diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 51b95b7f76..d927f1ac10 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1149,11 +1149,6 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int lp.lggr.Errorw("Failed to poll and save logs, retrying later", "err", err) return } - - if lp.finalityViolated.Load() { - lp.lggr.Info("PollAndSaveLogs completed successfully - removing finality violation flag") - lp.finalityViolated.Store(false) - } } func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) { @@ -1193,6 +1188,11 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int return fmt.Errorf("unable to get current block: %w", err) } currentBlockNumber = currentBlock.Number + // getCurrentBlockMaybeHandleReorg ensured that DB's state matches RPC's, so it's safe to remove finality violation flag if it was set. + if lp.finalityViolated.Load() { + lp.lggr.Info("getCurrentBlockMaybeHandleReorg completed successfully - removing finality violation flag") + lp.finalityViolated.Store(false) + } // backfill finalized blocks if we can for performance. If we crash during backfill, we // may reprocess logs. Log insertion is idempotent so this is ok. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 57b96c3b3f..cce4048d71 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -981,6 +981,112 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { }) } +type mockedLP struct { + ORM *MockORM + Client *clienttest.Client + HeadTracker *headstest.Tracker[*evmtypes.Head, common.Hash] + LP *logPoller +} + +func newMockedLP(t *testing.T, opts Opts) *mockedLP { + t.Helper() + mORM := NewMockORM(t) + ht := headstest.NewTracker[*evmtypes.Head, common.Hash](t) + ec := clienttest.NewClient(t) + return &mockedLP{ + ORM: mORM, + Client: ec, + HeadTracker: ht, + LP: NewLogPoller(mORM, ec, logger.Test(t), ht, opts), + } +} + +// Test_PollAndSaveLogs_FinalityViolationSurvivesTransient documents that +// PollAndSaveLogs only clears finalityViolated after a successful verification that DB's latest block belongs to the canonical chain. +func Test_PollAndSaveLogs_FinalityViolationSurvivesTransient(t *testing.T) { + t.Parallel() + + chainID := testutils.NewRandomEVMChainID() + opts := Opts{PollPeriod: time.Hour, FinalityDepth: 2, BackfillBatchSize: 10, RPCBatchSize: 10} + + assertFinalityViolated := func(t *testing.T, lp *logPoller) { + t.Helper() + require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) + } + + dbBlock := func(n int64) *Block { + h := hashOf(n) + return &Block{ + BlockHash: h, + BlockNumber: n, + BlockTimestamp: time.Unix(n, 0), + FinalizedBlockNumber: n, + SafeBlockNumber: n, + } + } + + expectLatestSafe := func(ht *headstest.Tracker[*evmtypes.Head, common.Hash], latestNum, finalizedNum int64) { + latest := newHead(latestNum) + finalized := newHead(finalizedNum) + ht.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once() + ht.EXPECT().LatestSafeBlock(mock.Anything).Return(finalized, nil).Once() + } + + t.Run("LatestAndFinalizedBlock returns error", func(t *testing.T) { + m := newMockedLP(t, opts) + m.HeadTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(nil, nil, errors.New("temporary failure fetching latest block")).Once() + m.LP.finalityViolated.Store(true) + m.LP.PollAndSaveLogs(t.Context(), 1, false) + assertFinalityViolated(t, m.LP) + }) + + t.Run("LatestSafeBlock returns error", func(t *testing.T) { + m := newMockedLP(t, opts) + finalized := newHead(5) + latest := newHead(16) + m.HeadTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once() + m.HeadTracker.EXPECT().LatestSafeBlock(mock.Anything).Return(nil, errors.New("temporary failure fetching safe block")).Once() + m.LP.finalityViolated.Store(true) + m.LP.PollAndSaveLogs(t.Context(), 1, false) + assertFinalityViolated(t, m.LP) + }) + + t.Run("latest block number lower than current", func(t *testing.T) { + m := newMockedLP(t, opts) + expectLatestSafe(m.HeadTracker, 3, 1) + m.LP.finalityViolated.Store(true) + m.LP.PollAndSaveLogs(t.Context(), 10, false) + assertFinalityViolated(t, m.LP) + }) + + t.Run("getCurrentBlockMaybeHandleReorg fails", func(t *testing.T) { + m := newMockedLP(t, opts) + // headerByNumber runs before any ORM read when currentBlock is nil. + expectLatestSafe(m.HeadTracker, 10, 8) + m.Client.EXPECT().HeadByNumber(mock.Anything, mock.MatchedBy(func(n *big.Int) bool { return n.Int64() == 5 })). + Return(nil, errors.New("rpc head by number failed")).Once() + m.LP.finalityViolated.Store(true) + m.LP.PollAndSaveLogs(t.Context(), 5, false) + assertFinalityViolated(t, m.LP) + }) + + t.Run("happy path clears finality violation", func(t *testing.T) { + m := newMockedLP(t, opts) + m.ORM.EXPECT().SelectBlockByNumber(mock.Anything, int64(9)).Return(dbBlock(9), nil).Once() + m.ORM.EXPECT().InsertLogsWithBlocks(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + expectLatestSafe(m.HeadTracker, 10, 8) + m.Client.EXPECT().ConfiguredChainID().Return(chainID).Maybe() + m.Client.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{ + BlockNumber: 10, + BlockHash: hashOf(10), + Topics: []common.Hash{common.HexToHash("0x100")}, + }}, nil).Once() + m.LP.finalityViolated.Store(true) + m.LP.PollAndSaveLogs(t.Context(), 10, false) + require.NoError(t, m.LP.HealthReport()[m.LP.Name()]) + }) +} + func Test_FindBlockAfterLCA(t *testing.T) { testCases := []struct { Name string