Skip to content
2 changes: 1 addition & 1 deletion pkg/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) {
require.NoError(t, err, "block %v", i)
chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i)))
require.NoError(t, err)
assert.Equal(t, chainBlk.Hash().Bytes(), blk.BlockHash.Bytes(), "block %v", i)
assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i)
}
}

Expand Down
128 changes: 97 additions & 31 deletions pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Inserting backfilled logs with batch endblock", "from", from, "to", to, "logs", len(gethLogs))
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock)
err = lp.orm.InsertLogsWithBlocks(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), []Block{endblock})
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -1187,52 +1187,118 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
return fmt.Errorf("failed to backfill finalized logs: %w", err)
}
currentBlockNumber = lastSafeBackfillBlock + 1
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
return fmt.Errorf("failed to get current block: %w", err)
}
currentBlockNumber = currentBlock.Number
}

for {
if currentBlockNumber > currentBlock.Number {
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
return fmt.Errorf("failed to get current block: %w", err)
blocks, logs, err := lp.getUnfinalizedLogs(ctx, currentBlock, latestBlockNumber, safeBlockNumber, latestFinalizedBlockNumber, isReplay)
Comment thread
dhaidashenko marked this conversation as resolved.
// even if we have an error, we may have partial logs and blocks that can be saved, so we save what we have and then retry.
if len(logs) > 0 || len(blocks) > 0 {
lp.lggr.Debugw("Saving logs", "logs", len(logs), "blocks", len(blocks), "currentBlockNumber", currentBlockNumber)
insertErr := lp.orm.InsertLogsWithBlocks(ctx, logs, blocks)
if insertErr != nil {
lp.lggr.Warnw("Unable to save logs, retrying later", "insertErr", insertErr, "block", currentBlockNumber, "err", err)
return nil
}
}

if err == nil {
if len(blocks) > 0 {
latestBlockNumber = blocks[len(blocks)-1].BlockNumber
}
currentBlockNumber = currentBlock.Number
lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber)
return nil
}

var reorgErr *reorgError
if !errors.As(err, &reorgErr) {
Comment thread
dhaidashenko marked this conversation as resolved.
return fmt.Errorf("failed to get unfinalized logs: %w", err)
}

lp.lggr.Warnw("Reorg detected during unfinalized log processing, handling reorg", "err", err, "currentBlockNumber", currentBlockNumber, "lastKnownMatchingHead", reorgErr.ReorgedAt.Number)
currentBlock, err = lp.handleReorg(ctx, reorgErr.ReorgedAt)
if err != nil {
return fmt.Errorf("failed to handle reorg: %w", err)
}
currentBlockNumber = currentBlock.Number
lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number)
Comment thread
dhaidashenko marked this conversation as resolved.
}
}

type reorgError struct {
ReorgedAt *evmtypes.Head
}

func newReorgError(reorgedAt *evmtypes.Head) error {
return &reorgError{ReorgedAt: reorgedAt}
}

func (e *reorgError) Error() string {
return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number)
}

func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) {
const maxUnfinalizedBlocks = 2000
var logs []Log
var blocks []Block
for {
h := currentBlock.Hash
var logs []types.Log
logs, err = lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
if err != nil {
lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber)
return nil
lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number)
return blocks, logs, fmt.Errorf("unable to query for logs: %w", err)
}
Comment thread
dhaidashenko marked this conversation as resolved.
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
block := Block{
BlockHash: h,
BlockNumber: currentBlockNumber,
BlockNumber: currentBlock.Number,
BlockTimestamp: currentBlock.Timestamp,
FinalizedBlockNumber: latestFinalizedBlockNumber,
SafeBlockNumber: safeBlockNumber,
}
err = lp.orm.InsertLogsWithBlock(
ctx,
convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()),
block,
)
FinalizedBlockNumber: finalized,
SafeBlockNumber: safe,
}
logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...)
blocks = append(blocks, block)

if currentBlock.Number >= latest {
return blocks, logs, nil
}

if len(blocks) >= maxUnfinalizedBlocks {
lp.lggr.Warnw("Too many unfinalized blocks, stopping log retrieval to avoid OOM", "currentBlockNumber", currentBlock.Number, "latestBlockNumber", latest)
return blocks, logs, nil
}

nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return nil
lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number)
return blocks, logs, fmt.Errorf("unable to get next block: %w", err)
}
// Update current block.
// Same reorg detection on unfinalized blocks.
currentBlockNumber++
if currentBlockNumber > latestBlockNumber {
break

if nextBlock.ParentHash != currentBlock.Hash {
return blocks, logs, newReorgError(nextBlock)
}
}

return nil
if isReplay {
// During replay, we also check if the next block matches what we have in the DB to avoid false positives on reorgs due to finality violation.
nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number)
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number)
return blocks, logs, fmt.Errorf("failed to get next block from DB during replay: %w", err)
}

if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash {
return blocks, logs, newReorgError(nextBlock)
}
}

currentBlock = nextBlock
}
}

// Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker
Expand Down
8 changes: 5 additions & 3 deletions pkg/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
owner.GasPrice = big.NewInt(10e9)
p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool {
// After the set of reorgs, we should have the same canonical blocks that geth does.
t.Log("Starting test", mineOrReorg)
seed := time.Now().UnixNano()
Comment thread
dhaidashenko marked this conversation as resolved.
localRand := rand.New(rand.NewSource(seed))
t.Log("Starting test", mineOrReorg, "seed", seed)
chainID := testutils.NewRandomEVMChainID()
// Set up a test chain with a log emitting contract deployed.
orm := logpoller.NewORM(chainID, db, lggr)
Expand Down Expand Up @@ -742,7 +744,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
}
// Randomly pick to mine or reorg
for i := 0; i < numChainInserts; i++ {
if rand.Int63()%2 == 0 {
if localRand.Int63()%2 == 0 {
// Mine blocks
for j := 0; j < int(mineOrReorg[i]); j++ {
backend.Commit()
Expand Down Expand Up @@ -770,7 +772,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
require.NoError(t, err1)
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
}
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false)
currentBlock, err = lp.LatestBlock(testutils.Context(t))
require.NoError(t, err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error {
err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error {
return o.ORM.InsertLogs(ctx, logs)
})
trackInsertedLogsAndBlock(ctx, o, logs, nil, err)
trackInsertedLogsAndBlocks(ctx, o, logs, nil, err)
trackInsertedBlockLatency(ctx, o, logs, err)
return err
}

func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error {
func (o *ObservedORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error {
err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error {
Comment thread
dhaidashenko marked this conversation as resolved.
return o.ORM.InsertLogsWithBlock(ctx, logs, block)
return o.ORM.InsertLogsWithBlocks(ctx, logs, blocks)
})
trackInsertedLogsAndBlock(ctx, o, logs, &block, err)
trackInsertedLogsAndBlocks(ctx, o, logs, blocks, err)
trackInsertedBlockLatency(ctx, o, logs, err)
return err
}
Expand Down Expand Up @@ -290,15 +290,15 @@ func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryTy
return exec()
}

func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) {
func trackInsertedLogsAndBlocks(ctx context.Context, o *ObservedORM, logs []Log, blocks []Block, err error) {
if err != nil {
return
}
ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout)
defer cancel()
o.metrics.IncrementLogsInserted(ctx, int64(len(logs)))
if block != nil {
o.metrics.IncrementBlocksInserted(ctx, 1)
if len(blocks) > 0 {
o.metrics.IncrementBlocksInserted(ctx, int64(len(blocks)))
}
}

Expand Down
40 changes: 24 additions & 16 deletions pkg/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(ctx, 0, []common.Address{{}}, []common.Hash{{}}, 1)
_, _ = orm.SelectIndexedLogsCreatedAfter(ctx, common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0)
_ = orm.InsertLogs(ctx, []Log{})
_ = orm.InsertLogsWithBlock(ctx, []Log{}, Block{
BlockNumber: 1,
BlockTimestamp: time.Now(),
_ = orm.InsertLogsWithBlocks(ctx, []Log{}, []Block{
{
BlockNumber: 1,
BlockTimestamp: time.Now(),
},
})

require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
Expand Down Expand Up @@ -113,21 +115,25 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency))
// Insert 5 more logs with block
require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{
BlockHash: utils.RandomBytes32(),
BlockNumber: 10,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[10:15], []Block{
{
BlockHash: utils.RandomBytes32(),
BlockNumber: 10,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
},
}))
assert.Equal(t, 15, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 1, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))

// Insert 5 more logs with block
require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[15:], Block{
BlockHash: utils.RandomBytes32(),
BlockNumber: 15,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[15:], []Block{
{
BlockHash: utils.RandomBytes32(),
BlockNumber: 15,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
},
}))
assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))
Expand All @@ -143,9 +149,11 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, network, "420", "DeleteBlocksBefore", "delete"))

// Don't update counters in case of an error
require.Error(t, orm.InsertLogsWithBlock(ctx, logs, Block{
BlockHash: utils.RandomBytes32(),
BlockTimestamp: time.Now(),
require.Error(t, orm.InsertLogsWithBlocks(ctx, logs, []Block{
{
BlockHash: utils.RandomBytes32(),
BlockTimestamp: time.Now(),
},
}))
assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))
Expand Down
40 changes: 27 additions & 13 deletions pkg/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM.
type ORM interface {
InsertLogs(ctx context.Context, logs []Log) error
InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error
InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error
InsertFilter(ctx context.Context, filter Filter) error

LoadFilters(ctx context.Context) (map[string]Filter, error)
Expand Down Expand Up @@ -112,6 +112,18 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum
return err
}

func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error {
const q = `INSERT INTO evm.log_poller_blocks
(evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at, safe_block_number)
VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW(), :safe_block_number)
ON CONFLICT DO NOTHING`
// maintain behaviour of InsertBlock
for i := range blocks {
blocks[i].EVMChainID = sqlutil.New(o.chainID)
}
return batchInsert(ctx, o.ds, q, blocks, 1000)
}

// InsertFilter is idempotent.
//
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
Expand Down Expand Up @@ -544,10 +556,10 @@ func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error {
})
}

func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error {
func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error {
// Optimization, don't open TX when there is only a block to be persisted
if len(logs) == 0 {
return o.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber)
return o.InsertBlocks(ctx, blocks)
}

if err := o.validateLogs(logs); err != nil {
Expand All @@ -556,7 +568,7 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block

// Block and logs goes with the same TX to ensure atomicity
return o.Transact(ctx, func(orm *DSORM) error {
err := orm.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber)
err := orm.InsertBlocks(ctx, blocks)
if err != nil {
return err
}
Expand All @@ -565,20 +577,22 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block
}

func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

query := `INSERT INTO evm.logs
const q = `INSERT INTO evm.logs
(evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at)
VALUES
(:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW())
ON CONFLICT DO NOTHING`
return batchInsert(ctx, tx, q, logs, 4000)
}

func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchInsertSize int) error {
for i := 0; i < len(objs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(objs) {
end = len(objs)
}

_, err := tx.NamedExecContext(ctx, query, logs[start:end])
_, err := ds.NamedExecContext(ctx, query, objs[start:end])
if err != nil {
if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
// In case of DB timeouts, try to insert again with a smaller batch upto a limit
Expand Down
Loading
Loading