Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions arbos/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math"
"math/big"

"github.com/ethereum/go-ethereum/arbitrum/filter"
"github.com/ethereum/go-ethereum/arbitrum_types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand Down Expand Up @@ -42,13 +43,19 @@ var EmitTicketCreatedEvent func(*vm.EVM, [32]byte) error

// ErrFilteredCascadingRedeem is returned via TxFailed when a redeem's
// inner execution touches a filtered address, requiring the entire tx group
// (originating user tx + all its redeems) to be reverted.
// (originating user tx + all its redeems) to be reverted. All fields are
// captured before the group rollback so TxFailed can build a fully populated
// FilteredTxReport without late-filling.
type ErrFilteredCascadingRedeem struct {
OriginatingTxHash common.Hash
OriginatingTx *types.Transaction
FilteredAddresses []filter.FilteredAddressRecord
BlockNumber uint64
ParentBlockHash common.Hash
PositionInBlock int // receipt index of the originating user tx
}

func (e *ErrFilteredCascadingRedeem) Error() string {
return fmt.Sprintf("cascading redeem filtered (originating tx: %s)", e.OriginatingTxHash.Hex())
return fmt.Sprintf("cascading redeem filtered (originating tx: %s)", e.OriginatingTx.Hash().Hex())
}

// A helper struct that implements String() by marshalling to JSON.
Expand Down Expand Up @@ -95,14 +102,14 @@ type groupCheckpoint struct {
userTxsProcessed int
completeLen int
receiptsLen int
userTxHash common.Hash
userTx *types.Transaction
}

// saveGroupCheckpoint snapshots the loop state so the entire tx group can be
// rolled back if a descendant redeem is filtered. header is passed separately
// because only GasUsed is checkpointed; the rest of the header is immutable
// during the loop.
func (s *blockBuildState) saveGroupCheckpoint(header *types.Header, snap int, userTxHash common.Hash) error {
func (s *blockBuildState) saveGroupCheckpoint(header *types.Header, snap int, userTx *types.Transaction) error {
if len(s.redeems) != 0 {
return errors.New("saveGroupCheckpoint called with pending redeems")
}
Expand All @@ -115,7 +122,7 @@ func (s *blockBuildState) saveGroupCheckpoint(header *types.Header, snap int, us
userTxsProcessed: s.userTxsProcessed,
completeLen: len(s.complete),
receiptsLen: len(s.receipts),
userTxHash: userTxHash,
userTx: userTx,
}
return nil
}
Expand Down Expand Up @@ -213,10 +220,10 @@ type SequencingHooks interface {
// SupportsGroupRollback returns whether the hooks support checkpointing and
// rolling back a group of transactions (user tx + its scheduled redeems).
SupportsGroupRollback() bool
// PreTxFilter rejects a tx before execution.
PreTxFilter(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info) error
// PostTxFilter rejects a tx after execution.
PostTxFilter(*types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, common.Address, uint64, *core.ExecutionResult) error
// PreTxFilter rejects a tx before execution. positionInBlock is len(receipts) at call time.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to remove this positionInBlock comment.
Arguments are not named here.

PreTxFilter(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info, int) error
// PostTxFilter rejects a tx after execution. positionInBlock is len(receipts) at call time.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to remove this positionInBlock comment.
Arguments are not named here.

PostTxFilter(*types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, common.Address, uint64, *core.ExecutionResult, int) error
// BlockFilter rejects an entire block after all txs have been applied.
BlockFilter(*types.Header, *state.StateDB, types.Transactions, types.Receipts) error
// TxSucceeded records that the last user tx from NextTxToSequence executed successfully.
Expand Down Expand Up @@ -244,11 +251,11 @@ func (n *NoopSequencingHooks) NextTxToSequence() (*types.Transaction, *arbitrum_

func (n *NoopSequencingHooks) CanDiscardTx() bool { return false }

func (n *NoopSequencingHooks) PreTxFilter(config *params.ChainConfig, header *types.Header, db *state.StateDB, a *arbosState.ArbosState, transaction *types.Transaction, options *arbitrum_types.ConditionalOptions, address common.Address, info *L1Info) error {
func (n *NoopSequencingHooks) PreTxFilter(config *params.ChainConfig, header *types.Header, db *state.StateDB, a *arbosState.ArbosState, transaction *types.Transaction, options *arbitrum_types.ConditionalOptions, address common.Address, info *L1Info, positionInBlock int) error {
return nil
}

func (n *NoopSequencingHooks) PostTxFilter(header *types.Header, db *state.StateDB, a *arbosState.ArbosState, transaction *types.Transaction, address common.Address, u uint64, result *core.ExecutionResult) error {
func (n *NoopSequencingHooks) PostTxFilter(header *types.Header, db *state.StateDB, a *arbosState.ArbosState, transaction *types.Transaction, address common.Address, u uint64, result *core.ExecutionResult, positionInBlock int) error {
return nil
}

Expand Down Expand Up @@ -426,7 +433,7 @@ func ProduceBlockAdvanced(

// Writes to statedb object should be avoided to prevent invalid state from permeating as statedb snapshot is not taken
if isUserTx {
if err = sequencingHooks.PreTxFilter(chainConfig, header, buildState.statedb, buildState.arbState, tx, options, sender, l1Info); err != nil {
if err = sequencingHooks.PreTxFilter(chainConfig, header, buildState.statedb, buildState.arbState, tx, options, sender, l1Info, len(buildState.receipts)); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -489,15 +496,15 @@ func ProduceBlockAdvanced(
&header.GasUsed,
runCtx,
func(result *core.ExecutionResult) error {
if err := sequencingHooks.PostTxFilter(header, buildState.statedb, buildState.arbState, tx, sender, dataGas, result); err != nil {
if err := sequencingHooks.PostTxFilter(header, buildState.statedb, buildState.arbState, tx, sender, dataGas, result, len(buildState.receipts)); err != nil {
return err
}
// Additional post-transaction validity check
if err = extraPostTxFilter(chainConfig, header, buildState.statedb, buildState.arbState, tx, options, sender, l1Info, result); err != nil {
return err
}
if isUserTx && len(result.ScheduledTxes) > 0 && sequencingHooks.SupportsGroupRollback() {
if err := buildState.saveGroupCheckpoint(header, snap, tx.Hash()); err != nil {
if err := buildState.saveGroupCheckpoint(header, snap, tx); err != nil {
return err
}
}
Expand All @@ -519,11 +526,19 @@ func ProduceBlockAdvanced(
// active group checkpoint, roll back the entire group (user tx + all
// redeems) to the pre-group state.
if !isUserTx && buildState.activeGroupCP != nil && errors.Is(err, state.ErrArbTxFilter) {
userTxHash := buildState.activeGroupCP.userTxHash
// Capture everything before rollback — addressCheckerStateß
cp := buildState.activeGroupCP
_, filteredAddresses := buildState.statedb.IsAddressFiltered()
if err := buildState.rollbackToGroupCheckpoint(header); err != nil {
return nil, nil, nil, err
}
sequencingHooks.TxFailed(&ErrFilteredCascadingRedeem{OriginatingTxHash: userTxHash})
sequencingHooks.TxFailed(&ErrFilteredCascadingRedeem{
OriginatingTx: cp.userTx,
FilteredAddresses: filteredAddresses,
BlockNumber: header.Number.Uint64(),
ParentBlockHash: header.ParentHash,
PositionInBlock: cp.receiptsLen,
})
continue
}
if isUserTx {
Expand Down
2 changes: 2 additions & 0 deletions changelog/mnasr-nit-4644.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### Added
- Report filtered delayed transactions to filtering-report service with structured FilteredTxReport
108 changes: 84 additions & 24 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/consensus"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec/addressfilter"
"github.com/offchainlabs/nitro/execution/gethexec/eventfilter"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/containers"
Expand Down Expand Up @@ -94,19 +95,22 @@ func (e *ErrFilteredDelayedMessage) Error() string {
var ErrDelayedTxFiltered = errors.New("delayed transaction filtered")

// DelayedFilteringSequencingHooks extends NoopSequencingHooks with address filtering
// for delayed message processing. Collects all tx hashes that touch filtered addresses
// and are not in the onchain filter. After block production, the caller checks if any
// hashes were collected and returns ErrFilteredDelayedMessage if so.
// for delayed message processing. Builds FilteredTxReport entries for txs that touch
// filtered addresses and are not in the onchain filter. After block production, the
// caller checks pendingFilteredTxReports and returns ErrFilteredDelayedMessage if any.
Comment thread
diegoximenes marked this conversation as resolved.
type DelayedFilteringSequencingHooks struct {
arbos.NoopSequencingHooks
FilteredTxHashes []common.Hash
eventFilter *eventfilter.EventFilter
filteredTxHashes []common.Hash
pendingFilteredTxReports []addressfilter.FilteredTxReport
eventFilter *eventfilter.EventFilter
inboxRequestId common.Hash
}

func NewDelayedFilteringSequencingHooks(txes types.Transactions, ef *eventfilter.EventFilter) *DelayedFilteringSequencingHooks {
func NewDelayedFilteringSequencingHooks(txes types.Transactions, ef *eventfilter.EventFilter, inboxRequestId common.Hash) *DelayedFilteringSequencingHooks {
return &DelayedFilteringSequencingHooks{
NoopSequencingHooks: *arbos.NewNoopSequencingHooks(txes),
eventFilter: ef,
inboxRequestId: inboxRequestId,
}
}

Expand All @@ -127,18 +131,20 @@ func touchAddresses(db *state.StateDB, tx *types.Transaction, sender common.Addr
}

// PostTxFilter touches To/From addresses and checks IsAddressFiltered.
// Collects tx hashes that touch filtered addresses but are not in the onchain filter.
// For redeems, returns ErrArbTxFilter to trigger group rollback.
func (f *DelayedFilteringSequencingHooks) PostTxFilter(header *types.Header, db *state.StateDB, a *arbosState.ArbosState, tx *types.Transaction, sender common.Address, dataGas uint64, result *core.ExecutionResult) error {
// Builds a FilteredTxReport and returns ErrArbTxFilter for filtered txs.
// For redeems, returns ErrArbTxFilter without a report (originating tx is
// collected in TxFailed after group rollback).
func (f *DelayedFilteringSequencingHooks) PostTxFilter(header *types.Header, db *state.StateDB, a *arbosState.ArbosState, tx *types.Transaction, sender common.Address, dataGas uint64, result *core.ExecutionResult, positionInBlock int) error {
if tx.Type() == types.ArbitrumInternalTxType {
return nil
}
touchAddresses(db, tx, sender)
applyEventFilter(f.eventFilter, db)

if filtered, _ := db.IsAddressFiltered(); filtered {
if filtered, filteredAddresses := db.IsAddressFiltered(); filtered {
// For redeems, return the filter error so the block processor can
// trigger a group rollback.
// trigger a group rollback. The block processor captures all report
// data before rollback and passes it through ErrFilteredCascadingRedeem.
if tx.Type() == types.ArbitrumRetryTxType {
return state.ErrArbTxFilter
}
Expand All @@ -148,24 +154,63 @@ func (f *DelayedFilteringSequencingHooks) PostTxFilter(header *types.Header, db
if errors.As(result.Err, &filteredErr) {
return nil
}
// Otherwise, this tx touched a filtered address but wasn't in the
// onchain filter - collect it so the caller can halt.
f.FilteredTxHashes = append(f.FilteredTxHashes, tx.Hash())
f.filteredTxHashes = append(f.filteredTxHashes, tx.Hash())

txRLP, err := tx.MarshalBinary()
if err != nil {
log.Error("error marshalling filtered delayed tx to RLP", "txHash", tx.Hash(), "err", err)
} else {
report := addressfilter.FilteredTxReport{
ID: uuid.Must(uuid.NewV7()).String(),
TxHash: tx.Hash(),
TxRLP: txRLP,
FilteredAddresses: filteredAddresses,
BlockNumber: header.Number.Uint64(),
ParentBlockHash: header.ParentHash,
PositionInBlock: uint64(positionInBlock), // #nosec G115
FilteredAt: time.Now().UTC(),
IsDelayed: true,
DelayedReportData: &addressfilter.DelayedReportData{InboxRequestId: f.inboxRequestId},
}
f.pendingFilteredTxReports = append(f.pendingFilteredTxReports, report)
}

}
return nil
}

func (f *DelayedFilteringSequencingHooks) SupportsGroupRollback() bool { return true }

// TxFailed extracts the originating tx hash from ErrFilteredCascadingRedeem
// and appends it to FilteredTxHashes. After ProduceBlockAdvanced returns, the
// existing check fires ErrFilteredDelayedMessage, causing the delayed sequencer
// to halt and the transaction-filterer to add the hash to the onchain filter.
// TxFailed builds a fully populated FilteredTxReport from
// ErrFilteredCascadingRedeem. The block processor captures all needed data
// (originating tx, filtered addresses, block metadata, user tx position)
// before the group rollback and passes it through the error.
func (f *DelayedFilteringSequencingHooks) TxFailed(err error) {
var cascadingErr *arbos.ErrFilteredCascadingRedeem
if errors.As(err, &cascadingErr) {
f.FilteredTxHashes = append(f.FilteredTxHashes, cascadingErr.OriginatingTxHash)
if !errors.As(err, &cascadingErr) {
return
}
originatingTxHash := cascadingErr.OriginatingTx.Hash()
f.filteredTxHashes = append(f.filteredTxHashes, originatingTxHash)

txRLP, marshalErr := cascadingErr.OriginatingTx.MarshalBinary()
if marshalErr != nil {
log.Error("error marshalling originating tx RLP", "txHash", originatingTxHash, "err", marshalErr)
return
}
report := addressfilter.FilteredTxReport{
ID: uuid.Must(uuid.NewV7()).String(),
TxHash: originatingTxHash,
TxRLP: txRLP,
FilteredAddresses: cascadingErr.FilteredAddresses,
BlockNumber: cascadingErr.BlockNumber,
ParentBlockHash: cascadingErr.ParentBlockHash,
PositionInBlock: uint64(cascadingErr.PositionInBlock), // #nosec G115
FilteredAt: time.Now().UTC(),
IsDelayed: true,
DelayedReportData: &addressfilter.DelayedReportData{InboxRequestId: f.inboxRequestId},
}
f.pendingFilteredTxReports = append(f.pendingFilteredTxReports, report)
}

func applyEventFilter(ef *eventfilter.EventFilter, db *state.StateDB) {
Expand Down Expand Up @@ -934,7 +979,11 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith
log.Warn("error parsing incoming message for filtering", "err", err)
txes = types.Transactions{}
}
filteringHooks := NewDelayedFilteringSequencingHooks(txes, s.eventFilter)
var inboxRequestId common.Hash
if msg.Message.Header.RequestId != nil {
inboxRequestId = *msg.Message.Header.RequestId
}
filteringHooks := NewDelayedFilteringSequencingHooks(txes, s.eventFilter, inboxRequestId)

block, statedb, receipts, err := arbos.ProduceBlockAdvanced(
msg.Message.Header,
Expand All @@ -951,10 +1000,11 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith
return nil, nil, nil, err
}
// Check if any txs touched filtered addresses but are not in the onchain filter
if len(filteringHooks.FilteredTxHashes) > 0 {
if len(filteringHooks.filteredTxHashes) > 0 {
if s.transactionFiltererRPCClient != nil {
filteredTxHashes := filteringHooks.filteredTxHashes
s.LaunchThread(func(ctx context.Context) {
for _, filteredTxHash := range filteringHooks.FilteredTxHashes {
for _, filteredTxHash := range filteredTxHashes {
_, err := s.transactionFiltererRPCClient.Filter(filteredTxHash).Await(ctx)
if err != nil {
log.Error("error reporting filtered tx to transaction-filterer", "filteredTxHash", filteredTxHash, "err", err)
Expand All @@ -963,8 +1013,18 @@ func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWith
})
}

// Report structured reports to filtering-report service (non-blocking)
if s.filteringReportRPCClient != nil && len(filteringHooks.pendingFilteredTxReports) > 0 {
reports := filteringHooks.pendingFilteredTxReports
Comment thread
diegoximenes marked this conversation as resolved.
s.LaunchThread(func(ctx context.Context) {
if _, err := s.filteringReportRPCClient.ReportFilteredTransactions(reports).Await(ctx); err != nil {
log.Error("error reporting filtered delayed txs to filtering-report", "count", len(reports), "err", err)
}
})
}

return nil, nil, nil, &ErrFilteredDelayedMessage{
TxHashes: filteringHooks.FilteredTxHashes,
TxHashes: filteringHooks.filteredTxHashes,
DelayedMsgIdx: msg.DelayedMessagesRead - 1,
}
}
Expand Down
Loading
Loading