Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ You can grab them and copy-paste in your project to start using sdk-go.
* [Responder](./http/responder): Receive and reply to events using the CloudEvents Client.
* [Sender](./http/sender): Send events using the CloudEvents Client.
* [Sender with retries](./http/sender-retry): Send events, retrying in case of a failure.
* [Correlation Extension](./http/correlation): Send and receive events using the Correlation extension.
* [Receiver & Requester with metrics enabled](./http/metrics): Request events and handle events with metrics enabled.
* Kafka
* [Receiver](./kafka/receiver): Receive events using the CloudEvents Client. To run the tests look at [Kafka samples README](./kafka/README.md).
Expand Down
121 changes: 121 additions & 0 deletions samples/http/correlation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Correlation Sample

This sample demonstrates how to use the `CorrelationExtension` to track event relationships and causality in distributed systems.

## Prerequisites

- Go 1.25.0 or later
- Access to a terminal

## Running the Sample

1. Start the receiver in one terminal:
```bash
go run receiver/main.go
```

2. Start the sender in another terminal:
```bash
go run sender/main.go
```

## Expected Output

### Receiver
The receiver will print the incoming events along with their correlation and causation identifiers:
```
Received Event:
Context Attributes,
specversion: 1.0
type: com.example.order.placed
source: https://example.com/orders
id: order-123
Extensions,
correlationid: txn-abc-123
Data,
{
"customerId": "456",
"orderId": "123"
}
Correlation ID: txn-abc-123
-------------------------------------------------
Received Event:
Context Attributes,
specversion: 1.0
type: com.example.payment.processed
source: https://example.com/payments
id: payment-789
Extensions,
causationid: order-123
correlationid: txn-abc-123
Data,
{
"amount": 150,
"currency": "USD"
}
Correlation ID: txn-abc-123
Causation ID: order-123
-------------------------------------------------
Received Event:
Context Attributes,
specversion: 1.0
type: com.example.inventory.checked
source: https://example.com/inventory
id: inventory-456
Extensions,
causationid: order-123
correlationid: txn-abc-123
Data,
{
"available": true,
"items": ["sku-001", "sku-002"]
}
Correlation ID: txn-abc-123
Causation ID: order-123
-------------------------------------------------
Received Event:
Context Attributes,
specversion: 1.0
type: com.example.shipping.scheduled
source: https://example.com/shipping
id: shipping-012
Extensions,
causationid: inventory-456
correlationid: txn-abc-123
Data,
{
"carrier": "FastShip",
"estimatedDelivery": "2024-01-15"
}
Correlation ID: txn-abc-123
Causation ID: inventory-456
-------------------------------------------------
Received Event:
Context Attributes,
specversion: 1.0
type: com.example.notification.email
source: https://example.com/notifications
id: notify-email-890
Extensions,
causationid: shipping-012
correlationid: txn-abc-123
Data,
{
"recipient": "customer@example.com",
"template": "order-fulfilled"
}
Correlation ID: txn-abc-123
Causation ID: shipping-012
-------------------------------------------------
```

### Sender
The sender will log its activity, showing the causation relationship in a tree format:
```
[Correlation ID: txn-abc-123]
└── ID: order-123 (Order Placed) [202]
├── ID: payment-789 (Payment Processed) [202]
└── ID: inventory-456 (Inventory Checked) [202]
└── ID: shipping-012 (Shipping Scheduled) [202]
└── ID: notify-email-890 (Notification Sent) [202]
```
46 changes: 46 additions & 0 deletions samples/http/correlation/receiver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"fmt"
"log"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/extensions"
)

func main() {
ctx := context.Background()
p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClient(p)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("will listen on :8080\n")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(ctx, receive))
}

func receive(ctx context.Context, event cloudevents.Event) {
fmt.Printf("Received Event:\n%s\n", event)

// Extract the correlation extension
if ext, ok := extensions.GetCorrelationExtension(event); ok {
fmt.Printf("Correlation ID: %s\n", ext.CorrelationID)
if ext.CausationID != "" {
fmt.Printf("Causation ID: %s\n", ext.CausationID)
}
} else {
fmt.Printf("No Correlation Extension found in event\n")
}
fmt.Println("-------------------------------------------------")
}
142 changes: 142 additions & 0 deletions samples/http/correlation/sender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"fmt"
"log"
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/extensions"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func main() {
ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")

p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

// Initial event in a logical flow
correlationID := "txn-abc-123"
fmt.Printf("[Correlation ID: %s]\n", correlationID)

e1 := cloudevents.NewEvent()
e1.SetID("order-123")
e1.SetType("com.example.order.placed")
e1.SetSource("https://example.com/orders")
_ = e1.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"orderId": "123",
"customerId": "456",
})

// Add correlation extension
ext1 := extensions.CorrelationExtension{
CorrelationID: correlationID,
}
ext1.AddCorrelationAttributes(&e1)

send(c, ctx, e1, "└── ", "Order Placed")

// Event B: Payment Processed (triggered by order A)
e2 := cloudevents.NewEvent()
e2.SetID("payment-789")
e2.SetType("com.example.payment.processed")
e2.SetSource("https://example.com/payments")
_ = e2.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"amount": 150.0,
"currency": "USD",
})

ext2 := extensions.CorrelationExtension{
CorrelationID: correlationID,
CausationID: e1.ID(),
}
ext2.AddCorrelationAttributes(&e2)

send(c, ctx, e2, " ├── ", "Payment Processed")

// Event C: Inventory Checked (triggered by order A)
e3 := cloudevents.NewEvent()
e3.SetID("inventory-456")
e3.SetType("com.example.inventory.checked")
e3.SetSource("https://example.com/inventory")
_ = e3.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"items": []string{"sku-001", "sku-002"},
"available": true,
})

ext3 := extensions.CorrelationExtension{
CorrelationID: correlationID,
CausationID: e1.ID(),
}
ext3.AddCorrelationAttributes(&e3)

send(c, ctx, e3, " └── ", "Inventory Checked")

// Event D: Shipping Scheduled (triggered by inventory check C)
e4 := cloudevents.NewEvent()
e4.SetID("shipping-012")
e4.SetType("com.example.shipping.scheduled")
e4.SetSource("https://example.com/shipping")
_ = e4.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"carrier": "FastShip",
"estimatedDelivery": "2024-01-15",
})

ext4 := extensions.CorrelationExtension{
CorrelationID: correlationID,
CausationID: e3.ID(),
}
ext4.AddCorrelationAttributes(&e4)

send(c, ctx, e4, " └── ", "Shipping Scheduled")

// Event E: Notification Sent (triggered by shipping D)
e5 := cloudevents.NewEvent()
e5.SetID("notify-email-890")
e5.SetType("com.example.notification.email")
e5.SetSource("https://example.com/notifications")
_ = e5.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"recipient": "customer@example.com",
"template": "order-fulfilled",
})

ext5 := extensions.CorrelationExtension{
CorrelationID: correlationID,
CausationID: e4.ID(),
}
ext5.AddCorrelationAttributes(&e5)

send(c, ctx, e5, " └── ", "Notification Sent")
}

func send(c cloudevents.Client, ctx context.Context, e cloudevents.Event, prefix string, label string) {
res := c.Send(ctx, e)
if cloudevents.IsUndelivered(res) {
fmt.Printf("%sID: %s (%s) [FAILED: %v]\n", prefix, e.ID(), label, res)
return
}
var httpResult *cehttp.Result
if cloudevents.ResultAs(res, &httpResult) {
status := fmt.Sprintf("%d", httpResult.StatusCode)
if httpResult.StatusCode != http.StatusOK && httpResult.StatusCode != http.StatusAccepted {
status = fmt.Sprintf("FAILED %d: %s", httpResult.StatusCode, fmt.Sprintf(httpResult.Format, httpResult.Args...))
}
fmt.Printf("%sID: %s (%s) [%s]\n", prefix, e.ID(), label, status)
return
}
fmt.Printf("%sID: %s (%s) [%s]\n", prefix, e.ID(), label, res.Error())
}
41 changes: 41 additions & 0 deletions v2/binding/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package binding

import (
"github.com/cloudevents/sdk-go/v2/types"
)

// ExtractMetadata reads metadata extensions from a MessageMetadataReader and maps them to the target pointers.
// It skips empty values to ensure only valid data is written.
func ExtractMetadata[T ~string](reader MessageMetadataReader, mapping map[string]*T) error {
for name, target := range mapping {
v := reader.GetExtension(name)
if v == nil {
continue
}
s, err := types.Format(v)
if err != nil {
return err
}
*target = T(s)
}
return nil
}

// AttachMetadata sets metadata extensions on a MessageMetadataWriter using the provided mapping.
// It skips empty values to ensure only valid data is written.
func AttachMetadata[T ~string](writer MessageMetadataWriter, mapping map[string]T) error {
for name, value := range mapping {
if value == "" {
continue
}
if err := writer.SetExtension(name, value); err != nil {
return err
}
}
return nil
}
Loading