diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 717f7c31ea2..39374f2f8e8 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -1855,9 +1855,8 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error) } if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount { if b.espressoStreamer != nil { - b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount)) if b.espressoRestarting { - // Reset only once + // Reset on startup log.Info("resetting streamer to parent chain", "messageCount", batchPosition.MessageCount) // Fallback. For existing queued batches, we don't have the hotshot block number, so we reset to the parent chain. b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx) @@ -1878,6 +1877,14 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error) log.Info("submitted pending transactions after restart", "from", batchPosition.MessageCount, "count", len(queue)) } b.espressoRestarting = false + } else { + if uint64(batchPosition.MessageCount) < b.espressoStreamer.GetCurrentMessagePos() { + // Batch position moved backwards probably due to l1 reorg, try and resync the blocks from hotshot + log.Warn("resetting espresso streamer to parent chain (L1 reorg?)", "messageCount", batchPosition.MessageCount) + b.resetStreamerToParentChainOrConfigHotshotBlock(batchPosition.MessageCount, ctx) + } else { + b.espressoStreamer.AdvanceTo(uint64(batchPosition.MessageCount)) + } } } latestHeader, err := b.l1Reader.LastHeader(ctx) diff --git a/arbnode/espresso_caff_node_test.go b/arbnode/espresso_caff_node_test.go index 564a898f90b..d7264f34cdb 100644 --- a/arbnode/espresso_caff_node_test.go +++ b/arbnode/espresso_caff_node_test.go @@ -50,6 +50,10 @@ func (m *MockEspressoStreamer) GetCurrentEarliestHotShotBlockNumber(pos uint64) return m.currHotShot } +func (m *MockEspressoStreamer) GetCurrentMessagePos() uint64 { + return m.currPos +} + func (m *MockEspressoStreamer) Start(ctx context.Context) error { return nil } diff --git a/espressostreamer/espresso_streamer.go b/espressostreamer/espresso_streamer.go index 56de1d42e7d..15a54675678 100644 --- a/espressostreamer/espresso_streamer.go +++ b/espressostreamer/espresso_streamer.go @@ -44,6 +44,8 @@ type EspressoStreamerInterface interface { Advance() // Reset sets the current message position and the next hotshot block number. Reset(currentMessagePos uint64, currentHostshotBlock uint64) + // GetCurrentMessagePos returns the current message position of the streamer. + GetCurrentMessagePos() uint64 // RecordTimeDurationBetweenHotshotAndCurrentBlock records the time duration between // the next hotshot block and the current block. RecordTimeDurationBetweenHotshotAndCurrentBlock(nextHotshotBlock uint64, blockProductionTime time.Time) @@ -225,6 +227,12 @@ func (s *EspressoStreamer) Advance() { s.currentMessagePos += 1 } +func (s *EspressoStreamer) GetCurrentMessagePos() uint64 { + s.messageLock.RLock() + defer s.messageLock.RUnlock() + return s.currentMessagePos +} + func (s *EspressoStreamer) AdvanceTo(toPos uint64) { s.messageLock.Lock() defer s.messageLock.Unlock() @@ -370,11 +378,8 @@ func (s *EspressoStreamer) parseEspressoTransaction(tx espressoTypes.Bytes, l1He s.messageLock.Lock() defer s.messageLock.Unlock() for i, message := range messages { - var messageWithMetadata arbostypes.MessageWithMetadata - err = rlp.DecodeBytes(message, &messageWithMetadata) - if err != nil { - log.Warn("failed to decode message", "err", err) - // Instead of returnning an error, we should just skip this message + if _, exists := s.messageWithMetadataAndPos[indices[i]]; exists { + log.Warn("duplicate message position, discarding", "pos", indices[i]) continue } @@ -383,6 +388,14 @@ func (s *EspressoStreamer) parseEspressoTransaction(tx espressoTypes.Bytes, l1He continue } + var messageWithMetadata arbostypes.MessageWithMetadata + err = rlp.DecodeBytes(message, &messageWithMetadata) + if err != nil { + log.Warn("failed to decode message", "err", err) + // Instead of returnning an error, we should just skip this message + continue + } + msg := &MessageWithMetadataAndPos{ MessageWithMeta: messageWithMetadata, Pos: indices[i],