Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 68 additions & 3 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ package events

import (
"context"
"encoding/json"
"log"
"reflect"

ctxpkg "github.com/owncloud/reva/v2/pkg/ctx"
"github.com/google/uuid"
ctxpkg "github.com/owncloud/reva/v2/pkg/ctx"
"go-micro.dev/v4/events"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
)

var (
Expand All @@ -47,6 +51,11 @@ var (

// MetadatakeyInitiatorID is the key used for the initiator id in the metadata map of the event
MetadatakeyInitiatorID = "initiatorid"

// MetadatakeyExtraInfo is the key used for the extra information associated to the event.
// This usually includes information that should be propagated across services.
// The information is a map[string][]string encoded as a JSON string
MetadatakeyExtraInfo = "extrainfo"
)

type (
Expand Down Expand Up @@ -77,6 +86,7 @@ type (
ID string
TraceParent string
InitiatorID string
ExtraInfo metadata.MD
Event interface{}
}
)
Expand Down Expand Up @@ -112,11 +122,14 @@ func Consume(s Consumer, group string, evs ...Unmarshaller) (<-chan Event, error
continue
}

var md metadata.MD
json.Unmarshal([]byte(e.Metadata[MetadatakeyExtraInfo]), &md)
outchan <- Event{
Type: et,
ID: e.Metadata[MetadatakeyEventID],
TraceParent: e.Metadata[MetadatakeyTraceParent],
InitiatorID: e.Metadata[MetadatakeyInitiatorID],
ExtraInfo: md,
Event: event,
}
}
Expand All @@ -135,11 +148,14 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) {
go func() {
for {
e := <-c
var md metadata.MD
json.Unmarshal([]byte(e.Metadata[MetadatakeyExtraInfo]), &md)
outchan <- Event{
Type: e.Metadata[MetadatakeyEventType],
ID: e.Metadata[MetadatakeyEventID],
TraceParent: e.Metadata[MetadatakeyTraceParent],
InitiatorID: e.Metadata[MetadatakeyInitiatorID],
ExtraInfo: md,
Event: e.Payload,
}
}
Expand All @@ -150,14 +166,21 @@ func ConsumeAll(s Consumer, group string) (<-chan Event, error) {
// Publish publishes the ev to the MainQueue from where it is distributed to all subscribers
// NOTE: needs to use reflect on runtime
func Publish(ctx context.Context, s Publisher, ev interface{}) error {
prevSpan := trace.SpanFromContext(ctx)
ctx2, span := TraceEventProducer(ctx, prevSpan.TracerProvider(), ev)
defer span.End()

evName := reflect.TypeOf(ev).String()
traceParent := getTraceParentFromCtx(ctx)
iid, _ := ctxpkg.ContextGetInitiator(ctx)
traceParent := getTraceParentFromCtx(ctx2)
iid, _ := ctxpkg.ContextGetInitiator(ctx2)
md, _ := metadata.FromOutgoingContext(ctx2)
extraInfoBytes, _ := json.Marshal(md)
return s.Publish(MainQueueName, ev, events.WithMetadata(map[string]string{
MetadatakeyEventType: evName,
MetadatakeyEventID: uuid.New().String(),
MetadatakeyTraceParent: traceParent,
MetadatakeyInitiatorID: iid,
MetadatakeyExtraInfo: string(extraInfoBytes),
}))
}

Expand All @@ -178,3 +201,45 @@ func getTraceParentFromCtx(ctx context.Context) string {
tc.Inject(ctx, &mc)
return mc["traceparent"]
}

func TraceEventProducer(ctx context.Context, tp trace.TracerProvider, evPayload interface{}) (context.Context, trace.Span) {
tracer := tp.Tracer("github.com/owncloud/reva/pkg/events")
evType := reflect.TypeOf(evPayload).String()
iid, _ := ctxpkg.ContextGetInitiator(ctx)

newCtx, span := tracer.Start(
ctx,
"Event "+evType,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
attribute.String("ocis.event.type", evType),
attribute.String("ocis.event.initiator", iid),
),
)
return newCtx, span
}

func TraceEventConsumer(ctx context.Context, tp trace.TracerProvider, ev Event) (context.Context, trace.Span) {
tracer := tp.Tracer("github.com/owncloud/reva/pkg/events")
return TraceEventConsumerWithTracer(ctx, tracer, ev)
}

func TraceEventConsumerWithTracer(ctx context.Context, tracer trace.Tracer, ev Event) (context.Context, trace.Span) {
evCtx := ev.GetTraceContext(ctx)

newCtx, span := tracer.Start(
ctx,
"Event "+ev.Type,
trace.WithNewRoot(),
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithAttributes(
attribute.String("ocis.event.type", ev.Type),
attribute.String("ocis.event.id", ev.ID),
attribute.String("ocis.event.initiator", ev.InitiatorID),
),
trace.WithLinks(
trace.LinkFromContext(evCtx),
),
)
return newCtx, span
}
14 changes: 11 additions & 3 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/owncloud/reva/v2/pkg/appctx"
ctxpkg "github.com/owncloud/reva/v2/pkg/ctx"
"github.com/owncloud/reva/v2/pkg/errtypes"
Expand All @@ -44,13 +46,13 @@ import (
"github.com/owncloud/reva/v2/pkg/share/manager/registry"
"github.com/owncloud/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages
"github.com/owncloud/reva/v2/pkg/storagespace"
revatrace "github.com/owncloud/reva/v2/pkg/trace"
"github.com/owncloud/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"
grpcmetadata "google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

Expand Down Expand Up @@ -278,7 +280,13 @@ func (m *Manager) ProcessEvents(ch <-chan events.Event) {

if ev, ok := event.Event.(events.SpaceDeleted); ok {
log.Debug().Msgf("space deleted event: %v", ev)
go func() { m.purgeSpace(ctx, ev.ID) }()
go func() {
ctx2, span := events.TraceEventConsumer(ctx, revatrace.DefaultProvider(), event)
ctx2 = grpcmetadata.NewOutgoingContext(ctx2, event.ExtraInfo)
defer span.End()

m.purgeSpace(ctx2, ev.ID)
}()
}
}
}
Expand Down
Loading
Loading