Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion exporter/data_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"log/slog"
"sync"
"time"

"github.com/thomaspoignant/go-feature-flag/utils/fflog"
Expand Down Expand Up @@ -43,6 +44,7 @@ type dataExporterImpl[T ExportableEvent] struct {

daemonChan chan struct{}
ticker *time.Ticker
flushMu sync.Mutex
}

// NewDataExporter create a new DataExporter with the given exporter and his consumer information to consume the data
Expand Down Expand Up @@ -86,7 +88,7 @@ func (d *dataExporterImpl[T]) Start() {
}
}

// Stop is flushing the daya and stopping the ticker
// Stop is flushing the data and stopping the ticker
func (d *dataExporterImpl[T]) Stop() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
Expand All @@ -100,6 +102,9 @@ func (d *dataExporterImpl[T]) Stop() {

// Flush is sending the data to the exporter
func (d *dataExporterImpl[T]) Flush() {
d.flushMu.Lock()
defer d.flushMu.Unlock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The flushMu mutex in dataExporterImpl is redundant. The eventStore.ProcessPendingEvents method now implements internal per-consumer synchronization using currentConsumer.mutex. Since each DataExporter instance is associated with a specific consumerID, the event store already ensures that concurrent flushes for the same consumer are serialized. Removing this lock reduces unnecessary overhead.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hugehoo as Gemini is saying this mutex is not needed here.
Can you guide me why you want to add one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added flushMu as a defensive guard because, ProcessPendingEvents() released the store lock before export and only updated the consumer offset afterward. That made concurrent Flush() calls for the same consumer look risky, so I serialized them at the DataExporter layer.
but after saw what Gemini said, I agree this is not the right place to keep that guarantee. The atomicity should live in EventStore, not in DataExporter.


store := *d.eventStore
err := store.ProcessPendingEvents(d.consumerID, d.sendEvents)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions exporter/data_exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package exporter_test

import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/thomaspoignant/go-feature-flag/exporter"
Expand All @@ -14,6 +18,21 @@ import (
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
)

type slowExporter struct {
delay time.Duration
exportCall atomic.Int32
}

func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error {
s.exportCall.Add(1)
time.Sleep(s.delay)
return nil
}

func (s *slowExporter) IsBulk() bool {
return true
}

func TestDataExporterFlush_TriggerError(t *testing.T) {
evStore := mock.NewEventStore[exporter.FeatureEvent]()
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) {
})
}
}

func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) {
evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration)
evStore.AddConsumer("slow-consumer")
defer evStore.Stop()

for i := 0; i < 100; i++ {
evStore.Add(exporter.FeatureEvent{Kind: "feature"})
}

slow := &slowExporter{delay: 200 * time.Millisecond}
exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{
Exporter: slow,
FlushInterval: 10 * time.Second,
MaxEventInMemory: 100000,
}, "slow-consumer", &evStore, nil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
exp.Flush()
}()
time.Sleep(10 * time.Millisecond)
go func() {
defer wg.Done()
exp.Flush()
}()
wg.Wait()

assert.Equal(t, int32(1), slow.exportCall.Load())
count, err := evStore.GetPendingEventCount("slow-consumer")
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}
14 changes: 7 additions & 7 deletions exporter/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,21 @@ func (e *eventStoreImpl[T]) AddConsumer(consumerID string) {
func (e *eventStoreImpl[T]) ProcessPendingEvents(
consumerID string, processEventsFunc func(context.Context, []T) error) error {
e.mutex.Lock()
defer e.mutex.Unlock()

eventList, err := e.fetchPendingEvents(consumerID)
e.mutex.Unlock()
if err != nil {
return err
}

err = processEventsFunc(context.Background(), eventList.Events)
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
if err != nil {
return err
}
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated

e.mutex.Lock()
err = e.updateConsumerOffset(consumerID, eventList.NewOffset)
if err != nil {
return err
}
return nil
e.mutex.Unlock()
return err
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The change to release the global mutex before calling processEventsFunc successfully reduces lock contention for Add() operations. However, it introduces a race condition where multiple concurrent calls to ProcessPendingEvents for the same consumerID can fetch and process the same set of events multiple times (since the offset is only updated after processing).

While the addition of flushMu in DataExporter mitigates this for that specific caller, the EventStore itself is no longer safe for concurrent processing by the same consumer. If the EventStore is intended to be a robust standalone component, consider implementing a per-consumer lock or a "busy" state within eventStoreImpl. This would ensure atomicity for a single consumer's processing while still allowing other consumers and Add() operations to proceed concurrently.


// GetTotalEventCount returns the total number of events in the store.
Expand Down Expand Up @@ -177,7 +177,7 @@ func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64
if err != nil {
return err
}
currentConsumer.Offset = e.lastOffset
currentConsumer.Offset = offset
return nil
}

Expand Down
44 changes: 44 additions & 0 deletions exporter/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg := &sync.WaitGroup{}

var countersMu sync.Mutex
consumeFunc := func(eventStore exporter.EventStore[testutils.ExportableMockEvent], consumerName string, eventCounters *map[string]int) {
defer wg.Done()
err := eventStore.ProcessPendingEvents(consumerName,
Expand All @@ -186,7 +187,9 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
err = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
if eventCounters != nil {
countersMu.Lock()
(*eventCounters)[consumerName] = len(events)
countersMu.Unlock()
}
return nil
})
Expand Down Expand Up @@ -275,6 +278,47 @@ func Test_WaitForEmptyClean(t *testing.T) {
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

processingStarted := make(chan struct{})
processingDone := make(chan struct{})
go func() {
_ = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
close(processingStarted)
time.Sleep(200 * time.Millisecond)
return nil
})
close(processingDone)
}()

<-processingStarted

addDone := make(chan struct{})
go func() {
eventStore.Add(testutils.NewExportableMockEvent("during-export"))
close(addDone)
}()

select {
case <-addDone:
case <-time.After(100 * time.Millisecond):
t.Fatal("Add() was blocked while ProcessPendingEvents was running a slow exporter")
}

<-processingDone
}

func startEventProducer(
ctx context.Context,
eventStore exporter.EventStore[testutils.ExportableMockEvent],
Expand Down
Loading