diff --git a/samples/README.md b/samples/README.md index 6edc8f03d..57f3c1665 100644 --- a/samples/README.md +++ b/samples/README.md @@ -20,6 +20,7 @@ You can grab them and copy-paste in your project to start using sdk-go. * [Responder](./http/responder): Receive and reply to events using the CloudEvents Client. * [Sender](./http/sender): Send events using the CloudEvents Client. * [Sender with retries](./http/sender-retry): Send events, retrying in case of a failure. + * [Correlation Extension](./http/correlation): Send and receive events using the Correlation extension. * [Receiver & Requester with metrics enabled](./http/metrics): Request events and handle events with metrics enabled. * Kafka * [Receiver](./kafka/receiver): Receive events using the CloudEvents Client. To run the tests look at [Kafka samples README](./kafka/README.md). diff --git a/samples/http/correlation/README.md b/samples/http/correlation/README.md new file mode 100644 index 000000000..dece2f634 --- /dev/null +++ b/samples/http/correlation/README.md @@ -0,0 +1,121 @@ +# Correlation Sample + +This sample demonstrates how to use the `CorrelationExtension` to track event relationships and causality in distributed systems. + +## Prerequisites + +- Go 1.25.0 or later +- Access to a terminal + +## Running the Sample + +1. Start the receiver in one terminal: + ```bash + go run receiver/main.go + ``` + +2. Start the sender in another terminal: + ```bash + go run sender/main.go + ``` + +## Expected Output + +### Receiver +The receiver will print the incoming events along with their correlation and causation identifiers: +``` +Received Event: +Context Attributes, + specversion: 1.0 + type: com.example.order.placed + source: https://example.com/orders + id: order-123 +Extensions, + correlationid: txn-abc-123 +Data, + { + "customerId": "456", + "orderId": "123" + } +Correlation ID: txn-abc-123 +------------------------------------------------- +Received Event: +Context Attributes, + specversion: 1.0 + type: com.example.payment.processed + source: https://example.com/payments + id: payment-789 +Extensions, + causationid: order-123 + correlationid: txn-abc-123 +Data, + { + "amount": 150, + "currency": "USD" + } +Correlation ID: txn-abc-123 +Causation ID: order-123 +------------------------------------------------- +Received Event: +Context Attributes, + specversion: 1.0 + type: com.example.inventory.checked + source: https://example.com/inventory + id: inventory-456 +Extensions, + causationid: order-123 + correlationid: txn-abc-123 +Data, + { + "available": true, + "items": ["sku-001", "sku-002"] + } +Correlation ID: txn-abc-123 +Causation ID: order-123 +------------------------------------------------- +Received Event: +Context Attributes, + specversion: 1.0 + type: com.example.shipping.scheduled + source: https://example.com/shipping + id: shipping-012 +Extensions, + causationid: inventory-456 + correlationid: txn-abc-123 +Data, + { + "carrier": "FastShip", + "estimatedDelivery": "2024-01-15" + } +Correlation ID: txn-abc-123 +Causation ID: inventory-456 +------------------------------------------------- +Received Event: +Context Attributes, + specversion: 1.0 + type: com.example.notification.email + source: https://example.com/notifications + id: notify-email-890 +Extensions, + causationid: shipping-012 + correlationid: txn-abc-123 +Data, + { + "recipient": "customer@example.com", + "template": "order-fulfilled" + } +Correlation ID: txn-abc-123 +Causation ID: shipping-012 +------------------------------------------------- +``` + +### Sender +The sender will log its activity, showing the causation relationship in a tree format: +``` +[Correlation ID: txn-abc-123] +└── ID: order-123 (Order Placed) [202] + ├── ID: payment-789 (Payment Processed) [202] + └── ID: inventory-456 (Inventory Checked) [202] + └── ID: shipping-012 (Shipping Scheduled) [202] + └── ID: notify-email-890 (Notification Sent) [202] +``` diff --git a/samples/http/correlation/receiver/main.go b/samples/http/correlation/receiver/main.go new file mode 100644 index 000000000..7150a40ce --- /dev/null +++ b/samples/http/correlation/receiver/main.go @@ -0,0 +1,46 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/extensions" +) + +func main() { + ctx := context.Background() + p, err := cloudevents.NewHTTP() + if err != nil { + log.Fatalf("failed to create protocol: %s", err.Error()) + } + + c, err := cloudevents.NewClient(p) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + log.Printf("will listen on :8080\n") + log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, receive)) +} + +func receive(ctx context.Context, event cloudevents.Event) { + fmt.Printf("Received Event:\n%s\n", event) + + // Extract the correlation extension + if ext, ok := extensions.GetCorrelationExtension(event); ok { + fmt.Printf("Correlation ID: %s\n", ext.CorrelationID) + if ext.CausationID != "" { + fmt.Printf("Causation ID: %s\n", ext.CausationID) + } + } else { + fmt.Printf("No Correlation Extension found in event\n") + } + fmt.Println("-------------------------------------------------") +} diff --git a/samples/http/correlation/sender/main.go b/samples/http/correlation/sender/main.go new file mode 100644 index 000000000..12879e8c5 --- /dev/null +++ b/samples/http/correlation/sender/main.go @@ -0,0 +1,142 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/extensions" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +func main() { + ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/") + + p, err := cloudevents.NewHTTP() + if err != nil { + log.Fatalf("failed to create protocol: %s", err.Error()) + } + + c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + // Initial event in a logical flow + correlationID := "txn-abc-123" + fmt.Printf("[Correlation ID: %s]\n", correlationID) + + e1 := cloudevents.NewEvent() + e1.SetID("order-123") + e1.SetType("com.example.order.placed") + e1.SetSource("https://example.com/orders") + _ = e1.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "orderId": "123", + "customerId": "456", + }) + + // Add correlation extension + ext1 := extensions.CorrelationExtension{ + CorrelationID: correlationID, + } + ext1.AddCorrelationAttributes(&e1) + + send(c, ctx, e1, "└── ", "Order Placed") + + // Event B: Payment Processed (triggered by order A) + e2 := cloudevents.NewEvent() + e2.SetID("payment-789") + e2.SetType("com.example.payment.processed") + e2.SetSource("https://example.com/payments") + _ = e2.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "amount": 150.0, + "currency": "USD", + }) + + ext2 := extensions.CorrelationExtension{ + CorrelationID: correlationID, + CausationID: e1.ID(), + } + ext2.AddCorrelationAttributes(&e2) + + send(c, ctx, e2, " ├── ", "Payment Processed") + + // Event C: Inventory Checked (triggered by order A) + e3 := cloudevents.NewEvent() + e3.SetID("inventory-456") + e3.SetType("com.example.inventory.checked") + e3.SetSource("https://example.com/inventory") + _ = e3.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "items": []string{"sku-001", "sku-002"}, + "available": true, + }) + + ext3 := extensions.CorrelationExtension{ + CorrelationID: correlationID, + CausationID: e1.ID(), + } + ext3.AddCorrelationAttributes(&e3) + + send(c, ctx, e3, " └── ", "Inventory Checked") + + // Event D: Shipping Scheduled (triggered by inventory check C) + e4 := cloudevents.NewEvent() + e4.SetID("shipping-012") + e4.SetType("com.example.shipping.scheduled") + e4.SetSource("https://example.com/shipping") + _ = e4.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "carrier": "FastShip", + "estimatedDelivery": "2024-01-15", + }) + + ext4 := extensions.CorrelationExtension{ + CorrelationID: correlationID, + CausationID: e3.ID(), + } + ext4.AddCorrelationAttributes(&e4) + + send(c, ctx, e4, " └── ", "Shipping Scheduled") + + // Event E: Notification Sent (triggered by shipping D) + e5 := cloudevents.NewEvent() + e5.SetID("notify-email-890") + e5.SetType("com.example.notification.email") + e5.SetSource("https://example.com/notifications") + _ = e5.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "recipient": "customer@example.com", + "template": "order-fulfilled", + }) + + ext5 := extensions.CorrelationExtension{ + CorrelationID: correlationID, + CausationID: e4.ID(), + } + ext5.AddCorrelationAttributes(&e5) + + send(c, ctx, e5, " └── ", "Notification Sent") +} + +func send(c cloudevents.Client, ctx context.Context, e cloudevents.Event, prefix string, label string) { + res := c.Send(ctx, e) + if cloudevents.IsUndelivered(res) { + fmt.Printf("%sID: %s (%s) [FAILED: %v]\n", prefix, e.ID(), label, res) + return + } + var httpResult *cehttp.Result + if cloudevents.ResultAs(res, &httpResult) { + status := fmt.Sprintf("%d", httpResult.StatusCode) + if httpResult.StatusCode != http.StatusOK && httpResult.StatusCode != http.StatusAccepted { + status = fmt.Sprintf("FAILED %d: %s", httpResult.StatusCode, fmt.Sprintf(httpResult.Format, httpResult.Args...)) + } + fmt.Printf("%sID: %s (%s) [%s]\n", prefix, e.ID(), label, status) + return + } + fmt.Printf("%sID: %s (%s) [%s]\n", prefix, e.ID(), label, res.Error()) +} diff --git a/v2/binding/metadata.go b/v2/binding/metadata.go new file mode 100644 index 000000000..532b340e8 --- /dev/null +++ b/v2/binding/metadata.go @@ -0,0 +1,41 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package binding + +import ( + "github.com/cloudevents/sdk-go/v2/types" +) + +// ExtractMetadata reads metadata extensions from a MessageMetadataReader and maps them to the target pointers. +// It skips empty values to ensure only valid data is written. +func ExtractMetadata[T ~string](reader MessageMetadataReader, mapping map[string]*T) error { + for name, target := range mapping { + v := reader.GetExtension(name) + if v == nil { + continue + } + s, err := types.Format(v) + if err != nil { + return err + } + *target = T(s) + } + return nil +} + +// AttachMetadata sets metadata extensions on a MessageMetadataWriter using the provided mapping. +// It skips empty values to ensure only valid data is written. +func AttachMetadata[T ~string](writer MessageMetadataWriter, mapping map[string]T) error { + for name, value := range mapping { + if value == "" { + continue + } + if err := writer.SetExtension(name, value); err != nil { + return err + } + } + return nil +} diff --git a/v2/binding/metadata_test.go b/v2/binding/metadata_test.go new file mode 100644 index 000000000..b241233c9 --- /dev/null +++ b/v2/binding/metadata_test.go @@ -0,0 +1,81 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package binding + +import ( + "testing" + + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/stretchr/testify/assert" +) + +type mockMetadataReader struct { + extensions map[string]interface{} +} + +func (m *mockMetadataReader) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { + return nil, nil +} + +func (m *mockMetadataReader) GetExtension(name string) interface{} { + return m.extensions[name] +} + +type mockMetadataWriter struct { + extensions map[string]interface{} +} + +func (m *mockMetadataWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + return nil +} + +func (m *mockMetadataWriter) SetExtension(name string, value interface{}) error { + if m.extensions == nil { + m.extensions = make(map[string]interface{}) + } + m.extensions[name] = value + return nil +} + +func TestExtractMetadata(t *testing.T) { + reader := &mockMetadataReader{ + extensions: map[string]interface{}{ + "test1": "value1", + "test2": "value2", + }, + } + + var v1, v2, v3 string + mapping := map[string]*string{ + "test1": &v1, + "test2": &v2, + "test3": &v3, + } + + err := ExtractMetadata(reader, mapping) + + assert.NoError(t, err) + assert.Equal(t, "value1", v1) + assert.Equal(t, "value2", v2) + assert.Equal(t, "", v3) +} + +func TestAttachMetadata(t *testing.T) { + writer := &mockMetadataWriter{} + mapping := map[string]string{ + "test1": "value1", + "test2": "value2", + "test3": "", + } + + err := AttachMetadata(writer, mapping) + + assert.NoError(t, err) + assert.Equal(t, "value1", writer.extensions["test1"]) + assert.Equal(t, "value2", writer.extensions["test2"]) + _, ok := writer.extensions["test3"] + assert.False(t, ok) +} diff --git a/v2/event/extensions.go b/v2/event/extensions.go index 87e5de572..d2bc52638 100644 --- a/v2/event/extensions.go +++ b/v2/event/extensions.go @@ -9,6 +9,8 @@ import ( "errors" "fmt" "strings" + + "github.com/cloudevents/sdk-go/v2/types" ) const ( @@ -55,3 +57,34 @@ func validateExtensionName(key string) error { } return nil } + +// ExtractExtensions reads multiple extension attributes from an EventReader into the provided mapping. +// It returns true if at least one extension was found and successfully mapped. +func ExtractExtensions[T ~string](reader EventReader, mapping map[string]*T) bool { + found := false + extensions := reader.Extensions() + for name, target := range mapping { + v, ok := extensions[name] + if !ok { + continue + } + s, err := types.ToString(v) + if err != nil { + continue + } + *target = T(s) + found = true + } + return found +} + +// AttachExtensions sets multiple extension attributes on an EventWriter using the provided mapping. +// It skips empty values to ensure only valid data is written. +func AttachExtensions[T ~string](writer EventWriter, mapping map[string]T) { + for name, value := range mapping { + if value == "" { + continue + } + writer.SetExtension(name, string(value)) + } +} diff --git a/v2/event/extensions_test.go b/v2/event/extensions_test.go index 1181a2724..978ededee 100644 --- a/v2/event/extensions_test.go +++ b/v2/event/extensions_test.go @@ -38,3 +38,70 @@ func TestEvent_validateExtensionName(t *testing.T) { }) } } + +func TestExtractExtensions(t *testing.T) { + e := New() + e.SetExtension("test1", "value1") + e.SetExtension("test2", "value2") + + var v1, v2, v3 string + mapping := map[string]*string{ + "test1": &v1, + "test2": &v2, + "test3": &v3, + } + + found := ExtractExtensions(&e, mapping) + + if !found { + t.Errorf("expected found to be true") + } + if v1 != "value1" { + t.Errorf("expected v1 to be value1, got %s", v1) + } + if v2 != "value2" { + t.Errorf("expected v2 to be value2, got %s", v2) + } + if v3 != "" { + t.Errorf("expected v3 to be empty, got %s", v3) + } +} + +func TestExtractExtensions_NotFound(t *testing.T) { + e := New() + + var v1 string + mapping := map[string]*string{ + "test1": &v1, + } + + found := ExtractExtensions(&e, mapping) + + if found { + t.Errorf("expected found to be false") + } + if v1 != "" { + t.Errorf("expected v1 to be empty, got %s", v1) + } +} + +func TestAttachExtensions(t *testing.T) { + e := New() + mapping := map[string]string{ + "test1": "value1", + "test2": "value2", + "test3": "", + } + + AttachExtensions(&e, mapping) + + if e.Extensions()["test1"] != "value1" { + t.Errorf("expected test1 to be value1, got %v", e.Extensions()["test1"]) + } + if e.Extensions()["test2"] != "value2" { + t.Errorf("expected test2 to be value2, got %v", e.Extensions()["test2"]) + } + if _, ok := e.Extensions()["test3"]; ok { + t.Errorf("expected test3 to be missing") + } +} diff --git a/v2/extensions/correlation_extension.go b/v2/extensions/correlation_extension.go new file mode 100644 index 000000000..6876970ce --- /dev/null +++ b/v2/extensions/correlation_extension.go @@ -0,0 +1,64 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package extensions + +import ( + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" +) + +const ( + // CorrelationIDExtension is the CloudEvents extension attribute for correlationid. + CorrelationIDExtension = "correlationid" + // CausationIDExtension is the CloudEvents extension attribute for causationid. + CausationIDExtension = "causationid" +) + +// CorrelationExtension represents the correlation CloudEvents extension. +type CorrelationExtension struct { + // CorrelationID is an identifier that groups related events within the same logical flow or business transaction. + CorrelationID string `json:"correlationid"` + // CausationID is the unique identifier of the event that directly caused this event to be generated. + CausationID string `json:"causationid"` +} + +// AddCorrelationAttributes adds the correlation attributes to the cloudevents context. +func (c CorrelationExtension) AddCorrelationAttributes(e event.EventWriter) { + event.AttachExtensions(e, map[string]string{ + CorrelationIDExtension: c.CorrelationID, + CausationIDExtension: c.CausationID, + }) +} + +// GetCorrelationExtension extracts the correlation extension from the event. +func GetCorrelationExtension(e event.Event) (CorrelationExtension, bool) { + c := CorrelationExtension{} + found := event.ExtractExtensions(e, map[string]*string{ + CorrelationIDExtension: &c.CorrelationID, + CausationIDExtension: &c.CausationID, + }) + return c, found +} + +// ReadTransformer returns a transformer that reads the correlation extension from the message metadata. +func (c *CorrelationExtension) ReadTransformer() binding.TransformerFunc { + return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error { + return binding.ExtractMetadata(reader, map[string]*string{ + CorrelationIDExtension: &c.CorrelationID, + CausationIDExtension: &c.CausationID, + }) + } +} + +// WriteTransformer returns a transformer that writes the correlation extension to the message metadata. +func (c CorrelationExtension) WriteTransformer() binding.TransformerFunc { + return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error { + return binding.AttachMetadata(writer, map[string]string{ + CorrelationIDExtension: c.CorrelationID, + CausationIDExtension: c.CausationID, + }) + } +} diff --git a/v2/extensions/correlation_extension_test.go b/v2/extensions/correlation_extension_test.go new file mode 100644 index 000000000..a015e49a6 --- /dev/null +++ b/v2/extensions/correlation_extension_test.go @@ -0,0 +1,123 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package extensions_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cloudevents/sdk-go/v2/binding" + bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/extensions" + "github.com/cloudevents/sdk-go/v2/test" +) + +func TestCorrelationExtension(t *testing.T) { + testCases := []struct { + name string + extension extensions.CorrelationExtension + expectedEvent func() event.Event + }{ + { + name: "both attributes", + extension: extensions.CorrelationExtension{ + CorrelationID: "corr-1", + CausationID: "caus-1", + }, + expectedEvent: func() event.Event { + e := test.MinEvent() + e.SetExtension("correlationid", "corr-1") + e.SetExtension("causationid", "caus-1") + return e + }, + }, + { + name: "only correlationid", + extension: extensions.CorrelationExtension{ + CorrelationID: "corr-1", + }, + expectedEvent: func() event.Event { + e := test.MinEvent() + e.SetExtension("correlationid", "corr-1") + return e + }, + }, + { + name: "only causationid", + extension: extensions.CorrelationExtension{ + CausationID: "caus-1", + }, + expectedEvent: func() event.Event { + e := test.MinEvent() + e.SetExtension("causationid", "caus-1") + return e + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + e := test.MinEvent() + tc.extension.AddCorrelationAttributes(&e) + + expected := tc.expectedEvent() + require.Equal(t, expected.Extensions(), e.Extensions()) + + // Test GetCorrelationExtension + got, ok := extensions.GetCorrelationExtension(e) + require.True(t, ok) + require.Equal(t, tc.extension, got) + }) + } +} + +func TestCorrelationExtension_GetNotSet(t *testing.T) { + e := test.MinEvent() + _, ok := extensions.GetCorrelationExtension(e) + require.False(t, ok) +} + +func TestCorrelationExtension_ReadTransformer(t *testing.T) { + e := test.MinEvent() + e.SetExtension("correlationid", "corr-1") + e.SetExtension("causationid", "caus-1") + + ext := extensions.CorrelationExtension{} + bindingtest.RunTransformerTests(t, context.TODO(), []bindingtest.TransformerTestArgs{ + { + Name: "Read from Mock Structured message", + InputMessage: bindingtest.MustCreateMockStructuredMessage(t, e), + WantEvent: e, + Transformers: binding.Transformers{ext.ReadTransformer()}, + }, + }) + require.Equal(t, "corr-1", ext.CorrelationID) + require.Equal(t, "caus-1", ext.CausationID) +} + +func TestCorrelationExtension_WriteTransformer(t *testing.T) { + e := test.MinEvent() + ext := extensions.CorrelationExtension{ + CorrelationID: "corr-1", + CausationID: "caus-1", + } + + want := e.Clone() + want.SetExtension("correlationid", "corr-1") + want.SetExtension("causationid", "caus-1") + + bindingtest.RunTransformerTests(t, context.TODO(), []bindingtest.TransformerTestArgs{ + { + Name: "Write to Mock Structured message", + InputMessage: bindingtest.MustCreateMockStructuredMessage(t, e), + WantEvent: want, + Transformers: binding.Transformers{ext.WriteTransformer()}, + }, + }) +} diff --git a/v2/extensions/distributed_tracing_extension.go b/v2/extensions/distributed_tracing_extension.go index 12c2e06fc..77111f40c 100644 --- a/v2/extensions/distributed_tracing_extension.go +++ b/v2/extensions/distributed_tracing_extension.go @@ -6,13 +6,8 @@ package extensions import ( - "reflect" - "strings" - "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/event" - - "github.com/cloudevents/sdk-go/v2/types" ) const ( @@ -29,64 +24,36 @@ type DistributedTracingExtension struct { // AddTracingAttributes adds the tracing attributes traceparent and tracestate to the cloudevents context func (d DistributedTracingExtension) AddTracingAttributes(e event.EventWriter) { if d.TraceParent != "" { - value := reflect.ValueOf(d) - typeOf := value.Type() - - for i := 0; i < value.NumField(); i++ { - k := strings.ToLower(typeOf.Field(i).Name) - v := value.Field(i).Interface() - if k == TraceStateExtension && v == "" { - continue - } - e.SetExtension(k, v) - } + event.AttachExtensions(e, map[string]string{ + TraceParentExtension: d.TraceParent, + TraceStateExtension: d.TraceState, + }) } } -func GetDistributedTracingExtension(event event.Event) (DistributedTracingExtension, bool) { - if tp, ok := event.Extensions()[TraceParentExtension]; ok { - if tpStr, err := types.ToString(tp); err == nil { - var tsStr string - if ts, ok := event.Extensions()[TraceStateExtension]; ok { - tsStr, _ = types.ToString(ts) - } - return DistributedTracingExtension{TraceParent: tpStr, TraceState: tsStr}, true - } - } - return DistributedTracingExtension{}, false +func GetDistributedTracingExtension(e event.Event) (DistributedTracingExtension, bool) { + d := DistributedTracingExtension{} + ok := event.ExtractExtensions(e, map[string]*string{ + TraceParentExtension: &d.TraceParent, + TraceStateExtension: &d.TraceState, + }) + return d, ok } func (d *DistributedTracingExtension) ReadTransformer() binding.TransformerFunc { return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error { - tp := reader.GetExtension(TraceParentExtension) - if tp != nil { - tpFormatted, err := types.Format(tp) - if err != nil { - return err - } - d.TraceParent = tpFormatted - } - ts := reader.GetExtension(TraceStateExtension) - if ts != nil { - tsFormatted, err := types.Format(ts) - if err != nil { - return err - } - d.TraceState = tsFormatted - } - return nil + return binding.ExtractMetadata(reader, map[string]*string{ + TraceParentExtension: &d.TraceParent, + TraceStateExtension: &d.TraceState, + }) } } func (d *DistributedTracingExtension) WriteTransformer() binding.TransformerFunc { return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error { - err := writer.SetExtension(TraceParentExtension, d.TraceParent) - if err != nil { - return nil - } - if d.TraceState != "" { - return writer.SetExtension(TraceStateExtension, d.TraceState) - } - return nil + return binding.AttachMetadata(writer, map[string]string{ + TraceParentExtension: d.TraceParent, + TraceStateExtension: d.TraceState, + }) } } diff --git a/v2/extensions/distributed_tracing_extension_test.go b/v2/extensions/distributed_tracing_extension_test.go index 5b3be63f1..3a9afa361 100644 --- a/v2/extensions/distributed_tracing_extension_test.go +++ b/v2/extensions/distributed_tracing_extension_test.go @@ -265,6 +265,26 @@ func TestDistributedTracingExtension_ReadTransformer(t *testing.T) { } } +func TestGetDistributedTracingExtension(t *testing.T) { + wantExt := extensions.DistributedTracingExtension{ + TraceParent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + TraceState: "rojo=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01,congo=lZWRzIHRoNhcm5hbCBwbGVhc3VyZS4=", + } + e := test.MinEvent() + wantExt.AddTracingAttributes(&e) + + gotExt, ok := extensions.GetDistributedTracingExtension(e) + require.True(t, ok) + require.Equal(t, wantExt, gotExt) +} + +func TestGetDistributedTracingExtension_NotSet(t *testing.T) { + e := test.MinEvent() + gotExt, ok := extensions.GetDistributedTracingExtension(e) + require.False(t, ok) + require.Zero(t, gotExt) +} + func TestDistributedTracingExtension_WriteTransformer(t *testing.T) { e := test.MinEvent() e.Context = e.Context.AsV1()