Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,9 +1855,8 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
}
if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount {
if b.espressoStreamer != nil {
b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount))
if b.espressoRestarting {
// Reset only once
// Reset on startup
log.Info("resetting streamer to parent chain", "messageCount", batchPosition.MessageCount)
// Fallback. For existing queued batches, we don't have the hotshot block number, so we reset to the parent chain.
b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx)
Expand All @@ -1878,6 +1877,14 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
log.Info("submitted pending transactions after restart", "from", batchPosition.MessageCount, "count", len(queue))
}
b.espressoRestarting = false
} else {
if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() {
Copy link
Copy Markdown
Collaborator

@Sneh1999 Sneh1999 May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the MessagePos is after the current message pos and has not yet arrived in the streamer? Will we keep resetting then in this loop?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont quite understand the question. But how it works now, is we only advance the streamer when the batch is posted (its part of this else check). So streamer current position will never be ahead of batch position, only equal to it. So, it will only reset if for some reason the batch position is moved backwards. Does this make sense?

// Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot
log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount)
b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The reorg detection compares uint64(batchPosition.MessageCount) against GetCurrentMessagePos(), but the Reset() call passes batchPosition.MessageCount (type arbutil.MessageIndex) — which resetStreamerToParentChainOrConfigHotshotBlock then casts to uint64 internally. This is fine and consistent with the startup path on line 1862, just noting that the types match up.

One thing worth considering: after the reorg reset, the streamer's messageWithMetadataAndPos buffer is fully cleared (line 193 of espresso_streamer.go). If the streamer's background goroutine has already fetched and buffered messages for positions beyond the new currentMessagePos, those will be lost and need to be re-fetched. This is the correct behavior for a reorg scenario, but I want to confirm: is there any risk that the background fetcher could race and re-populate stale data between the GetCurrentMessagePos() check and the Reset() call? Both happen under different lock acquisitions.

The window is small, but in theory:

  1. GetCurrentMessagePos() acquires RLock, reads pos, releases
  2. Background goroutine acquires Lock, inserts messages at old positions
  3. Reset() (via resetStreamerToParentChainOrConfigHotshotBlock) acquires Lock, clears everything

Step 3 would clean up step 2's stale data, so this is safe in practice. Just flagging for awareness.

} else {
b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount))
}
}
}
latestHeader, err := b.l1Reader.LastHeader(ctx)
Expand Down
4 changes: 4 additions & 0 deletions arbnode/espresso_caff_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (m *MockEspressoStreamer) GetCurrentEarliestHotShotBlockNumber(pos uint64)
return m.currHotShot
}

func (m *MockEspressoStreamer) GetCurrentMessagePos() uint64 {
return m.currPos
}

func (m *MockEspressoStreamer) Start(ctx context.Context) error {
return nil
}
Expand Down
23 changes: 18 additions & 5 deletions espressostreamer/espresso_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type EspressoStreamerInterface interface {
Advance()
// Reset sets the current message position and the next hotshot block number.
Reset(currentMessagePos uint64, currentHostshotBlock uint64)
// GetCurrentMessagePos returns the current message position of the streamer.
GetCurrentMessagePos() uint64
// RecordTimeDurationBetweenHotshotAndCurrentBlock records the time duration between
// the next hotshot block and the current block.
RecordTimeDurationBetweenHotshotAndCurrentBlock(nextHotshotBlock uint64, blockProductionTime time.Time)
Expand Down Expand Up @@ -225,6 +227,12 @@ func (s *EspressoStreamer) Advance() {
s.currentMessagePos += 1
}

func (s *EspressoStreamer) GetCurrentMessagePos() uint64 {
s.messageLock.RLock()
defer s.messageLock.RUnlock()
return s.currentMessagePos
}

func (s *EspressoStreamer) AdvanceTo(toPos uint64) {
s.messageLock.Lock()
defer s.messageLock.Unlock()
Expand Down Expand Up @@ -370,11 +378,8 @@ func (s *EspressoStreamer) parseEspressoTransaction(tx espressoTypes.Bytes, l1He
s.messageLock.Lock()
defer s.messageLock.Unlock()
for i, message := range messages {
var messageWithMetadata arbostypes.MessageWithMetadata
err = rlp.DecodeBytes(message, &messageWithMetadata)
if err != nil {
log.Warn("failed to decode message", "err", err)
// Instead of returnning an error, we should just skip this message
if _, exists := s.messageWithMetadataAndPos[indices[i]]; exists {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this code doing? I am not sure I understand

Copy link
Copy Markdown
Author

@lukeiannucci lukeiannucci May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it just checks if the position is already in the streamer. Without this check it was overwriting an existing message with the later message instead of just taking the first message

log.Warn("duplicate message position, discarding", "pos", indices[i])
continue
}
Comment thread
lukeiannucci marked this conversation as resolved.

Expand All @@ -383,6 +388,14 @@ func (s *EspressoStreamer) parseEspressoTransaction(tx espressoTypes.Bytes, l1He
continue
}

var messageWithMetadata arbostypes.MessageWithMetadata
err = rlp.DecodeBytes(message, &messageWithMetadata)
if err != nil {
log.Warn("failed to decode message", "err", err)
// Instead of returnning an error, we should just skip this message
continue
}

msg := &MessageWithMetadataAndPos{
MessageWithMeta: messageWithMetadata,
Pos: indices[i],
Expand Down
Loading