From b65f4ab228d8fe5848e6219c87c4a9f77d2680ee Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 13:51:16 +0100 Subject: [PATCH 01/13] Test: reorg during replay --- pkg/logpoller/log_poller_test.go | 110 +++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 9e3571e904..739ac64ea2 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -2155,3 +2155,113 @@ func TestWhere(t *testing.T) { assert.Equal(t, []query.Expression{}, result) }) } + +func TestLogPoller_Reorg_On_Replay(t *testing.T) { + // TestCase: + // 1. LogPoller processes blocks to block 11 + // 2. Reorg replaces block 11 with a new block (some additional blocks may be added on top of it) + // 3. Replay is initiated from block below 11. + // Expected behaviour: + // 1. LogPoller should replace reorged block 11 with a new data. + // 2. DB must not contain at any point logs from both old and new block 11. + // 3. Finality Violation must not occur, since chain did not violate finality depth. + t.Parallel() + const reorgedBlockNumber = 11 + testCases := []struct { + name string + numberOfBlocksAfterReorg int + }{ + { + name: "Replay start right after reorg", + numberOfBlocksAfterReorg: 0, + }, + { + name: "Replay start a few blocks after reorg", + numberOfBlocksAfterReorg: 1, + }, + { + name: "Replay start once reorged block is finalized", + numberOfBlocksAfterReorg: 5, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lpOpts := logpoller.Opts{ + PollPeriod: 24 * time.Hour, // effectively disable automatic polling, so we can control when we poll in the test + UseFinalityTag: false, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) + + // Set up a log poller listening for log emitter logs. + err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + Name: "Test Emitter 1", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + }) + require.NoError(t, err) + + // populate chain with data + for range reorgedBlockNumber - 1 { + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(1))}) + require.NoError(t, err) + th.Backend.Commit() + } + + // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long. + require.NoError(t, th.LogPoller.Start(t.Context())) + defer func() { + require.NoError(t, th.LogPoller.Close()) + }() + testutils.RequireEventually(t, func() bool { + latest, err := th.LogPoller.LatestBlock(t.Context()) + return err == nil && latest.BlockNumber == reorgedBlockNumber + }) + + reorgedBlock, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) + + // Replace block 11 with a new block and burry it under 1 new block + require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) + const newLogData = int64(123) + // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it + for range tc.numberOfBlocksAfterReorg + 1 { + // emit log that is not tracked by LP to ensure that tracked log has a different index. + // So if reorg is not properly handled and both logs end up in the database + _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) + require.NoError(t, err) + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + th.Backend.Commit() + } + + newReorgedBlock, err := th.Client.BlockByNumber(t.Context(), big.NewInt(reorgedBlockNumber)) + require.NoError(t, err) + require.NotEqual(t, reorgedBlock.Hash().String(), newReorgedBlock.Hash().String()) + + latest, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), latest.Number().Int64()) + + // Trigger replay, which should gracefully handle the reorg and end up on the new latest block + err = th.LogPoller.Replay(t.Context(), 5) + require.NoError(t, err) + // LP should be on latest block now + lpLatest, err := th.LogPoller.LatestBlock(t.Context()) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), lpLatest.BlockNumber) + logs, err := th.ORM.SelectLogsByBlockRange(t.Context(), reorgedBlockNumber, reorgedBlockNumber) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, newLogData, big.NewInt(0).SetBytes(logs[0].Data).Int64(), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay") + // Ensure reorged block was replaced by a new one + dbBlock, err := th.ORM.SelectBlockByNumber(testutils.Context(t), reorgedBlock.Number().Int64()) + require.NoError(t, err) + require.Equal(t, reorgedBlock.Number().Int64(), dbBlock.BlockNumber) + require.NotEqual(t, reorgedBlock.Hash(), dbBlock.BlockHash) + }) + } +} From 64e2c6375f91e599d40d64c730c81de076fee568 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 14:37:28 +0100 Subject: [PATCH 02/13] Fix reorg handling during replay --- pkg/logpoller/helper_test.go | 2 +- pkg/logpoller/log_poller.go | 163 +++++++++++++++------- pkg/logpoller/log_poller_internal_test.go | 12 +- pkg/logpoller/log_poller_test.go | 14 +- 4 files changed, 130 insertions(+), 61 deletions(-) diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index e4f6a0f745..cb79fb721c 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -164,7 +164,7 @@ func (th *TestHarness) AdjustTime(t *testing.T, d time.Duration) { } func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 { - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) latest, _ := th.LogPoller.LatestBlock(ctx) return latest.BlockNumber + 1 } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 91d7dfde68..47fb6fc12c 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -77,7 +77,7 @@ type LogPoller interface { type LogPollerTest interface { LogPoller - PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) + PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) BackupPollAndSaveLogs(ctx context.Context) error Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) @@ -673,7 +673,7 @@ func (lp *logPoller) run() { } else { start = lastProcessed.BlockNumber + 1 } - lp.PollAndSaveLogs(ctx, start) + lp.PollAndSaveLogs(ctx, start, false) case <-backupLogPollTicker.C: if lp.backupPollerBlockDelay == 0 { continue // backup poller is disabled @@ -783,7 +783,7 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64 if err == nil { // Serially process replay requests. lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) - lp.PollAndSaveLogs(ctx, fromBlock) + lp.PollAndSaveLogs(ctx, fromBlock, true) lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) } } else { @@ -965,6 +965,26 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { return nil } +func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { + // If we don't have the current block already, lets get it. + header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) + if err != nil { + lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", blockNumber, err) + } + // Additional sanity checks, don't necessarily trust the RPC. + if header == nil { + lp.lggr.Errorw("Unexpected nil block from RPC", "blockNumber", blockNumber) + return nil, fmt.Errorf("got nil block for %d", blockNumber) + } + if header.Number != blockNumber { + lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "blockNumber", blockNumber, "got", header.Number) + return nil, fmt.Errorf("block mismatch have %d want %d", header.Number, blockNumber) + } + + return header, nil +} + // getCurrentBlockMaybeHandleReorg accepts a block number // and will return that block if its parent points to our last saved block. // One can optionally pass the block header if it has already been queried to avoid an extra RPC call. @@ -973,23 +993,12 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { // 1. Find the LCA by following parent hashes. // 2. Delete all logs and blocks after the LCA // 3. Return the LCA+1, i.e. our new current (unprocessed) block. -func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) { +func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) { var err1 error if currentBlock == nil { - // If we don't have the current block already, lets get it. - currentBlock, err1 = lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(currentBlockNumber)) + currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber) if err1 != nil { - lp.lggr.Warnw("Unable to get currentBlock", "err", err1, "currentBlockNumber", currentBlockNumber) - return nil, err1 - } - // Additional sanity checks, don't necessarily trust the RPC. - if currentBlock == nil { - lp.lggr.Errorw("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber) - return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber) - } - if currentBlock.Number != currentBlockNumber { - lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number) - return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1) } } // Does this currentBlock point to the same parent that we have saved? @@ -1008,39 +1017,99 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren } // Check for reorg. if currentBlock.ParentHash != expectedParent.BlockHash { - // There can be another reorg while we're finding the LCA. - // That is ok, since we'll detect it on the next iteration. - // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. - blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber) - if err2 != nil { - return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) - } - - lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber) - // We truncate all the blocks and logs after the LCA. - // We could preserve the logs for forensics, since its possible - // that applications see them and take action upon it, however that - // results in significantly slower reads since we must then compute - // the canonical set per read. Typically, if an application took action on a log - // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. - // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) - if err2 != nil { - // If we error on db commit, we can't know if the tx went through or not. - // We return an error here which will cause us to restart polling from lastBlockSaved + 1 - return nil, err2 - } - return blockAfterLCA, nil - } - // No reorg, return current block. + return lp.handleReorg(ctx, currentBlock) + } + + if !isReplay { + // During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock. + return currentBlock, nil + } + + // Ensure that if DB contains current block it matches the current block from RPC. + currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to get current block from DB %d: %w", currentBlockNumber, err) + } + + if currentBlockDB != nil && currentBlock.Hash != currentBlockDB.BlockHash { + return lp.handleReorg(ctx, currentBlock) + } + + // No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation. + latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx) + if err1 != nil { + if pkgerrors.Is(err1, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1) + } + return nil, pkgerrors.Wrap(err1, "unable to get latest block") + } + + if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber { + // currentBlock is newest, nothing more to check + return currentBlock, nil + } + + latestBlockRPC, err := lp.headerByNumber(ctx, latestBlockDB.BlockNumber) + if err != nil { + return nil, fmt.Errorf("unable to get latest block header for block number %d: %w", latestBlockDB.BlockNumber, err) + } + + if latestBlockRPC.Hash != latestBlockDB.BlockHash { + // Reorg detected, handle it + lca, err := lp.handleReorg(ctx, latestBlockRPC) + if err != nil { + return nil, fmt.Errorf("failed to handle reorg: %w", err) + } + + if lca.Number < currentBlock.BlockNumber() { + // LCA is older than current block, we need to get the new current block after reorg + return lca, nil + } + } + return currentBlock, nil } +func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Head) (*evmtypes.Head, error) { + // during replay currentBlock may be older than the latest block, thus it's possible to miss finality violation, + // if we use its view on latest finalized block. To be safe, we get the latest block from the db. + latestBlock, err := lp.orm.SelectLatestBlock(ctx) + if err != nil { + if pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when handling reorg, but got no rows", "currentBlockNumber", currentBlock.Number, "err", err) + } + return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db") + } + // There can be another reorg while we're finding the LCA. + // That is ok, since we'll detect it on the next iteration. + // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. + blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber) + if err2 != nil { + return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) + } + + lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlock.Number) + // We truncate all the blocks and logs after the LCA. + // We could preserve the logs for forensics, since its possible + // that applications see them and take action upon it, however that + // results in significantly slower reads since we must then compute + // the canonical set per read. Typically, if an application took action on a log + // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. + // Its also nicely analogous to reading from the chain itself. + err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) + if err2 != nil { + // If we error on db commit, we can't know if the tx went through or not. + // We return an error here which will cause us to restart polling from lastBlockSaved + 1 + return nil, err2 + } + return blockAfterLCA, nil +} + // PollAndSaveLogs On startup/crash current is the first block after the last processed block. // currentBlockNumber is the block from where new logs are to be polled & saved. Under normal // conditions this would be equal to lastProcessed.BlockNumber + 1. -func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) { - err := lp.pollAndSaveLogs(ctx, currentBlockNumber) +func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) { + err := lp.pollAndSaveLogs(ctx, currentBlockNumber, isReplay) if errors.Is(err, commontypes.ErrFinalityViolated) { lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err) lp.finalityViolated.Store(true) @@ -1059,7 +1128,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int } } -func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) { +func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) { lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber) // Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks() // latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration @@ -1089,7 +1158,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int } // Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks. // Returns (currentBlock || LCA+1 if reorg detected, error) - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved and retry. @@ -1115,7 +1184,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int for { if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 1ff3daf6bc..64c3512107 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -380,7 +380,7 @@ func assertBackupPollerStartup(t *testing.T, head *evmtypes.Head, finalizedHead assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) - lp.PollAndSaveLogs(ctx, head.Number) + lp.PollAndSaveLogs(ctx, head.Number, false) lastProcessed, err := lp.orm.SelectLatestBlock(ctx) require.NoError(t, err) @@ -489,7 +489,7 @@ func TestLogPoller_Replay(t *testing.T) { { ctx := testutils.Context(t) // process 1 log in block 3 - lp.PollAndSaveLogs(ctx, 4) + lp.PollAndSaveLogs(ctx, 4, false) latest, err := lp.LatestBlock(ctx) require.NoError(t, err) require.Equal(t, int64(4), latest.BlockNumber) @@ -894,7 +894,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { // insert finalized block with different hash than in RPC require.NoError(t, orm.InsertBlock(t.Context(), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2, 2)) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) { @@ -915,7 +915,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { return evmtypes.Head{Number: num, Hash: utils.NewHash()} }) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Log's hash does not match block's", func(t *testing.T) { @@ -933,7 +933,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Happy path", func(t *testing.T) { @@ -953,7 +953,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.NoError(t, lp.HealthReport()[lp.Name()]) }) } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 739ac64ea2..a3aa3b44cb 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -377,7 +377,7 @@ func Test_BackupLogPoller(t *testing.T) { th.finalizeThroughBlock(t, 64) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) require.NoError(t, th.LogPoller.BackupPollAndSaveLogs(ctx)) currentBlock, err := th.LogPoller.LatestBlock(ctx) require.NoError(t, err) @@ -718,7 +718,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { backend.Commit() } currentBlockNumber := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) currentBlock, err := lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) matchesGeth := func() bool { @@ -770,7 +770,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } @@ -1370,7 +1370,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber)) // after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB - th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1) + th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1, false) block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3) require.NoError(t, err) assert.Equal(t, 3, int(block.BlockNumber)) @@ -1624,7 +1624,7 @@ func TestTooManyLogResults(t *testing.T) { Addresses: []common.Address{addr}, }) require.NoError(t, err) - lp.PollAndSaveLogs(ctx, 5) + lp.PollAndSaveLogs(ctx, 5, false) block, err2 := o.SelectLatestBlock(ctx) require.NoError(t, err2) assert.Equal(t, int64(298), block.BlockNumber) @@ -1656,7 +1656,7 @@ func TestTooManyLogResults(t *testing.T) { return []types.Log{}, tooLargeErr // return "too many results" error if block range spans 4 or more blocks }) - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself @@ -1690,7 +1690,7 @@ func TestTooManyLogResults(t *testing.T) { headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself From ccc00f846a3d24dd26044555b816d2be3551ca5c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 17 Feb 2026 17:57:35 +0100 Subject: [PATCH 03/13] Fix nits --- pkg/logpoller/log_poller.go | 8 +++----- pkg/logpoller/log_poller_test.go | 9 +++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 47fb6fc12c..a64584d969 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -966,7 +966,6 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { - // If we don't have the current block already, lets get it. header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) if err != nil { lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) @@ -1056,14 +1055,13 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren if latestBlockRPC.Hash != latestBlockDB.BlockHash { // Reorg detected, handle it - lca, err := lp.handleReorg(ctx, latestBlockRPC) + blockAfterLCA, err := lp.handleReorg(ctx, latestBlockRPC) if err != nil { return nil, fmt.Errorf("failed to handle reorg: %w", err) } - if lca.Number < currentBlock.BlockNumber() { - // LCA is older than current block, we need to get the new current block after reorg - return lca, nil + if blockAfterLCA.Number < currentBlock.BlockNumber() { + return blockAfterLCA, nil } } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index a3aa3b44cb..818d97a85c 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -2164,7 +2164,7 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { // Expected behaviour: // 1. LogPoller should replace reorged block 11 with a new data. // 2. DB must not contain at any point logs from both old and new block 11. - // 3. Finality Violation must not occur, since chain did not violate finality depth. + // 3. Finality Violation must not occur, since chain did not violate finality depth. t.Parallel() const reorgedBlockNumber = 11 testCases := []struct { @@ -2211,7 +2211,7 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { th.Backend.Commit() } - // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long. + // Start LogPoller and wait for it to complete the first poll. The second poll won't happen until we call Replay, since the poll period is very long. require.NoError(t, th.LogPoller.Start(t.Context())) defer func() { require.NoError(t, th.LogPoller.Close()) @@ -2225,16 +2225,17 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) - // Replace block 11 with a new block and burry it under 1 new block + // Replace block 11 with a new block and bury it under 1 new block require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) const newLogData = int64(123) // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it for range tc.numberOfBlocksAfterReorg + 1 { // emit log that is not tracked by LP to ensure that tracked log has a different index. - // So if reorg is not properly handled and both logs end up in the database + // So if reorg is not properly handled, both logs end up in the database and the test fails. _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) require.NoError(t, err) _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + require.NoError(t, err) th.Backend.Commit() } From a6119cfddb5182e3ea3a1549626abfd2b27dd591 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 19:52:51 +0100 Subject: [PATCH 04/13] ORM methods to batch insert --- pkg/logpoller/helper_test.go | 2 +- pkg/logpoller/log_poller.go | 6 ++--- pkg/logpoller/observability.go | 14 +++++----- pkg/logpoller/observability_test.go | 40 ++++++++++++++++----------- pkg/logpoller/orm.go | 41 +++++++++++++++++++--------- pkg/logpoller/orm_test.go | 42 ++++++++++++++++++++++++----- 6 files changed, 98 insertions(+), 47 deletions(-) diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index cb79fb721c..24c3b072fd 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -182,7 +182,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) { require.NoError(t, err, "block %v", i) chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) require.NoError(t, err) - assert.Equal(t, chainBlk.Hash().Bytes(), blk.BlockHash.Bytes(), "block %v", i) + assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i) } } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index a64584d969..a43cd35681 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -956,7 +956,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } lp.lggr.Debugw("Inserting backfilled logs with batch endblock", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock) + err = lp.orm.InsertLogsWithBlocks(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), []Block{endblock}) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -1206,10 +1206,10 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int FinalizedBlockNumber: latestFinalizedBlockNumber, SafeBlockNumber: safeBlockNumber, } - err = lp.orm.InsertLogsWithBlock( + err = lp.orm.InsertLogsWithBlocks( ctx, convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()), - block, + []Block{block}, ) if err != nil { lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) diff --git a/pkg/logpoller/observability.go b/pkg/logpoller/observability.go index 0cdd040df0..ffa1b47456 100644 --- a/pkg/logpoller/observability.go +++ b/pkg/logpoller/observability.go @@ -52,16 +52,16 @@ func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error { err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error { return o.ORM.InsertLogs(ctx, logs) }) - trackInsertedLogsAndBlock(ctx, o, logs, nil, err) + trackInsertedLogsAndBlocks(ctx, o, logs, nil, err) trackInsertedBlockLatency(ctx, o, logs, err) return err } -func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { +func (o *ObservedORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error { err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error { - return o.ORM.InsertLogsWithBlock(ctx, logs, block) + return o.ORM.InsertLogsWithBlocks(ctx, logs, blocks) }) - trackInsertedLogsAndBlock(ctx, o, logs, &block, err) + trackInsertedLogsAndBlocks(ctx, o, logs, blocks, err) trackInsertedBlockLatency(ctx, o, logs, err) return err } @@ -290,15 +290,15 @@ func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryTy return exec() } -func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) { +func trackInsertedLogsAndBlocks(ctx context.Context, o *ObservedORM, logs []Log, blocks []Block, err error) { if err != nil { return } ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() o.metrics.IncrementLogsInserted(ctx, int64(len(logs))) - if block != nil { - o.metrics.IncrementBlocksInserted(ctx, 1) + if len(blocks) > 0 { + o.metrics.IncrementBlocksInserted(ctx, int64(len(blocks))) } } diff --git a/pkg/logpoller/observability_test.go b/pkg/logpoller/observability_test.go index 3945d52424..7a82906e9f 100644 --- a/pkg/logpoller/observability_test.go +++ b/pkg/logpoller/observability_test.go @@ -41,9 +41,11 @@ func TestMultipleMetricsArePublished(t *testing.T) { _, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(ctx, 0, []common.Address{{}}, []common.Hash{{}}, 1) _, _ = orm.SelectIndexedLogsCreatedAfter(ctx, common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0) _ = orm.InsertLogs(ctx, []Log{}) - _ = orm.InsertLogsWithBlock(ctx, []Log{}, Block{ - BlockNumber: 1, - BlockTimestamp: time.Now(), + _ = orm.InsertLogsWithBlocks(ctx, []Log{}, []Block{ + { + BlockNumber: 1, + BlockTimestamp: time.Now(), + }, }) require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration)) @@ -113,21 +115,25 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency)) // Insert 5 more logs with block - require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 10, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 5, + require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[10:15], []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 10, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 5, + }, })) assert.Equal(t, 15, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 1, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) // Insert 5 more logs with block - require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[15:], Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 15, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 5, + require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[15:], []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 15, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 5, + }, })) assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) @@ -143,9 +149,11 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, network, "420", "DeleteBlocksBefore", "delete")) // Don't update counters in case of an error - require.Error(t, orm.InsertLogsWithBlock(ctx, logs, Block{ - BlockHash: utils.RandomBytes32(), - BlockTimestamp: time.Now(), + require.Error(t, orm.InsertLogsWithBlocks(ctx, logs, []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockTimestamp: time.Now(), + }, })) assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index eaf489ed32..8098ada382 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -24,7 +24,7 @@ import ( // What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM. type ORM interface { InsertLogs(ctx context.Context, logs []Log) error - InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error + InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error InsertFilter(ctx context.Context, filter Filter) error LoadFilters(ctx context.Context) (map[string]Filter, error) @@ -112,6 +112,18 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum return err } +func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error { + const q = `INSERT INTO evm.log_poller_blocks + (evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at, safe_block_number) + VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW(), :safe_block_number) + ON CONFLICT DO NOTHING` + // maintain behaviour of InsertBlock + for i := range blocks { + blocks[i].EVMChainID = ubig.New(o.chainID) + } + return batchInsert(ctx, o.ds, q, blocks, 1000) +} + // InsertFilter is idempotent. // // Each address/event pair must have a unique job id, so it may be removed when the job is deleted. @@ -544,10 +556,10 @@ func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error { }) } -func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { +func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error { // Optimization, don't open TX when there is only a block to be persisted if len(logs) == 0 { - return o.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber) + return o.InsertBlocks(ctx, blocks) } if err := o.validateLogs(logs); err != nil { @@ -556,7 +568,7 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block // Block and logs goes with the same TX to ensure atomicity return o.Transact(ctx, func(orm *DSORM) error { - err := orm.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber) + err := orm.InsertBlocks(ctx, blocks) if err != nil { return err } @@ -565,20 +577,23 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block } func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error { - batchInsertSize := 4000 - for i := 0; i < len(logs); i += batchInsertSize { - start, end := i, i+batchInsertSize - if end > len(logs) { - end = len(logs) - } - - query := `INSERT INTO evm.logs + const q = `INSERT INTO evm.logs (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) VALUES (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING` + return batchInsert(ctx, tx, q, logs, 4000) +} + +func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchSize int) error { + batchInsertSize := 4000 + for i := 0; i < len(objs); i += batchInsertSize { + start, end := i, i+batchInsertSize + if end > len(objs) { + end = len(objs) + } - _, err := tx.NamedExecContext(ctx, query, logs[start:end]) + _, err := ds.NamedExecContext(ctx, query, objs[start:end]) if err != nil { if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { // In case of DB timeouts, try to insert again with a smaller batch upto a limit diff --git a/pkg/logpoller/orm_test.go b/pkg/logpoller/orm_test.go index d4f25c9027..3318e9bb81 100644 --- a/pkg/logpoller/orm_test.go +++ b/pkg/logpoller/orm_test.go @@ -94,6 +94,32 @@ func TestLogPoller_Batching(t *testing.T) { require.Equal(t, len(logs), len(lgs)) } +func TestLogPoller_Blocks_Batching(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + th := SetupTH(t, lpOpts) + var blocks []logpoller.Block + var logs []logpoller.Log + const numBlocks = 2000 + for i := 0; i < numBlocks; i++ { + blockHash := common.HexToHash(fmt.Sprintf("0x%d", i+1)) + blocks = append(blocks, logpoller.Block{ + EVMChainID: ubig.New(th.ChainID), + BlockHash: blockHash, + BlockNumber: int64(i + 1), + }) + logs = append(logs, GenLog(th.ChainID, int64(i), int64(i+1), blockHash.String(), EmitterABI.Events["Log1"].ID.Bytes(), th.EmitterAddress1)) + } + require.NoError(t, th.ORM.InsertLogsWithBlocks(ctx, logs, blocks)) + lgs, err := th.ORM.SelectLogsByBlockRange(ctx, 1, numBlocks) + require.NoError(t, err) + // Make sure all logs are inserted + require.Equal(t, len(logs), len(lgs)) + dbBlocks, err := th.ORM.GetBlocksRange(ctx, 1, numBlocks) + require.NoError(t, err) + require.Equal(t, numBlocks, len(dbBlocks)) +} + func TestORM_GetBlocks_From_Range(t *testing.T) { th := SetupTH(t, lpOpts) o1 := th.ORM @@ -2116,7 +2142,7 @@ func TestInsertLogsWithBlock(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // clean all logs and blocks between test cases defer func() { _ = o.DeleteLogsAndBlocksAfter(ctx, 0) }() - insertError := o.InsertLogsWithBlock(ctx, tt.logs, tt.block) + insertError := o.InsertLogsWithBlocks(ctx, tt.logs, []logpoller.Block{tt.block}) logs, logsErr := o.SelectLogs(ctx, 0, math.MaxInt, address, event) block, blockErr := o.SelectLatestBlock(ctx) @@ -2209,16 +2235,18 @@ func TestSelectLogsDataWordBetween(t *testing.T) { secondLogData = append(secondLogData, logpoller.EvmWord(5).Bytes()...) secondLogData = append(secondLogData, logpoller.EvmWord(20).Bytes()...) - err := th.ORM.InsertLogsWithBlock(ctx, + err := th.ORM.InsertLogsWithBlocks(ctx, []logpoller.Log{ GenLogWithData(th.ChainID, address, eventSig, 1, 1, firstLogData), GenLogWithData(th.ChainID, address, eventSig, 2, 2, secondLogData), }, - logpoller.Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 10, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 1, + []logpoller.Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 10, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 1, + }, }, ) require.NoError(t, err) From 519585159ec1348b3febee68770392e508b97ab3 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 13 Feb 2026 17:10:36 +0100 Subject: [PATCH 05/13] switch to batch logs/blocks inserts --- pkg/logpoller/log_poller.go | 117 +++++++++++++++++++++++-------- pkg/logpoller/log_poller_test.go | 8 ++- 2 files changed, 92 insertions(+), 33 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index a43cd35681..868df6b8f1 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1178,52 +1178,109 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int return fmt.Errorf("failed to backfill finalized logs: %w", err) } currentBlockNumber = lastSafeBackfillBlock + 1 + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) + if err != nil { + // If there's an error handling the reorg, we can't be sure what state the db was left in. + // Resume from the latest block saved. + return fmt.Errorf("failed to get current block: %w", err) + } + currentBlockNumber = currentBlock.Number } for { - if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - return fmt.Errorf("failed to get current block: %w", err) + + blocks, logs, err := lp.getUnfinalizedLogs(ctx, currentBlock, latestBlockNumber, safeBlockNumber, latestFinalizedBlockNumber, isReplay) + // even if we have an error, we may have partial logs and blocks that can be saved, so we save what we have and then retry. + if len(logs) > 0 || len(blocks) > 0 { + lp.lggr.Debugw("Saving logs", "logs", len(logs), "blocks", len(blocks), "currentBlockNumber", currentBlockNumber) + insertErr := lp.orm.InsertLogsWithBlocks(ctx, logs, blocks) + if insertErr != nil { + lp.lggr.Warnw("Unable to save logs, retrying later", "insertErr", insertErr, "block", currentBlockNumber, "err", err) + return nil } - currentBlockNumber = currentBlock.Number } + if err == nil { + lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber) + return nil + } + + var reorgErr *reorgError + if !errors.As(err, &reorgErr) { + return fmt.Errorf("failed to get unfinalized logs: %w", err) + } + + lp.lggr.Warnw("Reorg detected during unfinalized log processing, handling reorg", "err", err, "currentBlockNumber", currentBlockNumber, "lastKnownMatchingHead", reorgErr.ReorgedAt.Number) + currentBlock, err = lp.handleReorg(ctx, reorgErr.ReorgedAt) + if err != nil { + return fmt.Errorf("failed to handle reorg: %w", err) + } + lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number) + } +} + +type reorgError struct { + ReorgedAt *evmtypes.Head +} + +func newReorgError(reorgedAt *evmtypes.Head) error { + return &reorgError{ReorgedAt: reorgedAt} +} + +func (e *reorgError) Error() string { + return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number) +} + +func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { + var logs []Log + var blocks []Block + for { h := currentBlock.Hash - var logs []types.Log - logs, err = lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) + rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) if err != nil { - lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber) - return nil + lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number) + return blocks, logs, nil } - lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) + lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) block := Block{ BlockHash: h, - BlockNumber: currentBlockNumber, + BlockNumber: currentBlock.Number, BlockTimestamp: currentBlock.Timestamp, - FinalizedBlockNumber: latestFinalizedBlockNumber, - SafeBlockNumber: safeBlockNumber, - } - err = lp.orm.InsertLogsWithBlocks( - ctx, - convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()), - []Block{block}, - ) + FinalizedBlockNumber: finalized, + SafeBlockNumber: safe, + } + logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...) + blocks = append(blocks, block) + + if currentBlock.Number >= latest { + return blocks, logs, nil + } + + nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1) if err != nil { - lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) - return nil + lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number) + return blocks, logs, nil } - // Update current block. - // Same reorg detection on unfinalized blocks. - currentBlockNumber++ - if currentBlockNumber > latestBlockNumber { - break + + if nextBlock.ParentHash != currentBlock.Hash { + return blocks, logs, newReorgError(nextBlock) } - } - return nil + if isReplay { + // During replay, we also check if the next block matches what we have in the DB to avoid false positives on reorgs due to finality violation. + nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number) + return blocks, logs, nil + } + + if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash { + return blocks, logs, newReorgError(nextBlock) + } + } + + currentBlock = nextBlock + } } // Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 818d97a85c..72cefae5b0 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -690,7 +690,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { owner.GasPrice = big.NewInt(10e9) p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool { // After the set of reorgs, we should have the same canonical blocks that geth does. - t.Log("Starting test", mineOrReorg) + seed := time.Now().UnixNano() + localRand := rand.New(rand.NewSource(seed)) + t.Log("Starting test", mineOrReorg, "seed", seed) chainID := testutils.NewRandomEVMChainID() // Set up a test chain with a log emitting contract deployed. orm := logpoller.NewORM(chainID, db, lggr) @@ -742,7 +744,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { } // Randomly pick to mine or reorg for i := 0; i < numChainInserts; i++ { - if rand.Int63()%2 == 0 { + if localRand.Int63()%2 == 0 { // Mine blocks for j := 0; j < int(mineOrReorg[i]); j++ { backend.Commit() @@ -770,7 +772,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } From 715d9bad4e67a59beb88726ae87f06b0a6d5c01d Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 17 Feb 2026 18:07:51 +0100 Subject: [PATCH 06/13] nit fixes --- pkg/logpoller/log_poller.go | 6 ++++++ pkg/logpoller/orm.go | 3 +-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 868df6b8f1..ec0d4cfc57 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1232,6 +1232,7 @@ func (e *reorgError) Error() string { } func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { + const maxUnfinalizedBlocks = 2000 var logs []Log var blocks []Block for { @@ -1256,6 +1257,11 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty return blocks, logs, nil } + if len(blocks) >= maxUnfinalizedBlocks { + lp.lggr.Warnw("Too many unfinalized blocks, stopping log retrieval to avoid OOM", "currentBlockNumber", currentBlock.Number, "latestBlockNumber", latest) + return blocks, logs, nil + } + nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1) if err != nil { lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 8098ada382..6a503ace60 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -585,8 +585,7 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D return batchInsert(ctx, tx, q, logs, 4000) } -func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchSize int) error { - batchInsertSize := 4000 +func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchInsertSize int) error { for i := 0; i < len(objs); i += batchInsertSize { start, end := i, i+batchInsertSize if end > len(objs) { From cc16600fbdeccb8298f84dd4367f0e2170c602a8 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 26 Feb 2026 17:13:42 +0100 Subject: [PATCH 07/13] switch ubig to sqlutil big --- pkg/logpoller/orm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 6a503ace60..012c450f3a 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -119,7 +119,7 @@ func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error { ON CONFLICT DO NOTHING` // maintain behaviour of InsertBlock for i := range blocks { - blocks[i].EVMChainID = ubig.New(o.chainID) + blocks[i].EVMChainID = sqlutil.New(o.chainID) } return batchInsert(ctx, o.ds, q, blocks, 1000) } From 60ed9dab8f7563ee35a17692ab1c0d8fb821f94c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 26 Feb 2026 18:03:04 +0100 Subject: [PATCH 08/13] fix orm_test --- pkg/logpoller/orm_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logpoller/orm_test.go b/pkg/logpoller/orm_test.go index 3318e9bb81..491c913943 100644 --- a/pkg/logpoller/orm_test.go +++ b/pkg/logpoller/orm_test.go @@ -104,7 +104,7 @@ func TestLogPoller_Blocks_Batching(t *testing.T) { for i := 0; i < numBlocks; i++ { blockHash := common.HexToHash(fmt.Sprintf("0x%d", i+1)) blocks = append(blocks, logpoller.Block{ - EVMChainID: ubig.New(th.ChainID), + EVMChainID: sqlutil.New(th.ChainID), BlockHash: blockHash, BlockNumber: int64(i + 1), }) From 7667a4dcca4267e0ac58414f0b1009ee4815da4c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 19 Feb 2026 19:32:04 +0100 Subject: [PATCH 09/13] Refactor tests in preparation for sparse blocks --- pkg/logpoller/helper_test.go | 10 --- pkg/logpoller/log_poller_test.go | 135 ++++++++++++++++++------------- 2 files changed, 79 insertions(+), 66 deletions(-) diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index 24c3b072fd..d9d552460c 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -176,16 +176,6 @@ func (th *TestHarness) assertDontHave(t *testing.T, start, end int) { } } -func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) { - for i := start; i < end; i++ { - blk, err := th.ORM.SelectBlockByNumber(testutils.Context(t), int64(i)) - require.NoError(t, err, "block %v", i) - chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) - require.NoError(t, err) - assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i) - } -} - // Simulates an RPC failover event to an alternate rpc server. This can also be used to // simulate switching back to the primary rpc after it recovers. func (th *TestHarness) SetActiveClient(backend evmtypes.Backend, chainType chaintype.ChainType) { diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 72cefae5b0..6792db5b31 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -723,23 +723,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) currentBlock, err := lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) - matchesGeth := func() bool { - // Check every block is identical - latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) - require.NoError(t, err1) - for i := 1; i < int(latest.NumberU64()); i++ { - ourBlock, err1 := lp.BlockByNumber(testutils.Context(t), int64(i)) - require.NoError(t, err1) - gethBlock, err1 := ec.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) - require.NoError(t, err1) - if ourBlock.BlockHash != gethBlock.Hash() { - t.Logf("Initial poll our block differs at height %d got %x want %x\n", i, ourBlock.BlockHash, gethBlock.Hash()) - return false - } - } - return true - } - if !matchesGeth() { + if !checkDBMatchesGeth(t, orm, simulatedClient) { return false } // Randomly pick to mine or reorg @@ -776,7 +760,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } - return matchesGeth() + return checkDBMatchesGeth(t, orm, simulatedClient) }, gen.SliceOfN(numChainInserts, gen.UInt64Range(1, uint64(finalityDepth-1))))) // Max reorg depth is finality depth - 1 p.TestingRun(t) } @@ -841,7 +825,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 1, 1) require.NoError(t, err) assert.Empty(t, lgs) - th.assertHaveCanonical(t, 1, 1) + requireDBMatchesGeth(t, th.ORM, th.Client) // Polling again should be a noop, since we are at the latest. newStart = th.PollAndSaveLogs(testutils.Context(t), newStart) @@ -849,7 +833,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { latest, err := th.ORM.SelectLatestBlock(testutils.Context(t)) require.NoError(t, err) assert.Equal(t, int64(1), latest.BlockNumber) - th.assertHaveCanonical(t, 1, 1) + requireDBMatchesGeth(t, th.ORM, th.Client) // Test scenario: one log 2 block chain. // Chain gen <- 1 <- 2 (L1) @@ -901,7 +885,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { require.NoError(t, err) require.Len(t, lgs, 1) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000002`), lgs[0].Data) - th.assertHaveCanonical(t, 1, 3) + requireDBMatchesGeth(t, th.ORM, th.Client) parent, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1)) require.NoError(t, err) @@ -936,8 +920,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000001`), lgs[0].Data) assert.Equal(t, int64(3), lgs[1].BlockNumber) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000003`), lgs[1].Data) - th.assertHaveCanonical(t, 1, 1) - th.assertHaveCanonical(t, 3, 4) + requireDBMatchesGeth(t, th.ORM, th.Client) th.assertDontHave(t, 2, 2) // 2 gets backfilled // Test scenario: multiple logs per block for many blocks (also after reorg). @@ -968,9 +951,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { assert.Equal(t, th.EmitterAddress2, lgs[1].Address) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000006`), lgs[2].Data) assert.Equal(t, th.EmitterAddress1, lgs[2].Address) - th.assertHaveCanonical(t, 1, 1) - th.assertDontHave(t, 2, 2) // 2 gets backfilled - th.assertHaveCanonical(t, 3, 6) + requireDBMatchesGeth(t, th.ORM, th.Client) // Test scenario: node down for exactly finality + 2 blocks // Note we only backfill up to finalized - 1 blocks, because we need to save the @@ -996,7 +977,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { assert.Equal(t, int64(8), lgs[1].BlockNumber) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000009`), lgs[2].Data) assert.Equal(t, int64(9), lgs[2].BlockNumber) - th.assertHaveCanonical(t, 8, 10) + requireDBMatchesGeth(t, th.ORM, th.Client) // Test scenario large backfill (multiple batches) // Chain gen <- 1 <- 2 (L1_1) <- 3' L1_3 <- 4 <- 5 (L1_4, L2_5) <- 6 (L1_6) <- 7 (L1_7) <- 8 (L1_8) <- 9 (L1_9) <- 10..32 @@ -1017,9 +998,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 11, 36) require.NoError(t, err) assert.Len(t, lgs, 25) - th.assertHaveCanonical(t, 32, 36) // Should have last finalized block plus unfinalized blocks - th.assertDontHave(t, 11, 13) // Should not have older finalized blocks - th.assertDontHave(t, 14, 16) // Should not have older finalized blocks + requireDBMatchesGeth(t, th.ORM, th.Client) + th.assertDontHave(t, 11, 13) // Should not have older finalized blocks + th.assertDontHave(t, 14, 16) // Should not have older finalized blocks // Verify that a custom block timestamp will get written to db correctly also b, err = th.Client.BlockByNumber(testutils.Context(t), nil) @@ -1206,9 +1187,8 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { require.NoError(t, err) require.Len(t, lgs, 30) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000002`), lgs[0].Data) - th.assertHaveCanonical(t, 1, 2) - th.assertDontHave(t, 2, 31) // These blocks are backfilled - th.assertHaveCanonical(t, 32, 36) + requireLBBlockIsFinalized(t, th.ORM, 30) + requireDBMatchesGeth(t, th.ORM, th.Client) }) } } @@ -1557,27 +1537,25 @@ func TestTooManyLogResults(t *testing.T) { } var filterLogsCall *mock.Call - head := &evmtypes.Head{} - finalized := &evmtypes.Head{} ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) { if blockNumber == nil { require.FailNow(t, "unexpected call to get current head") } - return &evmtypes.Head{Number: blockNumber.Int64(), ParentHash: common.HexToHash(fmt.Sprintf("0x%x", blockNumber.Int64()-1))}, nil + return newHead(blockNumber.Int64()), nil }) t.Run("halves size until small enough, then succeeds", func(t *testing.T) { // Simulate latestBlock = 300 - head.Number = 300 - head.Hash = common.HexToHash("0x1234") // needed to satisfy validation in fetchBlocks() - finalized.Number = head.Number - lpOpts.FinalityDepth + head := newHead(300) + finalized := newHead(head.Number - lpOpts.FinalityDepth) headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() headByHash := ec.On("HeadByHash", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockHash common.Hash) (*evmtypes.Head, error) { - return &evmtypes.Head{Hash: blockHash}, nil + num := new(big.Int).SetBytes(blockHash.Bytes()).Int64() + return newHead(num), nil }) batchCallContext := ec.On("BatchCallContext", mock.Anything, mock.Anything).Return( @@ -1591,11 +1569,7 @@ func TestTooManyLogResults(t *testing.T) { blockNumber, ok := new(big.Int).SetString(blockNumberHex[2:], 16) require.True(t, ok, blockNumberHex) - calls[i].Result = &evmtypes.Head{ - Number: blockNumber.Int64(), - Hash: common.HexToHash(fmt.Sprintf("0x%x", blockNumber.Int64())), - ParentHash: common.HexToHash(fmt.Sprintf("0x%x", blockNumber.Int64()-1)), - } + calls[i].Result = newHead(blockNumber.Int64()) } return nil }, @@ -1629,7 +1603,7 @@ func TestTooManyLogResults(t *testing.T) { lp.PollAndSaveLogs(ctx, 5, false) block, err2 := o.SelectLatestBlock(ctx) require.NoError(t, err2) - assert.Equal(t, int64(298), block.BlockNumber) + assert.Equal(t, int64(300), block.BlockNumber) logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All() // Should have tried again 3 times--first reducing batch size to 10, then 5, then 2 @@ -1647,8 +1621,8 @@ func TestTooManyLogResults(t *testing.T) { // Now jump to block 500, but return error no matter how small the block range gets. // Should exit the loop with a critical error instead of hanging. - head.Number = 500 - finalized.Number = head.Number - lpOpts.FinalityDepth + head := newHead(500) + finalized := newHead(head.Number - lpOpts.FinalityDepth) headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() filterLogsCall = ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { @@ -1663,7 +1637,7 @@ func TestTooManyLogResults(t *testing.T) { if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself } else { - assert.Equal(t, int64(298), block.BlockNumber) + assert.Equal(t, int64(300), block.BlockNumber) } warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All() crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All() @@ -1679,8 +1653,8 @@ func TestTooManyLogResults(t *testing.T) { t.Run("Unrelated error are retried without adjusting size", func(t *testing.T) { unrelatedError := errors.New("Unrelated to the size of the request") - head.Number = 500 - finalized.Number = head.Number - lpOpts.FinalityDepth + head := newHead(500) + finalized := newHead(head.Number - lpOpts.FinalityDepth) obs.TakeAll() filterLogsCall = ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { @@ -1697,7 +1671,7 @@ func TestTooManyLogResults(t *testing.T) { if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself } else { - assert.Equal(t, int64(298), block.BlockNumber) + assert.Equal(t, int64(300), block.BlockNumber) } crit := obs.FilterLevelExact(zapcore.DPanicLevel).All() errors := obs.FilterLevelExact(zapcore.ErrorLevel).All() @@ -2261,10 +2235,59 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { require.Len(t, logs, 1) require.Equal(t, newLogData, big.NewInt(0).SetBytes(logs[0].Data).Int64(), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay") // Ensure reorged block was replaced by a new one - dbBlock, err := th.ORM.SelectBlockByNumber(testutils.Context(t), reorgedBlock.Number().Int64()) - require.NoError(t, err) - require.Equal(t, reorgedBlock.Number().Int64(), dbBlock.BlockNumber) - require.NotEqual(t, reorgedBlock.Hash(), dbBlock.BlockHash) + requireDBMatchesGeth(t, th.ORM, th.Client) }) } } + +func requireDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Client) { + require.True(t, checkDBMatchesGeth(t, orm, client), "DB state does not match geth canonical chain") +} + +func checkDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Client) bool { + // Check every block is identical + latest, err1 := client.HeadByNumber(testutils.Context(t), nil) + require.NoError(t, err1) + dbBlocks, err := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) + require.NoError(t, err) + // ensure all blocks present in db are on geth canonical chain + for _, ourBlock := range dbBlocks { + gethBlock, err1 := client.HeadByNumber(testutils.Context(t), big.NewInt(ourBlock.BlockNumber)) + require.NoError(t, err1) + if ourBlock.BlockHash != gethBlock.Hash { + t.Logf("Initial poll our block differs at height %d got %x want %x\n", ourBlock.BlockNumber, ourBlock.BlockHash, gethBlock.Hash) + return false + } + } + + latestDB, err := orm.SelectLatestBlock(t.Context()) + require.NoError(t, err) + require.Equal(t, latest.Number, latestDB.BlockNumber, "latest block number in db should match geth") + + // ensure all logs present in db are on geth canonical chain + logs, err1 := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) + require.NoError(t, err1) + for _, log := range logs { + gethBlock, err1 := client.HeadByNumber(testutils.Context(t), big.NewInt(log.BlockNumber)) + require.NoError(t, err1) + if log.BlockHash != gethBlock.Hash { + t.Logf("Log present in db, is not present in canonical chain. Log block number %d, Log block Hash: %s, block hash %s\n", log.BlockNumber, log.BlockHash, gethBlock.Hash) + return false + } + } + return true +} + +func requireLBBlockIsFinalized(t *testing.T, orm logpoller.ORM, block int64) { + latest, err := orm.SelectLatestBlock(t.Context()) + require.NoError(t, err) + require.GreaterOrEqual(t, latest.FinalizedBlockNumber, block, "specified block should be finalized") +} + +func newHead(num int64) *evmtypes.Head { + return &evmtypes.Head{ + Number: num, + Hash: common.BigToHash(big.NewInt(num)), + ParentHash: common.BigToHash(big.NewInt(num - 1)), + } +} From c681f60e6e4a035d40d9f3ffbf93d4c151ece4a8 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Wed, 25 Feb 2026 18:50:18 +0100 Subject: [PATCH 10/13] Store only blocks with logs and checkpoints --- pkg/logpoller/log_poller.go | 83 +++++---- pkg/logpoller/log_poller_internal_test.go | 196 ++++++++++++++++++++++ pkg/logpoller/observability.go | 6 + pkg/logpoller/orm.go | 12 ++ 4 files changed, 265 insertions(+), 32 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index ec0d4cfc57..ee6102422f 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1081,7 +1081,7 @@ func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Hea // There can be another reorg while we're finding the LCA. // That is ok, since we'll detect it on the next iteration. // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. - blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber) + blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock.Number, latestBlock.FinalizedBlockNumber) if err2 != nil { return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) } @@ -1231,10 +1231,15 @@ func (e *reorgError) Error() string { return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number) } -func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { +func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) (blocks []Block, logs []Log, err error) { const maxUnfinalizedBlocks = 2000 - var logs []Log - var blocks []Block + var block *Block + defer func() { + // ensure that we always include the last block even if it's empty to use it as check point for next poll. + if block != nil && (len(blocks) == 0 || blocks[len(blocks)-1].BlockNumber != block.BlockNumber) { + blocks = append(blocks, *block) + } + }() for { h := currentBlock.Hash rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) @@ -1243,15 +1248,18 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty return blocks, logs, nil } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) - block := Block{ + block = &Block{ BlockHash: h, BlockNumber: currentBlock.Number, BlockTimestamp: currentBlock.Timestamp, FinalizedBlockNumber: finalized, SafeBlockNumber: safe, } - logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...) - blocks = append(blocks, block) + logs = append(logs, convertLogs(rpcLogs, []Block{*block}, lp.lggr, lp.ec.ConfiguredChainID())...) + // Always save the block with logs, to know an impact of finality violation and for better observability. + if len(rpcLogs) > 0 { + blocks = append(blocks, *block) + } if currentBlock.Number >= latest { return blocks, logs, nil @@ -1328,39 +1336,50 @@ func (lp *logPoller) latestSafeBlock(ctx context.Context, latestFinalizedBlockNu // Find the first place where our chain and their chain have the same block, // that block number is the LCA. Return the block after that, where we want to resume polling. -func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.Head, latestFinalizedBlockNumber int64) (*evmtypes.Head, error) { - // Current is where the mismatch starts. - // Check its parent to see if its the same as ours saved. - parent, err := lp.latencyMonitor.HeadByHash(ctx, current.ParentHash) - if err != nil { - return nil, err +func (lp *logPoller) findBlockAfterLCA(ctx context.Context, currentHeadNumber int64, dbLatestFinalizedBlockNumber int64) (*evmtypes.Head, error) { + if currentHeadNumber < dbLatestFinalizedBlockNumber { + lp.lggr.Criticalw("Unexpected state. Current head number is lower than latest finalized block number", "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber) + return nil, fmt.Errorf("current head number %d is lower than latest finalized block number %d: %w", currentHeadNumber, dbLatestFinalizedBlockNumber, commontypes.ErrFinalityViolated) } - blockAfterLCA := current + // We expect reorgs up to the block after latestFinalizedBlock - // We loop via parent instead of current so current always holds the LCA+1. // If the parent block number becomes < the first finalized block our reorg is too deep. - // This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config. - var ourParentBlockHash common.Hash - for parent.Number >= latestFinalizedBlockNumber { - outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number) + // This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config or chain violates finality guarantees. + for { + // Since we do not store all blocks in the db, it's possible that we do not have the parent block in our db. + // Find the nearest ancestor that we have in our db and check if it still belongs to canonical chain. + ourParent, err := lp.orm.SelectNewestBlock(ctx, currentHeadNumber-1) if err != nil { - return nil, err - } - ourParentBlockHash = outParentBlock.BlockHash - if parent.Hash == ourParentBlockHash { - // If we do have the blockhash, return blockAfterLCA - return blockAfterLCA, nil + if errors.Is(err, sql.ErrNoRows) { + lp.lggr.Warnw("No ancestor block found in db, this means that the reorg is deeper than the number of blocks we have in the db.", "err", err, "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber) + // we should return currentHeadNumber as the block after LCA, to avoid drifting too far back. + return lp.headerByNumber(ctx, currentHeadNumber) + } + return nil, fmt.Errorf("failed to select ancestor for current block %d: %w", currentHeadNumber-1, err) } - // Otherwise get a new parent and update blockAfterLCA. - blockAfterLCA = parent - parent, err = lp.latencyMonitor.HeadByHash(ctx, parent.ParentHash) + + // Since we are looking for block after LCA, fetch child of ourParent. + // If new current points to ourParent, we found the LCA and can return block after it. Otherwise, keep looking for ancestors. + rpcChild, err := lp.headerByNumber(ctx, ourParent.BlockNumber+1) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch block after ancestor %d: %w", ourParent.BlockNumber+1, err) + } + if ourParent.BlockHash == rpcChild.ParentHash { + return rpcChild, nil + } + + if ourParent.BlockNumber <= dbLatestFinalizedBlockNumber { + lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, + "current", rpcChild.Number, + "latestFinalized", dbLatestFinalizedBlockNumber, + "ourParentHash", ourParent.BlockHash, + "expectedParentHash", rpcChild.ParentHash, + "childHash", rpcChild.Hash) + return nil, fmt.Errorf("%w: finalized block with hash %s is not parent of canonical block at height %d, with parent hash %s", commontypes.ErrFinalityViolated, ourParent.BlockHash, rpcChild.Number, rpcChild.ParentHash) } - } - lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber) - return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number) + currentHeadNumber = ourParent.BlockNumber + } } // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 64c3512107..21e4763d8c 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -958,6 +958,202 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { }) } +func Test_FindBlockAfterLCA(t *testing.T) { + testCases := []struct { + Name string + CurrentBlockNumber int64 + DBLatestFinalized int64 + DBBlocks []int64 + Setup func(t *testing.T, ec *clienttest.Client) + ExpectedError error + ExpectedHead *evmtypes.Head + }{ + { + Name: "current head lower than DB finalized", + CurrentBlockNumber: 3, + DBLatestFinalized: 5, + DBBlocks: nil, + Setup: nil, + ExpectedError: commontypes.ErrFinalityViolated, + ExpectedHead: nil, + }, + { + Name: "no reorg - chains match on first iteration", + CurrentBlockNumber: 5, + DBLatestFinalized: 3, + DBBlocks: []int64{4}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + return newHead(n.Int64()), nil + }) + }, + ExpectedError: nil, + ExpectedHead: newHead(5), + }, + { + Name: "reorg - LCA found after walking back", + CurrentBlockNumber: 5, + DBLatestFinalized: 1, + DBBlocks: []int64{2, 3}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + num := n.Int64() + if num == 4 { + return &evmtypes.Head{Number: 4, Hash: common.BigToHash(big.NewInt(4)), ParentHash: common.HexToHash("0xdead")}, nil + } + return newHead(num), nil + }).Maybe() + }, + ExpectedError: nil, + ExpectedHead: newHead(3), + }, + { + Name: "reorg too deep", + CurrentBlockNumber: 5, + DBLatestFinalized: 2, + DBBlocks: []int64{1}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + num := n.Int64() + return &evmtypes.Head{Number: num, Hash: common.BigToHash(big.NewInt(num)), ParentHash: common.HexToHash("0xbeef")}, nil + }) + }, + ExpectedError: commontypes.ErrFinalityViolated, + ExpectedHead: nil, + }, + { + Name: "RPC HeadByNumber returns error", + CurrentBlockNumber: 5, + DBLatestFinalized: 3, + DBBlocks: []int64{4}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + if n.Int64() == 5 { + return nil, errors.New("rpc error") + } + return newHead(n.Int64()), nil + }) + }, + ExpectedError: errors.New("rpc error"), + ExpectedHead: nil, + }, + { + Name: "All blocks in DB are on a different chain", + CurrentBlockNumber: 100, + DBLatestFinalized: 10, + DBBlocks: []int64{90, 80, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() < 20 { + return head, nil + } + head.ParentHash = common.HexToHash("0xdead") + head.Hash = common.HexToHash("0xdead") + return head, nil + }) + }, + ExpectedError: nil, + ExpectedHead: &evmtypes.Head{ + Number: 20, + Hash: common.HexToHash("0xdead"), + ParentHash: common.HexToHash("0xdead"), + }, + }, + { + Name: "Sparse DB blocks - LCA found successfully", + CurrentBlockNumber: 100, + DBLatestFinalized: 10, + DBBlocks: []int64{90, 80, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() > 21 { + head.ParentHash = common.HexToHash("0xdead") + return head, nil + } + return head, nil + }) + }, + ExpectedError: nil, + ExpectedHead: newHead(21), + }, + { + Name: "Child of latest finalized is not canonical - finality violation", + CurrentBlockNumber: 100, + DBLatestFinalized: 80, + DBBlocks: []int64{90, 80, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() > 80 { + head.ParentHash = common.HexToHash("0xdead") + return head, nil + } + return head, nil + }) + }, + ExpectedError: commontypes.ErrFinalityViolated, + ExpectedHead: nil, + }, + { + // Such case is possible, since DBLatestFinalized is defined by FinalizedBlockNumber of the latest block. + Name: "Latest finalized DB block is in canonical but much older than DBLatestFinalized", + CurrentBlockNumber: 100, + DBLatestFinalized: 80, + DBBlocks: []int64{90, 70, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() > 80 { + head.ParentHash = common.HexToHash("0xdead") + return head, nil + } + return head, nil + }) + }, + ExpectedError: nil, + ExpectedHead: newHead(71), + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + db := testutils.NewSqlxDB(t) + lggr, _ := logger.TestObserved(t, zapcore.DebugLevel) + orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr) + headTracker := headstest.NewTracker[*evmtypes.Head, common.Hash](t) + ec := clienttest.NewClient(t) + ctx := testutils.Context(t) + for _, blockNum := range tc.DBBlocks { + hash := common.BigToHash(big.NewInt(blockNum)) + require.NoError(t, orm.InsertBlock(ctx, hash, blockNum, time.Now(), blockNum, blockNum)) + } + if tc.Setup != nil { + tc.Setup(t, ec) + } + lpOpts := Opts{ + PollPeriod: time.Second, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + BackupPollerBlockDelay: 0, + } + lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + blockAfterLCA, err := lp.findBlockAfterLCA(ctx, tc.CurrentBlockNumber, tc.DBLatestFinalized) + if tc.ExpectedError != nil { + require.ErrorContains(t, err, tc.ExpectedError.Error()) + require.Nil(t, blockAfterLCA) + } else { + require.NoError(t, err) + require.Equal(t, tc.ExpectedHead, blockAfterLCA) + } + }) + } +} + func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) lpOpts := Opts{ diff --git a/pkg/logpoller/observability.go b/pkg/logpoller/observability.go index ffa1b47456..a1d4fdd3a3 100644 --- a/pkg/logpoller/observability.go +++ b/pkg/logpoller/observability.go @@ -138,6 +138,12 @@ func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumb }) } +func (o *ObservedORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) { + return withObservedQuery(ctx, o, "SelectNewestBlock", func() (*Block, error) { + return o.ORM.SelectNewestBlock(ctx, maxAllowedBlockNumber) + }) +} + func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) { return withObservedQuery(ctx, o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 012c450f3a..f0792ea309 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -43,6 +43,7 @@ type ORM interface { SelectBlockByHash(ctx context.Context, hash common.Hash) (*Block, error) SelectLatestBlock(ctx context.Context) (*Block, error) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error) + SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) SelectLatestFinalizedBlock(ctx context.Context) (*Block, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) @@ -295,6 +296,17 @@ func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int return &b, nil } +func (o *DSORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) { + var b Block + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= $2 ORDER BY block_number DESC LIMIT 1`), + sqlutil.New(o.chainID), maxAllowedBlockNumber, + ); err != nil { + return nil, err + } + return &b, nil +} + func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withConfs(confs). From cd25239e9ab96db052a0ceccdb91547d7adc10e2 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 27 Feb 2026 17:15:15 +0100 Subject: [PATCH 11/13] Add feature flag to disable/enable sparse blocks storage --- CONFIG.md | 8 + pkg/chains/legacyevm/chain.go | 1 + pkg/config/chain_scoped.go | 4 + pkg/config/config.go | 1 + pkg/config/mocks/evm.go | 45 ++++++ pkg/config/toml/config.go | 1 + pkg/config/toml/config_test.go | 1 + pkg/config/toml/defaults.go | 3 + pkg/config/toml/defaults/fallback.toml | 1 + pkg/config/toml/docs.toml | 3 + pkg/config/toml/testdata/config-full.toml | 1 + pkg/logpoller/log_poller.go | 17 +- pkg/logpoller/log_poller_internal_test.go | 41 +++-- pkg/logpoller/log_poller_test.go | 183 ++++++++++++---------- 14 files changed, 212 insertions(+), 98 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index b5eaaf1354..627a296ac0 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -28,6 +28,7 @@ FlagsContractAddress = '0xae4E781a6218A8031764928E88d457937A954fC3' # Example LinkContractAddress = '0x538aAaB4ea120b2bC2fe5D296852D948F07D849e' # Example LogBackfillBatchSize = 1000 # Default LogPollInterval = '15s' # Default +LogPollerSkipEmptyBlocks = false # Default LogKeepBlocksDepth = 100000 # Default LogPrunePageSize = 0 # Default BackupLogPollerBlockDelay = 100 # Default @@ -172,6 +173,13 @@ LogPollInterval = '15s' # Default ``` LogPollInterval works in conjunction with Feature.LogPoller. Controls how frequently the log poller polls for logs. Defaults to the block production rate. +### LogPollerSkipEmptyBlocks +:warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._ +```toml +LogPollerSkipEmptyBlocks = false # Default +``` +LogPollerSkipEmptyBlocks works in conjunction with Feature.LogPoller. Controls whether the log poller skips blocks with no logs. Setting this to true can reduce DB load, however it can affect product performance. Consult with specific Chainlink product team before changing this setting. + ### LogKeepBlocksDepth :warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._ ```toml diff --git a/pkg/chains/legacyevm/chain.go b/pkg/chains/legacyevm/chain.go index ac93410a82..59e6d13863 100644 --- a/pkg/chains/legacyevm/chain.go +++ b/pkg/chains/legacyevm/chain.go @@ -244,6 +244,7 @@ func newChain(cfg *config.ChainScoped, nodes []*toml.Node, opts ChainRelayOpts, LogPrunePageSize: int64(cfg.EVM().LogPrunePageSize()), BackupPollerBlockDelay: int64(cfg.EVM().BackupLogPollerBlockDelay()), ClientErrors: cfg.EVM().NodePool().Errors(), + SkipEmptyBlocks: cfg.EVM().LogPollerSkipEmptyBlocks(), } lpORM, err := logpoller.NewObservedORM(chainID, opts.DS, l) diff --git a/pkg/config/chain_scoped.go b/pkg/config/chain_scoped.go index 9abfa057a8..d8705e3258 100644 --- a/pkg/config/chain_scoped.go +++ b/pkg/config/chain_scoped.go @@ -95,6 +95,10 @@ func (e *EVMConfig) LogPollInterval() time.Duration { return e.C.LogPollInterval.Duration() } +func (e *EVMConfig) LogPollerSkipEmptyBlocks() bool { + return *e.C.LogPollerSkipEmptyBlocks +} + func (e *EVMConfig) FinalityDepth() uint32 { return *e.C.FinalityDepth } diff --git a/pkg/config/config.go b/pkg/config/config.go index 9593c96d96..1cc0419bcd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -42,6 +42,7 @@ type EVM interface { LogKeepBlocksDepth() uint32 BackupLogPollerBlockDelay() uint64 LogPollInterval() time.Duration + LogPollerSkipEmptyBlocks() bool LogPrunePageSize() uint32 MinContractPayment() *commonassets.Link MinIncomingConfirmations() uint32 diff --git a/pkg/config/mocks/evm.go b/pkg/config/mocks/evm.go index e5ae1af545..cf93191433 100644 --- a/pkg/config/mocks/evm.go +++ b/pkg/config/mocks/evm.go @@ -982,6 +982,51 @@ func (_c *EVM_LogPollInterval_Call) RunAndReturn(run func() time.Duration) *EVM_ return _c } +// LogPollerSkipEmptyBlocks provides a mock function with no fields +func (_m *EVM) LogPollerSkipEmptyBlocks() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for LogPollerSkipEmptyBlocks") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// EVM_LogPollerSkipEmptyBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LogPollerSkipEmptyBlocks' +type EVM_LogPollerSkipEmptyBlocks_Call struct { + *mock.Call +} + +// LogPollerSkipEmptyBlocks is a helper method to define mock.On call +func (_e *EVM_Expecter) LogPollerSkipEmptyBlocks() *EVM_LogPollerSkipEmptyBlocks_Call { + return &EVM_LogPollerSkipEmptyBlocks_Call{Call: _e.mock.On("LogPollerSkipEmptyBlocks")} +} + +func (_c *EVM_LogPollerSkipEmptyBlocks_Call) Run(run func()) *EVM_LogPollerSkipEmptyBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *EVM_LogPollerSkipEmptyBlocks_Call) Return(_a0 bool) *EVM_LogPollerSkipEmptyBlocks_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EVM_LogPollerSkipEmptyBlocks_Call) RunAndReturn(run func() bool) *EVM_LogPollerSkipEmptyBlocks_Call { + _c.Call.Return(run) + return _c +} + // LogPrunePageSize provides a mock function with no fields func (_m *EVM) LogPrunePageSize() uint32 { ret := _m.Called() diff --git a/pkg/config/toml/config.go b/pkg/config/toml/config.go index 6e9cbf4ee3..f3cc3e30b0 100644 --- a/pkg/config/toml/config.go +++ b/pkg/config/toml/config.go @@ -403,6 +403,7 @@ type Chain struct { LinkContractAddress *types.EIP55Address LogBackfillBatchSize *uint32 LogPollInterval *commonconfig.Duration + LogPollerSkipEmptyBlocks *bool LogKeepBlocksDepth *uint32 LogPrunePageSize *uint32 BackupLogPollerBlockDelay *uint64 diff --git a/pkg/config/toml/config_test.go b/pkg/config/toml/config_test.go index 47328b9dda..53a0a581a0 100644 --- a/pkg/config/toml/config_test.go +++ b/pkg/config/toml/config_test.go @@ -252,6 +252,7 @@ var fullConfig = EVMConfig{ LinkContractAddress: ptr(types.MustEIP55Address("0x538aAaB4ea120b2bC2fe5D296852D948F07D849e")), LogBackfillBatchSize: ptr[uint32](17), LogPollInterval: config.MustNewDuration(time.Minute), + LogPollerSkipEmptyBlocks: ptr(false), LogKeepBlocksDepth: ptr[uint32](100000), LogPrunePageSize: ptr[uint32](0), BackupLogPollerBlockDelay: ptr[uint64](532), diff --git a/pkg/config/toml/defaults.go b/pkg/config/toml/defaults.go index dced1aed85..835b1b4a6f 100644 --- a/pkg/config/toml/defaults.go +++ b/pkg/config/toml/defaults.go @@ -229,6 +229,9 @@ func (c *Chain) SetFrom(f *Chain) { if v := f.LogPollInterval; v != nil { c.LogPollInterval = v } + if v := f.LogPollerSkipEmptyBlocks; v != nil { + c.LogPollerSkipEmptyBlocks = v + } if v := f.LogKeepBlocksDepth; v != nil { c.LogKeepBlocksDepth = v } diff --git a/pkg/config/toml/defaults/fallback.toml b/pkg/config/toml/defaults/fallback.toml index 7d8ea77644..64afbf8842 100644 --- a/pkg/config/toml/defaults/fallback.toml +++ b/pkg/config/toml/defaults/fallback.toml @@ -7,6 +7,7 @@ FinalityTagEnabled = false SafeTagSupported = true LogBackfillBatchSize = 1000 LogPollInterval = '15s' +LogPollerSkipEmptyBlocks = false LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 BackupLogPollerBlockDelay = 100 diff --git a/pkg/config/toml/docs.toml b/pkg/config/toml/docs.toml index 72947f8394..3633ba28b7 100644 --- a/pkg/config/toml/docs.toml +++ b/pkg/config/toml/docs.toml @@ -71,6 +71,9 @@ LogBackfillBatchSize = 1000 # Default # LogPollInterval works in conjunction with Feature.LogPoller. Controls how frequently the log poller polls for logs. Defaults to the block production rate. LogPollInterval = '15s' # Default # **ADVANCED** +# LogPollerSkipEmptyBlocks works in conjunction with Feature.LogPoller. Controls whether the log poller skips blocks with no logs. Setting this to true can reduce DB load, however it can affect product performance. Consult with specific Chainlink product team before changing this setting. +LogPollerSkipEmptyBlocks = false # Default +# **ADVANCED** # LogKeepBlocksDepth works in conjunction with Feature.LogPoller. Controls how many blocks the poller will keep, must be greater than FinalityDepth+1. LogKeepBlocksDepth = 100000 # Default # **ADVANCED** diff --git a/pkg/config/toml/testdata/config-full.toml b/pkg/config/toml/testdata/config-full.toml index 0284665631..b800d4f193 100644 --- a/pkg/config/toml/testdata/config-full.toml +++ b/pkg/config/toml/testdata/config-full.toml @@ -12,6 +12,7 @@ FlagsContractAddress = '0xae4E781a6218A8031764928E88d457937A954fC3' LinkContractAddress = '0x538aAaB4ea120b2bC2fe5D296852D948F07D849e' LogBackfillBatchSize = 17 LogPollInterval = '1m0s' +LogPollerSkipEmptyBlocks = false LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 BackupLogPollerBlockDelay = 532 diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index ee6102422f..5cca0df580 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -114,11 +114,12 @@ type logPoller struct { latencyMonitor LatencyMonitor lggr logger.SugaredLogger pollPeriod time.Duration // poll period set by block production rate - useFinalityTag bool // indicates whether logPoller should use chain's finality or pick a fixed depth for finality - finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized. If `useFinalityTag` is set to true, this value is ignored, because finalityDepth is fetched from chain - keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database - backfillBatchSize int64 // batch size to use when backfilling finalized logs - rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks + skipEmptyBlocks bool + useFinalityTag bool // indicates whether logPoller should use chain's finality or pick a fixed depth for finality + finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized. If `useFinalityTag` is set to true, this value is ignored, because finalityDepth is fetched from chain + keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database + backfillBatchSize int64 // batch size to use when backfilling finalized logs + rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks logPrunePageSize int64 clientErrors config.ClientErrors backupPollerNextBlock int64 // next block to be processed by Backup LogPoller @@ -154,6 +155,7 @@ type Opts struct { BackupPollerBlockDelay int64 LogPrunePageSize int64 ClientErrors config.ClientErrors + SkipEmptyBlocks bool } // NewLogPoller creates a log poller. Note there is an assumption @@ -177,6 +179,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke replayStart: make(chan int64), replayComplete: make(chan error), pollPeriod: opts.PollPeriod, + skipEmptyBlocks: opts.SkipEmptyBlocks, backupPollerBlockDelay: opts.BackupPollerBlockDelay, finalityDepth: opts.FinalityDepth, useFinalityTag: opts.UseFinalityTag, @@ -1256,8 +1259,8 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty SafeBlockNumber: safe, } logs = append(logs, convertLogs(rpcLogs, []Block{*block}, lp.lggr, lp.ec.ConfiguredChainID())...) - // Always save the block with logs, to know an impact of finality violation and for better observability. - if len(rpcLogs) > 0 { + // Skip empty blocks if configured to do so. + if len(rpcLogs) > 0 || !lp.skipEmptyBlocks { blocks = append(blocks, *block) } diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 21e4763d8c..832572b1d3 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -959,6 +959,37 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { } func Test_FindBlockAfterLCA(t *testing.T) { + testCases := []struct { + Name string + SkipEmptyBlocks bool + }{ + { + Name: "Skip empty block", + SkipEmptyBlocks: true, + }, + { + Name: "Don't skip empty block", + SkipEmptyBlocks: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + lpOpts := Opts{ + PollPeriod: time.Second, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + BackupPollerBlockDelay: 0, + SkipEmptyBlocks: tc.SkipEmptyBlocks, + } + testFindBlockAfterLCA(t, lpOpts) + }) + } +} + +func testFindBlockAfterLCA(t *testing.T, opts Opts) { testCases := []struct { Name string CurrentBlockNumber int64 @@ -1133,15 +1164,7 @@ func Test_FindBlockAfterLCA(t *testing.T) { if tc.Setup != nil { tc.Setup(t, ec) } - lpOpts := Opts{ - PollPeriod: time.Second, - FinalityDepth: 3, - BackfillBatchSize: 3, - RPCBatchSize: 3, - KeepFinalizedBlocksDepth: 20, - BackupPollerBlockDelay: 0, - } - lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + lp := NewLogPoller(orm, ec, lggr, headTracker, opts) blockAfterLCA, err := lp.findBlockAfterLCA(ctx, tc.CurrentBlockNumber, tc.DBLatestFinalized) if tc.ExpectedError != nil { require.ErrorContains(t, err, tc.ExpectedError.Error()) diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 6792db5b31..27f72987e8 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -676,93 +676,112 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { func TestLogPoller_SynchronizedWithGeth(t *testing.T) { t.Parallel() - // The log poller's blocks table should remain synchronized - // with the canonical chain of geth's despite arbitrary mixes of mining and reorgs. - testParams := gopter.DefaultTestParameters() - testParams.MinSuccessfulTests = 100 - p := gopter.NewProperties(testParams) - numChainInserts := 3 - finalityDepth := 5 - lggr := logger.Test(t) - db := testutils.NewSqlxDB(t) + testCases := []struct { + name string + skipEmptyBlocks bool + }{ + { + name: "with empty blocks", + skipEmptyBlocks: false, + }, + { + name: "without empty blocks", + skipEmptyBlocks: true, + }, + } - owner := testutils.MustNewSimTransactor(t) - owner.GasPrice = big.NewInt(10e9) - p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool { - // After the set of reorgs, we should have the same canonical blocks that geth does. - seed := time.Now().UnixNano() - localRand := rand.New(rand.NewSource(seed)) - t.Log("Starting test", mineOrReorg, "seed", seed) - chainID := testutils.NewRandomEVMChainID() - // Set up a test chain with a log emitting contract deployed. - orm := logpoller.NewORM(chainID, db, lggr) - // Note this property test is run concurrently and the sim is not threadsafe. - backend := simulated.NewBackend(types.GenesisAlloc{ - owner.From: { - Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)), - }, - }, simulated.WithBlockGasLimit(10e6)) - ec := backend.Client() - _, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) - require.NoError(t, err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // The log poller's blocks table should remain synchronized + // with the canonical chain of geth's despite arbitrary mixes of mining and reorgs. + testParams := gopter.DefaultTestParameters() + testParams.MinSuccessfulTests = 100 + p := gopter.NewProperties(testParams) + numChainInserts := 3 + finalityDepth := 5 + lggr := logger.Test(t) + db := testutils.NewSqlxDB(t) + + owner := testutils.MustNewSimTransactor(t) + owner.GasPrice = big.NewInt(10e9) + p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool { + // After the set of reorgs, we should have the same canonical blocks that geth does. + seed := time.Now().UnixNano() + localRand := rand.New(rand.NewSource(seed)) + t.Log("Starting test", mineOrReorg, "seed", seed) + chainID := testutils.NewRandomEVMChainID() + // Set up a test chain with a log emitting contract deployed. + orm := logpoller.NewORM(chainID, db, lggr) + // Note this property test is run concurrently and the sim is not threadsafe. + backend := simulated.NewBackend(types.GenesisAlloc{ + owner.From: { + Balance: big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)), + }, + }, simulated.WithBlockGasLimit(10e6)) + ec := backend.Client() + _, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) + require.NoError(t, err) - lpOpts := logpoller.Opts{ - PollPeriod: 15 * time.Second, - FinalityDepth: int64(finalityDepth), - BackfillBatchSize: 3, - RPCBatchSize: 2, - KeepFinalizedBlocksDepth: 1000, - } - simulatedClient := client.NewSimulatedBackendClient(t, backend, chainID) - ht := headstest.NewSimulatedHeadTracker(simulatedClient, lpOpts.UseFinalityTag, lpOpts.FinalityDepth) - lp := logpoller.NewLogPoller(orm, simulatedClient, lggr, ht, lpOpts) - for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. - backend.Commit() - } - currentBlockNumber := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) - currentBlock, err := lp.LatestBlock(testutils.Context(t)) - require.NoError(t, err) - if !checkDBMatchesGeth(t, orm, simulatedClient) { - return false - } - // Randomly pick to mine or reorg - for i := 0; i < numChainInserts; i++ { - if localRand.Int63()%2 == 0 { - // Mine blocks - for j := 0; j < int(mineOrReorg[i]); j++ { - backend.Commit() - latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) - require.NoError(t, err1) - t.Log("mined block", latest.Hash()) + lpOpts := logpoller.Opts{ + PollPeriod: 15 * time.Second, + FinalityDepth: int64(finalityDepth), + BackfillBatchSize: 3, + RPCBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + SkipEmptyBlocks: tc.skipEmptyBlocks, } - } else { - // Reorg blocks - latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) - require.NoError(t, err1) - reorgedBlock := big.NewInt(0).Sub(latest.Number(), big.NewInt(int64(mineOrReorg[i]))) - reorg, err1 := ec.BlockByNumber(testutils.Context(t), reorgedBlock) - require.NoError(t, err1) - require.NoError(t, backend.Fork(reorg.Hash())) - - t.Logf("Reorging from (%v, %x) back to (%v, %x)\n", latest.NumberU64(), latest.Hash(), reorgedBlock.Uint64(), reorg.Hash()) - // Actually need to change the block here to trigger the reorg. - _, err1 = emitter1.EmitLog1(owner, []*big.Int{big.NewInt(1)}) - require.NoError(t, err1) - for j := 0; j < int(mineOrReorg[i]+1); j++ { // Need +1 to make it actually longer height so we detect it. + simulatedClient := client.NewSimulatedBackendClient(t, backend, chainID) + ht := headstest.NewSimulatedHeadTracker(simulatedClient, lpOpts.UseFinalityTag, lpOpts.FinalityDepth) + lp := logpoller.NewLogPoller(orm, simulatedClient, lggr, ht, lpOpts) + for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. backend.Commit() } - latest, err1 = ec.BlockByNumber(testutils.Context(t), nil) - require.NoError(t, err1) - t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) - } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false) - currentBlock, err = lp.LatestBlock(testutils.Context(t)) - require.NoError(t, err) - } - return checkDBMatchesGeth(t, orm, simulatedClient) - }, gen.SliceOfN(numChainInserts, gen.UInt64Range(1, uint64(finalityDepth-1))))) // Max reorg depth is finality depth - 1 - p.TestingRun(t) + currentBlockNumber := int64(1) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) + currentBlock, err := lp.LatestBlock(testutils.Context(t)) + require.NoError(t, err) + if !checkDBMatchesGeth(t, orm, simulatedClient) { + return false + } + // Randomly pick to mine or reorg + for i := 0; i < numChainInserts; i++ { + if localRand.Int63()%2 == 0 { + // Mine blocks + for j := 0; j < int(mineOrReorg[i]); j++ { + backend.Commit() + latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) + require.NoError(t, err1) + t.Log("mined block", latest.Hash()) + } + } else { + // Reorg blocks + latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) + require.NoError(t, err1) + reorgedBlock := big.NewInt(0).Sub(latest.Number(), big.NewInt(int64(mineOrReorg[i]))) + reorg, err1 := ec.BlockByNumber(testutils.Context(t), reorgedBlock) + require.NoError(t, err1) + require.NoError(t, backend.Fork(reorg.Hash())) + + t.Logf("Reorging from (%v, %x) back to (%v, %x)\n", latest.NumberU64(), latest.Hash(), reorgedBlock.Uint64(), reorg.Hash()) + // Actually need to change the block here to trigger the reorg. + _, err1 = emitter1.EmitLog1(owner, []*big.Int{big.NewInt(1)}) + require.NoError(t, err1) + for j := 0; j < int(mineOrReorg[i]+1); j++ { // Need +1 to make it actually longer height so we detect it. + backend.Commit() + } + latest, err1 = ec.BlockByNumber(testutils.Context(t), nil) + require.NoError(t, err1) + t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) + } + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false) + currentBlock, err = lp.LatestBlock(testutils.Context(t)) + require.NoError(t, err) + } + return checkDBMatchesGeth(t, orm, simulatedClient) + }, gen.SliceOfN(numChainInserts, gen.UInt64Range(1, uint64(finalityDepth-1))))) // Max reorg depth is finality depth - 1 + p.TestingRun(t) + }) + } } func TestLogPoller_PollAndSaveLogs(t *testing.T) { From f077932696ea6debdd5f6e44a32dcccf4aa3620d Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 24 Apr 2026 17:53:48 +0200 Subject: [PATCH 12/13] Fix merge issue --- pkg/logpoller/log_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index b209ad5ea8..7af48aa173 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1263,7 +1263,7 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty return blocks, logs, fmt.Errorf("unable to query for logs: %w", err) } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) - block := Block{ + block = &Block{ BlockHash: h, BlockNumber: currentBlock.Number, BlockTimestamp: currentBlock.Timestamp, From 52e95aac8c0bbbc663671cf05ba3616f38137604 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 24 Apr 2026 18:18:04 +0200 Subject: [PATCH 13/13] Fix linter issues --- pkg/logpoller/log_poller_internal_test.go | 1 - pkg/logpoller/log_poller_test.go | 20 +++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 832572b1d3..beddeba3b9 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -1148,7 +1148,6 @@ func testFindBlockAfterLCA(t *testing.T, opts Opts) { }, } for _, tc := range testCases { - tc := tc t.Run(tc.Name, func(t *testing.T) { t.Parallel() db := testutils.NewSqlxDB(t) diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 0c115d8912..c4e39cd165 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -747,7 +747,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { for i := 0; i < numChainInserts; i++ { if localRand.Int63()%2 == 0 { // Mine blocks - for j := 0; j < int(mineOrReorg[i]); j++ { + for j := uint64(0); j < mineOrReorg[i]; j++ { backend.Commit() latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) require.NoError(t, err1) @@ -757,7 +757,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { // Reorg blocks latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) require.NoError(t, err1) - reorgedBlock := big.NewInt(0).Sub(latest.Number(), big.NewInt(int64(mineOrReorg[i]))) + reorgedBlock := big.NewInt(0).Sub(latest.Number(), big.NewInt(0).SetUint64(mineOrReorg[i])) reorg, err1 := ec.BlockByNumber(testutils.Context(t), reorgedBlock) require.NoError(t, err1) require.NoError(t, backend.Fork(reorg.Hash())) @@ -766,7 +766,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { // Actually need to change the block here to trigger the reorg. _, err1 = emitter1.EmitLog1(owner, []*big.Int{big.NewInt(1)}) require.NoError(t, err1) - for j := 0; j < int(mineOrReorg[i]+1); j++ { // Need +1 to make it actually longer height so we detect it. + for j := uint64(0); j < mineOrReorg[i]+1; j++ { // Need +1 to make it actually longer height so we detect it. backend.Commit() } latest, err1 = ec.BlockByNumber(testutils.Context(t), nil) @@ -778,6 +778,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err) } return checkDBMatchesGeth(t, orm, simulatedClient) + // #nosec G115 // finalityDepth-1 always >= 0 }, gen.SliceOfN(numChainInserts, gen.UInt64Range(1, uint64(finalityDepth-1))))) // Max reorg depth is finality depth - 1 p.TestingRun(t) }) @@ -2265,14 +2266,15 @@ func requireDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Clie func checkDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Client) bool { // Check every block is identical - latest, err1 := client.HeadByNumber(testutils.Context(t), nil) - require.NoError(t, err1) + latest, err := client.HeadByNumber(testutils.Context(t), nil) + require.NoError(t, err) dbBlocks, err := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) require.NoError(t, err) // ensure all blocks present in db are on geth canonical chain for _, ourBlock := range dbBlocks { - gethBlock, err1 := client.HeadByNumber(testutils.Context(t), big.NewInt(ourBlock.BlockNumber)) - require.NoError(t, err1) + var gethBlock *evmtypes.Head + gethBlock, err = client.HeadByNumber(testutils.Context(t), big.NewInt(ourBlock.BlockNumber)) + require.NoError(t, err) if ourBlock.BlockHash != gethBlock.Hash { t.Logf("Initial poll our block differs at height %d got %x want %x\n", ourBlock.BlockNumber, ourBlock.BlockHash, gethBlock.Hash) return false @@ -2284,8 +2286,8 @@ func checkDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Client require.Equal(t, latest.Number, latestDB.BlockNumber, "latest block number in db should match geth") // ensure all logs present in db are on geth canonical chain - logs, err1 := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) - require.NoError(t, err1) + logs, err := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) + require.NoError(t, err) for _, log := range logs { gethBlock, err1 := client.HeadByNumber(testutils.Context(t), big.NewInt(log.BlockNumber)) require.NoError(t, err1)