Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions v2/extensions/sequence_extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package extensions

import (
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/types"
)

const SequenceExtensionKey = "sequence"

// SequenceExtension represents the CloudEvents Sequence extension,
// which describes the position of an event in the ordered sequence
// of events produced by a unique event source.
// See https://github.com/cloudevents/spec/blob/main/cloudevents/extensions/sequence.md
// for more info.
type SequenceExtension struct {
Sequence string `json:"sequence"`
}

// AddSequenceExtension sets the sequence extension attribute on the event.
// The value MUST be a non-empty, lexicographically-orderable string.
func AddSequenceExtension(e *event.Event, sequence string) {
if sequence != "" {
e.SetExtension(SequenceExtensionKey, sequence)
}
}

// GetSequenceExtension extracts the sequence extension from a CloudEvent.
// It returns the extension and true if found, or an empty extension and false otherwise.
func GetSequenceExtension(e event.Event) (SequenceExtension, bool) {
if val, ok := e.Extensions()[SequenceExtensionKey]; ok {
if s, err := types.ToString(val); err == nil && s != "" {
return SequenceExtension{Sequence: s}, true
}
}
return SequenceExtension{}, false
}

// ReadTransformer returns a binding.TransformerFunc that reads the sequence
// extension from a message into this SequenceExtension struct.
func (s *SequenceExtension) ReadTransformer() binding.TransformerFunc {
return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error {
val := reader.GetExtension(SequenceExtensionKey)
if val != nil {
formatted, err := types.Format(val)
if err != nil {
return err
}
s.Sequence = formatted
}
return nil
}
}

// WriteTransformer returns a binding.TransformerFunc that writes the sequence
// extension from this SequenceExtension struct into a message.
func (s *SequenceExtension) WriteTransformer() binding.TransformerFunc {
return func(reader binding.MessageMetadataReader, writer binding.MessageMetadataWriter) error {
if s.Sequence != "" {
return writer.SetExtension(SequenceExtensionKey, s.Sequence)
}
return nil
}
}
161 changes: 161 additions & 0 deletions v2/extensions/sequence_extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package extensions_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/cloudevents/sdk-go/v2/binding"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/extensions"
"github.com/cloudevents/sdk-go/v2/test"
)

func TestAddSequenceExtension(t *testing.T) {
e := event.New()
e.SetSource("https://example.com/source")
e.SetType("com.example.test")
e.SetID("123")

extensions.AddSequenceExtension(&e, "001")

got, ok := e.Extensions()[extensions.SequenceExtensionKey]
require.True(t, ok)
require.Equal(t, "001", got)
}

func TestAddSequenceExtension_Empty(t *testing.T) {
e := event.New()
e.SetSource("https://example.com/source")
e.SetType("com.example.test")
e.SetID("123")

extensions.AddSequenceExtension(&e, "")

_, ok := e.Extensions()[extensions.SequenceExtensionKey]
require.False(t, ok)
}

func TestGetSequenceExtension(t *testing.T) {
e := event.New()
e.SetSource("https://example.com/source")
e.SetType("com.example.test")
e.SetID("123")
e.SetExtension(extensions.SequenceExtensionKey, "042")

ext, ok := extensions.GetSequenceExtension(e)
require.True(t, ok)
require.Equal(t, "042", ext.Sequence)
}

func TestGetSequenceExtension_NotPresent(t *testing.T) {
e := event.New()
e.SetSource("https://example.com/source")
e.SetType("com.example.test")
e.SetID("123")

ext, ok := extensions.GetSequenceExtension(e)
require.False(t, ok)
require.Equal(t, "", ext.Sequence)
}

func TestSequenceExtension_ReadTransformer_Empty(t *testing.T) {
e := test.MinEvent()
e.Context = e.Context.AsV1()

tests := []bindingtest.TransformerTestArgs{
{
Name: "Read from Mock Structured message",
InputMessage: bindingtest.MustCreateMockStructuredMessage(t, e),
WantEvent: e,
},
{
Name: "Read from Mock Binary message",
InputMessage: bindingtest.MustCreateMockBinaryMessage(e),
WantEvent: e,
},
{
Name: "Read from Event message",
InputEvent: e,
WantEvent: e,
},
}
for _, tt := range tests {
ext := extensions.SequenceExtension{}
tt.Transformers = binding.Transformers{ext.ReadTransformer()}
bindingtest.RunTransformerTests(t, context.TODO(), []bindingtest.TransformerTestArgs{tt})
require.Zero(t, ext.Sequence)
}
}

func TestSequenceExtension_ReadTransformer(t *testing.T) {
e := test.MinEvent()
e.Context = e.Context.AsV1()
wantExt := extensions.SequenceExtension{
Sequence: "00042",
}
extensions.AddSequenceExtension(&e, wantExt.Sequence)

tests := []bindingtest.TransformerTestArgs{
{
Name: "Read from Mock Structured message",
InputMessage: bindingtest.MustCreateMockStructuredMessage(t, e),
WantEvent: e,
},
{
Name: "Read from Mock Binary message",
InputMessage: bindingtest.MustCreateMockBinaryMessage(e),
WantEvent: e,
},
{
Name: "Read from Event message",
InputEvent: e,
WantEvent: e,
},
}
for _, tt := range tests {
haveExt := extensions.SequenceExtension{}
tt.Transformers = binding.Transformers{haveExt.ReadTransformer()}
bindingtest.RunTransformerTests(t, context.TODO(), []bindingtest.TransformerTestArgs{tt})
require.Equal(t, wantExt.Sequence, haveExt.Sequence)
}
}

func TestSequenceExtension_WriteTransformer(t *testing.T) {
e := test.MinEvent()
e.Context = e.Context.AsV1()

ext := extensions.SequenceExtension{
Sequence: "00042",
}
want := e.Clone()
extensions.AddSequenceExtension(&want, ext.Sequence)

bindingtest.RunTransformerTests(t, context.TODO(), []bindingtest.TransformerTestArgs{
{
Name: "Write to Mock Structured message",
InputMessage: bindingtest.MustCreateMockStructuredMessage(t, e),
WantEvent: want,
Transformers: binding.Transformers{ext.WriteTransformer()},
},
{
Name: "Write to Mock Binary message",
InputMessage: bindingtest.MustCreateMockBinaryMessage(e),
WantEvent: want,
Transformers: binding.Transformers{ext.WriteTransformer()},
},
{
Name: "Write to Event message",
InputEvent: e,
WantEvent: want,
Transformers: binding.Transformers{ext.WriteTransformer()},
},
})
}
Loading