-
Notifications
You must be signed in to change notification settings - Fork 721
Report filtered delayed transactions to filtering-report service - NIT-4644 #4632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
5d91439
2a36d7a
da4fbe2
50934a3
a615380
a19760a
0fd2625
110d981
01a3b5d
08f6070
53017e1
177a108
b5e169d
3ec0fd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| ### Added | ||
| - Report filtered delayed transactions to filtering-report service with structured FilteredTxReport |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,6 +50,7 @@ import ( | |
| "github.com/offchainlabs/nitro/arbutil" | ||
| "github.com/offchainlabs/nitro/consensus" | ||
| "github.com/offchainlabs/nitro/execution" | ||
| "github.com/offchainlabs/nitro/execution/gethexec/addressfilter" | ||
| "github.com/offchainlabs/nitro/execution/gethexec/eventfilter" | ||
| "github.com/offchainlabs/nitro/util/arbmath" | ||
| "github.com/offchainlabs/nitro/util/containers" | ||
|
|
@@ -94,13 +95,14 @@ func (e *ErrFilteredDelayedMessage) Error() string { | |
| var ErrDelayedTxFiltered = errors.New("delayed transaction filtered") | ||
|
|
||
| // DelayedFilteringSequencingHooks extends NoopSequencingHooks with address filtering | ||
| // for delayed message processing. Collects all tx hashes that touch filtered addresses | ||
| // and are not in the onchain filter. After block production, the caller checks if any | ||
| // hashes were collected and returns ErrFilteredDelayedMessage if so. | ||
| // for delayed message processing. Builds FilteredTxReport entries for txs that touch | ||
| // filtered addresses and are not in the onchain filter. After block production, the | ||
| // caller checks pendingFilteredTxReports and returns ErrFilteredDelayedMessage if any. | ||
|
diegoximenes marked this conversation as resolved.
|
||
| type DelayedFilteringSequencingHooks struct { | ||
| arbos.NoopSequencingHooks | ||
| FilteredTxHashes []common.Hash | ||
| eventFilter *eventfilter.EventFilter | ||
| pendingFilteredTxReports []addressfilter.FilteredTxReport | ||
| eventFilter *eventfilter.EventFilter | ||
| pendingCascadingRedeemAddresses []filter.FilteredAddressRecord | ||
| } | ||
|
|
||
| func NewDelayedFilteringSequencingHooks(txes types.Transactions, ef *eventfilter.EventFilter) *DelayedFilteringSequencingHooks { | ||
|
|
@@ -127,19 +129,25 @@ func touchAddresses(db *state.StateDB, tx *types.Transaction, sender common.Addr | |
| } | ||
|
|
||
| // PostTxFilter touches To/From addresses and checks IsAddressFiltered. | ||
| // Collects tx hashes that touch filtered addresses but are not in the onchain filter. | ||
| // For redeems, returns ErrArbTxFilter to trigger group rollback. | ||
| // Builds a FilteredTxReport and returns ErrArbTxFilter for filtered txs. | ||
| // For redeems, returns ErrArbTxFilter without a report (originating tx is | ||
| // collected in TxFailed after group rollback). | ||
| func (f *DelayedFilteringSequencingHooks) PostTxFilter(header *types.Header, db *state.StateDB, a *arbosState.ArbosState, tx *types.Transaction, sender common.Address, dataGas uint64, result *core.ExecutionResult) error { | ||
|
diegoximenes marked this conversation as resolved.
Outdated
|
||
| if tx.Type() == types.ArbitrumInternalTxType { | ||
| return nil | ||
| } | ||
| touchAddresses(db, tx, sender) | ||
| applyEventFilter(f.eventFilter, db) | ||
|
|
||
| if filtered, _ := db.IsAddressFiltered(); filtered { | ||
| if filtered, filteredAddresses := db.IsAddressFiltered(); filtered { | ||
| // For redeems, return the filter error so the block processor can | ||
| // trigger a group rollback. | ||
| // trigger a group rollback. The originating tx hash will be | ||
| // collected in TxFailed. | ||
| if tx.Type() == types.ArbitrumRetryTxType { | ||
| // Stash filtered addresses so TxFailed can populate them on | ||
| // the cascading redeem report — the statedb will be reverted | ||
| // before TxFailed runs. | ||
| f.pendingCascadingRedeemAddresses = filteredAddresses | ||
| return state.ErrArbTxFilter | ||
| } | ||
| // If the STF already handled this tx via the onchain filter mechanism, | ||
|
|
@@ -148,23 +156,59 @@ func (f *DelayedFilteringSequencingHooks) PostTxFilter(header *types.Header, db | |
| if errors.As(result.Err, &filteredErr) { | ||
| return nil | ||
| } | ||
| // Otherwise, this tx touched a filtered address but wasn't in the | ||
| // onchain filter - collect it so the caller can halt. | ||
| f.FilteredTxHashes = append(f.FilteredTxHashes, tx.Hash()) | ||
| // Build a report for this filtered tx. InboxRequestId is set later | ||
| // in createBlockFromNextMessage where msg metadata is available. | ||
| txRLP, err := tx.MarshalBinary() | ||
| if err != nil { | ||
| log.Error("error marshalling filtered delayed tx to RLP", "txHash", tx.Hash(), "err", err) | ||
| txRLP = nil | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| } | ||
| report := addressfilter.FilteredTxReport{ | ||
| ID: uuid.NewString(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as in #4627 (comment) |
||
| TxHash: tx.Hash(), | ||
| TxRLP: txRLP, | ||
| FilteredAddresses: filteredAddresses, | ||
| BlockNumber: header.Number.Uint64(), | ||
| ParentBlockHash: header.ParentHash, | ||
| PositionInBlock: db.TxIndex(), | ||
| FilteredAt: time.Now(), | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| IsDelayed: true, | ||
| DelayedReportData: nil, // populated later in createBlockFromNextMessage when msg metadata is available | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea of having a ProduceBlockAdvanced can provide RequestId to PreTxFilter/PostTxFilter, so you have everything necessary to build FilteredTxReport here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The principled boundary is: per-tx metadata flows through PostTxFilter, per-message metadata flows through the constructor or the callsite (createBlockFromNextMessage). The The lint still catches us — if a new field is added to FilteredTxReport, we'll get a compile error here and handle it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check #4632 (comment) |
||
| } | ||
| f.pendingFilteredTxReports = append(f.pendingFilteredTxReports, report) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (f *DelayedFilteringSequencingHooks) SupportsGroupRollback() bool { return true } | ||
|
|
||
| // TxFailed extracts the originating tx hash from ErrFilteredCascadingRedeem | ||
| // and appends it to FilteredTxHashes. After ProduceBlockAdvanced returns, the | ||
| // existing check fires ErrFilteredDelayedMessage, causing the delayed sequencer | ||
| // to halt and the transaction-filterer to add the hash to the onchain filter. | ||
| // and builds a minimal FilteredTxReport. After ProduceBlockAdvanced returns, | ||
| // the existing check fires ErrFilteredDelayedMessage, causing the delayed | ||
| // sequencer to halt and the transaction-filterer to add the hash to the | ||
| // onchain filter. | ||
| func (f *DelayedFilteringSequencingHooks) TxFailed(err error) { | ||
| var cascadingErr *arbos.ErrFilteredCascadingRedeem | ||
| if errors.As(err, &cascadingErr) { | ||
| f.FilteredTxHashes = append(f.FilteredTxHashes, cascadingErr.OriginatingTxHash) | ||
| // FilteredAddresses were stashed in PostTxFilter before the | ||
| // statedb was reverted. TxRLP and block metadata are populated | ||
| // later in createBlockFromNextMessage. | ||
| report := addressfilter.FilteredTxReport{ | ||
| ID: uuid.NewString(), | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| TxHash: cascadingErr.OriginatingTxHash, | ||
| FilteredAddresses: f.pendingCascadingRedeemAddresses, | ||
| FilteredAt: time.Now(), | ||
| IsDelayed: true, | ||
| PositionInBlock: 0, | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| DelayedReportData: nil, // populated later in createBlockFromNextMessage when msg metadata is available | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
|
|
||
| // Populated later in createBlockFromNextMessage where txes and block are available | ||
| TxRLP: nil, | ||
| BlockNumber: 0, | ||
| ParentBlockHash: common.Hash{}, | ||
| } | ||
| f.pendingFilteredTxReports = append(f.pendingFilteredTxReports, report) | ||
| f.pendingCascadingRedeemAddresses = nil | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -936,10 +980,50 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith | |
| return nil, nil, nil, err | ||
| } | ||
| // Check if any txs touched filtered addresses but are not in the onchain filter | ||
| if len(filteringHooks.FilteredTxHashes) > 0 { | ||
| if len(filteringHooks.pendingFilteredTxReports) > 0 { | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| // Build tx lookup for enriching minimal TxFailed reports | ||
| txByHash := make(map[common.Hash]*types.Transaction, len(txes)) | ||
| for _, tx := range txes { | ||
| txByHash[tx.Hash()] = tx | ||
| } | ||
|
|
||
| var inboxRequestId common.Hash | ||
| if msg.Message.Header.RequestId != nil { | ||
| inboxRequestId = *msg.Message.Header.RequestId | ||
| } | ||
|
|
||
| filteredTxHashes := make([]common.Hash, len(filteringHooks.pendingFilteredTxReports)) | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| for i := range filteringHooks.pendingFilteredTxReports { | ||
| report := &filteringHooks.pendingFilteredTxReports[i] | ||
|
|
||
| // Enrich TxFailed reports (identified by BlockNumber = 0) with | ||
| // data not available after group rollback. | ||
| if report.BlockNumber == 0 { | ||
| if tx, ok := txByHash[report.TxHash]; ok { | ||
| txRLP, err := tx.MarshalBinary() | ||
| if err != nil { | ||
| log.Error("error marshalling filtered delayed tx to RLP", "txHash", report.TxHash, "err", err) | ||
| } else { | ||
| report.TxRLP = txRLP | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Add a log when the hash is not found
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this else branch, tx.MarshalBinary() succeeded. So we won't need logging. the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant the case where txByHash does not contain report.TxHash (the outer if tx, ok := txByHash[report.TxHash]; ok check). Even if that path is unreachable with the current code, the logic is spread across components, so adding a log in the else branch would make it less fragile and easier to debug |
||
| if block != nil { | ||
| report.BlockNumber = block.NumberU64() | ||
| report.ParentBlockHash = block.ParentHash() | ||
| } | ||
| } | ||
|
|
||
| // Set InboxRequestId (not available in PostTxFilter/TxFailed) | ||
| report.DelayedReportData = &addressfilter.DelayedReportData{ | ||
| InboxRequestId: inboxRequestId, | ||
| } | ||
|
|
||
| filteredTxHashes[i] = report.TxHash | ||
| } | ||
|
|
||
| if s.transactionFiltererRPCClient != nil { | ||
| s.LaunchThread(func(ctx context.Context) { | ||
| for _, filteredTxHash := range filteringHooks.FilteredTxHashes { | ||
| for _, filteredTxHash := range filteredTxHashes { | ||
| _, err := s.transactionFiltererRPCClient.Filter(filteredTxHash).Await(ctx) | ||
| if err != nil { | ||
| log.Error("error reporting filtered tx to transaction-filterer", "filteredTxHash", filteredTxHash, "err", err) | ||
|
|
@@ -948,8 +1032,18 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith | |
| }) | ||
| } | ||
|
|
||
| // Report structured reports to filtering-report service (non-blocking) | ||
| if s.filteringReportRPCClient != nil { | ||
| reports := filteringHooks.pendingFilteredTxReports | ||
|
diegoximenes marked this conversation as resolved.
|
||
| s.LaunchThread(func(ctx context.Context) { | ||
| if _, err := s.filteringReportRPCClient.ReportFilteredTransactions(reports).Await(ctx); err != nil { | ||
| log.Warn("error reporting filtered delayed txs to filtering-report", "count", len(reports), "err", err) | ||
|
mahdy-nasr marked this conversation as resolved.
Outdated
|
||
| } | ||
| }) | ||
| } | ||
|
|
||
| return nil, nil, nil, &ErrFilteredDelayedMessage{ | ||
| TxHashes: filteringHooks.FilteredTxHashes, | ||
| TxHashes: filteredTxHashes, | ||
| DelayedMsgIdx: msg.DelayedMessagesRead - 1, | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.