Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/data_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (d *dataExporterImpl[T]) Start() {
}
}

// Stop is flushing the daya and stopping the ticker
// Stop is flushing the data and stopping the ticker
func (d *dataExporterImpl[T]) Stop() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
Expand Down
54 changes: 54 additions & 0 deletions exporter/data_exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package exporter_test

import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/thomaspoignant/go-feature-flag/exporter"
Expand All @@ -14,6 +18,21 @@ import (
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
)

type slowExporter struct {
delay time.Duration
exportCall atomic.Int32
}

func (s *slowExporter) Export(_ context.Context, _ *fflog.FFLogger, _ []exporter.ExportableEvent) error {
s.exportCall.Add(1)
time.Sleep(s.delay)
return nil
}

func (s *slowExporter) IsBulk() bool {
return true
}

func TestDataExporterFlush_TriggerError(t *testing.T) {
evStore := mock.NewEventStore[exporter.FeatureEvent]()
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -133,3 +152,38 @@ func TestDataExporterFlush_TriggerErrorIfExporterFail(t *testing.T) {
})
}
}

func TestDataExporterFlush_ConcurrentFlushNoDuplicate(t *testing.T) {
evStore := exporter.NewEventStore[exporter.FeatureEvent](defaultTestCleanQueueDuration)
evStore.AddConsumer("slow-consumer")
defer evStore.Stop()

for i := 0; i < 100; i++ {
evStore.Add(exporter.FeatureEvent{Kind: "feature"})
}

slow := &slowExporter{delay: 200 * time.Millisecond}
exp := exporter.NewDataExporter[exporter.FeatureEvent](exporter.Config{
Exporter: slow,
FlushInterval: 10 * time.Second,
MaxEventInMemory: 100000,
}, "slow-consumer", &evStore, nil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
exp.Flush()
}()
time.Sleep(10 * time.Millisecond)
go func() {
defer wg.Done()
exp.Flush()
}()
wg.Wait()

assert.Equal(t, int32(1), slow.exportCall.Load())
count, err := evStore.GetPendingEventCount("slow-consumer")
assert.Nil(t, err)
assert.Equal(t, int64(0), count)
}
51 changes: 29 additions & 22 deletions exporter/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,51 @@ type EventStoreItem[T ExportableEvent] struct {
Data T
}

// consumer serializes ProcessPendingEvents calls for a single consumer.
// Lock ordering: consumer.mutex must always be acquired before eventStoreImpl.mutex.
// Never acquire consumer.mutex while holding eventStoreImpl.mutex.
type consumer struct {
Comment thread
thomaspoignant marked this conversation as resolved.
Offset int64
mutex sync.Mutex
}

// AddConsumer is adding a new consumer to the Event store.
// note that you can't add a consumer after the Event store has been started.
func (e *eventStoreImpl[T]) AddConsumer(consumerID string) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.consumers[consumerID] = &consumer{Offset: e.lastOffset}
}

// ProcessPendingEvents is processing all the available item in the Event store for this consumer
// with the process events function in parameter,
func (e *eventStoreImpl[T]) ProcessPendingEvents(
consumerID string, processEventsFunc func(context.Context, []T) error) error {
e.mutex.Lock()
defer e.mutex.Unlock()

eventList, err := e.fetchPendingEvents(consumerID)
e.mutex.RLock()
currentConsumer, err := e.getConsumer(consumerID)
e.mutex.RUnlock()
if err != nil {
return err
}
err = processEventsFunc(context.Background(), eventList.Events)
if err != nil {
return err

currentConsumer.mutex.Lock()
defer currentConsumer.mutex.Unlock()

e.mutex.RLock()
eventList := e.fetchPendingEvents(currentConsumer)
e.mutex.RUnlock()

if len(eventList.Events) == 0 {
return nil
}
err = e.updateConsumerOffset(consumerID, eventList.NewOffset)
if err != nil {

if err := processEventsFunc(context.Background(), eventList.Events); err != nil {
return err
}
return nil

e.mutex.Lock()
defer e.mutex.Unlock()
return e.updateConsumerOffset(currentConsumer, eventList.NewOffset)
Comment thread
thomaspoignant marked this conversation as resolved.
}

// GetTotalEventCount returns the total number of events in the store.
Expand Down Expand Up @@ -136,11 +151,7 @@ func (e *eventStoreImpl[T]) Add(data T) {

// fetchPendingEvents is returning all the available item in the Event store for this consumer.
// WARNING: please call this function only in a function that has locked the mutex first.
func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T], error) {
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return nil, err
}
func (e *eventStoreImpl[T]) fetchPendingEvents(currentConsumer *consumer) *EventList[T] {
events := make([]T, 0)
for _, event := range e.events {
if event.Offset > currentConsumer.Offset {
Expand All @@ -151,7 +162,7 @@ func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T]
Events: events,
InitialOffset: currentConsumer.Offset,
NewOffset: e.lastOffset,
}, nil
}
}

// getConsumer checks if the consumer exists and returns it.
Expand All @@ -165,19 +176,15 @@ func (e *eventStoreImpl[T]) getConsumer(consumerID string) (*consumer, error) {

// updateConsumerOffset updates the offset of the consumer to the new offset.
// WARNING: please call this function only in a function that has locked the mutex first.
func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64) error {
func (e *eventStoreImpl[T]) updateConsumerOffset(currentConsumer *consumer, offset int64) error {
if offset > e.lastOffset {
return fmt.Errorf(
"invalid offset: offset %d is greater than the last offset %d",
offset,
e.lastOffset,
)
}
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return err
}
currentConsumer.Offset = e.lastOffset
currentConsumer.Offset = offset
return nil
}

Expand Down
Loading
Loading