fix: Reset streamer in case of L1 reorg causing batch tx not to go through#1045
fix: Reset streamer in case of L1 reorg causing batch tx not to go through#1045lukeiannucci wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces L1 reorg handling by resetting the Espresso streamer if the batch position moves backwards and adds a mechanism to discard duplicate message positions during transaction parsing. Feedback suggests optimizing the reorg logic to prevent redundant resets during startup and moving the duplicate message check before RLP decoding to enhance efficiency.
| if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() { | ||
| // Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot | ||
| log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount) | ||
| b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx) | ||
| } else { | ||
| b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount)) | ||
| } |
There was a problem hiding this comment.
The current logic performs a reorg check and potential reset on every iteration where a new batch is being built. On startup (b.espressoRestarting == true), this leads to redundant operations: 1. b.espressoStreamer.GetCurrentMessagePos() is initialized to 1, so if batchPosition.MessageCount is 0, it triggers a warning and a reset at line 1861. 2. Then, line 1869 calls the exact same reset function again. 3. If batchPosition.MessageCount > 0, it calls AdvanceTo and then immediately Reset at line 1869, making the AdvanceTo call redundant. Wrapping the reorg check in if !b.espressoRestarting avoids these redundant calls and prevents a false-positive reorg warning on startup.
if !b.espressoRestarting {
if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() {
// Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot
log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount)
b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx)
} else {
b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount))
}
}| if _, exists := s.messageWithMetadataAndPos[indices[i]]; exists { | ||
| log.Warn("duplicate message position, discarding", "pos", indices[i]) | ||
| continue | ||
| } |
There was a problem hiding this comment.
This check for duplicate message positions is a good addition for robustness. However, it would be more efficient to perform this check before the RLP decoding of the message (around line 382), as indices[i] is already available. This would save CPU cycles by avoiding unnecessary decoding for messages that are already in the buffer.
|
✅ All tests successful. No failed tests were found. 📣 Thoughts on this report? Let Codecov know! | Powered by Codecov |
|
Claude finished @lukeiannucci's task in 1m 46s —— View job PR #1045 Review — Reset streamer on L1 reorg
Overall: This PR looks good. The changes are well-structured and address a real failure mode — an L1 reorg causing Summary of changes
What looks good
Minor feedbackI left two inline comments:
No blocking issues found. |
| if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() { | ||
| // Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot | ||
| log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount) | ||
| b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx) |
There was a problem hiding this comment.
nit: The reorg detection compares uint64(batchPosition.MessageCount) against GetCurrentMessagePos(), but the Reset() call passes batchPosition.MessageCount (type arbutil.MessageIndex) — which resetStreamerToParentChainOrConfigHotshotBlock then casts to uint64 internally. This is fine and consistent with the startup path on line 1862, just noting that the types match up.
One thing worth considering: after the reorg reset, the streamer's messageWithMetadataAndPos buffer is fully cleared (line 193 of espresso_streamer.go). If the streamer's background goroutine has already fetched and buffered messages for positions beyond the new currentMessagePos, those will be lost and need to be re-fetched. This is the correct behavior for a reorg scenario, but I want to confirm: is there any risk that the background fetcher could race and re-populate stale data between the GetCurrentMessagePos() check and the Reset() call? Both happen under different lock acquisitions.
The window is small, but in theory:
GetCurrentMessagePos()acquiresRLock, reads pos, releases- Background goroutine acquires
Lock, inserts messages at old positions Reset()(viaresetStreamerToParentChainOrConfigHotshotBlock) acquiresLock, clears everything
Step 3 would clean up step 2's stale data, so this is safe in practice. Just flagging for awareness.
Sneh1999
left a comment
There was a problem hiding this comment.
Can we not create a mock L1 docker image similar to what we did in proxy and use that to do the reorg?
| } | ||
| b.espressoRestarting = false | ||
| } else { | ||
| if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() { |
There was a problem hiding this comment.
what if the MessagePos is after the current message pos and has not yet arrived in the streamer? Will we keep resetting then in this loop?
There was a problem hiding this comment.
I dont quite understand the question. But how it works now, is we only advance the streamer when the batch is posted (its part of this else check). So streamer current position will never be ahead of batch position, only equal to it. So, it will only reset if for some reason the batch position is moved backwards. Does this make sense?
| if err != nil { | ||
| log.Warn("failed to decode message", "err", err) | ||
| // Instead of returnning an error, we should just skip this message | ||
| if _, exists := s.messageWithMetadataAndPos[indices[i]]; exists { |
There was a problem hiding this comment.
what is this code doing? I am not sure I understand
There was a problem hiding this comment.
it just checks if the position is already in the streamer. Without this check it was overwriting an existing message with the later message instead of just taking the first message
Closes #<ISSUE_NUMBER>
This PR:
This PR does not:
Key places to review: