Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/data_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (d *dataExporterImpl[T]) Start() {
}
}

// Stop is flushing the daya and stopping the ticker
// Stop is flushing the data and stopping the ticker
func (d *dataExporterImpl[T]) Stop() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
Expand Down
54 changes: 54 additions & 0 deletions exporter/data_exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package exporter_test

import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/thomaspoignant/go-feature-flag/exporter"
Expand All @@ -14,6 +18,21 @@ import (
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
)

type slowExporter struct {
delay time.Duration
exportCall atomic.Int32
}

func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error {
s.exportCall.Add(1)
time.Sleep(s.delay)
return nil
}

func (s *slowExporter) IsBulk() bool {
return true
}

func TestDataExporterFlush_TriggerError(t *testing.T) {
evStore := mock.NewEventStore[exporter.FeatureEvent]()
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) {
})
}
}

func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) {
evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration)
evStore.AddConsumer("slow-consumer")
defer evStore.Stop()

for i := 0; i < 100; i++ {
evStore.Add(exporter.FeatureEvent{Kind: "feature"})
}

slow := &slowExporter{delay: 200 * time.Millisecond}
exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{
Exporter: slow,
FlushInterval: 10 * time.Second,
MaxEventInMemory: 100000,
}, "slow-consumer", &evStore, nil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
exp.Flush()
}()
time.Sleep(10 * time.Millisecond)
go func() {
defer wg.Done()
exp.Flush()
}()
wg.Wait()

assert.Equal(t, int32(1), slow.exportCall.Load())
count, err := evStore.GetPendingEventCount("slow-consumer")
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}
31 changes: 24 additions & 7 deletions exporter/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,34 +78,51 @@ type EventStoreItem[T ExportableEvent] struct {

type consumer struct {
Comment thread
thomaspoignant marked this conversation as resolved.
Offset int64
mutex sync.Mutex
}

// AddConsumer is adding a new consumer to the Event store.
// note that you can't add a consumer after the Event store has been started.
func (e *eventStoreImpl[T]) AddConsumer(consumerID string) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.consumers[consumerID] = &consumer{Offset: e.lastOffset}
}

// ProcessPendingEvents is processing all the available item in the Event store for this consumer
// with the process events function in parameter,
func (e *eventStoreImpl[T]) ProcessPendingEvents(
consumerID string, processEventsFunc func(context.Context, []T) error) error {
e.mutex.Lock()
defer e.mutex.Unlock()
e.mutex.RLock()
currentConsumer, err := e.getConsumer(consumerID)
e.mutex.RUnlock()
if err != nil {
return err
}

currentConsumer.mutex.Lock()
defer currentConsumer.mutex.Unlock()

e.mutex.Lock()
eventList, err := e.fetchPendingEvents(consumerID)
e.mutex.Unlock()
if err != nil {
return err
}

if len(eventList.Events) == 0 {
return nil
}

err = processEventsFunc(context.Background(), eventList.Events)
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
if err != nil {
return err
}
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated

e.mutex.Lock()
err = e.updateConsumerOffset(consumerID, eventList.NewOffset)
if err != nil {
return err
}
return nil
e.mutex.Unlock()
Comment thread
thomaspoignant marked this conversation as resolved.
Outdated
return err
}

// GetTotalEventCount returns the total number of events in the store.
Expand Down Expand Up @@ -177,7 +194,7 @@ func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64
if err != nil {
return err
}
currentConsumer.Offset = e.lastOffset
currentConsumer.Offset = offset
return nil
}

Expand Down
181 changes: 181 additions & 0 deletions exporter/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
time.Sleep(50 * time.Millisecond)
wg := &sync.WaitGroup{}

var countersMu sync.Mutex
consumeFunc := func(eventStore exporter.EventStore[testutils.ExportableMockEvent], consumerName string, eventCounters *map[string]int) {
defer wg.Done()
err := eventStore.ProcessPendingEvents(consumerName,
Expand All @@ -186,7 +187,9 @@
err = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
if eventCounters != nil {
countersMu.Lock()
(*eventCounters)[consumerName] = len(events)
countersMu.Unlock()
}
return nil
})
Expand Down Expand Up @@ -275,6 +278,184 @@
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func Test_ProcessPendingEvents_DoesNotBlockAdd(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

processingStarted := make(chan struct{})
processingDone := make(chan struct{})
go func() {
_ = eventStore.ProcessPendingEvents(consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
close(processingStarted)
time.Sleep(200 * time.Millisecond)
return nil
})
close(processingDone)
}()

<-processingStarted

addDone := make(chan struct{})
go func() {
eventStore.Add(testutils.NewExportableMockEvent("during-export"))
close(addDone)
}()

select {
case <-addDone:
case <-time.After(100 * time.Millisecond):
t.Fatal("Add() was blocked while ProcessPendingEvents was running a slow exporter")
}

<-processingDone
}

func Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

processingStarted := make(chan struct{})
releaseFirst := make(chan struct{})
secondCallStarted := make(chan struct{})
var wg sync.WaitGroup

processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error {

Check failure on line 339 in exporter/event_store_test.go

View workflow job for this annotation

GitHub Actions / Lint

Test_ProcessPendingEvents_QueuedSameConsumerCallDoesNotBlockAdd$1 - ctx is unused (unparam)
select {
case <-processingStarted:
default:
close(processingStarted)
}
<-releaseFirst
return nil
}

wg.Add(2)
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()

<-processingStarted

go func() {
close(secondCallStarted)
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
wg.Done()
}()

<-secondCallStarted
time.Sleep(20 * time.Millisecond)

addDone := make(chan struct{})
go func() {
eventStore.Add(testutils.NewExportableMockEvent("during-queued-flush"))
close(addDone)
}()

select {
case <-addDone:
case <-time.After(100 * time.Millisecond):
t.Fatal("Add() was blocked while a same-consumer ProcessPendingEvents call was queued")
}

close(releaseFirst)
wg.Wait()
}

func Test_ProcessPendingEvents_SameConsumerConcurrentCallsDoNotDuplicate(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

for i := 0; i < 10; i++ {
eventStore.Add(testutils.NewExportableMockEvent("init"))
}

var (
callbackMu sync.Mutex
batchSizes []int
startedOnce sync.Once
started = make(chan struct{})
release = make(chan struct{})
wg sync.WaitGroup
)

processFunc := func(ctx context.Context, events []testutils.ExportableMockEvent) error {
callbackMu.Lock()
batchSizes = append(batchSizes, len(events))
callbackMu.Unlock()
startedOnce.Do(func() { close(started) })
<-release
if err := ctx.Err(); err != nil {
return err
}
return nil
}

wg.Add(2)
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()
<-started
go func() {
defer wg.Done()
assert.Nil(t, eventStore.ProcessPendingEvents(consumerName, processFunc))
}()

close(release)
wg.Wait()

callbackMu.Lock()
defer callbackMu.Unlock()
assert.Equal(t, []int{10}, batchSizes)

count, err := eventStore.GetPendingEventCount(consumerName)
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}

func Test_ProcessPendingEvents_EmptyBatchSkipsCallback(t *testing.T) {
consumerName := "consumer1"
eventStore := exporter.NewEventStore[testutils.ExportableMockEvent](
defaultTestCleanQueueDuration,
)
eventStore.AddConsumer(consumerName)
defer eventStore.Stop()

callbackCalled := false
err := eventStore.ProcessPendingEvents(
consumerName,
func(ctx context.Context, events []testutils.ExportableMockEvent) error {
callbackCalled = true
return nil
},
)

assert.Nil(t, err)
assert.False(t, callbackCalled)
}

func startEventProducer(
ctx context.Context,
eventStore exporter.EventStore[testutils.ExportableMockEvent],
Expand Down
Loading