fix(exporter): reduce mutex hold time in process pending events#5134
fix(exporter): reduce mutex hold time in process pending events#5134hugehoo wants to merge 16 commits intothomaspoignant:mainfrom
Conversation
✅ Deploy Preview for go-feature-flag-doc-preview canceled.
|
There was a problem hiding this comment.
Code Review
This pull request improves concurrency by reducing lock contention in the EventStore and adding a mutex to prevent concurrent flushes in the DataExporter. It also fixes a bug in updateConsumerOffset where the global last offset was incorrectly used instead of the consumer-specific offset. Feedback suggests addressing a potential race condition in EventStore when the same consumer processes events concurrently and recommends an optimization to return early if no events are found during processing.
| e.mutex.Lock() | ||
| defer e.mutex.Unlock() | ||
|
|
||
| eventList, err := e.fetchPendingEvents(consumerID) | ||
| e.mutex.Unlock() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = processEventsFunc(context.Background(), eventList.Events) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| e.mutex.Lock() | ||
| err = e.updateConsumerOffset(consumerID, eventList.NewOffset) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| e.mutex.Unlock() | ||
| return err | ||
| } |
There was a problem hiding this comment.
The change to release the global mutex before calling processEventsFunc successfully reduces lock contention for Add() operations. However, it introduces a race condition where multiple concurrent calls to ProcessPendingEvents for the same consumerID can fetch and process the same set of events multiple times (since the offset is only updated after processing).
While the addition of flushMu in DataExporter mitigates this for that specific caller, the EventStore itself is no longer safe for concurrent processing by the same consumer. If the EventStore is intended to be a robust standalone component, consider implementing a per-consumer lock or a "busy" state within eventStoreImpl. This would ensure atomicity for a single consumer's processing while still allowing other consumers and Add() operations to proceed concurrently.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5134 +/- ##
==========================================
+ Coverage 85.81% 85.89% +0.07%
==========================================
Files 153 153
Lines 6558 6558
==========================================
+ Hits 5628 5633 +5
+ Misses 699 696 -3
+ Partials 231 229 -2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces more granular locking in the event exporter to prevent slow export operations from blocking the addition of new events. Key changes include the addition of a per-consumer mutex in the event store and a refactor of the ProcessPendingEvents method to release the global lock during event processing. Feedback highlights a data race in the consumer lookup within ProcessPendingEvents that needs to be addressed by holding the global mutex during the lookup. Additionally, the flushMu in DataExporter is now redundant as the event store handles per-consumer synchronization internally.
| d.flushMu.Lock() | ||
| defer d.flushMu.Unlock() |
There was a problem hiding this comment.
The flushMu mutex in dataExporterImpl is redundant. The eventStore.ProcessPendingEvents method now implements internal per-consumer synchronization using currentConsumer.mutex. Since each DataExporter instance is associated with a specific consumerID, the event store already ensures that concurrent flushes for the same consumer are serialized. Removing this lock reduces unnecessary overhead.
There was a problem hiding this comment.
@hugehoo as Gemini is saying this mutex is not needed here.
Can you guide me why you want to add one?
There was a problem hiding this comment.
I added flushMu as a defensive guard because, ProcessPendingEvents() released the store lock before export and only updated the consumer offset afterward. That made concurrent Flush() calls for the same consumer look risky, so I serialized them at the DataExporter layer.
but after saw what Gemini said, I agree this is not the right place to keep that guarantee. The atomicity should live in EventStore, not in DataExporter.
thomaspoignant
left a comment
There was a problem hiding this comment.
@hugehoo the different review from gemini help to understand that while you improve the add function speed we are not able to assure atomicity per consumer which can be an issue.
I am happy to discuss about the implementation and your ideas to improve it.
| d.flushMu.Lock() | ||
| defer d.flushMu.Unlock() |
There was a problem hiding this comment.
@hugehoo as Gemini is saying this mutex is not needed here.
Can you guide me why you want to add one?
thanks for the review! I updated commit as you and Gemini's feedbacks. |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the EventStore to use per-consumer locking, ensuring that slow event exports do not block the addition of new events and preventing duplicate processing during concurrent flushes. It also fixes a bug where consumer offsets were incorrectly updated and adds several tests to verify the new concurrency behavior. Feedback suggests updating the ProcessPendingEvents signature to accept and propagate a context.Context for better management of long-running I/O operations.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request improves the concurrency model of the event store by introducing per-consumer mutexes and fine-grained locking in ProcessPendingEvents. It also fixes a bug in updateConsumerOffset where the wrong offset was being applied and adds comprehensive tests for concurrent scenarios. Review feedback suggests refactoring internal methods to use consumer pointers directly to avoid redundant lookups and potential race conditions if a consumer is re-added during processing. Additionally, it was noted that ProcessPendingEvents should ideally propagate a context to allow for the cancellation of long-running export operations.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces fine-grained locking in the EventStore to prevent event processing from blocking the addition of new events. Key changes include adding a per-consumer mutex, refactoring ProcessPendingEvents to use more granular locking, and updating internal methods to work with consumer pointers. The PR also adds several tests to verify concurrent behavior and fixes a typo in the documentation. The review feedback suggests further improving performance by using read locks instead of write locks when fetching events and updating offsets, as these operations do not modify the global store state.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
thomaspoignant
left a comment
There was a problem hiding this comment.
@hugehoo thanks a lot for this update, this is a really good one 🙇.
I will merge this PR and it will be part of the next version.
|



Description
This PR reduces lock contention in the exporter event store while preserving event delivery guarantees for concurrent flushes.
What was the problem?
ProcessPendingEvents()held the event store mutex while runningprocessEventsFunc, which can perform slow I/O such as exporter uploads.Add()could be blocked unnecessarily.lastOffsetinstead of the offset passed intoupdateConsumerOffset().Flush()calls needed to stay safe after shortening the event store lock scope.How is it resolved?
ProcessPendingEvents()now:processEventsFunc,updateConsumerOffset()now uses the providedoffsetargument instead of always writinglastOffset.DataExporter.Flush()now serializes concurrent flushes with a mutex so concurrent calls do not double-process or lose events.Add()not being blocked by slow pending-event processing,event_store_test.go.Closes issue(s)
Resolve #5133
Checklist
README.mdand/website/docs)