Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/merry-wolves-dry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
pympp: minor
---

Added fee payer policy enforcement for sponsored Tempo transactions, validating gas limits, fee caps, total fee budgets, validity windows, and access lists against per-chain policy defaults. Added call pattern validation to restrict sponsored transactions to approved selectors (transfers, and optionally an approve+swap prefix via the stablecoin DEX).
56 changes: 56 additions & 0 deletions src/mpp/methods/tempo/fee_payer_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Fee payer policy defaults for sponsored Tempo transactions."""

from __future__ import annotations

from collections.abc import Mapping
from dataclasses import dataclass

from mpp.methods.tempo._defaults import CHAIN_ID, TESTNET_CHAIN_ID


@dataclass(frozen=True, slots=True)
class Policy:
max_gas: int
max_fee_per_gas: int
max_priority_fee_per_gas: int
max_total_fee: int
max_validity_window_seconds: int


DEFAULT_POLICY = Policy(
max_gas=2_000_000,
max_fee_per_gas=100_000_000_000,
max_priority_fee_per_gas=10_000_000_000,
max_total_fee=50_000_000_000_000_000,
max_validity_window_seconds=15 * 60,
)


POLICY_BY_CHAIN_ID: dict[int, Policy] = {
CHAIN_ID: DEFAULT_POLICY,
TESTNET_CHAIN_ID: Policy(
max_gas=DEFAULT_POLICY.max_gas,
max_fee_per_gas=DEFAULT_POLICY.max_fee_per_gas,
max_priority_fee_per_gas=50_000_000_000,
max_total_fee=DEFAULT_POLICY.max_total_fee,
max_validity_window_seconds=DEFAULT_POLICY.max_validity_window_seconds,
),
}


def get_policy(chain_id: int, overrides: Mapping[str, int] | None = None) -> Policy:
base = POLICY_BY_CHAIN_ID.get(chain_id, DEFAULT_POLICY)
if not overrides:
return base

return Policy(
max_gas=overrides.get("max_gas", base.max_gas),
max_fee_per_gas=overrides.get("max_fee_per_gas", base.max_fee_per_gas),
max_priority_fee_per_gas=overrides.get(
"max_priority_fee_per_gas", base.max_priority_fee_per_gas
),
max_total_fee=overrides.get("max_total_fee", base.max_total_fee),
max_validity_window_seconds=overrides.get(
"max_validity_window_seconds", base.max_validity_window_seconds
),
)
213 changes: 148 additions & 65 deletions src/mpp/methods/tempo/intents.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from mpp._defaults import DEFAULT_TIMEOUT
from mpp.errors import VerificationError
from mpp.methods.tempo._defaults import PATH_USD, rpc_url_for_chain
from mpp.methods.tempo.fee_payer_policy import get_policy
from mpp.methods.tempo.schemas import (
ChargeRequest,
CredentialPayload,
Expand All @@ -32,13 +33,17 @@

TRANSFER_SELECTOR = "a9059cbb"
TRANSFER_WITH_MEMO_SELECTOR = "95777d59"
APPROVE_SELECTOR = "095ea7b3"
SWAP_EXACT_AMOUNT_OUT_SELECTOR = "b30d91d5"

TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
TRANSFER_WITH_MEMO_TOPIC = "0x57bc7354aa85aed339e000bccffabbc529466af35f0772c8f8ee1145927de7f0"

ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"
STABLECOIN_DEX = "0xdec0000000000000000000000000000000000000"

MAX_SPLITS = 10
MAX_TRANSFERS = MAX_SPLITS + 1


def _parse_memo_bytes(memo: str | None) -> bytes | None:
Expand Down Expand Up @@ -217,6 +222,97 @@ def _match_transfer_calldata(call_data_hex: str, request: ChargeRequest) -> bool
return True


def _decode_call_address_arg(call_data_hex: str, arg_index: int) -> str:
start = 8 + (arg_index * 64)
end = start + 64
if len(call_data_hex) < end:
raise VerificationError("Invalid transaction: malformed call data")
return "0x" + call_data_hex[start + 24 : end]


def _validate_call_scope(calls: list[tuple[str, int, str]]) -> int:
if not calls:
raise VerificationError("Transaction contains no calls")

selectors = [call_data[:8].lower() for _, _, call_data in calls]
has_swap_prefix = selectors[0] == APPROVE_SELECTOR

if has_swap_prefix:
if len(selectors) < 3 or selectors[1] != SWAP_EXACT_AMOUNT_OUT_SELECTOR:
raise VerificationError("Invalid transaction: disallowed call pattern")
transfer_selectors = selectors[2:]
else:
if selectors[0] == SWAP_EXACT_AMOUNT_OUT_SELECTOR:
raise VerificationError("Invalid transaction: disallowed call pattern")
transfer_selectors = selectors

if (
not transfer_selectors
or len(transfer_selectors) > MAX_TRANSFERS
or any(
selector not in (TRANSFER_SELECTOR, TRANSFER_WITH_MEMO_SELECTOR)
for selector in transfer_selectors
)
):
raise VerificationError("Invalid transaction: disallowed call pattern")

if has_swap_prefix:
approve_to, _, approve_data = calls[0]
swap_to, _, swap_data = calls[1]
approve_spender = _decode_call_address_arg(approve_data, 0)
swap_token_in = _decode_call_address_arg(swap_data, 0)

if approve_to.lower() != swap_token_in.lower():
raise VerificationError("Invalid transaction: approve target does not match swap token")
if approve_spender.lower() != STABLECOIN_DEX.lower():
raise VerificationError(
"Invalid transaction: approve spender is not the stablecoin DEX"
)
if swap_to.lower() != STABLECOIN_DEX.lower():
raise VerificationError("Invalid transaction: swap target is not the stablecoin DEX")

return 2 if has_swap_prefix else 0


def _validate_normalized_calls(calls: list[tuple[str, int, str]], request: ChargeRequest) -> None:
prefix_len = _validate_call_scope(calls)
payment_calls = calls[prefix_len:]

expected = get_transfers(
int(request.amount),
request.recipient,
request.methodDetails.memo,
request.methodDetails.splits,
)

if len(payment_calls) != len(expected):
raise VerificationError("Invalid transaction: contains unauthorized extra calls")

sorted_expected = sorted(expected, key=lambda t: 0 if t.memo else 1)
used_calls: set[int] = set()

for transfer in sorted_expected:
found = False
for call_idx, (call_to, call_value, call_data) in enumerate(payment_calls):
if call_idx in used_calls:
continue
if call_to.lower() != request.currency.lower():
continue
if call_value:
continue
if _match_single_transfer_calldata(
call_data, transfer.recipient, transfer.amount, transfer.memo
):
used_calls.add(call_idx)
found = True
break
if not found:
raise VerificationError("Invalid transaction: no matching payment call found")

if len(used_calls) != len(payment_calls):
raise VerificationError("Invalid transaction: contains unauthorized extra calls")


# ──────────────────────────────────────────────────────────────────
# Charge intent
# ──────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -744,16 +840,43 @@ def _int(b: bytes) -> int:
f"Fee payer envelope expired: valid_before ({valid_before}) is not in the future"
)

chain_id = _int(decoded[0])
policy = get_policy(chain_id)

gas_limit = _int(decoded[3])
if gas_limit > policy.max_gas:
raise VerificationError("Invalid transaction: gas limit exceeds sponsor policy")

max_priority_fee_per_gas = _int(decoded[1])
max_fee_per_gas = _int(decoded[2])
if max_fee_per_gas > policy.max_fee_per_gas:
raise VerificationError("Invalid transaction: max fee per gas exceeds sponsor policy")
if max_priority_fee_per_gas > max_fee_per_gas:
raise VerificationError(
"Invalid transaction: max priority fee per gas exceeds max fee per gas"
)
if max_priority_fee_per_gas > policy.max_priority_fee_per_gas:
raise VerificationError(
"Invalid transaction: max priority fee per gas exceeds sponsor policy"
)
if gas_limit * max_fee_per_gas > policy.max_total_fee:
raise VerificationError("Invalid transaction: total fee budget exceeds sponsor policy")
if valid_before > int(time.time()) + policy.max_validity_window_seconds:
raise VerificationError("Invalid transaction: validity window exceeds sponsor policy")

if decoded[5]:
raise VerificationError("Invalid transaction: access list is not allowed")

calls = tuple(Call(to=c[0], value=_int(c[1]), data=c[2]) for c in decoded[4])

if request is not None:
self._validate_calls(calls, request)

tx_for_recovery = TempoTransaction(
chain_id=_int(decoded[0]),
max_priority_fee_per_gas=_int(decoded[1]),
max_fee_per_gas=_int(decoded[2]),
gas_limit=_int(decoded[3]),
chain_id=chain_id,
max_priority_fee_per_gas=max_priority_fee_per_gas,
max_fee_per_gas=max_fee_per_gas,
gas_limit=gas_limit,
calls=calls,
access_list=(),
nonce_key=_int(decoded[6]),
Expand Down Expand Up @@ -794,34 +917,10 @@ def _int(b: bytes) -> int:

def _validate_calls(self, calls: tuple, request: ChargeRequest) -> None:
"""Validate that calls match all expected transfers."""
expected = get_transfers(
int(request.amount),
request.recipient,
request.methodDetails.memo,
request.methodDetails.splits,
)

sorted_expected = sorted(expected, key=lambda t: 0 if t.memo else 1)
used_calls: set[int] = set()

for transfer in sorted_expected:
found = False
for call_idx, call in enumerate(calls):
if call_idx in used_calls:
continue
call_to = "0x" + bytes(call.to).hex()
if call_to.lower() != request.currency.lower():
continue
if call.value:
continue
if _match_single_transfer_calldata(
call.data.hex(), transfer.recipient, transfer.amount, transfer.memo
):
used_calls.add(call_idx)
found = True
break
if not found:
raise VerificationError("Invalid transaction: no matching payment call found")
normalized_calls = [
("0x" + bytes(call.to).hex(), int(call.value), call.data.hex()) for call in calls
]
_validate_normalized_calls(normalized_calls, request)

def _validate_transaction_payload(self, signature: str, request: ChargeRequest) -> None:
"""Best-effort pre-broadcast check. Silently skips if decoding fails."""
Expand All @@ -846,38 +945,22 @@ def _validate_transaction_payload(self, signature: str, request: ChargeRequest)
if not calls_data:
raise VerificationError("Transaction contains no calls")

expected = get_transfers(
int(request.amount),
request.recipient,
request.methodDetails.memo,
request.methodDetails.splits,
)
normalized_calls: list[tuple[str, int, str]] = []
for call_item in calls_data:
if not isinstance(call_item, (list, tuple)) or len(call_item) < 3:
raise VerificationError("Invalid transaction: malformed call data")

sorted_expected = sorted(expected, key=lambda t: 0 if t.memo else 1)
used_calls: set[int] = set()
call_to_raw, call_value_raw, call_data_raw = call_item[0], call_item[1], call_item[2]
if not isinstance(call_to_raw, bytes) or not isinstance(call_data_raw, bytes):
raise VerificationError("Invalid transaction: malformed call data")

for transfer in sorted_expected:
found = False
for call_idx, call_item in enumerate(calls_data):
if call_idx in used_calls:
continue
if not isinstance(call_item, (list, tuple)) or len(call_item) < 3:
continue
call_to_bytes, call_data_bytes = call_item[0], call_item[2]
if not call_to_bytes or not call_data_bytes:
continue
to_hex = (
call_to_bytes.hex() if isinstance(call_to_bytes, bytes) else str(call_to_bytes)
)
if ("0x" + to_hex).lower() != request.currency.lower():
continue
raw = call_data_bytes
data_hex = raw.hex() if isinstance(raw, bytes) else str(raw)
if _match_single_transfer_calldata(
data_hex, transfer.recipient, transfer.amount, transfer.memo
):
used_calls.add(call_idx)
found = True
break
if not found:
raise VerificationError("Invalid transaction: no matching payment call found")
if isinstance(call_value_raw, bytes):
call_value = int.from_bytes(call_value_raw, "big") if call_value_raw else 0
elif isinstance(call_value_raw, int):
call_value = call_value_raw
else:
raise VerificationError("Invalid transaction: malformed call data")

normalized_calls.append(("0x" + call_to_raw.hex(), call_value, call_data_raw.hex()))

_validate_normalized_calls(normalized_calls, request)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Gate strict call-scope checks to sponsored transactions

_validate_transaction_payload is called for every charge flow (_verify_transaction, line 721), but it now always invokes _validate_normalized_calls, which enforces the sponsored-only approve/swap/transfer pattern and rejects any extra calls. This introduces false negatives for non-fee-payer (methodDetails.feePayer == false) transactions that still include a valid payment transfer, because they are now rejected before broadcast even though downstream receipt verification could succeed. The new scope restriction should be conditioned on sponsored requests to avoid breaking existing unsponsored transaction patterns.

Useful? React with 👍 / 👎.

Loading