From 382e2a23ae9ec02ea0ced09f0f0a09a86da9bfc3 Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Sat, 18 Apr 2026 22:33:29 +0900 Subject: [PATCH 01/14] perf: shorten event store lock duration --- exporter/event_store.go | 14 ++++++------ exporter/event_store_test.go | 44 ++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/exporter/event_store.go b/exporter/event_store.go index 707d5305fa0..097f24a083d 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -91,21 +91,21 @@ func (e *eventStoreImpl[T]) AddConsumer(consumerID string) { func (e *eventStoreImpl[T]) ProcessPendingEvents( consumerID string, processEventsFunc func(context.Context, []T) error) error { e.mutex.Lock() - defer e.mutex.Unlock() - eventList, err := e.fetchPendingEvents(consumerID) + e.mutex.Unlock() if err != nil { return err } + err = processEventsFunc(context.Background(), eventList.Events) if err != nil { return err } + + e.mutex.Lock() err = e.updateConsumerOffset(consumerID, eventList.NewOffset) - if err != nil { - return err - } - return nil + e.mutex.Unlock() + return err } // GetTotalEventCount returns the total number of events in the store. @@ -177,7 +177,7 @@ func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64 if err != nil { return err } - currentConsumer.Offset = e.lastOffset + currentConsumer.Offset = offset return nil } diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index a15d33aa284..756b8e517e8 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -171,6 +171,7 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { time.Sleep(50 * time.Millisecond) wg := &sync.WaitGroup{} + var countersMu sync.Mutex consumeFunc := func(eventStore exporter.EventStore[testutils.ExportableMockEvent], consumerName string, eventCounters *map[string]int) { defer wg.Done() err := eventStore.ProcessPendingEvents(consumerName, @@ -186,7 +187,9 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { err = eventStore.ProcessPendingEvents(consumerName, func(ctx context.Context, events []testutils.ExportableMockEvent) error { if eventCounters != nil { + countersMu.Lock() (*eventCounters)[consumerName] = len(events) + countersMu.Unlock() } return nil }) @@ -275,6 +278,47 @@ func Test_WaitForEmptyClean(t *testing.T) { assert.Equal(t, int64(0), eventStore.GetTotalEventCount()) } +func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + for i := 0; i < 10; i++ { + eventStore.Add(testutils.NewExportableMockEvent("init")) + } + + processingStarted := make(chan struct{}) + processingDone := make(chan struct{}) + go func() { + _ = eventStore.ProcessPendingEvents(consumerName, + func(ctx context.Context, events []testutils.ExportableMockEvent) error { + close(processingStarted) + time.Sleep(200 * time.Millisecond) + return nil + }) + close(processingDone) + }() + + <-processingStarted + + addDone := make(chan struct{}) + go func() { + eventStore.Add(testutils.NewExportableMockEvent("during-export")) + close(addDone) + }() + + select { + case <-addDone: + case <-time.After(100 * time.Millisecond): + t.Fatal("Add() was blocked while ProcessPendingEvents was running a slow exporter") + } + + <-processingDone +} + func startEventProducer( ctx context.Context, eventStore exporter.EventStore[testutils.ExportableMockEvent], From 6daa6eb3d6c1e4e718b772db72b19b599a8abeb9 Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Sat, 18 Apr 2026 22:33:43 +0900 Subject: [PATCH 02/14] fix: serialize concurrent exporter flushes --- exporter/data_exporter.go | 7 ++++- exporter/data_exporter_test.go | 54 ++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go index 570e2e8f5dd..c55ce38d6e0 100644 --- a/exporter/data_exporter.go +++ b/exporter/data_exporter.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "log/slog" + "sync" "time" "github.com/thomaspoignant/go-feature-flag/utils/fflog" @@ -43,6 +44,7 @@ type dataExporterImpl[T ExportableEvent] struct { daemonChan chan struct{} ticker *time.Ticker + flushMu sync.Mutex } // NewDataExporter create a new DataExporter with the given exporter and his consumer information to consume the data @@ -98,8 +100,11 @@ func (d *dataExporterImpl[T]) Stop() { d.Flush() } -// Flush is sending the data to the exporter +// Flush is sending the data to the exporter. func (d *dataExporterImpl[T]) Flush() { + d.flushMu.Lock() + defer d.flushMu.Unlock() + store := *d.eventStore err := store.ProcessPendingEvents(d.consumerID, d.sendEvents) if err != nil { diff --git a/exporter/data_exporter_test.go b/exporter/data_exporter_test.go index f75583d540b..88a2916eea4 100644 --- a/exporter/data_exporter_test.go +++ b/exporter/data_exporter_test.go @@ -1,10 +1,14 @@ package exporter_test import ( + "context" "fmt" "log/slog" "os" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/thomaspoignant/go-feature-flag/exporter" @@ -14,6 +18,21 @@ import ( "github.com/thomaspoignant/go-feature-flag/utils/fflog" ) +type slowExporter struct { + delay time.Duration + exportCall atomic.Int32 +} + +func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error { + s.exportCall.Add(1) + time.Sleep(s.delay) + return nil +} + +func (s *slowExporter) IsBulk() bool { + return true +} + func TestDataExporterFlush_TriggerError(t *testing.T) { evStore := mock.NewEventStore[exporter.FeatureEvent]() for i := 0; i < 100; i++ { @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) { }) } } + +func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) { + evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration) + evStore.AddConsumer("slow-consumer") + defer evStore.Stop() + + for i := 0; i < 100; i++ { + evStore.Add(exporter.FeatureEvent{Kind: "feature"}) + } + + slow := &slowExporter{delay: 200 * time.Millisecond} + exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{ + Exporter: slow, + FlushInterval: 10 * time.Second, + MaxEventInMemory: 100000, + }, "slow-consumer", &evStore, nil) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + exp.Flush() + }() + time.Sleep(10 * time.Millisecond) + go func() { + defer wg.Done() + exp.Flush() + }() + wg.Wait() + + assert.Equal(t, int32(1), slow.exportCall.Load()) + count, err := evStore.GetPendingEventCount("slow-consumer") + assert.Nil(t, err) + assert.Equal(t, int64(0), count) +} From 67a4e548597f477e94930aed58bfc4375b97efdb Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Sat, 18 Apr 2026 22:38:40 +0900 Subject: [PATCH 03/14] docs: fix typo in exporter stop comment --- exporter/data_exporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go index c55ce38d6e0..ede77a22ec6 100644 --- a/exporter/data_exporter.go +++ b/exporter/data_exporter.go @@ -88,7 +88,7 @@ func (d *dataExporterImpl[T]) Start() { } } -// Stop is flushing the daya and stopping the ticker +// Stop is flushing the data and stopping the ticker func (d *dataExporterImpl[T]) Stop() { // we don't start the daemon if we are not in bulk mode if !d.IsBulk() { @@ -100,7 +100,7 @@ func (d *dataExporterImpl[T]) Stop() { d.Flush() } -// Flush is sending the data to the exporter. +// Flush is sending the data to the exporter func (d *dataExporterImpl[T]) Flush() { d.flushMu.Lock() defer d.flushMu.Unlock() From b0419e5010b48fbd72a6f3921c030c984d396215 Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Sun, 19 Apr 2026 00:15:43 +0900 Subject: [PATCH 04/14] fix: serialize event store processing per consumer --- exporter/event_store.go | 13 +++++++ exporter/event_store_test.go | 74 ++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/exporter/event_store.go b/exporter/event_store.go index 097f24a083d..83a320b2055 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -78,6 +78,7 @@ type EventStoreItem[T ExportableEvent] struct { type consumer struct { Offset int64 + mutex sync.Mutex } // AddConsumer is adding a new consumer to the Event store. @@ -90,6 +91,14 @@ func (e *eventStoreImpl[T]) AddConsumer(consumerID string) { // with the process events function in parameter, func (e *eventStoreImpl[T]) ProcessPendingEvents( consumerID string, processEventsFunc func(context.Context, []T) error) error { + currentConsumer, err := e.getConsumer(consumerID) + if err != nil { + return err + } + + currentConsumer.mutex.Lock() + defer currentConsumer.mutex.Unlock() + e.mutex.Lock() eventList, err := e.fetchPendingEvents(consumerID) e.mutex.Unlock() @@ -97,6 +106,10 @@ func (e *eventStoreImpl[T]) ProcessPendingEvents( return err } + if len(eventList.Events) == 0 { + return nil + } + err = processEventsFunc(context.Background(), eventList.Events) if err != nil { return err diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 756b8e517e8..8765e607ff2 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -319,6 +319,80 @@ func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) { <-processingDone } +func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + for i := 0; i < 10; i++ { + eventStore.Add(testutils.NewExportableMockEvent("init")) + } + + var ( + callbackMu sync.Mutex + batchSizes []int + startedOnce sync.Once + started = make(chan struct{}) + release = make(chan struct{}) + wg sync.WaitGroup + ) + + processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error { + callbackMu.Lock() + batchSizes = append(batchSizes, len(events)) + callbackMu.Unlock() + startedOnce.Do(func() { close(started) }) + <-release + return nil + } + + wg.Add(2) + go func() { + defer wg.Done() + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + }() + <-started + go func() { + defer wg.Done() + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + }() + + close(release) + wg.Wait() + + callbackMu.Lock() + defer callbackMu.Unlock() + assert.Equal(t, []int{10}, batchSizes) + + count, err := eventStore.GetPendingEventCount(consumerName) + assert.Nil(t, err) + assert.Equal(t, int64(0), count) +} + +func Test_ProcessPendingEvents_EmptyBatchSkipsCallback(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + callbackCalled := false + err := eventStore.ProcessPendingEvents( + consumerName, + func(ctx context.Context, events []testutils.ExportableMockEvent) error { + callbackCalled = true + return nil + }, + ) + + assert.Nil(t, err) + assert.False(t, callbackCalled) +} + func startEventProducer( ctx context.Context, eventStore exporter.EventStore[testutils.ExportableMockEvent], From 5ba1427652ed6b66ec4e3b4adde51be5d01b57b9 Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Sun, 19 Apr 2026 00:32:32 +0900 Subject: [PATCH 05/14] test: fix unused context parameter in event store test --- exporter/event_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 8765e607ff2..48cd961fc28 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -340,7 +340,7 @@ func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *test wg sync.WaitGroup ) - processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error { + processFunc := func(_ context.Context, events []testutils.ExportableMockEvent) error { callbackMu.Lock() batchSizes = append(batchSizes, len(events)) callbackMu.Unlock() From 6b3302ecaceb929e52beec96bf2268f08e73fb21 Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Sun, 19 Apr 2026 00:41:10 +0900 Subject: [PATCH 06/14] test: satisfy linter in event store concurrency test --- exporter/event_store_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 48cd961fc28..4087f9cba55 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -340,12 +340,15 @@ func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *test wg sync.WaitGroup ) - processFunc := func(_ context.Context, events []testutils.ExportableMockEvent) error { + processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error { callbackMu.Lock() batchSizes = append(batchSizes, len(events)) callbackMu.Unlock() startedOnce.Do(func() { close(started) }) <-release + if err := ctx.Err(); err != nil { + return err + } return nil } From 7adf54ebb23d2cb833a7ec5b4539483e75a306a3 Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Thu, 23 Apr 2026 09:14:13 +0900 Subject: [PATCH 07/14] fix(exporter): move flush synchronization into event store --- exporter/data_exporter.go | 5 --- exporter/event_store.go | 4 +++ exporter/event_store_test.go | 60 ++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/exporter/data_exporter.go b/exporter/data_exporter.go index ede77a22ec6..7e4a51489fd 100644 --- a/exporter/data_exporter.go +++ b/exporter/data_exporter.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "log/slog" - "sync" "time" "github.com/thomaspoignant/go-feature-flag/utils/fflog" @@ -44,7 +43,6 @@ type dataExporterImpl[T ExportableEvent] struct { daemonChan chan struct{} ticker *time.Ticker - flushMu sync.Mutex } // NewDataExporter create a new DataExporter with the given exporter and his consumer information to consume the data @@ -102,9 +100,6 @@ func (d *dataExporterImpl[T]) Stop() { // Flush is sending the data to the exporter func (d *dataExporterImpl[T]) Flush() { - d.flushMu.Lock() - defer d.flushMu.Unlock() - store := *d.eventStore err := store.ProcessPendingEvents(d.consumerID, d.sendEvents) if err != nil { diff --git a/exporter/event_store.go b/exporter/event_store.go index 83a320b2055..1c65b3aec59 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -84,6 +84,8 @@ type consumer struct { // AddConsumer is adding a new consumer to the Event store. // note that you can't add a consumer after the Event store has been started. func (e *eventStoreImpl[T]) AddConsumer(consumerID string) { + e.mutex.Lock() + defer e.mutex.Unlock() e.consumers[consumerID] = &consumer{Offset: e.lastOffset} } @@ -91,7 +93,9 @@ func (e *eventStoreImpl[T]) AddConsumer(consumerID string) { // with the process events function in parameter, func (e *eventStoreImpl[T]) ProcessPendingEvents( consumerID string, processEventsFunc func(context.Context, []T) error) error { + e.mutex.RLock() currentConsumer, err := e.getConsumer(consumerID) + e.mutex.RUnlock() if err != nil { return err } diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 4087f9cba55..0be879e15f1 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -319,6 +319,66 @@ func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) { <-processingDone } +func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing.T) { + consumerName := "consumer1" + eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( + defaultTestCleanQueueDuration, + ) + eventStore.AddConsumer(consumerName) + defer eventStore.Stop() + + for i := 0; i < 10; i++ { + eventStore.Add(testutils.NewExportableMockEvent("init")) + } + + processingStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + secondCallStarted := make(chan struct{}) + var wg sync.WaitGroup + + processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error { + select { + case <-processingStarted: + default: + close(processingStarted) + } + <-releaseFirst + return nil + } + + wg.Add(2) + go func() { + defer wg.Done() + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + }() + + <-processingStarted + + go func() { + close(secondCallStarted) + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + wg.Done() + }() + + <-secondCallStarted + time.Sleep(20 * time.Millisecond) + + addDone := make(chan struct{}) + go func() { + eventStore.Add(testutils.NewExportableMockEvent("during-queued-flush")) + close(addDone) + }() + + select { + case <-addDone: + case <-time.After(100 * time.Millisecond): + t.Fatal("Add() was blocked while a same-consumer ProcessPendingEvents call was queued") + } + + close(releaseFirst) + wg.Wait() +} + func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *testing.T) { consumerName := "consumer1" eventStore := exporter.NewEventStore[testutils.ExportableMockEvent]( From 3c02665264f15d44685d3d521900f1d30cbec20e Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Thu, 23 Apr 2026 10:15:47 +0900 Subject: [PATCH 08/14] test(exporter): fix unused context parameter --- exporter/event_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 0be879e15f1..0a2a9a5cf1a 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -336,7 +336,7 @@ func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing. secondCallStarted := make(chan struct{}) var wg sync.WaitGroup - processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error { + processFunc := func(_ context.Context, events []testutils.ExportableMockEvent) error { select { case <-processingStarted: default: From 56b926f369dca786a117440f955d7d23e5d5163f Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Thu, 23 Apr 2026 10:29:59 +0900 Subject: [PATCH 09/14] test(exporter): fix unused callback parameter --- exporter/event_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 0a2a9a5cf1a..23a36c1dfc6 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -336,7 +336,7 @@ func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing. secondCallStarted := make(chan struct{}) var wg sync.WaitGroup - processFunc := func(_ context.Context, events []testutils.ExportableMockEvent) error { + processFunc := func(_ context.Context, _ []testutils.ExportableMockEvent) error { select { case <-processingStarted: default: From a1e45cd9d7892ce5bb33693571166a6c83ad2afd Mon Sep 17 00:00:00 2001 From: "elric.lim" Date: Thu, 23 Apr 2026 10:44:45 +0900 Subject: [PATCH 10/14] test(exporter): avoid unparam in queued flush test --- exporter/event_store_test.go | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/exporter/event_store_test.go b/exporter/event_store_test.go index 23a36c1dfc6..ae6a8a40941 100644 --- a/exporter/event_store_test.go +++ b/exporter/event_store_test.go @@ -336,27 +336,37 @@ func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing. secondCallStarted := make(chan struct{}) var wg sync.WaitGroup - processFunc := func(_ context.Context, _ []testutils.ExportableMockEvent) error { - select { - case <-processingStarted: - default: - close(processingStarted) - } - <-releaseFirst - return nil - } - wg.Add(2) go func() { defer wg.Done() - assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, + func(_ context.Context, _ []testutils.ExportableMockEvent) error { + select { + case <-processingStarted: + default: + close(processingStarted) + } + <-releaseFirst + return nil + }, + )) }() <-processingStarted go func() { close(secondCallStarted) - assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc)) + assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, + func(_ context.Context, _ []testutils.ExportableMockEvent) error { + select { + case <-processingStarted: + default: + close(processingStarted) + } + <-releaseFirst + return nil + }, + )) wg.Done() }() From d08e46d1eb8586eba342b410cd92c5f55145a4fd Mon Sep 17 00:00:00 2001 From: hoo Date: Sat, 25 Apr 2026 00:55:55 +0900 Subject: [PATCH 11/14] fix: method signature in subsequent methods in ProcessPendingEvents --- exporter/event_store.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/exporter/event_store.go b/exporter/event_store.go index 1c65b3aec59..07c5aa3cc31 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -104,25 +104,20 @@ func (e *eventStoreImpl[T]) ProcessPendingEvents( defer currentConsumer.mutex.Unlock() e.mutex.Lock() - eventList, err := e.fetchPendingEvents(consumerID) + eventList := e.fetchPendingEvents(currentConsumer) e.mutex.Unlock() - if err != nil { - return err - } if len(eventList.Events) == 0 { return nil } - err = processEventsFunc(context.Background(), eventList.Events) - if err != nil { + if err := processEventsFunc(context.Background(), eventList.Events); err != nil { return err } e.mutex.Lock() - err = e.updateConsumerOffset(consumerID, eventList.NewOffset) - e.mutex.Unlock() - return err + defer e.mutex.Unlock() + return e.updateConsumerOffset(currentConsumer, eventList.NewOffset) } // GetTotalEventCount returns the total number of events in the store. @@ -153,11 +148,7 @@ func (e *eventStoreImpl[T]) Add(data T) { // fetchPendingEvents is returning all the available item in the Event store for this consumer. // WARNING: please call this function only in a function that has locked the mutex first. -func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T], error) { - currentConsumer, err := e.getConsumer(consumerID) - if err != nil { - return nil, err - } +func (e *eventStoreImpl[T]) fetchPendingEvents(currentConsumer *consumer) *EventList[T] { events := make([]T, 0) for _, event := range e.events { if event.Offset > currentConsumer.Offset { @@ -168,7 +159,7 @@ func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T] Events: events, InitialOffset: currentConsumer.Offset, NewOffset: e.lastOffset, - }, nil + } } // getConsumer checks if the consumer exists and returns it. @@ -182,7 +173,7 @@ func (e *eventStoreImpl[T]) getConsumer(consumerID string) (*consumer, error) { // updateConsumerOffset updates the offset of the consumer to the new offset. // WARNING: please call this function only in a function that has locked the mutex first. -func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64) error { +func (e *eventStoreImpl[T]) updateConsumerOffset(currentConsumer *consumer, offset int64) error { if offset > e.lastOffset { return fmt.Errorf( "invalid offset: offset %d is greater than the last offset %d", @@ -190,10 +181,6 @@ func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64 e.lastOffset, ) } - currentConsumer, err := e.getConsumer(consumerID) - if err != nil { - return err - } currentConsumer.Offset = offset return nil } From 8bd2fae6a292c767e41436ec2c65c25245344217 Mon Sep 17 00:00:00 2001 From: Elric Date: Sat, 25 Apr 2026 01:26:32 +0900 Subject: [PATCH 12/14] Update exporter/event_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- exporter/event_store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter/event_store.go b/exporter/event_store.go index 07c5aa3cc31..7df0ff00b85 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -103,9 +103,9 @@ func (e *eventStoreImpl[T]) ProcessPendingEvents( currentConsumer.mutex.Lock() defer currentConsumer.mutex.Unlock() - e.mutex.Lock() - eventList := e.fetchPendingEvents(currentConsumer) - e.mutex.Unlock() +e.mutex.RLock() +eventList := e.fetchPendingEvents(currentConsumer) +e.mutex.RUnlock() if len(eventList.Events) == 0 { return nil From 518104184a752c1cc8353b56583841e9cec29d23 Mon Sep 17 00:00:00 2001 From: hoo Date: Sat, 25 Apr 2026 01:32:00 +0900 Subject: [PATCH 13/14] fix: fmt lint --- exporter/event_store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter/event_store.go b/exporter/event_store.go index 7df0ff00b85..ff6459e3e51 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -103,9 +103,9 @@ func (e *eventStoreImpl[T]) ProcessPendingEvents( currentConsumer.mutex.Lock() defer currentConsumer.mutex.Unlock() -e.mutex.RLock() -eventList := e.fetchPendingEvents(currentConsumer) -e.mutex.RUnlock() + e.mutex.RLock() + eventList := e.fetchPendingEvents(currentConsumer) + e.mutex.RUnlock() if len(eventList.Events) == 0 { return nil From 654bb9a56849fe11b7b1949e504adb490fa8e438 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 29 Apr 2026 16:53:57 +0200 Subject: [PATCH 14/14] Apply suggestion from @thomaspoignant --- exporter/event_store.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/exporter/event_store.go b/exporter/event_store.go index ff6459e3e51..042db30c16c 100644 --- a/exporter/event_store.go +++ b/exporter/event_store.go @@ -76,6 +76,9 @@ type EventStoreItem[T ExportableEvent] struct { Data T } +// consumer serializes ProcessPendingEvents calls for a single consumer. +// Lock ordering: consumer.mutex must always be acquired before eventStoreImpl.mutex. +// Never acquire consumer.mutex while holding eventStoreImpl.mutex. type consumer struct { Offset int64 mutex sync.Mutex