Skip to content

Commit bd0f088

Browse files
committed
Revert "PLEX-2473 LogPoller switch to batch inserts (Part 2) (#356)"
This reverts commit ffb14c0.
1 parent a289703 commit bd0f088

7 files changed

Lines changed: 78 additions & 196 deletions

File tree

pkg/logpoller/helper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) {
182182
require.NoError(t, err, "block %v", i)
183183
chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i)))
184184
require.NoError(t, err)
185-
assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i)
185+
assert.Equal(t, chainBlk.Hash().Bytes(), blk.BlockHash.Bytes(), "block %v", i)
186186
}
187187
}
188188

pkg/logpoller/log_poller.go

Lines changed: 31 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
965965
}
966966

967967
lp.lggr.Debugw("Inserting backfilled logs with batch endblock", "from", from, "to", to, "logs", len(gethLogs))
968-
err = lp.orm.InsertLogsWithBlocks(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), []Block{endblock})
968+
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock)
969969
if err != nil {
970970
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
971971
return err
@@ -1187,118 +1187,52 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
11871187
return fmt.Errorf("failed to backfill finalized logs: %w", err)
11881188
}
11891189
currentBlockNumber = lastSafeBackfillBlock + 1
1190-
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
1191-
if err != nil {
1192-
// If there's an error handling the reorg, we can't be sure what state the db was left in.
1193-
// Resume from the latest block saved.
1194-
return fmt.Errorf("failed to get current block: %w", err)
1195-
}
1196-
currentBlockNumber = currentBlock.Number
11971190
}
11981191

11991192
for {
1200-
blocks, logs, err := lp.getUnfinalizedLogs(ctx, currentBlock, latestBlockNumber, safeBlockNumber, latestFinalizedBlockNumber, isReplay)
1201-
// even if we have an error, we may have partial logs and blocks that can be saved, so we save what we have and then retry.
1202-
if len(logs) > 0 || len(blocks) > 0 {
1203-
lp.lggr.Debugw("Saving logs", "logs", len(logs), "blocks", len(blocks), "currentBlockNumber", currentBlockNumber)
1204-
insertErr := lp.orm.InsertLogsWithBlocks(ctx, logs, blocks)
1205-
if insertErr != nil {
1206-
lp.lggr.Warnw("Unable to save logs, retrying later", "insertErr", insertErr, "block", currentBlockNumber, "err", err)
1207-
return nil
1208-
}
1209-
}
1210-
1211-
if err == nil {
1212-
if len(blocks) > 0 {
1213-
latestBlockNumber = blocks[len(blocks)-1].BlockNumber
1193+
if currentBlockNumber > currentBlock.Number {
1194+
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
1195+
if err != nil {
1196+
// If there's an error handling the reorg, we can't be sure what state the db was left in.
1197+
// Resume from the latest block saved.
1198+
return fmt.Errorf("failed to get current block: %w", err)
12141199
}
1215-
lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber)
1216-
return nil
1217-
}
1218-
1219-
var reorgErr *reorgError
1220-
if !errors.As(err, &reorgErr) {
1221-
return fmt.Errorf("failed to get unfinalized logs: %w", err)
1200+
currentBlockNumber = currentBlock.Number
12221201
}
12231202

1224-
lp.lggr.Warnw("Reorg detected during unfinalized log processing, handling reorg", "err", err, "currentBlockNumber", currentBlockNumber, "lastKnownMatchingHead", reorgErr.ReorgedAt.Number)
1225-
currentBlock, err = lp.handleReorg(ctx, reorgErr.ReorgedAt)
1226-
if err != nil {
1227-
return fmt.Errorf("failed to handle reorg: %w", err)
1228-
}
1229-
currentBlockNumber = currentBlock.Number
1230-
lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number)
1231-
}
1232-
}
1233-
1234-
type reorgError struct {
1235-
ReorgedAt *evmtypes.Head
1236-
}
1237-
1238-
func newReorgError(reorgedAt *evmtypes.Head) error {
1239-
return &reorgError{ReorgedAt: reorgedAt}
1240-
}
1241-
1242-
func (e *reorgError) Error() string {
1243-
return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number)
1244-
}
1245-
1246-
func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) {
1247-
const maxUnfinalizedBlocks = 2000
1248-
var logs []Log
1249-
var blocks []Block
1250-
for {
12511203
h := currentBlock.Hash
1252-
rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
1204+
var logs []types.Log
1205+
logs, err = lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
12531206
if err != nil {
1254-
lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number)
1255-
return blocks, logs, fmt.Errorf("unable to query for logs: %w", err)
1207+
lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber)
1208+
return nil
12561209
}
1257-
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
1210+
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
12581211
block := Block{
12591212
BlockHash: h,
1260-
BlockNumber: currentBlock.Number,
1213+
BlockNumber: currentBlockNumber,
12611214
BlockTimestamp: currentBlock.Timestamp,
1262-
FinalizedBlockNumber: finalized,
1263-
SafeBlockNumber: safe,
1264-
}
1265-
logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...)
1266-
blocks = append(blocks, block)
1267-
1268-
if currentBlock.Number >= latest {
1269-
return blocks, logs, nil
1270-
}
1271-
1272-
if len(blocks) >= maxUnfinalizedBlocks {
1273-
lp.lggr.Warnw("Too many unfinalized blocks, stopping log retrieval to avoid OOM", "currentBlockNumber", currentBlock.Number, "latestBlockNumber", latest)
1274-
return blocks, logs, nil
1275-
}
1276-
1277-
nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1)
1215+
FinalizedBlockNumber: latestFinalizedBlockNumber,
1216+
SafeBlockNumber: safeBlockNumber,
1217+
}
1218+
err = lp.orm.InsertLogsWithBlock(
1219+
ctx,
1220+
convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()),
1221+
block,
1222+
)
12781223
if err != nil {
1279-
lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number)
1280-
return blocks, logs, fmt.Errorf("unable to get next block: %w", err)
1281-
}
1282-
1283-
if nextBlock.ParentHash != currentBlock.Hash {
1284-
return blocks, logs, newReorgError(nextBlock)
1224+
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
1225+
return nil
12851226
}
1286-
1287-
if isReplay {
1288-
// During replay, we also check if the next block matches what we have in the DB to avoid false positives on reorgs due to finality violation.
1289-
nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number)
1290-
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
1291-
lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number)
1292-
return blocks, logs, fmt.Errorf("failed to get next block from DB during replay: %w", err)
1293-
}
1294-
1295-
if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash {
1296-
return blocks, logs, newReorgError(nextBlock)
1297-
}
1227+
// Update current block.
1228+
// Same reorg detection on unfinalized blocks.
1229+
currentBlockNumber++
1230+
if currentBlockNumber > latestBlockNumber {
1231+
break
12981232
}
1299-
1300-
currentBlock = nextBlock
13011233
}
1234+
1235+
return nil
13021236
}
13031237

13041238
// Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker

pkg/logpoller/log_poller_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -690,9 +690,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
690690
owner.GasPrice = big.NewInt(10e9)
691691
p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool {
692692
// After the set of reorgs, we should have the same canonical blocks that geth does.
693-
seed := time.Now().UnixNano()
694-
localRand := rand.New(rand.NewSource(seed))
695-
t.Log("Starting test", mineOrReorg, "seed", seed)
693+
t.Log("Starting test", mineOrReorg)
696694
chainID := testutils.NewRandomEVMChainID()
697695
// Set up a test chain with a log emitting contract deployed.
698696
orm := logpoller.NewORM(chainID, db, lggr)
@@ -744,7 +742,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
744742
}
745743
// Randomly pick to mine or reorg
746744
for i := 0; i < numChainInserts; i++ {
747-
if localRand.Int63()%2 == 0 {
745+
if rand.Int63()%2 == 0 {
748746
// Mine blocks
749747
for j := 0; j < int(mineOrReorg[i]); j++ {
750748
backend.Commit()
@@ -772,7 +770,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
772770
require.NoError(t, err1)
773771
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
774772
}
775-
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false)
773+
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false)
776774
currentBlock, err = lp.LatestBlock(testutils.Context(t))
777775
require.NoError(t, err)
778776
}

pkg/logpoller/observability.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,16 @@ func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error {
5252
err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error {
5353
return o.ORM.InsertLogs(ctx, logs)
5454
})
55-
trackInsertedLogsAndBlocks(ctx, o, logs, nil, err)
55+
trackInsertedLogsAndBlock(ctx, o, logs, nil, err)
5656
trackInsertedBlockLatency(ctx, o, logs, err)
5757
return err
5858
}
5959

60-
func (o *ObservedORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error {
60+
func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error {
6161
err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error {
62-
return o.ORM.InsertLogsWithBlocks(ctx, logs, blocks)
62+
return o.ORM.InsertLogsWithBlock(ctx, logs, block)
6363
})
64-
trackInsertedLogsAndBlocks(ctx, o, logs, blocks, err)
64+
trackInsertedLogsAndBlock(ctx, o, logs, &block, err)
6565
trackInsertedBlockLatency(ctx, o, logs, err)
6666
return err
6767
}
@@ -290,15 +290,15 @@ func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryTy
290290
return exec()
291291
}
292292

293-
func trackInsertedLogsAndBlocks(ctx context.Context, o *ObservedORM, logs []Log, blocks []Block, err error) {
293+
func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) {
294294
if err != nil {
295295
return
296296
}
297297
ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout)
298298
defer cancel()
299299
o.metrics.IncrementLogsInserted(ctx, int64(len(logs)))
300-
if len(blocks) > 0 {
301-
o.metrics.IncrementBlocksInserted(ctx, int64(len(blocks)))
300+
if block != nil {
301+
o.metrics.IncrementBlocksInserted(ctx, 1)
302302
}
303303
}
304304

pkg/logpoller/observability_test.go

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,9 @@ func TestMultipleMetricsArePublished(t *testing.T) {
4141
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(ctx, 0, []common.Address{{}}, []common.Hash{{}}, 1)
4242
_, _ = orm.SelectIndexedLogsCreatedAfter(ctx, common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0)
4343
_ = orm.InsertLogs(ctx, []Log{})
44-
_ = orm.InsertLogsWithBlocks(ctx, []Log{}, []Block{
45-
{
46-
BlockNumber: 1,
47-
BlockTimestamp: time.Now(),
48-
},
44+
_ = orm.InsertLogsWithBlock(ctx, []Log{}, Block{
45+
BlockNumber: 1,
46+
BlockTimestamp: time.Now(),
4947
})
5048

5149
require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
@@ -115,25 +113,21 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
115113
assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
116114
assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency))
117115
// Insert 5 more logs with block
118-
require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[10:15], []Block{
119-
{
120-
BlockHash: utils.RandomBytes32(),
121-
BlockNumber: 10,
122-
BlockTimestamp: time.Now(),
123-
FinalizedBlockNumber: 5,
124-
},
116+
require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{
117+
BlockHash: utils.RandomBytes32(),
118+
BlockNumber: 10,
119+
BlockTimestamp: time.Now(),
120+
FinalizedBlockNumber: 5,
125121
}))
126122
assert.Equal(t, 15, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
127123
assert.Equal(t, 1, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))
128124

129125
// Insert 5 more logs with block
130-
require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[15:], []Block{
131-
{
132-
BlockHash: utils.RandomBytes32(),
133-
BlockNumber: 15,
134-
BlockTimestamp: time.Now(),
135-
FinalizedBlockNumber: 5,
136-
},
126+
require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[15:], Block{
127+
BlockHash: utils.RandomBytes32(),
128+
BlockNumber: 15,
129+
BlockTimestamp: time.Now(),
130+
FinalizedBlockNumber: 5,
137131
}))
138132
assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
139133
assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))
@@ -149,11 +143,9 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
149143
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, network, "420", "DeleteBlocksBefore", "delete"))
150144

151145
// Don't update counters in case of an error
152-
require.Error(t, orm.InsertLogsWithBlocks(ctx, logs, []Block{
153-
{
154-
BlockHash: utils.RandomBytes32(),
155-
BlockTimestamp: time.Now(),
156-
},
146+
require.Error(t, orm.InsertLogsWithBlock(ctx, logs, Block{
147+
BlockHash: utils.RandomBytes32(),
148+
BlockTimestamp: time.Now(),
157149
}))
158150
assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
159151
assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))

pkg/logpoller/orm.go

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
// What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM.
2525
type ORM interface {
2626
InsertLogs(ctx context.Context, logs []Log) error
27-
InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error
27+
InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error
2828
InsertFilter(ctx context.Context, filter Filter) error
2929

3030
LoadFilters(ctx context.Context) (map[string]Filter, error)
@@ -112,18 +112,6 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum
112112
return err
113113
}
114114

115-
func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error {
116-
const q = `INSERT INTO evm.log_poller_blocks
117-
(evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at, safe_block_number)
118-
VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW(), :safe_block_number)
119-
ON CONFLICT DO NOTHING`
120-
// maintain behaviour of InsertBlock
121-
for i := range blocks {
122-
blocks[i].EVMChainID = sqlutil.New(o.chainID)
123-
}
124-
return batchInsert(ctx, o.ds, q, blocks, 1000)
125-
}
126-
127115
// InsertFilter is idempotent.
128116
//
129117
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
@@ -556,10 +544,10 @@ func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error {
556544
})
557545
}
558546

559-
func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error {
547+
func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error {
560548
// Optimization, don't open TX when there is only a block to be persisted
561549
if len(logs) == 0 {
562-
return o.InsertBlocks(ctx, blocks)
550+
return o.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber)
563551
}
564552

565553
if err := o.validateLogs(logs); err != nil {
@@ -568,7 +556,7 @@ func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []B
568556

569557
// Block and logs goes with the same TX to ensure atomicity
570558
return o.Transact(ctx, func(orm *DSORM) error {
571-
err := orm.InsertBlocks(ctx, blocks)
559+
err := orm.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber)
572560
if err != nil {
573561
return err
574562
}
@@ -577,22 +565,20 @@ func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []B
577565
}
578566

579567
func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error {
580-
const q = `INSERT INTO evm.logs
568+
batchInsertSize := 4000
569+
for i := 0; i < len(logs); i += batchInsertSize {
570+
start, end := i, i+batchInsertSize
571+
if end > len(logs) {
572+
end = len(logs)
573+
}
574+
575+
query := `INSERT INTO evm.logs
581576
(evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at)
582577
VALUES
583578
(:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW())
584579
ON CONFLICT DO NOTHING`
585-
return batchInsert(ctx, tx, q, logs, 4000)
586-
}
587-
588-
func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchInsertSize int) error {
589-
for i := 0; i < len(objs); i += batchInsertSize {
590-
start, end := i, i+batchInsertSize
591-
if end > len(objs) {
592-
end = len(objs)
593-
}
594580

595-
_, err := ds.NamedExecContext(ctx, query, objs[start:end])
581+
_, err := tx.NamedExecContext(ctx, query, logs[start:end])
596582
if err != nil {
597583
if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
598584
// In case of DB timeouts, try to insert again with a smaller batch upto a limit

0 commit comments

Comments
 (0)