Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions arbnode/mel/extraction/abis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"github.com/ethereum/go-ethereum/common"

"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/solgen/go/rollupgen"
)

var BatchDeliveredID common.Hash
var InboxMessageDeliveredID common.Hash
var InboxMessageFromOriginID common.Hash
var MELConfigEventID common.Hash
var SeqInboxABI *abi.ABI
var IBridgeABI *abi.ABI
var RollupAdminABI *abi.ABI
var iInboxABI *abi.ABI
var iDelayedMessageProviderABI *abi.ABI

Expand Down Expand Up @@ -45,4 +48,11 @@ func init() {
panic(err)
}
iInboxABI = parsedIInboxABI

parsedRollupAdminABI, err := rollupgen.RollupAdminLogicMetaData.GetAbi()
if err != nil {
panic(err)
}
RollupAdminABI = parsedRollupAdminABI
MELConfigEventID = parsedRollupAdminABI.Events["MELConfigEvent"].ID
}
42 changes: 42 additions & 0 deletions arbnode/mel/extraction/mel_config_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2026, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
package melextraction

import (
"context"

"github.com/ethereum/go-ethereum/core/types"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/solgen/go/rollupgen"
)

// ParseMELConfigFromBlock scans the logs of the given parent chain block for
// a MELConfigEvent. The log prefetcher already filters by rollup address,
// so this function only needs to match the event topic.
// Returns nil if no config event is found in the block.
func ParseMELConfigFromBlock(
ctx context.Context,
parentChainHeader *types.Header,
logsFetcher LogsFetcher,
eventUnpacker EventUnpacker,
) (*mel.MELConfig, error) {
logs, err := logsFetcher.LogsForBlockHash(ctx, parentChainHeader.Hash())
if err != nil {
return nil, err
}
for _, log := range logs {
if log == nil || len(log.Topics) == 0 || log.Topics[0] != MELConfigEventID {
continue
}
event := new(rollupgen.RollupAdminLogicMELConfigEvent)
if err := eventUnpacker.UnpackLogTo(event, RollupAdminABI, "MELConfigEvent", *log); err != nil {
return nil, err
}
return &mel.MELConfig{
Inbox: event.Inbox,
SequencerInbox: event.SequencerInbox,
}, nil
}
return nil, nil
}
49 changes: 49 additions & 0 deletions arbnode/mel/extraction/message_extraction_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func ExtractMessages(
messagesFromBatchSegments,
arbstate.ParseSequencerMessage,
arbostypes.ParseBatchPostingReportMessageFields,
ParseMELConfigFromBlock,
)
}

Expand All @@ -101,6 +102,7 @@ func extractMessagesImpl(
extractBatchMessages batchMsgExtractionFunc,
parseSequencerMessage sequencerMessageParserFunc,
parseBatchPostingReport batchPostingReportParserFunc,
lookupMELConfig melConfigLookupFunc,
) (*mel.State, []*arbostypes.MessageWithMetadata, []*mel.DelayedInboxMessage, []*mel.BatchMetadata, error) {

state := inputState.Clone()
Expand All @@ -118,6 +120,7 @@ func extractMessagesImpl(
state.ParentChainBlockHash = parentChainHeader.Hash()
state.ParentChainBlockNumber = parentChainHeader.Number.Uint64()
state.ParentChainPreviousBlockHash = parentChainHeader.ParentHash

// Now, check for any logs emitted by the sequencer inbox by txs
// included in the parent chain block.
batches, batchTxs, err := lookupBatches(
Expand Down Expand Up @@ -267,5 +270,51 @@ func extractMessagesImpl(
return nil, nil, nil, nil, fmt.Errorf("batch AfterDelayedCount: %d and MEL state DelayedMessagesRead: %d mismatch", batch.AfterDelayedCount, state.DelayedMessagesRead)
}
}
// Check for MEL config events in this block.
melConfig, err := lookupMELConfig(
ctx,
parentChainHeader,
logsFetcher,
eventUnpacker,
)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to lookup MEL config event: %w", err)
}
if melConfig != nil {
// MEL consensus getting activated for the first time
if state.Version == 0 {
if err := moveUnreadDelayedMessagesToInboxAcc(state, delayedMsgDatabase); err != nil {
return nil, nil, nil, nil, err
}
}
state.Version += 1
state.DelayedMessagePostingTargetAddress = melConfig.Inbox
state.BatchPostingTargetAddress = melConfig.SequencerInbox
}
return state, messages, delayedMessages, batchMetas, nil
}

func moveUnreadDelayedMessagesToInboxAcc(state *mel.State, delayedMsgDatabase DelayedMessageDatabase) error {
var unreadDelayedMsgs []*mel.DelayedInboxMessage
for i := state.DelayedMessagesRead; i < state.DelayedMessagesSeen; i++ {
delayedMsg, err := delayedMsgDatabase.ReadDelayedMessage(state, i)
if err != nil {
return fmt.Errorf("failed creating delayed msg accumulators during MEL consensus activation: %w", err)
}
unreadDelayedMsgs = append(unreadDelayedMsgs, delayedMsg)
}
// Both the accumulators must be now empty
if state.DelayedMessageInboxAcc != (common.Hash{}) || state.DelayedMessageOutboxAcc != (common.Hash{}) {
return fmt.Errorf(
"one of DelayedMessageInboxAcc: %v and DelayedMessageOutboxAcc: %v is non zero after reading all delayed msgs for MEL activation",
state.DelayedMessageInboxAcc,
state.DelayedMessageOutboxAcc,
)
}
for _, delayedMsg := range unreadDelayedMsgs {
if err := state.AccumulateDelayedMessage(delayedMsg); err != nil {
return fmt.Errorf("failed creating delayed msg accumulators during MEL consensus activation: %w", err)
}
}
return nil
}
110 changes: 110 additions & 0 deletions arbnode/mel/extraction/message_extraction_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"github.com/offchainlabs/nitro/daprovider"
)

// noopMELConfigLookup is a no-op MEL config lookup used in tests that don't need config event handling.
func noopMELConfigLookup(_ context.Context, _ *types.Header, _ LogsFetcher, _ EventUnpacker) (*mel.MELConfig, error) {
return nil, nil
}

func TestExtractMessages(t *testing.T) {
ctx := context.Background()
prevParentBlockHash := common.HexToHash("0x1234")
Expand Down Expand Up @@ -203,6 +208,7 @@ func TestExtractMessages(t *testing.T) {
tt.extractBatchMessages,
tt.parseSequencerMsg,
tt.parseReport,
noopMELConfigLookup,
)
}

Expand Down Expand Up @@ -458,3 +464,107 @@ func parseReportForSecondBatch(
) (*big.Int, common.Address, common.Hash, uint64, *big.Int, uint64, error) {
return nil, common.Address{}, crypto.Keccak256Hash([]byte("batch1")), 0, nil, 0, nil
}

// makeDelayedMsg builds a deterministic DelayedInboxMessage for tests.
func makeDelayedMsg(i uint64) *mel.DelayedInboxMessage {
reqID := common.BigToHash(big.NewInt(int64(i))) // #nosec G115
return &mel.DelayedInboxMessage{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
RequestId: &reqID,
},
L2msg: []byte{byte(i)},
},
}
}

func TestMoveUnreadDelayedMessagesToInboxAcc(t *testing.T) {
t.Run("happy path rebuilds inbox accumulator", func(t *testing.T) {
state := &mel.State{
DelayedMessagesRead: 2,
DelayedMessagesSeen: 5,
}
mockDB := &mockDelayedMessageDB{
DelayedMessages: map[uint64]*mel.DelayedInboxMessage{
2: makeDelayedMsg(2),
3: makeDelayedMsg(3),
4: makeDelayedMsg(4),
},
}
require.NoError(t, moveUnreadDelayedMessagesToInboxAcc(state, mockDB))

// Build the expected accumulator independently by running the same
// sequence of AccumulateDelayedMessage calls on a fresh state.
expected := &mel.State{
DelayedMessagesRead: 2,
DelayedMessagesSeen: 5,
}
for i := uint64(2); i < 5; i++ {
require.NoError(t, expected.AccumulateDelayedMessage(makeDelayedMsg(i)))
}
require.Equal(t, expected.DelayedMessageInboxAcc, state.DelayedMessageInboxAcc)
require.NotEqual(t, common.Hash{}, state.DelayedMessageInboxAcc)
// Outbox is not touched by AccumulateDelayedMessage.
require.Equal(t, common.Hash{}, state.DelayedMessageOutboxAcc)
})

t.Run("no unread messages leaves state unchanged", func(t *testing.T) {
state := &mel.State{
DelayedMessagesRead: 3,
DelayedMessagesSeen: 3,
}
mockDB := &mockDelayedMessageDB{
DelayedMessages: map[uint64]*mel.DelayedInboxMessage{},
}
require.NoError(t, moveUnreadDelayedMessagesToInboxAcc(state, mockDB))
require.Equal(t, common.Hash{}, state.DelayedMessageInboxAcc)
require.Equal(t, common.Hash{}, state.DelayedMessageOutboxAcc)
})

t.Run("errors when inbox accumulator is non-zero", func(t *testing.T) {
preExisting := common.HexToHash("0xdeadbeef")
state := &mel.State{
DelayedMessagesRead: 0,
DelayedMessagesSeen: 1,
DelayedMessageInboxAcc: preExisting,
}
mockDB := &mockDelayedMessageDB{
DelayedMessages: map[uint64]*mel.DelayedInboxMessage{
0: makeDelayedMsg(0),
},
}
err := moveUnreadDelayedMessagesToInboxAcc(state, mockDB)
require.ErrorContains(t, err, "non zero")
// State must be unchanged on error.
require.Equal(t, preExisting, state.DelayedMessageInboxAcc)
})

t.Run("errors when outbox accumulator is non-zero", func(t *testing.T) {
preExisting := common.HexToHash("0xfeedface")
state := &mel.State{
DelayedMessagesRead: 0,
DelayedMessagesSeen: 1,
DelayedMessageOutboxAcc: preExisting,
}
mockDB := &mockDelayedMessageDB{
DelayedMessages: map[uint64]*mel.DelayedInboxMessage{
0: makeDelayedMsg(0),
},
}
err := moveUnreadDelayedMessagesToInboxAcc(state, mockDB)
require.ErrorContains(t, err, "non zero")
require.Equal(t, preExisting, state.DelayedMessageOutboxAcc)
})

t.Run("propagates DB read errors", func(t *testing.T) {
state := &mel.State{
DelayedMessagesRead: 0,
DelayedMessagesSeen: 2,
}
dbErr := errors.New("db read boom")
mockDB := &mockDelayedMessageDB{err: dbErr}
err := moveUnreadDelayedMessagesToInboxAcc(state, mockDB)
require.ErrorIs(t, err, dbErr)
require.ErrorContains(t, err, "failed creating delayed msg accumulators")
})
}
9 changes: 9 additions & 0 deletions arbnode/mel/extraction/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,12 @@ type batchMsgExtractionFunc func(
type batchPostingReportParserFunc func(
rd io.Reader,
) (*big.Int, common.Address, common.Hash, uint64, *big.Int, uint64, error)

// Defines a function that can lookup a MEL config event from a parent chain block.
// See: ParseMELConfigFromBlock.
type melConfigLookupFunc func(
ctx context.Context,
parentChainHeader *types.Header,
logsFetcher LogsFetcher,
eventUnpacker EventUnpacker,
) (*mel.MELConfig, error)
5 changes: 5 additions & 0 deletions arbnode/mel/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,8 @@ type MessageSyncProgress struct {
BatchProcessed uint64
MsgCount arbutil.MessageIndex
}

type MELConfig struct {
Inbox common.Address
SequencerInbox common.Address
}
2 changes: 1 addition & 1 deletion arbnode/mel/runner/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *MessageExtractor) initialize(ctx context.Context, current *fsm.CurrentS
return m.config.RetryInterval, fmt.Errorf("failed to get start parent chain block: %d corresponding to head mel state from parent chain: %w", melState.ParentChainBlockNumber, err)
}
// Initialize logsPreFetcher
m.logsAndHeadersPreFetcher = newLogsAndHeadersFetcher(m.parentChainReader, m.config.BlocksToPrefetch)
m.logsAndHeadersPreFetcher = newLogsAndHeadersFetcher(m.parentChainReader, m.config.BlocksToPrefetch, m.addrs.Rollup)
// We check if our head mel state's parentChainBlockHash matches the one on-chain, if it doesnt then we detected a reorg
if melState.ParentChainBlockHash != startBlock.Hash() {
log.Info("MEL detected L1 reorg at the start", "block", melState.ParentChainBlockNumber, "parentChainBlockHash", melState.ParentChainBlockHash, "onchainParentChainBlockHash", startBlock.Hash()) // Log level is Info because L1 reorgs are a common occurrence
Expand Down
28 changes: 27 additions & 1 deletion arbnode/mel/runner/logs_and_headers_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ type logsAndHeadersFetcher struct {
toBlock uint64
blocksToFetch uint64
chainHeight uint64
rollupAddr common.Address
headers []*types.Header
logsByTxIndex map[common.Hash]map[uint][]*types.Log
logsByBlockHash map[common.Hash][]*types.Log
}

func newLogsAndHeadersFetcher(parentChainReader ParentChainReader, blocksToFetch uint64) *logsAndHeadersFetcher {
func newLogsAndHeadersFetcher(parentChainReader ParentChainReader, blocksToFetch uint64, rollupAddr common.Address) *logsAndHeadersFetcher {
return &logsAndHeadersFetcher{
parentChainReader: parentChainReader,
blocksToFetch: blocksToFetch,
rollupAddr: rollupAddr,
logsByTxIndex: make(map[common.Hash]map[uint][]*types.Log),
logsByBlockHash: make(map[common.Hash][]*types.Log),
}
Expand Down Expand Up @@ -81,6 +83,9 @@ func (f *logsAndHeadersFetcher) fetch(ctx context.Context, preState *mel.State)
if fetchLogsErr == nil {
fetchLogsErr = f.fetchDelayedMessageLogs(ctx, parentChainBlockNumber, toBlock, preState.DelayedMessagePostingTargetAddress)
}
if fetchLogsErr == nil && f.rollupAddr != (common.Address{}) {
fetchLogsErr = f.fetchMELConfigUpdateLogs(ctx, parentChainBlockNumber, toBlock)
}
wg.Done()
}()
wg.Wait()
Expand Down Expand Up @@ -165,6 +170,27 @@ func (f *logsAndHeadersFetcher) fetchDelayedMessageLogs(ctx context.Context, fro
return conditionalFetch(nil, [][]common.Hash{{melextraction.InboxMessageDeliveredID, melextraction.InboxMessageFromOriginID}})
}

func (f *logsAndHeadersFetcher) fetchMELConfigUpdateLogs(ctx context.Context, from, to uint64) error {
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(from),
ToBlock: new(big.Int).SetUint64(to),
Addresses: []common.Address{f.rollupAddr},
Topics: [][]common.Hash{{melextraction.MELConfigEventID}},
}
logs, err := f.parentChainReader.FilterLogs(ctx, query)
if err != nil {
return err
}
for _, log := range logs {
f.logsByBlockHash[log.BlockHash] = append(f.logsByBlockHash[log.BlockHash], &log)
if _, ok := f.logsByTxIndex[log.BlockHash]; !ok {
f.logsByTxIndex[log.BlockHash] = make(map[uint][]*types.Log)
}
f.logsByTxIndex[log.BlockHash][log.TxIndex] = append(f.logsByTxIndex[log.BlockHash][log.TxIndex], &log)
}
return nil
}

func (f *logsAndHeadersFetcher) getHeaderByNumber(ctx context.Context, number uint64) (*types.Header, error) {
if len(f.headers) == 0 || number < f.fromBlock || number > f.toBlock { // uninitialized or out of range queries should directly be forwarded to parentChainReader
return f.parentChainReader.HeaderByNumber(ctx, new(big.Int).SetUint64(number))
Expand Down
2 changes: 1 addition & 1 deletion arbnode/mel/runner/logs_and_headers_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestLogsFetcher(t *testing.T) {
}

parentChainReader := &mockParentChainReader{logs: append(batchTxLogs, delayedMsgTxLogs...)}
fetcher := newLogsAndHeadersFetcher(parentChainReader, 10)
fetcher := newLogsAndHeadersFetcher(parentChainReader, 10, common.Address{})
fetcher.chainHeight = 100
melState := &mel.State{
ParentChainBlockNumber: 1,
Expand Down
2 changes: 1 addition & 1 deletion arbnode/mel/runner/mel.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (m *MessageExtractor) GetSequencerMessageBytes(ctx context.Context, seqNum

func (m *MessageExtractor) GetSequencerMessageBytesForParentBlock(ctx context.Context, seqNum uint64, parentChainBlock uint64) ([]byte, common.Hash, error) {
// No need to specify a max headers to fetch, as we are using the logs fetcher only, so we can pass in a 0.
logsFetcher := newLogsAndHeadersFetcher(m.parentChainReader, 0)
logsFetcher := newLogsAndHeadersFetcher(m.parentChainReader, 0, m.addrs.Rollup)
if err := logsFetcher.fetchSequencerBatchLogs(ctx, parentChainBlock, parentChainBlock); err != nil {
return nil, common.Hash{}, err
}
Expand Down
3 changes: 3 additions & 0 deletions changelog/ganeshvanahalli-nit-4779_and_4720.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added
- Make message extractor able to pick up MEL config upgrade events
- Handle transitioning of node consensus to MEL at MEL activated parent chain block number
Loading
Loading