Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion exporter/data_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"log/slog"
"sync"
"time"

"github.com/thomaspoignant/go-feature-flag/utils/fflog"
Expand Down Expand Up @@ -43,6 +44,7 @@ type dataExporterImpl[T ExportableEvent] struct {

daemonChan chan struct{}
ticker *time.Ticker
flushMu sync.Mutex
}

// NewDataExporter create a new DataExporter with the given exporter and his consumer information to consume the data
Expand Down Expand Up @@ -86,7 +88,7 @@ func (d *dataExporterImpl[T]) Start() {
}
}

// Stop is flushing the daya and stopping the ticker
// Stop is flushing the data and stopping the ticker
func (d *dataExporterImpl[T]) Stop() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
Expand All @@ -100,6 +102,9 @@ func (d *dataExporterImpl[T]) Stop() {

// Flush is sending the data to the exporter
func (d *dataExporterImpl[T]) Flush() {
d.flushMu.Lock()
defer d.flushMu.Unlock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The flushMu mutex in dataExporterImpl is redundant. The eventStore.ProcessPendingEvents method now implements internal per-consumer synchronization using currentConsumer.mutex. Since each DataExporter instance is associated with a specific consumerID, the event store already ensures that concurrent flushes for the same consumer are serialized. Removing this lock reduces unnecessary overhead.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hugehoo as Gemini is saying this mutex is not needed here.
Can you guide me why you want to add one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added flushMu as a defensive guard because, ProcessPendingEvents() released the store lock before export and only updated the consumer offset afterward. That made concurrent Flush() calls for the same consumer look risky, so I serialized them at the DataExporter layer.
but after saw what Gemini said, I agree this is not the right place to keep that guarantee. The atomicity should live in EventStore, not in DataExporter.


store := *d.eventStore
err := store.ProcessPendingEvents(d.consumerID, d.sendEvents)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions exporter/data_exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package exporter_test

import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/thomaspoignant/go-feature-flag/exporter"
Expand All @@ -14,6 +18,21 @@ import (
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
)

type slowExporter struct {
delay time.Duration
exportCall atomic.Int32
}

func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error {
s.exportCall.Add(1)
time.Sleep(s.delay)
return nil
}

func (s *slowExporter) IsBulk() bool {
return true
}

func TestDataExporterFlush_TriggerError(t *testing.T) {
evStore := mock.NewEventStore[exporter.FeatureEvent]()
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) {
})
}
}

func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) {
evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration)
evStore.AddConsumer("slow-consumer")
defer evStore.Stop()

for i := 0; i < 100; i++ {
evStore.Add(exporter.FeatureEvent{Kind: "feature"})
}

slow := &slowExporter{delay: 200 * time.Millisecond}
exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{
Exporter: slow,
FlushInterval: 10 * time.Second,
MaxEventInMemory: 100000,
}, "slow-consumer", &evStore, nil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
exp.Flush()
}()
time.Sleep(10 * time.Millisecond)
go func() {
defer wg.Done()
exp.Flush()
}()
wg.Wait()

assert.Equal(t, int32(1), slow.exportCall.Load())
count, err := evStore.GetPendingEventCount("slow-consumer")
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}
27 changes: 20 additions & 7 deletions exporter/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type EventStoreItem[T ExportableEvent] struct {

type consumer struct {
Comment thread
thomaspoignant marked this conversation as resolved.
Offset int64
mutex sync.Mutex
}

// AddConsumer is adding a new consumer to the Event store.
Expand All @@ -90,22 +91,34 @@ func (e *eventStoreImpl[T]) AddConsumer(consumerID string) {
// with the process events function in parameter,
func (e *eventStoreImpl[T]) ProcessPendingEvents(
consumerID string, processEventsFunc func(context.Context, []T) error) error {
e.mutex.Lock()
defer e.mutex.Unlock()
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return err
}

currentConsumer.mutex.Lock()
defer currentConsumer.mutex.Unlock()

e.mutex.Lock()
eventList, err := e.fetchPendingEvents(consumerID)
e.mutex.Unlock()
if err != nil {
return err
}
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated

if len(eventList.Events) == 0 {
return nil
}

err = processEventsFunc(context.Background(), eventList.Events)
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
if err != nil {
return err
}
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated

e.mutex.Lock()
err = e.updateConsumerOffset(consumerID, eventList.NewOffset)
if err != nil {
return err
}
return nil
e.mutex.Unlock()
return err
}

// GetTotalEventCount returns the total number of events in the store.
Expand Down Expand Up @@ -177,7 +190,7 @@ func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64
if err != nil {
return err
}
currentConsumer.Offset = e.lastOffset
currentConsumer.Offset = offset
return nil
}

Expand Down
121 changes: 121 additions & 0 deletions exporter/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg := &sync.WaitGroup{}

var countersMu sync.Mutex
consumeFunc := func(eventStore exporter.EventStore[testutils.ExportableMockEvent], consumerName string, eventCounters *map[string]int) {
defer wg.Done()
err := eventStore.ProcessPendingEvents(consumerName,
Expand All @@ -186,7 +187,9 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
err = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
if eventCounters != nil {
countersMu.Lock()
(*eventCounters)[consumerName] = len(events)
countersMu.Unlock()
}
return nil
})
Expand Down Expand Up @@ -275,6 +278,124 @@ func Test_WaitForEmptyClean(t *testing.T) {
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

processingStarted := make(chan struct{})
processingDone := make(chan struct{})
go func() {
_ = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
close(processingStarted)
time.Sleep(200 * time.Millisecond)
return nil
})
close(processingDone)
}()

<-processingStarted

addDone := make(chan struct{})
go func() {
eventStore.Add(testutils.NewExportableMockEvent("during-export"))
close(addDone)
}()

select {
case <-addDone:
case <-time.After(100 * time.Millisecond):
t.Fatal("Add() was blocked while ProcessPendingEvents was running a slow exporter")
}

<-processingDone
}

func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

var (
callbackMu sync.Mutex
batchSizes []int
startedOnce sync.Once
started = make(chan struct{})
release = make(chan struct{})
wg sync.WaitGroup
)

processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error {
callbackMu.Lock()
batchSizes = append(batchSizes, len(events))
callbackMu.Unlock()
startedOnce.Do(func() { close(started) })
<-release
if err := ctx.Err(); err != nil {
return err
}
return nil
}

wg.Add(2)
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()
<-started
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()

close(release)
wg.Wait()

callbackMu.Lock()
defer callbackMu.Unlock()
assert.Equal(t, []int{10}, batchSizes)

count, err := eventStore.GetPendingEventCount(consumerName)
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}

func Test_ProcessPendingEvents_EmptyBatchSkipsCallback(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

callbackCalled := false
err := eventStore.ProcessPendingEvents(
consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
callbackCalled = true
return nil
},
)

assert.Nil(t, err)
assert.False(t, callbackCalled)
}

func startEventProducer(
ctx context.Context,
eventStore exporter.EventStore[testutils.ExportableMockEvent],
Expand Down
Loading