diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go index 570e2e8f5dd..7e4a51489fd 100644 --- a/exporter/data_exporter.go +++ b/exporter/data_exporter.go @@ -86,7 +86,7 @@ func (d *dataExporterImpl[T]) Start() { } } -// Stop is flushing the daya and stopping the ticker +// Stop is flushing the data and stopping the ticker func (d *dataExporterImpl[T]) Stop() { // we don't start the daemon if we are not in bulk mode if !d.IsBulk() { diff --git a/exporter/data_exporter_test.go b/exporter/data_exporter_test.go index f75583d540b..88a2916eea4 100644 --- a/exporter/data_exporter_test.go +++ b/exporter/data_exporter_test.go @@ -1,10 +1,14 @@ package exporter_test import ( + "context" "fmt" "log/slog" "os" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/thomaspoignant/go-feature-flag/exporter" @@ -14,6 +18,21 @@ import ( "github.com/thomaspoignant/go-feature-flag/utils/fflog" ) +type slowExporter struct { + delay time.Duration + exportCall atomic.Int32 +} + +func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error { + s.exportCall.Add(1) + time.Sleep(s.delay) + return nil +} + +func (s *slowExporter) IsBulk() bool { + return true +} + func TestDataExporterFlush_TriggerError(t *testing.T) { evStore := mock.NewEventStore[exporter.FeatureEvent]() for i := 0; i < 100; i++ { @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) { }) } } + +func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) { + evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration) + evStore.AddConsumer("slow-consumer") + defer evStore.Stop() + + for i := 0; i < 100; i++ { + evStore.Add(exporter.FeatureEvent{Kind: "feature"}) + } + + slow := &slowExporter{delay: 200 * time.Millisecond} + exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{ + Exporter: slow, + FlushInterval: 10 * time.Second, + MaxEventInMemory: 100000, + }, "slow-consumer", &evStore, nil) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + exp.Flush() + }() + time.Sleep(10 * time.Millisecond) + go func() { + defer wg.Done() + exp.Flush() + }() + wg.Wait() + + assert.Equal(t, int32(1), slow.exportCall.Load()) + count, err := evStore.GetPendingEventCount("slow-consumer") + assert.Nil(t, err) + assert.Equal(t, int64(0), count) +} diff --git a/exporter/event_store.go b/exporter/event_store.go index 707d5305fa0..042db30c16c 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -76,13 +76,19 @@ type EventStoreItem[T ExportableEvent] struct { Data T } +// consumer serializes ProcessPendingEvents calls for a single consumer. +// Lock ordering: consumer.mutex must always be acquired before eventStoreImpl.mutex. +// Never acquire consumer.mutex while holding eventStoreImpl.mutex. type consumer struct { Offset int64 + mutex sync.Mutex } // AddConsumer is adding a new consumer to the Event store. // note that you can't add a consumer after the Event store has been started. func (e *eventStoreImpl[T]) AddConsumer(consumerID string) { + e.mutex.Lock() + defer e.mutex.Unlock() e.consumers[consumerID] = &consumer{Offset: e.lastOffset} } @@ -90,22 +96,31 @@ func (e *eventStoreImpl[T]) AddConsumer(consumerID string) { // with the process events function in parameter, func (e *eventStoreImpl[T]) ProcessPendingEvents( consumerID string, processEventsFunc func(context.Context, []T) error) error { - e.mutex.Lock() - defer e.mutex.Unlock() - - eventList, err := e.fetchPendingEvents(consumerID) + e.mutex.RLock() + currentConsumer, err := e.getConsumer(consumerID) + e.mutex.RUnlock() if err != nil { return err } - err = processEventsFunc(context.Background(), eventList.Events) - if err != nil { - return err + + currentConsumer.mutex.Lock() + defer currentConsumer.mutex.Unlock() + + e.mutex.RLock() + eventList := e.fetchPendingEvents(currentConsumer) + e.mutex.RUnlock() + + if len(eventList.Events) == 0 { + return nil } - err = e.updateConsumerOffset(consumerID, eventList.NewOffset) - if err != nil { + + if err := processEventsFunc(context.Background(), eventList.Events); err != nil { return err } - return nil + + e.mutex.Lock() + defer e.mutex.Unlock() + return e.updateConsumerOffset(currentConsumer, eventList.NewOffset) } // GetTotalEventCount returns the total number of events in the store. @@ -136,11 +151,7 @@ func (e *eventStoreImpl[T]) Add(data T) { // fetchPendingEvents is returning all the available item in the Event store for this consumer. // WARNING: please call this function only in a function that has locked the mutex first. -func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T], error) { - currentConsumer, err := e.getConsumer(consumerID) - if err != nil { - return nil, err - } +func (e *eventStoreImpl[T]) fetchPendingEvents(currentConsumer *consumer) *EventList[T] { events := make([]T, 0) for _, event := range e.events { if event.Offset > currentConsumer.Offset { @@ -151,7 +162,7 @@ func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T] Events: events, InitialOffset: currentConsumer.Offset, NewOffset: e.lastOffset, - }, nil + } } // getConsumer checks if the consumer exists and returns it. @@ -165,7 +176,7 @@ func (e *eventStoreImpl[T]) getConsumer(consumerID string) (*consumer, error) { // updateConsumerOffset updates the offset of the consumer to the new offset. // WARNING: please call this function only in a function that has locked the mutex first. -func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64) error { +func (e *eventStoreImpl[T]) updateConsumerOffset(currentConsumer *consumer, offset int64) error { if offset > e.lastOffset { return fmt.Errorf( "invalid offset: offset %d is greater than the last offset %d", @@ -173,11 +184,7 @@ func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64 e.lastOffset, ) } - currentConsumer, err := e.getConsumer(consumerID) - if err != nil { - return err - } - currentConsumer.Offset = e.lastOffset + currentConsumer.Offset = offset return nil } diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index a15d33aa284..ae6a8a40941 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -171,6 +171,7 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { time.Sleep(50 * time.Millisecond) wg := &sync.WaitGroup{} + var countersMu sync.Mutex consumeFunc := func(eventStore exporter.EventStore[testutils.ExportableMockEvent], consumerName string, eventCounters *map[string]int) { defer wg.Done() err := eventStore.ProcessPendingEvents(consumerName, @@ -186,7 +187,9 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { err = eventStore.ProcessPendingEvents(consumerName, func(ctx context.Context, events []testutils.ExportableMockEvent) error { if eventCounters != nil { + countersMu.Lock() (*eventCounters)[consumerName] = len(events) + countersMu.Unlock() } return nil }) @@ -275,6 +278,194 @@ func Test_WaitForEmptyClean(t *testing.T) { assert.Equal(t, int64(0), eventStore.GetTotalEventCount()) } +func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + for i := 0; i < 10; i++ { + eventStore.Add(testutils.NewExportableMockEvent("init")) + } + + processingStarted := make(chan struct{}) + processingDone := make(chan struct{}) + go func() { + _ = eventStore.ProcessPendingEvents(consumerName, + func(ctx context.Context, events []testutils.ExportableMockEvent) error { + close(processingStarted) + time.Sleep(200 * time.Millisecond) + return nil + }) + close(processingDone) + }() + + <-processingStarted + + addDone := make(chan struct{}) + go func() { + eventStore.Add(testutils.NewExportableMockEvent("during-export")) + close(addDone) + }() + + select { + case <-addDone: + case <-time.After(100 * time.Millisecond): + t.Fatal("Add() was blocked while ProcessPendingEvents was running a slow exporter") + } + + <-processingDone +} + +func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + for i := 0; i < 10; i++ { + eventStore.Add(testutils.NewExportableMockEvent("init")) + } + + processingStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + secondCallStarted := make(chan struct{}) + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer wg.Done() + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, + func(_ context.Context, _ []testutils.ExportableMockEvent) error { + select { + case <-processingStarted: + default: + close(processingStarted) + } + <-releaseFirst + return nil + }, + )) + }() + + <-processingStarted + + go func() { + close(secondCallStarted) + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, + func(_ context.Context, _ []testutils.ExportableMockEvent) error { + select { + case <-processingStarted: + default: + close(processingStarted) + } + <-releaseFirst + return nil + }, + )) + wg.Done() + }() + + <-secondCallStarted + time.Sleep(20 * time.Millisecond) + + addDone := make(chan struct{}) + go func() { + eventStore.Add(testutils.NewExportableMockEvent("during-queued-flush")) + close(addDone) + }() + + select { + case <-addDone: + case <-time.After(100 * time.Millisecond): + t.Fatal("Add() was blocked while a same-consumer ProcessPendingEvents call was queued") + } + + close(releaseFirst) + wg.Wait() +} + +func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + for i := 0; i < 10; i++ { + eventStore.Add(testutils.NewExportableMockEvent("init")) + } + + var ( + callbackMu sync.Mutex + batchSizes []int + startedOnce sync.Once + started = make(chan struct{}) + release = make(chan struct{}) + wg sync.WaitGroup + ) + + processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error { + callbackMu.Lock() + batchSizes = append(batchSizes, len(events)) + callbackMu.Unlock() + startedOnce.Do(func() { close(started) }) + <-release + if err := ctx.Err(); err != nil { + return err + } + return nil + } + + wg.Add(2) + go func() { + defer wg.Done() + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + }() + <-started + go func() { + defer wg.Done() + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + }() + + close(release) + wg.Wait() + + callbackMu.Lock() + defer callbackMu.Unlock() + assert.Equal(t, []int{10}, batchSizes) + + count, err := eventStore.GetPendingEventCount(consumerName) + assert.Nil(t, err) + assert.Equal(t, int64(0), count) +} + +func Test_ProcessPendingEvents_EmptyBatchSkipsCallback(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + callbackCalled := false + err := eventStore.ProcessPendingEvents( + consumerName, + func(ctx context.Context, events []testutils.ExportableMockEvent) error { + callbackCalled = true + return nil + }, + ) + + assert.Nil(t, err) + assert.False(t, callbackCalled) +} + func startEventProducer( ctx context.Context, eventStore exporter.EventStore[testutils.ExportableMockEvent],