Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,9 +1855,15 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
}
if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount {
if b.espressoStreamer != nil {
b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount))
if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() {
// Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot
log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount)
b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx)
} else {
b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current logic performs a reorg check and potential reset on every iteration where a new batch is being built. On startup (b.espressoRestarting == true), this leads to redundant operations: 1. b.espressoStreamer.GetCurrentMessagePos() is initialized to 1, so if batchPosition.MessageCount is 0, it triggers a warning and a reset at line 1861. 2. Then, line 1869 calls the exact same reset function again. 3. If batchPosition.MessageCount > 0, it calls AdvanceTo and then immediately Reset at line 1869, making the AdvanceTo call redundant. Wrapping the reorg check in if !b.espressoRestarting avoids these redundant calls and prevents a false-positive reorg warning on startup.

			if !b.espressoRestarting {
				if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() {
					// Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot
					log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount)
					b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx)
				} else {
					b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount))
				}
			}

if b.espressoRestarting {
// Reset only once
// Reset on startup
log.Info("resetting streamer to parent chain", "messageCount", batchPosition.MessageCount)
// Fallback. For existing queued batches, we don't have the hotshot block number, so we reset to the parent chain.
b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx)
Expand Down
4 changes: 4 additions & 0 deletions arbnode/espresso_caff_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *MockEspressoStreamer) GetCurrentEarliestHotShotBlockNumber(pos uint64)
return m.currHotShot
}

func (m *MockEspressoStreamer) GetCurrentMessagePos() uint64 {
return m.currPos
}

func (m *MockEspressoStreamer) Start(ctx context.Context) error {
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions espressostreamer/espresso_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type EspressoStreamerInterface interface {
Advance()
// Reset sets the current message position and the next hotshot block number.
Reset(currentMessagePos uint64, currentHostshotBlock uint64)
// GetCurrentMessagePos returns the current message position of the streamer.
GetCurrentMessagePos() uint64
// RecordTimeDurationBetweenHotshotAndCurrentBlock records the time duration between
// the next hotshot block and the current block.
RecordTimeDurationBetweenHotshotAndCurrentBlock(nextHotshotBlock uint64, blockProductionTime time.Time)
Expand Down Expand Up @@ -225,6 +227,12 @@ func (s *EspressoStreamer) Advance() {
s.currentMessagePos += 1
}

func (s *EspressoStreamer) GetCurrentMessagePos() uint64 {
s.messageLock.RLock()
defer s.messageLock.RUnlock()
return s.currentMessagePos
}

func (s *EspressoStreamer) AdvanceTo(toPos uint64) {
s.messageLock.Lock()
defer s.messageLock.Unlock()
Expand Down Expand Up @@ -383,6 +391,11 @@ func (s *EspressoStreamer) parseEspressoTransaction(tx espressoTypes.Bytes, l1He
continue
}

if _, exists := s.messageWithMetadataAndPos[indices[i]]; exists {
log.Warn("duplicate message position, discarding", "pos", indices[i])
continue
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check for duplicate message positions is a good addition for robustness. However, it would be more efficient to perform this check before the RLP decoding of the message (around line 382), as indices[i] is already available. This would save CPU cycles by avoiding unnecessary decoding for messages that are already in the buffer.


msg := &MessageWithMetadataAndPos{
MessageWithMeta: messageWithMetadata,
Pos: indices[i],
Expand Down
Loading