diff --git a/pkg/events/events.go b/pkg/events/events.go index 35b2999c60e..248408dd2be 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -20,13 +20,17 @@ package events import ( "context" + "encoding/json" "log" "reflect" - ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" "github.com/google/uuid" + ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" "go-micro.dev/v4/events" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/metadata" ) var ( @@ -47,6 +51,11 @@ var ( // MetadatakeyInitiatorID is the key used for the initiator id in the metadata map of the event MetadatakeyInitiatorID = "initiatorid" + + // MetadatakeyExtraInfo is the key used for the extra information associated to the event. + // This usually includes information that should be propagated across services. + // The information is a map[string][]string encoded as a JSON string + MetadatakeyExtraInfo = "extrainfo" ) type ( @@ -77,6 +86,7 @@ type ( ID string TraceParent string InitiatorID string + ExtraInfo metadata.MD Event interface{} } ) @@ -112,11 +122,14 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error continue } + var md metadata.MD + json.Unmarshal([]byte(e.Metadata[MetadatakeyExtraInfo]), &md) outchan <- Event{ Type: et, ID: e.Metadata[MetadatakeyEventID], TraceParent: e.Metadata[MetadatakeyTraceParent], InitiatorID: e.Metadata[MetadatakeyInitiatorID], + ExtraInfo: md, Event: event, } } @@ -135,11 +148,14 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) { go func() { for { e := <-c + var md metadata.MD + json.Unmarshal([]byte(e.Metadata[MetadatakeyExtraInfo]), &md) outchan <- Event{ Type: e.Metadata[MetadatakeyEventType], ID: e.Metadata[MetadatakeyEventID], TraceParent: e.Metadata[MetadatakeyTraceParent], InitiatorID: e.Metadata[MetadatakeyInitiatorID], + ExtraInfo: md, Event: e.Payload, } } @@ -150,14 +166,21 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) { // Publish publishes the ev to the MainQueue from where it is distributed to all subscribers // NOTE: needs to use reflect on runtime func Publish(ctx context.Context, s Publisher, ev interface{}) error { + prevSpan := trace.SpanFromContext(ctx) + ctx2, span := TraceEventProducer(ctx, prevSpan.TracerProvider(), ev) + defer span.End() + evName := reflect.TypeOf(ev).String() - traceParent := getTraceParentFromCtx(ctx) - iid, _ := ctxpkg.ContextGetInitiator(ctx) + traceParent := getTraceParentFromCtx(ctx2) + iid, _ := ctxpkg.ContextGetInitiator(ctx2) + md, _ := metadata.FromOutgoingContext(ctx2) + extraInfoBytes, _ := json.Marshal(md) return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{ MetadatakeyEventType: evName, MetadatakeyEventID: uuid.New().String(), MetadatakeyTraceParent: traceParent, MetadatakeyInitiatorID: iid, + MetadatakeyExtraInfo: string(extraInfoBytes), })) } @@ -178,3 +201,45 @@ func getTraceParentFromCtx(ctx context.Context) string { tc.Inject(ctx, &mc) return mc["traceparent"] } + +func TraceEventProducer(ctx context.Context, tp trace.TracerProvider, evPayload interface{}) (context.Context, trace.Span) { + tracer := tp.Tracer("github.com/owncloud/reva/pkg/events") + evType := reflect.TypeOf(evPayload).String() + iid, _ := ctxpkg.ContextGetInitiator(ctx) + + newCtx, span := tracer.Start( + ctx, + "Event "+evType, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + attribute.String("ocis.event.type", evType), + attribute.String("ocis.event.initiator", iid), + ), + ) + return newCtx, span +} + +func TraceEventConsumer(ctx context.Context, tp trace.TracerProvider, ev Event) (context.Context, trace.Span) { + tracer := tp.Tracer("github.com/owncloud/reva/pkg/events") + return TraceEventConsumerWithTracer(ctx, tracer, ev) +} + +func TraceEventConsumerWithTracer(ctx context.Context, tracer trace.Tracer, ev Event) (context.Context, trace.Span) { + evCtx := ev.GetTraceContext(ctx) + + newCtx, span := tracer.Start( + ctx, + "Event "+ev.Type, + trace.WithNewRoot(), + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes( + attribute.String("ocis.event.type", ev.Type), + attribute.String("ocis.event.id", ev.ID), + attribute.String("ocis.event.initiator", ev.InitiatorID), + ), + trace.WithLinks( + trace.LinkFromContext(evCtx), + ), + ) + return newCtx, span +} diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index df2ca50dc24..cab9b170cb0 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -29,6 +29,8 @@ import ( rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" "github.com/owncloud/reva/v2/pkg/appctx" ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" "github.com/owncloud/reva/v2/pkg/errtypes" @@ -44,13 +46,13 @@ import ( "github.com/owncloud/reva/v2/pkg/share/manager/registry" "github.com/owncloud/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages "github.com/owncloud/reva/v2/pkg/storagespace" + revatrace "github.com/owncloud/reva/v2/pkg/trace" "github.com/owncloud/reva/v2/pkg/utils" - "github.com/google/uuid" - "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" + grpcmetadata "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -278,7 +280,13 @@ func (m *Manager) ProcessEvents(ch <-chan events.Event) { if ev, ok := event.Event.(events.SpaceDeleted); ok { log.Debug().Msgf("space deleted event: %v", ev) - go func() { m.purgeSpace(ctx, ev.ID) }() + go func() { + ctx2, span := events.TraceEventConsumer(ctx, revatrace.DefaultProvider(), event) + ctx2 = grpcmetadata.NewOutgoingContext(ctx2, event.ExtraInfo) + defer span.End() + + m.purgeSpace(ctx2, ev.ID) + }() } } } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index c4c4fd1e08f..f06a82713d6 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -41,6 +41,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" + grpcmetadata "google.golang.org/grpc/metadata" ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" "github.com/owncloud/reva/v2/pkg/errtypes" @@ -277,326 +278,332 @@ func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (stor // Postprocessing starts the postprocessing result collector func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { - ctx := context.TODO() // we should pass the trace id in the event and initialize the trace provider here - ctx, span := tracer.Start(ctx, "Postprocessing") - defer span.End() log := logger.New() for event := range ch { - switch ev := event.Event.(type) { - case events.PostprocessingFinished: - sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger() - if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { - sublog.Debug().Msg("ignoring event for different storage") - continue - } - session, err := fs.sessionStore.Get(ctx, ev.UploadID) - if err != nil { - sublog.Error().Err(err).Msg("Failed to get upload") - continue // NOTE: since we can't get the upload, we can't delete the blob - } + evCtx := context.Background() + fs.processEvent(evCtx, event, log) + } +} - ctx = session.Context(ctx) +func (fs *Decomposedfs) processEvent(evCtx context.Context, event events.Event, log *zerolog.Logger) { + ctx, span := events.TraceEventConsumerWithTracer(evCtx, tracer, event) + ctx = grpcmetadata.NewOutgoingContext(ctx, event.ExtraInfo) + defer span.End() - n, err := session.Node(ctx) - if err != nil { - sublog.Error().Err(err).Msg("could not read node") - continue - } - sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() - if !n.Exists { - sublog.Debug().Msg("node no longer exists") - session.Cleanup(false, true, true, false) - continue - } + switch ev := event.Event.(type) { + case events.PostprocessingFinished: + sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger() + if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { + sublog.Debug().Msg("ignoring event for different storage") + return + } + session, err := fs.sessionStore.Get(ctx, ev.UploadID) + if err != nil { + sublog.Error().Err(err).Msg("Failed to get upload") + return // NOTE: since we can't get the upload, we can't delete the blob + } + + ctx = session.Context(ctx) - var ( - failed bool - revertNodeMetadata bool - keepUpload bool - ) - unmarkPostprocessing := true - - switch ev.Outcome { - default: - sublog.Error().Str("outcome", string(ev.Outcome)).Msg("unknown postprocessing outcome - aborting") - fallthrough - case events.PPOutcomeAbort: + n, err := session.Node(ctx) + if err != nil { + sublog.Error().Err(err).Msg("could not read node") + return + } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() + if !n.Exists { + sublog.Debug().Msg("node no longer exists") + session.Cleanup(false, true, true, false) + return + } + + var ( + failed bool + revertNodeMetadata bool + keepUpload bool + ) + unmarkPostprocessing := true + + switch ev.Outcome { + default: + sublog.Error().Str("outcome", string(ev.Outcome)).Msg("unknown postprocessing outcome - aborting") + fallthrough + case events.PPOutcomeAbort: + failed = true + revertNodeMetadata = true + keepUpload = true + metrics.UploadSessionsAborted.Inc() + case events.PPOutcomeContinue: + if err := session.Finalize(ctx); err != nil { + sublog.Error().Err(err).Msg("could not finalize upload") failed = true - revertNodeMetadata = true + revertNodeMetadata = false keepUpload = true - metrics.UploadSessionsAborted.Inc() - case events.PPOutcomeContinue: - if err := session.Finalize(ctx); err != nil { - sublog.Error().Err(err).Msg("could not finalize upload") - failed = true - revertNodeMetadata = false - keepUpload = true - // keep postprocessing status so the upload is not deleted during housekeeping - unmarkPostprocessing = false - } else { - metrics.UploadSessionsFinalized.Inc() - } - case events.PPOutcomeDelete: - failed = true - revertNodeMetadata = true - metrics.UploadSessionsDeleted.Inc() + // keep postprocessing status so the upload is not deleted during housekeeping + unmarkPostprocessing = false + } else { + metrics.UploadSessionsFinalized.Inc() } + case events.PPOutcomeDelete: + failed = true + revertNodeMetadata = true + metrics.UploadSessionsDeleted.Inc() + } - getParent := func() *node.Node { - p, err := n.Parent(ctx) - if err != nil { - sublog.Error().Err(err).Msg("could not read parent") - return nil - } - return p + getParent := func() *node.Node { + p, err := n.Parent(ctx) + if err != nil { + sublog.Error().Err(err).Msg("could not read parent") + return nil } + return p + } - now := time.Now() - if failed { - // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") - latestSession, err := n.ProcessingID(ctx) - if err != nil { - sublog.Error().Err(err).Msg("reading node for session failed") - } - if latestSession == session.ID() { - // propagate reverted sizeDiff after failed postprocessing - if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { - sublog.Error().Err(err).Msg("could not propagate tree size change") - } - } - } else if p := getParent(); p != nil { - // update parent tmtime to propagate etag change after successful postprocessing - _ = p.SetTMTime(ctx, &now) - if err := fs.tp.Propagate(ctx, p, 0); err != nil { - sublog.Error().Err(err).Msg("could not propagate etag change") + now := time.Now() + if failed { + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + sublog.Error().Err(err).Msg("reading node for session failed") + } + if latestSession == session.ID() { + // propagate reverted sizeDiff after failed postprocessing + if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { + sublog.Error().Err(err).Msg("could not propagate tree size change") } } + } else if p := getParent(); p != nil { + // update parent tmtime to propagate etag change after successful postprocessing + _ = p.SetTMTime(ctx, &now) + if err := fs.tp.Propagate(ctx, p, 0); err != nil { + sublog.Error().Err(err).Msg("could not propagate etag change") + } + } - session.Cleanup(revertNodeMetadata, !keepUpload, !keepUpload, unmarkPostprocessing) + session.Cleanup(revertNodeMetadata, !keepUpload, !keepUpload, unmarkPostprocessing) - var isVersion bool - if session.NodeExists() { - info, err := session.GetInfo(ctx) - if err == nil && info.MetaData["versionsPath"] != "" { - isVersion = true - } + var isVersion bool + if session.NodeExists() { + info, err := session.GetInfo(ctx) + if err == nil && info.MetaData["versionsPath"] != "" { + isVersion = true } + } - if err := events.Publish( - ctx, - fs.stream, - events.UploadReady{ - UploadID: ev.UploadID, - Failed: failed, - ExecutingUser: ev.ExecutingUser, - Filename: ev.Filename, - FileRef: &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: session.ProviderID(), - SpaceId: session.SpaceID(), - OpaqueId: session.SpaceID(), - }, - Path: utils.MakeRelativePath(filepath.Join(session.Dir(), session.Filename())), - }, - ResourceID: &provider.ResourceId{ + if err := events.Publish( + ctx, + fs.stream, + events.UploadReady{ + UploadID: ev.UploadID, + Failed: failed, + ExecutingUser: ev.ExecutingUser, + Filename: ev.Filename, + FileRef: &provider.Reference{ + ResourceId: &provider.ResourceId{ StorageId: session.ProviderID(), SpaceId: session.SpaceID(), - OpaqueId: session.NodeID(), + OpaqueId: session.SpaceID(), }, - Timestamp: utils.TimeToTS(now), - SpaceOwner: n.SpaceOwnerOrManager(ctx), - IsVersion: isVersion, - ImpersonatingUser: ev.ImpersonatingUser, + Path: utils.MakeRelativePath(filepath.Join(session.Dir(), session.Filename())), }, - ); err != nil { - sublog.Error().Err(err).Msg("Failed to publish UploadReady event") - } - case events.RestartPostprocessing: - sublog := log.With().Str("event", "RestartPostprocessing").Str("uploadid", ev.UploadID).Logger() - session, err := fs.sessionStore.Get(ctx, ev.UploadID) - if err != nil { - sublog.Error().Err(err).Msg("Failed to get upload") - continue - } - n, err := session.Node(ctx) - if err != nil { - sublog.Error().Err(err).Msg("could not read node") - continue - } - sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() - s, err := session.URL(ctx) - if err != nil { - sublog.Error().Err(err).Msg("could not create url") - continue - } - - metrics.UploadSessionsRestarted.Inc() - - // restart postprocessing - if err := events.Publish(ctx, fs.stream, events.BytesReceived{ - UploadID: session.ID(), - URL: s, - SpaceOwner: n.SpaceOwnerOrManager(ctx), - ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? - ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, - Filename: session.Filename(), - Filesize: uint64(session.Size()), - }); err != nil { - sublog.Error().Err(err).Msg("Failed to publish BytesReceived event") - } - case events.CleanUpload: - sublog := log.With().Str("event", "CleanUpload").Str("uploadid", ev.UploadID).Logger() - session, err := fs.sessionStore.Get(ctx, ev.UploadID) - if err != nil { - sublog.Error().Err(err).Msg("Failed to get upload") - continue // NOTE: since we can't get the upload, we can't delete the blob - } - session.Cleanup(true, !ev.KeepUpload, !ev.KeepUpload, true) - case events.RevertRevision: - sublog := log.With().Str("event", "RevertRevision").Interface("nodeid", ev.ResourceID).Logger() - if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { - sublog.Debug().Msg("ignoring event for different storage") - continue - } - n, err := fs.lu.NodeFromID(ctx, ev.ResourceID) - if err != nil { - sublog.Error().Err(err).Msg("Failed to get node") - continue - } + ResourceID: &provider.ResourceId{ + StorageId: session.ProviderID(), + SpaceId: session.SpaceID(), + OpaqueId: session.NodeID(), + }, + Timestamp: utils.TimeToTS(now), + SpaceOwner: n.SpaceOwnerOrManager(ctx), + IsVersion: isVersion, + ImpersonatingUser: ev.ImpersonatingUser, + }, + ); err != nil { + sublog.Error().Err(err).Msg("Failed to publish UploadReady event") + } + case events.RestartPostprocessing: + sublog := log.With().Str("event", "RestartPostprocessing").Str("uploadid", ev.UploadID).Logger() + session, err := fs.sessionStore.Get(ctx, ev.UploadID) + if err != nil { + sublog.Error().Err(err).Msg("Failed to get upload") + return + } + n, err := session.Node(ctx) + if err != nil { + sublog.Error().Err(err).Msg("could not read node") + return + } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() + s, err := session.URL(ctx) + if err != nil { + sublog.Error().Err(err).Msg("could not create url") + return + } - if err := n.RevertCurrentRevision(ctx); err != nil { - sublog.Error().Err(err).Msg("Failed to revert revision") - continue - } - case events.PostprocessingStepFinished: - sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger() - if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { - sublog.Debug().Msg("ignoring event for different storage") - continue - } - if ev.FinishedStep != events.PPStepAntivirus { - // atm we are only interested in antivirus results - continue - } + metrics.UploadSessionsRestarted.Inc() + + // restart postprocessing + if err := events.Publish(ctx, fs.stream, events.BytesReceived{ + UploadID: session.ID(), + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(ctx), + ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: session.Filename(), + Filesize: uint64(session.Size()), + }); err != nil { + sublog.Error().Err(err).Msg("Failed to publish BytesReceived event") + } + case events.CleanUpload: + sublog := log.With().Str("event", "CleanUpload").Str("uploadid", ev.UploadID).Logger() + session, err := fs.sessionStore.Get(ctx, ev.UploadID) + if err != nil { + sublog.Error().Err(err).Msg("Failed to get upload") + return // NOTE: since we can't get the upload, we can't delete the blob + } + session.Cleanup(true, !ev.KeepUpload, !ev.KeepUpload, true) + case events.RevertRevision: + sublog := log.With().Str("event", "RevertRevision").Interface("nodeid", ev.ResourceID).Logger() + if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { + sublog.Debug().Msg("ignoring event for different storage") + return + } + n, err := fs.lu.NodeFromID(ctx, ev.ResourceID) + if err != nil { + sublog.Error().Err(err).Msg("Failed to get node") + return + } - res := ev.Result.(events.VirusscanResult) - if res.ErrorMsg != "" { - // scan failed somehow - // Should we handle this here? - continue - } - sublog = log.With().Str("scan_description", res.Description).Bool("infected", res.Infected).Logger() + if err := n.RevertCurrentRevision(ctx); err != nil { + sublog.Error().Err(err).Msg("Failed to revert revision") + return + } + case events.PostprocessingStepFinished: + sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger() + if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { + sublog.Debug().Msg("ignoring event for different storage") + return + } + if ev.FinishedStep != events.PPStepAntivirus { + // atm we are only interested in antivirus results + return + } - var n *node.Node - switch ev.UploadID { - case "": - // uploadid is empty -> this was an on-demand scan - /* ON DEMAND SCANNING NOT SUPPORTED ATM - ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) - ref := &provider.Reference{ResourceId: ev.ResourceID} + res := ev.Result.(events.VirusscanResult) + if res.ErrorMsg != "" { + // scan failed somehow + // Should we handle this here? + return + } + sublog = log.With().Str("scan_description", res.Description).Bool("infected", res.Infected).Logger() - no, err := fs.lu.NodeFromResource(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") - continue + var n *node.Node + switch ev.UploadID { + case "": + // uploadid is empty -> this was an on-demand scan + /* ON DEMAND SCANNING NOT SUPPORTED ATM + ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) + ref := &provider.Reference{ResourceId: ev.ResourceID} - } - n = no - if ev.Outcome == events.PPOutcomeDelete { - // antivir wants us to delete the file. We must obey and need to - - // check if there a previous versions existing - revs, err := fs.ListRevisions(ctx, ref) - if len(revs) == 0 { - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") - } - - // no versions -> trash file - err := fs.Delete(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") - continue - } - - // now purge it from the recycle bin - if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - continue - } + no, err := fs.lu.NodeFromResource(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") + continue - // we have versions - find the newest - versions := make(map[uint64]string) // remember all versions - we need them later - var nv uint64 - for _, v := range revs { - versions[v.Mtime] = v.Key - if v.Mtime > nv { - nv = v.Mtime - } - } + } + n = no + if ev.Outcome == events.PPOutcomeDelete { + // antivir wants us to delete the file. We must obey and need to - // restore newest version - if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") - continue + // check if there a previous versions existing + revs, err := fs.ListRevisions(ctx, ref) + if len(revs) == 0 { + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") } - // now find infected version - revs, err = fs.ListRevisions(ctx, ref) + // no versions -> trash file + err := fs.Delete(ctx, ref) if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") + continue } - for _, v := range revs { - // we looking for a version that was previously not there - if _, ok := versions[v.Mtime]; ok { - continue - } - - if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") - } + // now purge it from the recycle bin + if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") } // remove cache entry in gateway fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) continue } - */ - default: - // uploadid is not empty -> this is an async upload - session, err := fs.sessionStore.Get(ctx, ev.UploadID) - if err != nil { - sublog.Error().Err(err).Msg("Failed to get upload") + + // we have versions - find the newest + versions := make(map[uint64]string) // remember all versions - we need them later + var nv uint64 + for _, v := range revs { + versions[v.Mtime] = v.Key + if v.Mtime > nv { + nv = v.Mtime + } + } + + // restore newest version + if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") continue } - n, err = session.Node(ctx) + // now find infected version + revs, err = fs.ListRevisions(ctx, ref) if err != nil { - sublog.Error().Err(err).Msg("Failed to get node after scan") - continue + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") } - sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() - session.SetScanData(res.Description, res.Scandate) - if err := session.Persist(ctx); err != nil { - sublog.Error().Err(err).Msg("Failed to persist scan results") + for _, v := range revs { + // we looking for a version that was previously not there + if _, ok := versions[v.Mtime]; ok { + continue + } + + if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") + } } - } - if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { - sublog.Error().Err(err).Msg("Failed to set scan results") + // remove cache entry in gateway + fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) continue } - - metrics.UploadSessionsScanned.Inc() + */ default: - log.Error().Interface("event", ev).Msg("Unknown event") + // uploadid is not empty -> this is an async upload + session, err := fs.sessionStore.Get(ctx, ev.UploadID) + if err != nil { + sublog.Error().Err(err).Msg("Failed to get upload") + return + } + + n, err = session.Node(ctx) + if err != nil { + sublog.Error().Err(err).Msg("Failed to get node after scan") + return + } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() + + session.SetScanData(res.Description, res.Scandate) + if err := session.Persist(ctx); err != nil { + sublog.Error().Err(err).Msg("Failed to persist scan results") + } } + + if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { + sublog.Error().Err(err).Msg("Failed to set scan results") + return + } + + metrics.UploadSessionsScanned.Inc() + default: + log.Error().Interface("event", ev).Msg("Unknown event") } }