Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,11 +1149,6 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
lp.lggr.Errorw("Failed to poll and save logs, retrying later", "err", err)
return
}

if lp.finalityViolated.Load() {
lp.lggr.Info("PollAndSaveLogs completed successfully - removing finality violation flag")
lp.finalityViolated.Store(false)
}
}

func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) {
Expand Down Expand Up @@ -1193,6 +1188,11 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
return fmt.Errorf("unable to get current block: %w", err)
}
currentBlockNumber = currentBlock.Number
// getCurrentBlockMaybeHandleReorg ensured that DB's state matches RPC's, so it's safe to remove finality violation flag if it was set.
if lp.finalityViolated.Load() {
Comment thread
dhaidashenko marked this conversation as resolved.
lp.lggr.Info("getCurrentBlockMaybeHandleReorg completed successfully - removing finality violation flag")
lp.finalityViolated.Store(false)
}

// backfill finalized blocks if we can for performance. If we crash during backfill, we
// may reprocess logs. Log insertion is idempotent so this is ok.
Expand Down
106 changes: 106 additions & 0 deletions pkg/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,112 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
})
}

type mockedLP struct {
ORM *MockORM
Client *clienttest.Client
HeadTracker *headstest.Tracker[*evmtypes.Head, common.Hash]
LP *logPoller
}

func newMockedLP(t *testing.T, opts Opts) *mockedLP {
t.Helper()
mORM := NewMockORM(t)
ht := headstest.NewTracker[*evmtypes.Head, common.Hash](t)
ec := clienttest.NewClient(t)
return &mockedLP{
ORM: mORM,
Client: ec,
HeadTracker: ht,
LP: NewLogPoller(mORM, ec, logger.Test(t), ht, opts),
}
}

// Test_PollAndSaveLogs_FinalityViolationSurvivesTransient documents that
// PollAndSaveLogs only clears finalityViolated after a successful verification that DB's latest block belongs to the canonical chain.
func Test_PollAndSaveLogs_FinalityViolationSurvivesTransient(t *testing.T) {
Comment thread
dhaidashenko marked this conversation as resolved.
t.Parallel()

chainID := testutils.NewRandomEVMChainID()
opts := Opts{PollPeriod: time.Hour, FinalityDepth: 2, BackfillBatchSize: 10, RPCBatchSize: 10}

assertFinalityViolated := func(t *testing.T, lp *logPoller) {
t.Helper()
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
}

dbBlock := func(n int64) *Block {
h := hashOf(n)
return &Block{
BlockHash: h,
BlockNumber: n,
BlockTimestamp: time.Unix(n, 0),
FinalizedBlockNumber: n,
SafeBlockNumber: n,
}
}

expectLatestSafe := func(ht *headstest.Tracker[*evmtypes.Head, common.Hash], latestNum, finalizedNum int64) {
latest := newHead(latestNum)
finalized := newHead(finalizedNum)
ht.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once()
ht.EXPECT().LatestSafeBlock(mock.Anything).Return(finalized, nil).Once()
}

t.Run("LatestAndFinalizedBlock returns error", func(t *testing.T) {
m := newMockedLP(t, opts)
m.HeadTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(nil, nil, errors.New("temporary failure fetching latest block")).Once()
m.LP.finalityViolated.Store(true)
m.LP.PollAndSaveLogs(t.Context(), 1, false)
assertFinalityViolated(t, m.LP)
})

t.Run("LatestSafeBlock returns error", func(t *testing.T) {
m := newMockedLP(t, opts)
finalized := newHead(5)
latest := newHead(16)
m.HeadTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once()
m.HeadTracker.EXPECT().LatestSafeBlock(mock.Anything).Return(nil, errors.New("temporary failure fetching safe block")).Once()
m.LP.finalityViolated.Store(true)
m.LP.PollAndSaveLogs(t.Context(), 1, false)
assertFinalityViolated(t, m.LP)
})

t.Run("latest block number lower than current", func(t *testing.T) {
m := newMockedLP(t, opts)
expectLatestSafe(m.HeadTracker, 3, 1)
m.LP.finalityViolated.Store(true)
m.LP.PollAndSaveLogs(t.Context(), 10, false)
assertFinalityViolated(t, m.LP)
})

t.Run("getCurrentBlockMaybeHandleReorg fails", func(t *testing.T) {
m := newMockedLP(t, opts)
// headerByNumber runs before any ORM read when currentBlock is nil.
expectLatestSafe(m.HeadTracker, 10, 8)
m.Client.EXPECT().HeadByNumber(mock.Anything, mock.MatchedBy(func(n *big.Int) bool { return n.Int64() == 5 })).
Return(nil, errors.New("rpc head by number failed")).Once()
m.LP.finalityViolated.Store(true)
m.LP.PollAndSaveLogs(t.Context(), 5, false)
assertFinalityViolated(t, m.LP)
})

t.Run("happy path clears finality violation", func(t *testing.T) {
m := newMockedLP(t, opts)
m.ORM.EXPECT().SelectBlockByNumber(mock.Anything, int64(9)).Return(dbBlock(9), nil).Once()
m.ORM.EXPECT().InsertLogsWithBlocks(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
expectLatestSafe(m.HeadTracker, 10, 8)
m.Client.EXPECT().ConfiguredChainID().Return(chainID).Maybe()
m.Client.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{
BlockNumber: 10,
BlockHash: hashOf(10),
Topics: []common.Hash{common.HexToHash("0x100")},
}}, nil).Once()
m.LP.finalityViolated.Store(true)
m.LP.PollAndSaveLogs(t.Context(), 10, false)
require.NoError(t, m.LP.HealthReport()[m.LP.Name()])
})
}

func Test_FindBlockAfterLCA(t *testing.T) {
testCases := []struct {
Name string
Expand Down
Loading