Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions services/activitylog/pkg/service/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -17,8 +18,10 @@
"github.com/owncloud/reva/v2/pkg/storagespace"
"github.com/owncloud/reva/v2/pkg/utils"
microstore "go-micro.dev/v4/store"
"google.golang.org/grpc/metadata"

"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
Expand Down Expand Up @@ -98,78 +101,88 @@
func (a *ActivitylogService) Run() {
for e := range a.events {
var err error

// trace provider is available here, otherwise the activitylog service should have crashed
tp, _ := tracing.GetServiceTraceProvider(a.cfg.Tracing, a.cfg.Service.Name)
evCtx := context.Background()
ctx, span := events.TraceEventConsumer(evCtx, tp, e)

Check failure on line 108 in services/activitylog/pkg/service/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: events.TraceEventConsumer
ctx = metadata.NewOutgoingContext(ctx, e.ExtraInfo)

Check failure on line 109 in services/activitylog/pkg/service/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

e.ExtraInfo undefined (type "github.com/owncloud/reva/v2/pkg/events".Event has no field or method ExtraInfo)
// span is closed at the end of the loop

switch ev := e.Event.(type) {
case events.UploadReady:
err = a.AddActivity(ev.FileRef, e.ID, utils.TSToTime(ev.Timestamp))
err = a.AddActivity(ctx, ev.FileRef, e.ID, utils.TSToTime(ev.Timestamp))
case events.FileTouched:
err = a.AddActivity(ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
err = a.AddActivity(ctx, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
// Disabled https://github.com/owncloud/ocis/issues/10293
//case events.FileDownloaded:
// we are only interested in public link downloads - so no need to store others.
//if ev.ImpersonatingUser.GetDisplayName() == "Public" {
// err = a.AddActivity(ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
// err = a.AddActivity(ctx, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
//}
case events.ContainerCreated:
err = a.AddActivity(ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
err = a.AddActivity(ctx, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
case events.ItemTrashed:
err = a.AddActivityTrashed(ev.ID, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
err = a.AddActivityTrashed(ctx, ev.ID, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
case events.ItemPurged:
err = a.RemoveResource(ev.ID)
err = a.RemoveResource(ev.ID) // no ctx needed at the moment
case events.ItemMoved:
err = a.AddActivity(ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
err = a.AddActivity(ctx, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
case events.ShareCreated:
err = a.AddActivity(toRef(ev.ItemID), e.ID, utils.TSToTime(ev.CTime))
err = a.AddActivity(ctx, toRef(ev.ItemID), e.ID, utils.TSToTime(ev.CTime))
case events.ShareUpdated:
if ev.Sharer != nil && ev.ItemID != nil && ev.Sharer.GetOpaqueId() != ev.ItemID.GetSpaceId() {
err = a.AddActivity(toRef(ev.ItemID), e.ID, utils.TSToTime(ev.MTime))
err = a.AddActivity(ctx, toRef(ev.ItemID), e.ID, utils.TSToTime(ev.MTime))
}
case events.ShareRemoved:
err = a.AddActivity(toRef(ev.ItemID), e.ID, ev.Timestamp)
err = a.AddActivity(ctx, toRef(ev.ItemID), e.ID, ev.Timestamp)
case events.LinkCreated:
err = a.AddActivity(toRef(ev.ItemID), e.ID, utils.TSToTime(ev.CTime))
err = a.AddActivity(ctx, toRef(ev.ItemID), e.ID, utils.TSToTime(ev.CTime))
case events.LinkUpdated:
if ev.Sharer != nil && ev.ItemID != nil && ev.Sharer.GetOpaqueId() != ev.ItemID.GetSpaceId() {
err = a.AddActivity(toRef(ev.ItemID), e.ID, utils.TSToTime(ev.MTime))
err = a.AddActivity(ctx, toRef(ev.ItemID), e.ID, utils.TSToTime(ev.MTime))
}
case events.LinkRemoved:
err = a.AddActivity(toRef(ev.ItemID), e.ID, utils.TSToTime(ev.Timestamp))
err = a.AddActivity(ctx, toRef(ev.ItemID), e.ID, utils.TSToTime(ev.Timestamp))
case events.SpaceShared:
err = a.AddSpaceActivity(ev.ID, e.ID, ev.Timestamp)
err = a.AddSpaceActivity(ev.ID, e.ID, ev.Timestamp) // no ctx needed at the moment
case events.SpaceUnshared:
err = a.AddSpaceActivity(ev.ID, e.ID, ev.Timestamp)
err = a.AddSpaceActivity(ev.ID, e.ID, ev.Timestamp) // no ctx needed at the moment
}

if err != nil {
a.log.Error().Err(err).Interface("event", e).Msg("could not process event")
}

span.End()
}
}

// AddActivity adds the activity to the given resource and all its parents
func (a *ActivitylogService) AddActivity(initRef *provider.Reference, eventID string, timestamp time.Time) error {
func (a *ActivitylogService) AddActivity(ctx context.Context, initRef *provider.Reference, eventID string, timestamp time.Time) error {
gwc, err := a.gws.Next()
if err != nil {
return fmt.Errorf("cant get gateway client: %w", err)
}

ctx, err := utils.GetServiceUserContext(a.cfg.ServiceAccount.ServiceAccountID, gwc, a.cfg.ServiceAccount.ServiceAccountSecret)
ctx2, err := utils.GetServiceUserContextWithContext(ctx, gwc, a.cfg.ServiceAccount.ServiceAccountID, a.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
return fmt.Errorf("cant get service user context: %w", err)
}

return a.addActivity(initRef, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return utils.GetResource(ctx, ref, gwc)
return utils.GetResource(ctx2, ref, gwc)
})
}

// AddActivityTrashed adds the activity to given trashed resource and all its former parents
func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId, reference *provider.Reference, eventID string, timestamp time.Time) error {
func (a *ActivitylogService) AddActivityTrashed(ctx context.Context, resourceID *provider.ResourceId, reference *provider.Reference, eventID string, timestamp time.Time) error {
gwc, err := a.gws.Next()
if err != nil {
return fmt.Errorf("cant get gateway client: %w", err)
}

ctx, err := utils.GetServiceUserContext(a.cfg.ServiceAccount.ServiceAccountID, gwc, a.cfg.ServiceAccount.ServiceAccountSecret)
ctx2, err := utils.GetServiceUserContextWithContext(ctx, gwc, a.cfg.ServiceAccount.ServiceAccountID, a.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
return fmt.Errorf("cant get service user context: %w", err)
}
Expand All @@ -186,7 +199,7 @@
}

return a.addActivity(ref, eventID, timestamp, func(ref *provider.Reference) (*provider.ResourceInfo, error) {
return utils.GetResource(ctx, ref, gwc)
return utils.GetResource(ctx2, ref, gwc)
})
}

Expand Down
7 changes: 5 additions & 2 deletions services/antivirus/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"github.com/owncloud/reva/v2/pkg/events/stream"
"github.com/owncloud/reva/v2/pkg/rhttp"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"

"github.com/owncloud/ocis/v2/ocis-pkg/generators"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
Expand Down Expand Up @@ -178,9 +179,11 @@
}

func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
ctx := e.GetTraceContext(context.Background())
ctx, span := av.tp.Tracer("antivirus").Start(ctx, "processEvent")
evCtx := context.Background()
ctx, span := events.TraceEventConsumer(evCtx, av.tp, e)

Check failure on line 183 in services/antivirus/pkg/service/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: events.TraceEventConsumer
ctx = metadata.NewOutgoingContext(ctx, e.ExtraInfo)

Check failure on line 184 in services/antivirus/pkg/service/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

e.ExtraInfo undefined (type "github.com/owncloud/reva/v2/pkg/events".Event has no field or method ExtraInfo)
defer span.End()

av.l.Info().Str("traceID", span.SpanContext().TraceID().String()).Msg("TraceID")
ev := e.Event.(events.StartPostprocessingStep)
if ev.StepToStart != events.PPStepAntivirus {
Expand Down
8 changes: 7 additions & 1 deletion services/clientlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"

"github.com/owncloud/reva/v2/pkg/events"
"github.com/owncloud/reva/v2/pkg/rgrpc/todo/pool"
Expand Down Expand Up @@ -104,13 +105,18 @@
}

func (cl *ClientlogService) processEvent(event events.Event) {
baseCtx := context.Background()
evCtx, span := events.TraceEventConsumer(baseCtx, cl.tp, event)

Check failure on line 109 in services/clientlog/pkg/service/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: events.TraceEventConsumer
evCtx = metadata.NewOutgoingContext(evCtx, event.ExtraInfo)

Check failure on line 110 in services/clientlog/pkg/service/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

event.ExtraInfo undefined (type "github.com/owncloud/reva/v2/pkg/events".Event has no field or method ExtraInfo)
defer span.End()

gwc, err := cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}

ctx, err := utils.GetServiceUserContextWithContext(context.Background(), gwc, cl.cfg.ServiceAccount.ServiceAccountID, cl.cfg.ServiceAccount.ServiceAccountSecret)
ctx, err := utils.GetServiceUserContextWithContext(evCtx, gwc, cl.cfg.ServiceAccount.ServiceAccountID, cl.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error authenticating service user")
return
Expand Down
12 changes: 9 additions & 3 deletions services/frontend/pkg/command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"github.com/owncloud/reva/v2/pkg/utils"
"github.com/rs/zerolog"
"go-micro.dev/v4/metadata"
grpcmeta "google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/fieldmaskpb"

"github.com/owncloud/ocis/v2/ocis-pkg/generators"
Expand Down Expand Up @@ -92,12 +93,17 @@
for {
select {
case e := <-ch:
ctx2, span := events.TraceEventConsumer(ctx, traceProvider, e)

Check failure on line 96 in services/frontend/pkg/command/events.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: events.TraceEventConsumer
ctx2 = grpcmeta.NewOutgoingContext(ctx2, e.ExtraInfo)

Check failure on line 97 in services/frontend/pkg/command/events.go

View workflow job for this annotation

GitHub Actions / build-and-test

e.ExtraInfo undefined (type "github.com/owncloud/reva/v2/pkg/events".Event has no field or method ExtraInfo)

switch ev := e.Event.(type) {
default:
l.Error().Interface("event", e).Msg("unhandled event")
case events.ShareCreated:
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency)
AutoAcceptShares(ctx2, ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency)
}

span.End()
case <-ctx.Done():
l.Info().Msg("context cancelled")
return
Expand All @@ -112,13 +118,13 @@
}

// AutoAcceptShares automatically accepts shares if configured by the admin or user
func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount, maxConcurrency int) {
func AutoAcceptShares(baseCtx context.Context, ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount, maxConcurrency int) {
gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
return
}
ctx, err := utils.GetServiceUserContextWithContext(context.Background(), gwc, cfg.ServiceAccountID, cfg.ServiceAccountSecret)
ctx, err := utils.GetServiceUserContextWithContext(baseCtx, gwc, cfg.ServiceAccountID, cfg.ServiceAccountSecret)
if err != nil {
l.Error().Err(err).Msg("cannot impersonate user")
return
Expand Down
8 changes: 7 additions & 1 deletion services/graph/pkg/service/v0/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ldapv3 "github.com/go-ldap/ldap/v3"
"github.com/jellydator/ttlcache/v3"
microstore "go-micro.dev/v4/store"
"google.golang.org/grpc/metadata"

"github.com/owncloud/reva/v2/pkg/events"
"github.com/owncloud/reva/v2/pkg/rgrpc/todo/pool"
Expand Down Expand Up @@ -548,14 +549,19 @@
for loop := true; loop; {
select {
case e := <-evChannel:
ctx2, span := events.TraceEventConsumer(ctx, g.traceProvider, e)

Check failure on line 552 in services/graph/pkg/service/v0/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: events.TraceEventConsumer
ctx2 = metadata.NewOutgoingContext(ctx, e.ExtraInfo)

Check failure on line 553 in services/graph/pkg/service/v0/service.go

View workflow job for this annotation

GitHub Actions / build-and-test

e.ExtraInfo undefined (type "github.com/owncloud/reva/v2/pkg/events".Event has no field or method ExtraInfo)

switch ev := e.Event.(type) {
default:
l.Error().Interface("event", e).Msg("unhandled event")
case events.UserSignedIn:
if err := g.identityBackend.UpdateLastSignInDate(ctx, ev.Executant.OpaqueId, utils.TSToTime(ev.Timestamp)); err != nil {
if err := g.identityBackend.UpdateLastSignInDate(ctx2, ev.Executant.OpaqueId, utils.TSToTime(ev.Timestamp)); err != nil {
l.Error().Err(err).Str("userid", ev.Executant.OpaqueId).Msg("Error updating last sign in date")
}
}

span.End()
case <-ctx.Done():
l.Info().Msg("context cancelled")
loop = false
Expand Down
2 changes: 1 addition & 1 deletion services/notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Server(cfg *config.Config) *cli.Command {
store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword),
)

svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService,
svc := service.NewEventsNotifier(traceProvider, evts, channel, logger, gatewaySelector, valueService,
cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret,
cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL,
cfg.Notifications.TranslationPath, cfg.Notifications.SMTP.Sender, notificationStore, historyClient, registeredEvents)
Expand Down
14 changes: 6 additions & 8 deletions services/notifications/pkg/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/rs/zerolog"
)

func (s eventsNotifier) sendGroupedEmailsJob(sendEmailsEvent events.SendEmailsEvent, eventId string) {
func (s eventsNotifier) sendGroupedEmailsJob(ctx context.Context, sendEmailsEvent events.SendEmailsEvent, eventId string) {
logger := s.logger.With().
Str("event", "SendEmailsEvent").
Str("eventId", eventId).
Expand All @@ -29,8 +29,6 @@ func (s eventsNotifier) sendGroupedEmailsJob(sendEmailsEvent events.SendEmailsEv
return
}

ctx := context.Background()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double-check if we can remove this. The potential problem is if the context is canceled.
If we require the goroutine below to finish, we'll need a new context, but we'll need to move the metadata from one context to another.


jobs := make(chan string, 10)
go func() {
for _, key := range keys {
Expand Down Expand Up @@ -63,7 +61,7 @@ func (s eventsNotifier) createGroupedMail(ctx context.Context, logger zerolog.Lo
Str("eventId", te.ID.OpaqueId).
Logger()

executant, spaceName, shareLink, _, err := s.prepareSpaceShared(logger, te)
executant, spaceName, shareLink, _, err := s.prepareSpaceShared(ctx, logger, te)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for grouped email")
continue
Expand All @@ -81,7 +79,7 @@ func (s eventsNotifier) createGroupedMail(ctx context.Context, logger zerolog.Lo
Str("eventId", te.ID.OpaqueId).
Logger()

executant, spaceName, shareLink, _, err := s.prepareSpaceUnshared(logger, te)
executant, spaceName, shareLink, _, err := s.prepareSpaceUnshared(ctx, logger, te)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for grouped email")
continue
Expand All @@ -104,7 +102,7 @@ func (s eventsNotifier) createGroupedMail(ctx context.Context, logger zerolog.Lo
Str("eventId", te.ItemID.OpaqueId).
Logger()

owner, shareFolder, shareLink, _, err := s.prepareShareCreated(logger, te)
owner, shareFolder, shareLink, _, err := s.prepareShareCreated(ctx, logger, te)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for grouped email")
continue
Expand All @@ -121,7 +119,7 @@ func (s eventsNotifier) createGroupedMail(ctx context.Context, logger zerolog.Lo
Str("eventId", te.ItemID.OpaqueId).
Logger()

shareFolder, _, err := s.prepareShareExpired(logger, te)
shareFolder, _, err := s.prepareShareExpired(ctx, logger, te)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for grouped email")
continue
Expand All @@ -136,7 +134,7 @@ func (s eventsNotifier) createGroupedMail(ctx context.Context, logger zerolog.Lo
Str("event", "ShareRemoved").
Str("eventId", te.ItemID.OpaqueId).
Logger()
executant, shareFolder, _, err := s.prepareShareRemoved(logger, te)
executant, shareFolder, _, err := s.prepareShareRemoved(ctx, logger, te)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for grouped email")
continue
Expand Down
4 changes: 2 additions & 2 deletions services/notifications/pkg/service/sciencemesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/owncloud/reva/v2/pkg/utils"
)

func (s eventsNotifier) handleScienceMeshInviteTokenGenerated(e events.ScienceMeshInviteTokenGenerated) {
func (s eventsNotifier) handleScienceMeshInviteTokenGenerated(baseCtx context.Context, e events.ScienceMeshInviteTokenGenerated) {
logger := s.logger.With().
Str("event", "ScienceMeshInviteTokenGenerated").
Logger()
Expand All @@ -20,7 +20,7 @@ func (s eventsNotifier) handleScienceMeshInviteTokenGenerated(e events.ScienceMe
return
}

ctx, err := utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret)
ctx, err := utils.GetServiceUserContextWithContext(baseCtx, gatewayClient, s.serviceAccountID, s.serviceAccountSecret)
if err != nil {
logger.Error().Err(err).Msg("Could not impersonate service user")
return
Expand Down
Loading