From b65f4ab228d8fe5848e6219c87c4a9f77d2680ee Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 13:51:16 +0100 Subject: [PATCH 01/10] Test: reorg during replay --- pkg/logpoller/log_poller_test.go | 110 +++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 9e3571e904..739ac64ea2 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -2155,3 +2155,113 @@ func TestWhere(t *testing.T) { assert.Equal(t, []query.Expression{}, result) }) } + +func TestLogPoller_Reorg_On_Replay(t *testing.T) { + // TestCase: + // 1. LogPoller processes blocks to block 11 + // 2. Reorg replaces block 11 with a new block (some additional blocks may be added on top of it) + // 3. Replay is initiated from block below 11. + // Expected behaviour: + // 1. LogPoller should replace reorged block 11 with a new data. + // 2. DB must not contain at any point logs from both old and new block 11. + // 3. Finality Violation must not occur, since chain did not violate finality depth. + t.Parallel() + const reorgedBlockNumber = 11 + testCases := []struct { + name string + numberOfBlocksAfterReorg int + }{ + { + name: "Replay start right after reorg", + numberOfBlocksAfterReorg: 0, + }, + { + name: "Replay start a few blocks after reorg", + numberOfBlocksAfterReorg: 1, + }, + { + name: "Replay start once reorged block is finalized", + numberOfBlocksAfterReorg: 5, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lpOpts := logpoller.Opts{ + PollPeriod: 24 * time.Hour, // effectively disable automatic polling, so we can control when we poll in the test + UseFinalityTag: false, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) + + // Set up a log poller listening for log emitter logs. + err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + Name: "Test Emitter 1", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + }) + require.NoError(t, err) + + // populate chain with data + for range reorgedBlockNumber - 1 { + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(1))}) + require.NoError(t, err) + th.Backend.Commit() + } + + // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long. + require.NoError(t, th.LogPoller.Start(t.Context())) + defer func() { + require.NoError(t, th.LogPoller.Close()) + }() + testutils.RequireEventually(t, func() bool { + latest, err := th.LogPoller.LatestBlock(t.Context()) + return err == nil && latest.BlockNumber == reorgedBlockNumber + }) + + reorgedBlock, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) + + // Replace block 11 with a new block and burry it under 1 new block + require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) + const newLogData = int64(123) + // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it + for range tc.numberOfBlocksAfterReorg + 1 { + // emit log that is not tracked by LP to ensure that tracked log has a different index. + // So if reorg is not properly handled and both logs end up in the database + _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) + require.NoError(t, err) + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + th.Backend.Commit() + } + + newReorgedBlock, err := th.Client.BlockByNumber(t.Context(), big.NewInt(reorgedBlockNumber)) + require.NoError(t, err) + require.NotEqual(t, reorgedBlock.Hash().String(), newReorgedBlock.Hash().String()) + + latest, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), latest.Number().Int64()) + + // Trigger replay, which should gracefully handle the reorg and end up on the new latest block + err = th.LogPoller.Replay(t.Context(), 5) + require.NoError(t, err) + // LP should be on latest block now + lpLatest, err := th.LogPoller.LatestBlock(t.Context()) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), lpLatest.BlockNumber) + logs, err := th.ORM.SelectLogsByBlockRange(t.Context(), reorgedBlockNumber, reorgedBlockNumber) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, newLogData, big.NewInt(0).SetBytes(logs[0].Data).Int64(), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay") + // Ensure reorged block was replaced by a new one + dbBlock, err := th.ORM.SelectBlockByNumber(testutils.Context(t), reorgedBlock.Number().Int64()) + require.NoError(t, err) + require.Equal(t, reorgedBlock.Number().Int64(), dbBlock.BlockNumber) + require.NotEqual(t, reorgedBlock.Hash(), dbBlock.BlockHash) + }) + } +} From 64e2c6375f91e599d40d64c730c81de076fee568 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 14:37:28 +0100 Subject: [PATCH 02/10] Fix reorg handling during replay --- pkg/logpoller/helper_test.go | 2 +- pkg/logpoller/log_poller.go | 163 +++++++++++++++------- pkg/logpoller/log_poller_internal_test.go | 12 +- pkg/logpoller/log_poller_test.go | 14 +- 4 files changed, 130 insertions(+), 61 deletions(-) diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index e4f6a0f745..cb79fb721c 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -164,7 +164,7 @@ func (th *TestHarness) AdjustTime(t *testing.T, d time.Duration) { } func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 { - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) latest, _ := th.LogPoller.LatestBlock(ctx) return latest.BlockNumber + 1 } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 91d7dfde68..47fb6fc12c 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -77,7 +77,7 @@ type LogPoller interface { type LogPollerTest interface { LogPoller - PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) + PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) BackupPollAndSaveLogs(ctx context.Context) error Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) @@ -673,7 +673,7 @@ func (lp *logPoller) run() { } else { start = lastProcessed.BlockNumber + 1 } - lp.PollAndSaveLogs(ctx, start) + lp.PollAndSaveLogs(ctx, start, false) case <-backupLogPollTicker.C: if lp.backupPollerBlockDelay == 0 { continue // backup poller is disabled @@ -783,7 +783,7 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64 if err == nil { // Serially process replay requests. lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) - lp.PollAndSaveLogs(ctx, fromBlock) + lp.PollAndSaveLogs(ctx, fromBlock, true) lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) } } else { @@ -965,6 +965,26 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { return nil } +func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { + // If we don't have the current block already, lets get it. + header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) + if err != nil { + lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", blockNumber, err) + } + // Additional sanity checks, don't necessarily trust the RPC. + if header == nil { + lp.lggr.Errorw("Unexpected nil block from RPC", "blockNumber", blockNumber) + return nil, fmt.Errorf("got nil block for %d", blockNumber) + } + if header.Number != blockNumber { + lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "blockNumber", blockNumber, "got", header.Number) + return nil, fmt.Errorf("block mismatch have %d want %d", header.Number, blockNumber) + } + + return header, nil +} + // getCurrentBlockMaybeHandleReorg accepts a block number // and will return that block if its parent points to our last saved block. // One can optionally pass the block header if it has already been queried to avoid an extra RPC call. @@ -973,23 +993,12 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { // 1. Find the LCA by following parent hashes. // 2. Delete all logs and blocks after the LCA // 3. Return the LCA+1, i.e. our new current (unprocessed) block. -func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) { +func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) { var err1 error if currentBlock == nil { - // If we don't have the current block already, lets get it. - currentBlock, err1 = lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(currentBlockNumber)) + currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber) if err1 != nil { - lp.lggr.Warnw("Unable to get currentBlock", "err", err1, "currentBlockNumber", currentBlockNumber) - return nil, err1 - } - // Additional sanity checks, don't necessarily trust the RPC. - if currentBlock == nil { - lp.lggr.Errorw("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber) - return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber) - } - if currentBlock.Number != currentBlockNumber { - lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number) - return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1) } } // Does this currentBlock point to the same parent that we have saved? @@ -1008,39 +1017,99 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren } // Check for reorg. if currentBlock.ParentHash != expectedParent.BlockHash { - // There can be another reorg while we're finding the LCA. - // That is ok, since we'll detect it on the next iteration. - // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. - blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber) - if err2 != nil { - return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) - } - - lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber) - // We truncate all the blocks and logs after the LCA. - // We could preserve the logs for forensics, since its possible - // that applications see them and take action upon it, however that - // results in significantly slower reads since we must then compute - // the canonical set per read. Typically, if an application took action on a log - // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. - // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) - if err2 != nil { - // If we error on db commit, we can't know if the tx went through or not. - // We return an error here which will cause us to restart polling from lastBlockSaved + 1 - return nil, err2 - } - return blockAfterLCA, nil - } - // No reorg, return current block. + return lp.handleReorg(ctx, currentBlock) + } + + if !isReplay { + // During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock. + return currentBlock, nil + } + + // Ensure that if DB contains current block it matches the current block from RPC. + currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to get current block from DB %d: %w", currentBlockNumber, err) + } + + if currentBlockDB != nil && currentBlock.Hash != currentBlockDB.BlockHash { + return lp.handleReorg(ctx, currentBlock) + } + + // No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation. + latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx) + if err1 != nil { + if pkgerrors.Is(err1, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1) + } + return nil, pkgerrors.Wrap(err1, "unable to get latest block") + } + + if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber { + // currentBlock is newest, nothing more to check + return currentBlock, nil + } + + latestBlockRPC, err := lp.headerByNumber(ctx, latestBlockDB.BlockNumber) + if err != nil { + return nil, fmt.Errorf("unable to get latest block header for block number %d: %w", latestBlockDB.BlockNumber, err) + } + + if latestBlockRPC.Hash != latestBlockDB.BlockHash { + // Reorg detected, handle it + lca, err := lp.handleReorg(ctx, latestBlockRPC) + if err != nil { + return nil, fmt.Errorf("failed to handle reorg: %w", err) + } + + if lca.Number < currentBlock.BlockNumber() { + // LCA is older than current block, we need to get the new current block after reorg + return lca, nil + } + } + return currentBlock, nil } +func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Head) (*evmtypes.Head, error) { + // during replay currentBlock may be older than the latest block, thus it's possible to miss finality violation, + // if we use its view on latest finalized block. To be safe, we get the latest block from the db. + latestBlock, err := lp.orm.SelectLatestBlock(ctx) + if err != nil { + if pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when handling reorg, but got no rows", "currentBlockNumber", currentBlock.Number, "err", err) + } + return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db") + } + // There can be another reorg while we're finding the LCA. + // That is ok, since we'll detect it on the next iteration. + // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. + blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber) + if err2 != nil { + return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) + } + + lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlock.Number) + // We truncate all the blocks and logs after the LCA. + // We could preserve the logs for forensics, since its possible + // that applications see them and take action upon it, however that + // results in significantly slower reads since we must then compute + // the canonical set per read. Typically, if an application took action on a log + // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. + // Its also nicely analogous to reading from the chain itself. + err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) + if err2 != nil { + // If we error on db commit, we can't know if the tx went through or not. + // We return an error here which will cause us to restart polling from lastBlockSaved + 1 + return nil, err2 + } + return blockAfterLCA, nil +} + // PollAndSaveLogs On startup/crash current is the first block after the last processed block. // currentBlockNumber is the block from where new logs are to be polled & saved. Under normal // conditions this would be equal to lastProcessed.BlockNumber + 1. -func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) { - err := lp.pollAndSaveLogs(ctx, currentBlockNumber) +func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) { + err := lp.pollAndSaveLogs(ctx, currentBlockNumber, isReplay) if errors.Is(err, commontypes.ErrFinalityViolated) { lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err) lp.finalityViolated.Store(true) @@ -1059,7 +1128,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int } } -func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) { +func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) { lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber) // Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks() // latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration @@ -1089,7 +1158,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int } // Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks. // Returns (currentBlock || LCA+1 if reorg detected, error) - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved and retry. @@ -1115,7 +1184,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int for { if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 1ff3daf6bc..64c3512107 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -380,7 +380,7 @@ func assertBackupPollerStartup(t *testing.T, head *evmtypes.Head, finalizedHead assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) - lp.PollAndSaveLogs(ctx, head.Number) + lp.PollAndSaveLogs(ctx, head.Number, false) lastProcessed, err := lp.orm.SelectLatestBlock(ctx) require.NoError(t, err) @@ -489,7 +489,7 @@ func TestLogPoller_Replay(t *testing.T) { { ctx := testutils.Context(t) // process 1 log in block 3 - lp.PollAndSaveLogs(ctx, 4) + lp.PollAndSaveLogs(ctx, 4, false) latest, err := lp.LatestBlock(ctx) require.NoError(t, err) require.Equal(t, int64(4), latest.BlockNumber) @@ -894,7 +894,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { // insert finalized block with different hash than in RPC require.NoError(t, orm.InsertBlock(t.Context(), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2, 2)) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) { @@ -915,7 +915,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { return evmtypes.Head{Number: num, Hash: utils.NewHash()} }) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Log's hash does not match block's", func(t *testing.T) { @@ -933,7 +933,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Happy path", func(t *testing.T) { @@ -953,7 +953,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.NoError(t, lp.HealthReport()[lp.Name()]) }) } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 739ac64ea2..a3aa3b44cb 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -377,7 +377,7 @@ func Test_BackupLogPoller(t *testing.T) { th.finalizeThroughBlock(t, 64) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) require.NoError(t, th.LogPoller.BackupPollAndSaveLogs(ctx)) currentBlock, err := th.LogPoller.LatestBlock(ctx) require.NoError(t, err) @@ -718,7 +718,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { backend.Commit() } currentBlockNumber := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) currentBlock, err := lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) matchesGeth := func() bool { @@ -770,7 +770,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } @@ -1370,7 +1370,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber)) // after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB - th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1) + th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1, false) block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3) require.NoError(t, err) assert.Equal(t, 3, int(block.BlockNumber)) @@ -1624,7 +1624,7 @@ func TestTooManyLogResults(t *testing.T) { Addresses: []common.Address{addr}, }) require.NoError(t, err) - lp.PollAndSaveLogs(ctx, 5) + lp.PollAndSaveLogs(ctx, 5, false) block, err2 := o.SelectLatestBlock(ctx) require.NoError(t, err2) assert.Equal(t, int64(298), block.BlockNumber) @@ -1656,7 +1656,7 @@ func TestTooManyLogResults(t *testing.T) { return []types.Log{}, tooLargeErr // return "too many results" error if block range spans 4 or more blocks }) - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself @@ -1690,7 +1690,7 @@ func TestTooManyLogResults(t *testing.T) { headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself From ccc00f846a3d24dd26044555b816d2be3551ca5c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 17 Feb 2026 17:57:35 +0100 Subject: [PATCH 03/10] Fix nits --- pkg/logpoller/log_poller.go | 8 +++----- pkg/logpoller/log_poller_test.go | 9 +++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 47fb6fc12c..a64584d969 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -966,7 +966,6 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { - // If we don't have the current block already, lets get it. header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) if err != nil { lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) @@ -1056,14 +1055,13 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren if latestBlockRPC.Hash != latestBlockDB.BlockHash { // Reorg detected, handle it - lca, err := lp.handleReorg(ctx, latestBlockRPC) + blockAfterLCA, err := lp.handleReorg(ctx, latestBlockRPC) if err != nil { return nil, fmt.Errorf("failed to handle reorg: %w", err) } - if lca.Number < currentBlock.BlockNumber() { - // LCA is older than current block, we need to get the new current block after reorg - return lca, nil + if blockAfterLCA.Number < currentBlock.BlockNumber() { + return blockAfterLCA, nil } } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index a3aa3b44cb..818d97a85c 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -2164,7 +2164,7 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { // Expected behaviour: // 1. LogPoller should replace reorged block 11 with a new data. // 2. DB must not contain at any point logs from both old and new block 11. - // 3. Finality Violation must not occur, since chain did not violate finality depth. + // 3. Finality Violation must not occur, since chain did not violate finality depth. t.Parallel() const reorgedBlockNumber = 11 testCases := []struct { @@ -2211,7 +2211,7 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { th.Backend.Commit() } - // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long. + // Start LogPoller and wait for it to complete the first poll. The second poll won't happen until we call Replay, since the poll period is very long. require.NoError(t, th.LogPoller.Start(t.Context())) defer func() { require.NoError(t, th.LogPoller.Close()) @@ -2225,16 +2225,17 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) - // Replace block 11 with a new block and burry it under 1 new block + // Replace block 11 with a new block and bury it under 1 new block require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) const newLogData = int64(123) // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it for range tc.numberOfBlocksAfterReorg + 1 { // emit log that is not tracked by LP to ensure that tracked log has a different index. - // So if reorg is not properly handled and both logs end up in the database + // So if reorg is not properly handled, both logs end up in the database and the test fails. _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) require.NoError(t, err) _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + require.NoError(t, err) th.Backend.Commit() } From a6119cfddb5182e3ea3a1549626abfd2b27dd591 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 19:52:51 +0100 Subject: [PATCH 04/10] ORM methods to batch insert --- pkg/logpoller/helper_test.go | 2 +- pkg/logpoller/log_poller.go | 6 ++--- pkg/logpoller/observability.go | 14 +++++----- pkg/logpoller/observability_test.go | 40 ++++++++++++++++----------- pkg/logpoller/orm.go | 41 +++++++++++++++++++--------- pkg/logpoller/orm_test.go | 42 ++++++++++++++++++++++++----- 6 files changed, 98 insertions(+), 47 deletions(-) diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index cb79fb721c..24c3b072fd 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -182,7 +182,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) { require.NoError(t, err, "block %v", i) chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) require.NoError(t, err) - assert.Equal(t, chainBlk.Hash().Bytes(), blk.BlockHash.Bytes(), "block %v", i) + assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i) } } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index a64584d969..a43cd35681 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -956,7 +956,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } lp.lggr.Debugw("Inserting backfilled logs with batch endblock", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock) + err = lp.orm.InsertLogsWithBlocks(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), []Block{endblock}) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -1206,10 +1206,10 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int FinalizedBlockNumber: latestFinalizedBlockNumber, SafeBlockNumber: safeBlockNumber, } - err = lp.orm.InsertLogsWithBlock( + err = lp.orm.InsertLogsWithBlocks( ctx, convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()), - block, + []Block{block}, ) if err != nil { lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) diff --git a/pkg/logpoller/observability.go b/pkg/logpoller/observability.go index 0cdd040df0..ffa1b47456 100644 --- a/pkg/logpoller/observability.go +++ b/pkg/logpoller/observability.go @@ -52,16 +52,16 @@ func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error { err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error { return o.ORM.InsertLogs(ctx, logs) }) - trackInsertedLogsAndBlock(ctx, o, logs, nil, err) + trackInsertedLogsAndBlocks(ctx, o, logs, nil, err) trackInsertedBlockLatency(ctx, o, logs, err) return err } -func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { +func (o *ObservedORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error { err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error { - return o.ORM.InsertLogsWithBlock(ctx, logs, block) + return o.ORM.InsertLogsWithBlocks(ctx, logs, blocks) }) - trackInsertedLogsAndBlock(ctx, o, logs, &block, err) + trackInsertedLogsAndBlocks(ctx, o, logs, blocks, err) trackInsertedBlockLatency(ctx, o, logs, err) return err } @@ -290,15 +290,15 @@ func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryTy return exec() } -func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) { +func trackInsertedLogsAndBlocks(ctx context.Context, o *ObservedORM, logs []Log, blocks []Block, err error) { if err != nil { return } ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() o.metrics.IncrementLogsInserted(ctx, int64(len(logs))) - if block != nil { - o.metrics.IncrementBlocksInserted(ctx, 1) + if len(blocks) > 0 { + o.metrics.IncrementBlocksInserted(ctx, int64(len(blocks))) } } diff --git a/pkg/logpoller/observability_test.go b/pkg/logpoller/observability_test.go index 3945d52424..7a82906e9f 100644 --- a/pkg/logpoller/observability_test.go +++ b/pkg/logpoller/observability_test.go @@ -41,9 +41,11 @@ func TestMultipleMetricsArePublished(t *testing.T) { _, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(ctx, 0, []common.Address{{}}, []common.Hash{{}}, 1) _, _ = orm.SelectIndexedLogsCreatedAfter(ctx, common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0) _ = orm.InsertLogs(ctx, []Log{}) - _ = orm.InsertLogsWithBlock(ctx, []Log{}, Block{ - BlockNumber: 1, - BlockTimestamp: time.Now(), + _ = orm.InsertLogsWithBlocks(ctx, []Log{}, []Block{ + { + BlockNumber: 1, + BlockTimestamp: time.Now(), + }, }) require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration)) @@ -113,21 +115,25 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency)) // Insert 5 more logs with block - require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 10, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 5, + require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[10:15], []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 10, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 5, + }, })) assert.Equal(t, 15, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 1, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) // Insert 5 more logs with block - require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[15:], Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 15, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 5, + require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[15:], []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 15, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 5, + }, })) assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) @@ -143,9 +149,11 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, network, "420", "DeleteBlocksBefore", "delete")) // Don't update counters in case of an error - require.Error(t, orm.InsertLogsWithBlock(ctx, logs, Block{ - BlockHash: utils.RandomBytes32(), - BlockTimestamp: time.Now(), + require.Error(t, orm.InsertLogsWithBlocks(ctx, logs, []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockTimestamp: time.Now(), + }, })) assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index eaf489ed32..8098ada382 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -24,7 +24,7 @@ import ( // What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM. type ORM interface { InsertLogs(ctx context.Context, logs []Log) error - InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error + InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error InsertFilter(ctx context.Context, filter Filter) error LoadFilters(ctx context.Context) (map[string]Filter, error) @@ -112,6 +112,18 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum return err } +func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error { + const q = `INSERT INTO evm.log_poller_blocks + (evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at, safe_block_number) + VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW(), :safe_block_number) + ON CONFLICT DO NOTHING` + // maintain behaviour of InsertBlock + for i := range blocks { + blocks[i].EVMChainID = ubig.New(o.chainID) + } + return batchInsert(ctx, o.ds, q, blocks, 1000) +} + // InsertFilter is idempotent. // // Each address/event pair must have a unique job id, so it may be removed when the job is deleted. @@ -544,10 +556,10 @@ func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error { }) } -func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { +func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error { // Optimization, don't open TX when there is only a block to be persisted if len(logs) == 0 { - return o.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber) + return o.InsertBlocks(ctx, blocks) } if err := o.validateLogs(logs); err != nil { @@ -556,7 +568,7 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block // Block and logs goes with the same TX to ensure atomicity return o.Transact(ctx, func(orm *DSORM) error { - err := orm.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber) + err := orm.InsertBlocks(ctx, blocks) if err != nil { return err } @@ -565,20 +577,23 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block } func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error { - batchInsertSize := 4000 - for i := 0; i < len(logs); i += batchInsertSize { - start, end := i, i+batchInsertSize - if end > len(logs) { - end = len(logs) - } - - query := `INSERT INTO evm.logs + const q = `INSERT INTO evm.logs (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) VALUES (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING` + return batchInsert(ctx, tx, q, logs, 4000) +} + +func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchSize int) error { + batchInsertSize := 4000 + for i := 0; i < len(objs); i += batchInsertSize { + start, end := i, i+batchInsertSize + if end > len(objs) { + end = len(objs) + } - _, err := tx.NamedExecContext(ctx, query, logs[start:end]) + _, err := ds.NamedExecContext(ctx, query, objs[start:end]) if err != nil { if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { // In case of DB timeouts, try to insert again with a smaller batch upto a limit diff --git a/pkg/logpoller/orm_test.go b/pkg/logpoller/orm_test.go index d4f25c9027..3318e9bb81 100644 --- a/pkg/logpoller/orm_test.go +++ b/pkg/logpoller/orm_test.go @@ -94,6 +94,32 @@ func TestLogPoller_Batching(t *testing.T) { require.Equal(t, len(logs), len(lgs)) } +func TestLogPoller_Blocks_Batching(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + th := SetupTH(t, lpOpts) + var blocks []logpoller.Block + var logs []logpoller.Log + const numBlocks = 2000 + for i := 0; i < numBlocks; i++ { + blockHash := common.HexToHash(fmt.Sprintf("0x%d", i+1)) + blocks = append(blocks, logpoller.Block{ + EVMChainID: ubig.New(th.ChainID), + BlockHash: blockHash, + BlockNumber: int64(i + 1), + }) + logs = append(logs, GenLog(th.ChainID, int64(i), int64(i+1), blockHash.String(), EmitterABI.Events["Log1"].ID.Bytes(), th.EmitterAddress1)) + } + require.NoError(t, th.ORM.InsertLogsWithBlocks(ctx, logs, blocks)) + lgs, err := th.ORM.SelectLogsByBlockRange(ctx, 1, numBlocks) + require.NoError(t, err) + // Make sure all logs are inserted + require.Equal(t, len(logs), len(lgs)) + dbBlocks, err := th.ORM.GetBlocksRange(ctx, 1, numBlocks) + require.NoError(t, err) + require.Equal(t, numBlocks, len(dbBlocks)) +} + func TestORM_GetBlocks_From_Range(t *testing.T) { th := SetupTH(t, lpOpts) o1 := th.ORM @@ -2116,7 +2142,7 @@ func TestInsertLogsWithBlock(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // clean all logs and blocks between test cases defer func() { _ = o.DeleteLogsAndBlocksAfter(ctx, 0) }() - insertError := o.InsertLogsWithBlock(ctx, tt.logs, tt.block) + insertError := o.InsertLogsWithBlocks(ctx, tt.logs, []logpoller.Block{tt.block}) logs, logsErr := o.SelectLogs(ctx, 0, math.MaxInt, address, event) block, blockErr := o.SelectLatestBlock(ctx) @@ -2209,16 +2235,18 @@ func TestSelectLogsDataWordBetween(t *testing.T) { secondLogData = append(secondLogData, logpoller.EvmWord(5).Bytes()...) secondLogData = append(secondLogData, logpoller.EvmWord(20).Bytes()...) - err := th.ORM.InsertLogsWithBlock(ctx, + err := th.ORM.InsertLogsWithBlocks(ctx, []logpoller.Log{ GenLogWithData(th.ChainID, address, eventSig, 1, 1, firstLogData), GenLogWithData(th.ChainID, address, eventSig, 2, 2, secondLogData), }, - logpoller.Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 10, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 1, + []logpoller.Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 10, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 1, + }, }, ) require.NoError(t, err) From 519585159ec1348b3febee68770392e508b97ab3 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 13 Feb 2026 17:10:36 +0100 Subject: [PATCH 05/10] switch to batch logs/blocks inserts --- pkg/logpoller/log_poller.go | 117 +++++++++++++++++++++++-------- pkg/logpoller/log_poller_test.go | 8 ++- 2 files changed, 92 insertions(+), 33 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index a43cd35681..868df6b8f1 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1178,52 +1178,109 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int return fmt.Errorf("failed to backfill finalized logs: %w", err) } currentBlockNumber = lastSafeBackfillBlock + 1 + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) + if err != nil { + // If there's an error handling the reorg, we can't be sure what state the db was left in. + // Resume from the latest block saved. + return fmt.Errorf("failed to get current block: %w", err) + } + currentBlockNumber = currentBlock.Number } for { - if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - return fmt.Errorf("failed to get current block: %w", err) + + blocks, logs, err := lp.getUnfinalizedLogs(ctx, currentBlock, latestBlockNumber, safeBlockNumber, latestFinalizedBlockNumber, isReplay) + // even if we have an error, we may have partial logs and blocks that can be saved, so we save what we have and then retry. + if len(logs) > 0 || len(blocks) > 0 { + lp.lggr.Debugw("Saving logs", "logs", len(logs), "blocks", len(blocks), "currentBlockNumber", currentBlockNumber) + insertErr := lp.orm.InsertLogsWithBlocks(ctx, logs, blocks) + if insertErr != nil { + lp.lggr.Warnw("Unable to save logs, retrying later", "insertErr", insertErr, "block", currentBlockNumber, "err", err) + return nil } - currentBlockNumber = currentBlock.Number } + if err == nil { + lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber) + return nil + } + + var reorgErr *reorgError + if !errors.As(err, &reorgErr) { + return fmt.Errorf("failed to get unfinalized logs: %w", err) + } + + lp.lggr.Warnw("Reorg detected during unfinalized log processing, handling reorg", "err", err, "currentBlockNumber", currentBlockNumber, "lastKnownMatchingHead", reorgErr.ReorgedAt.Number) + currentBlock, err = lp.handleReorg(ctx, reorgErr.ReorgedAt) + if err != nil { + return fmt.Errorf("failed to handle reorg: %w", err) + } + lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number) + } +} + +type reorgError struct { + ReorgedAt *evmtypes.Head +} + +func newReorgError(reorgedAt *evmtypes.Head) error { + return &reorgError{ReorgedAt: reorgedAt} +} + +func (e *reorgError) Error() string { + return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number) +} + +func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { + var logs []Log + var blocks []Block + for { h := currentBlock.Hash - var logs []types.Log - logs, err = lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) + rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) if err != nil { - lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber) - return nil + lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number) + return blocks, logs, nil } - lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) + lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) block := Block{ BlockHash: h, - BlockNumber: currentBlockNumber, + BlockNumber: currentBlock.Number, BlockTimestamp: currentBlock.Timestamp, - FinalizedBlockNumber: latestFinalizedBlockNumber, - SafeBlockNumber: safeBlockNumber, - } - err = lp.orm.InsertLogsWithBlocks( - ctx, - convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()), - []Block{block}, - ) + FinalizedBlockNumber: finalized, + SafeBlockNumber: safe, + } + logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...) + blocks = append(blocks, block) + + if currentBlock.Number >= latest { + return blocks, logs, nil + } + + nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1) if err != nil { - lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) - return nil + lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number) + return blocks, logs, nil } - // Update current block. - // Same reorg detection on unfinalized blocks. - currentBlockNumber++ - if currentBlockNumber > latestBlockNumber { - break + + if nextBlock.ParentHash != currentBlock.Hash { + return blocks, logs, newReorgError(nextBlock) } - } - return nil + if isReplay { + // During replay, we also check if the next block matches what we have in the DB to avoid false positives on reorgs due to finality violation. + nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number) + return blocks, logs, nil + } + + if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash { + return blocks, logs, newReorgError(nextBlock) + } + } + + currentBlock = nextBlock + } } // Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 818d97a85c..72cefae5b0 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -690,7 +690,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { owner.GasPrice = big.NewInt(10e9) p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool { // After the set of reorgs, we should have the same canonical blocks that geth does. - t.Log("Starting test", mineOrReorg) + seed := time.Now().UnixNano() + localRand := rand.New(rand.NewSource(seed)) + t.Log("Starting test", mineOrReorg, "seed", seed) chainID := testutils.NewRandomEVMChainID() // Set up a test chain with a log emitting contract deployed. orm := logpoller.NewORM(chainID, db, lggr) @@ -742,7 +744,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { } // Randomly pick to mine or reorg for i := 0; i < numChainInserts; i++ { - if rand.Int63()%2 == 0 { + if localRand.Int63()%2 == 0 { // Mine blocks for j := 0; j < int(mineOrReorg[i]); j++ { backend.Commit() @@ -770,7 +772,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } From 715d9bad4e67a59beb88726ae87f06b0a6d5c01d Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 17 Feb 2026 18:07:51 +0100 Subject: [PATCH 06/10] nit fixes --- pkg/logpoller/log_poller.go | 6 ++++++ pkg/logpoller/orm.go | 3 +-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 868df6b8f1..ec0d4cfc57 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1232,6 +1232,7 @@ func (e *reorgError) Error() string { } func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { + const maxUnfinalizedBlocks = 2000 var logs []Log var blocks []Block for { @@ -1256,6 +1257,11 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty return blocks, logs, nil } + if len(blocks) >= maxUnfinalizedBlocks { + lp.lggr.Warnw("Too many unfinalized blocks, stopping log retrieval to avoid OOM", "currentBlockNumber", currentBlock.Number, "latestBlockNumber", latest) + return blocks, logs, nil + } + nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1) if err != nil { lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 8098ada382..6a503ace60 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -585,8 +585,7 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D return batchInsert(ctx, tx, q, logs, 4000) } -func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchSize int) error { - batchInsertSize := 4000 +func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchInsertSize int) error { for i := 0; i < len(objs); i += batchInsertSize { start, end := i, i+batchInsertSize if end > len(objs) { From cc16600fbdeccb8298f84dd4367f0e2170c602a8 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 26 Feb 2026 17:13:42 +0100 Subject: [PATCH 07/10] switch ubig to sqlutil big --- pkg/logpoller/orm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 6a503ace60..012c450f3a 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -119,7 +119,7 @@ func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error { ON CONFLICT DO NOTHING` // maintain behaviour of InsertBlock for i := range blocks { - blocks[i].EVMChainID = ubig.New(o.chainID) + blocks[i].EVMChainID = sqlutil.New(o.chainID) } return batchInsert(ctx, o.ds, q, blocks, 1000) } From 60ed9dab8f7563ee35a17692ab1c0d8fb821f94c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 26 Feb 2026 18:03:04 +0100 Subject: [PATCH 08/10] fix orm_test --- pkg/logpoller/orm_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logpoller/orm_test.go b/pkg/logpoller/orm_test.go index 3318e9bb81..491c913943 100644 --- a/pkg/logpoller/orm_test.go +++ b/pkg/logpoller/orm_test.go @@ -104,7 +104,7 @@ func TestLogPoller_Blocks_Batching(t *testing.T) { for i := 0; i < numBlocks; i++ { blockHash := common.HexToHash(fmt.Sprintf("0x%d", i+1)) blocks = append(blocks, logpoller.Block{ - EVMChainID: ubig.New(th.ChainID), + EVMChainID: sqlutil.New(th.ChainID), BlockHash: blockHash, BlockNumber: int64(i + 1), }) From bf5a3d3b2d0b053c0769bce23ac7aa44c578e6b4 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 24 Apr 2026 15:21:37 +0200 Subject: [PATCH 09/10] Fix linter --- pkg/logpoller/orm_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/logpoller/orm_test.go b/pkg/logpoller/orm_test.go index 74ff530ba7..ac0ee97113 100644 --- a/pkg/logpoller/orm_test.go +++ b/pkg/logpoller/orm_test.go @@ -113,13 +113,13 @@ func TestLogPoller_Blocks_Batching(t *testing.T) { logs = append(logs, GenLog(th.ChainID, int64(i), int64(i+1), blockHash.String(), EmitterABI.Events["Log1"].ID.Bytes(), th.EmitterAddress1)) } require.NoError(t, th.ORM.InsertLogsWithBlocks(ctx, logs, blocks)) - lgs, err := th.ORM.SelectLogsByBlockRange(ctx, 1, numBlocks) + dbLogs, err := th.ORM.SelectLogsByBlockRange(ctx, 1, numBlocks) require.NoError(t, err) // Make sure all logs are inserted - require.Equal(t, len(logs), len(lgs)) + require.Len(t, dbLogs, len(logs)) dbBlocks, err := th.ORM.GetBlocksRange(ctx, 1, numBlocks) require.NoError(t, err) - require.Equal(t, numBlocks, len(dbBlocks)) + require.Len(t, dbBlocks, numBlocks) } func TestORM_GetBlocks_From_Range(t *testing.T) { From 920419ddb070f4da4a943cf7e71655dac3a6d2fc Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Fri, 24 Apr 2026 16:00:35 +0200 Subject: [PATCH 10/10] Improve logging --- pkg/logpoller/log_poller.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 079c23eb8f..8772503bf7 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1209,6 +1209,9 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int } if err == nil { + if len(blocks) > 0 { + latestBlockNumber = blocks[len(blocks)-1].BlockNumber + } lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber) return nil } @@ -1223,6 +1226,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int if err != nil { return fmt.Errorf("failed to handle reorg: %w", err) } + currentBlockNumber = currentBlock.Number lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number) } } @@ -1248,7 +1252,7 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) if err != nil { lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number) - return blocks, logs, nil + return blocks, logs, fmt.Errorf("unable to query for logs: %w", err) } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) block := Block{ @@ -1273,7 +1277,7 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1) if err != nil { lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number) - return blocks, logs, nil + return blocks, logs, fmt.Errorf("unable to get next block: %w", err) } if nextBlock.ParentHash != currentBlock.Hash { @@ -1285,7 +1289,7 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number) if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number) - return blocks, logs, nil + return blocks, logs, fmt.Errorf("failed to get next block from DB during replay: %w", err) } if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash {