Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/data_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (d *dataExporterImpl[T]) Start() {
}
}

// Stop is flushing the daya and stopping the ticker
// Stop is flushing the data and stopping the ticker
func (d *dataExporterImpl[T]) Stop() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
Expand Down
54 changes: 54 additions & 0 deletions exporter/data_exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package exporter_test

import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/thomaspoignant/go-feature-flag/exporter"
Expand All @@ -14,6 +18,21 @@ import (
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
)

type slowExporter struct {
delay time.Duration
exportCall atomic.Int32
}

func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error {
s.exportCall.Add(1)
time.Sleep(s.delay)
return nil
}

func (s *slowExporter) IsBulk() bool {
return true
}

func TestDataExporterFlush_TriggerError(t *testing.T) {
evStore := mock.NewEventStore[exporter.FeatureEvent]()
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) {
})
}
}

func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) {
evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration)
evStore.AddConsumer("slow-consumer")
defer evStore.Stop()

for i := 0; i < 100; i++ {
evStore.Add(exporter.FeatureEvent{Kind: "feature"})
}

slow := &slowExporter{delay: 200 * time.Millisecond}
exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{
Exporter: slow,
FlushInterval: 10 * time.Second,
MaxEventInMemory: 100000,
}, "slow-consumer", &evStore, nil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
exp.Flush()
}()
time.Sleep(10 * time.Millisecond)
go func() {
defer wg.Done()
exp.Flush()
}()
wg.Wait()

assert.Equal(t, int32(1), slow.exportCall.Load())
count, err := evStore.GetPendingEventCount("slow-consumer")
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}
48 changes: 26 additions & 22 deletions exporter/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,34 +78,46 @@ type EventStoreItem[T ExportableEvent] struct {

type consumer struct {
Comment thread
thomaspoignant marked this conversation as resolved.
Offset int64
mutex sync.Mutex
}

// AddConsumer is adding a new consumer to the Event store.
// note that you can't add a consumer after the Event store has been started.
func (e *eventStoreImpl[T]) AddConsumer(consumerID string) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.consumers[consumerID] = &consumer{Offset: e.lastOffset}
}

// ProcessPendingEvents is processing all the available item in the Event store for this consumer
// with the process events function in parameter,
func (e *eventStoreImpl[T]) ProcessPendingEvents(
consumerID string, processEventsFunc func(context.Context, []T) error) error {
e.mutex.Lock()
defer e.mutex.Unlock()

eventList, err := e.fetchPendingEvents(consumerID)
e.mutex.RLock()
currentConsumer, err := e.getConsumer(consumerID)
e.mutex.RUnlock()
if err != nil {
return err
}
err = processEventsFunc(context.Background(), eventList.Events)
if err != nil {
return err

currentConsumer.mutex.Lock()
defer currentConsumer.mutex.Unlock()

e.mutex.Lock()
eventList := e.fetchPendingEvents(currentConsumer)
e.mutex.Unlock()
Comment thread
hugehoo marked this conversation as resolved.
Outdated

if len(eventList.Events) == 0 {
return nil
}
err = e.updateConsumerOffset(consumerID, eventList.NewOffset)
if err != nil {

if err := processEventsFunc(context.Background(), eventList.Events); err != nil {
return err
}
return nil

e.mutex.Lock()
defer e.mutex.Unlock()
return e.updateConsumerOffset(currentConsumer, eventList.NewOffset)
Comment thread
thomaspoignant marked this conversation as resolved.
}

// GetTotalEventCount returns the total number of events in the store.
Expand Down Expand Up @@ -136,11 +148,7 @@ func (e *eventStoreImpl[T]) Add(data T) {

// fetchPendingEvents is returning all the available item in the Event store for this consumer.
// WARNING: please call this function only in a function that has locked the mutex first.
func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T], error) {
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return nil, err
}
func (e *eventStoreImpl[T]) fetchPendingEvents(currentConsumer *consumer) *EventList[T] {
events := make([]T, 0)
for _, event := range e.events {
if event.Offset > currentConsumer.Offset {
Expand All @@ -151,7 +159,7 @@ func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T]
Events: events,
InitialOffset: currentConsumer.Offset,
NewOffset: e.lastOffset,
}, nil
}
}

// getConsumer checks if the consumer exists and returns it.
Expand All @@ -165,19 +173,15 @@ func (e *eventStoreImpl[T]) getConsumer(consumerID string) (*consumer, error) {

// updateConsumerOffset updates the offset of the consumer to the new offset.
// WARNING: please call this function only in a function that has locked the mutex first.
func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64) error {
func (e *eventStoreImpl[T]) updateConsumerOffset(currentConsumer *consumer, offset int64) error {
if offset > e.lastOffset {
return fmt.Errorf(
"invalid offset: offset %d is greater than the last offset %d",
offset,
e.lastOffset,
)
}
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return err
}
currentConsumer.Offset = e.lastOffset
currentConsumer.Offset = offset
return nil
}

Expand Down
191 changes: 191 additions & 0 deletions exporter/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg := &sync.WaitGroup{}

var countersMu sync.Mutex
consumeFunc := func(eventStore exporter.EventStore[testutils.ExportableMockEvent], consumerName string, eventCounters *map[string]int) {
defer wg.Done()
err := eventStore.ProcessPendingEvents(consumerName,
Expand All @@ -186,7 +187,9 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
err = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
if eventCounters != nil {
countersMu.Lock()
(*eventCounters)[consumerName] = len(events)
countersMu.Unlock()
}
return nil
})
Expand Down Expand Up @@ -275,6 +278,194 @@ func Test_WaitForEmptyClean(t *testing.T) {
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

processingStarted := make(chan struct{})
processingDone := make(chan struct{})
go func() {
_ = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
close(processingStarted)
time.Sleep(200 * time.Millisecond)
return nil
})
close(processingDone)
}()

<-processingStarted

addDone := make(chan struct{})
go func() {
eventStore.Add(testutils.NewExportableMockEvent("during-export"))
close(addDone)
}()

select {
case <-addDone:
case <-time.After(100 * time.Millisecond):
t.Fatal("Add() was blocked while ProcessPendingEvents was running a slow exporter")
}

<-processingDone
}

func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

processingStarted := make(chan struct{})
releaseFirst := make(chan struct{})
secondCallStarted := make(chan struct{})
var wg sync.WaitGroup

wg.Add(2)
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName,
func(_ context.Context, _ []testutils.ExportableMockEvent) error {
select {
case <-processingStarted:
default:
close(processingStarted)
}
<-releaseFirst
return nil
},
))
}()

<-processingStarted

go func() {
close(secondCallStarted)
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName,
func(_ context.Context, _ []testutils.ExportableMockEvent) error {
select {
case <-processingStarted:
default:
close(processingStarted)
}
<-releaseFirst
return nil
},
))
wg.Done()
}()

<-secondCallStarted
time.Sleep(20 * time.Millisecond)

addDone := make(chan struct{})
go func() {
eventStore.Add(testutils.NewExportableMockEvent("during-queued-flush"))
close(addDone)
}()

select {
case <-addDone:
case <-time.After(100 * time.Millisecond):
t.Fatal("Add() was blocked while a same-consumer ProcessPendingEvents call was queued")
}

close(releaseFirst)
wg.Wait()
}

func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

var (
callbackMu sync.Mutex
batchSizes []int
startedOnce sync.Once
started = make(chan struct{})
release = make(chan struct{})
wg sync.WaitGroup
)

processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error {
callbackMu.Lock()
batchSizes = append(batchSizes, len(events))
callbackMu.Unlock()
startedOnce.Do(func() { close(started) })
<-release
if err := ctx.Err(); err != nil {
return err
}
return nil
}

wg.Add(2)
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()
<-started
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()

close(release)
wg.Wait()

callbackMu.Lock()
defer callbackMu.Unlock()
assert.Equal(t, []int{10}, batchSizes)

count, err := eventStore.GetPendingEventCount(consumerName)
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}

func Test_ProcessPendingEvents_EmptyBatchSkipsCallback(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

callbackCalled := false
err := eventStore.ProcessPendingEvents(
consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
callbackCalled = true
return nil
},
)

assert.Nil(t, err)
assert.False(t, callbackCalled)
}

func startEventProducer(
ctx context.Context,
eventStore exporter.EventStore[testutils.ExportableMockEvent],
Expand Down
Loading