diff --git a/examples/bridge-demo/setup.sh b/examples/bridge-demo/setup.sh index bf6fb72be314..aba062d9e44e 100755 --- a/examples/bridge-demo/setup.sh +++ b/examples/bridge-demo/setup.sh @@ -299,6 +299,7 @@ for attempt in 1 2 3; do --json-parameters "$WRAPPED_PARAMS" \ --json-argument '{"accounts":{}}' 2>&1) && break echo " Attempt $attempt failed, retrying..." >&2 + echo "$WRAPPED_APP_OUTPUT" >&2 sleep 2 done [[ -z "$WRAPPED_APP_OUTPUT" ]] && { echo "ERROR: publish-and-create wrapped-fungible failed after retries" >&2; exit 1; } diff --git a/linera-bridge/contracts/evm-bridge/Cargo.lock b/linera-bridge/contracts/evm-bridge/Cargo.lock index c07d39ceef64..4bb3c30747be 100644 --- a/linera-bridge/contracts/evm-bridge/Cargo.lock +++ b/linera-bridge/contracts/evm-bridge/Cargo.lock @@ -3473,11 +3473,14 @@ dependencies = [ name = "linera-storage" version = "0.15.17" dependencies = [ + "anyhow", "async-trait", "bcs", "cfg-if", "cfg_aliases", + "clap", "futures", + "hex", "itertools 0.14.0", "linera-base", "linera-cache", diff --git a/linera-bridge/src/evm/microchain.rs b/linera-bridge/src/evm/microchain.rs index ccade5f40a45..3a958d7c2c90 100644 --- a/linera-bridge/src/evm/microchain.rs +++ b/linera-bridge/src/evm/microchain.rs @@ -68,7 +68,7 @@ mod tests { } #[test] - fn test_microchain_rejects_duplicate_block() { + fn test_microchain_accepts_duplicate_block() { let mut microchain = TestMicrochain::new(); microchain.add_block(BlockHeight(1)); @@ -76,8 +76,8 @@ mod tests { assert!( microchain .try_add_block(microchain.chain_id, BlockHeight(1)) - .is_err(), - "should reject duplicate block" + .is_ok(), + "Microchain must accept duplicate addBlock calls; subclass owns dedup" ); } diff --git a/linera-bridge/src/monitor/db.rs b/linera-bridge/src/monitor/db.rs index bbefcccf0f02..2062fb033947 100644 --- a/linera-bridge/src/monitor/db.rs +++ b/linera-bridge/src/monitor/db.rs @@ -100,7 +100,10 @@ impl BridgeDb { sqlx::query( "CREATE TABLE IF NOT EXISTS pending_burns ( linera_height INTEGER NOT NULL, - event_index INTEGER NOT NULL, + block_hash TEXT NOT NULL, + tx_index INTEGER NOT NULL, + event_pos_in_tx INTEGER NOT NULL, + event_index INTEGER NOT NULL, evm_recipient TEXT NOT NULL, amount TEXT NOT NULL, raw_cert BLOB, @@ -114,7 +117,10 @@ impl BridgeDb { sqlx::query( "CREATE TABLE IF NOT EXISTS finished_burns ( linera_height INTEGER NOT NULL, - event_index INTEGER NOT NULL, + block_hash TEXT NOT NULL, + tx_index INTEGER NOT NULL, + event_pos_in_tx INTEGER NOT NULL, + event_index INTEGER NOT NULL, evm_recipient TEXT NOT NULL, amount TEXT NOT NULL, raw_cert BLOB, @@ -217,10 +223,14 @@ impl BridgeDb { /// Inserts a new pending burn. Ignores duplicates (idempotent). pub async fn insert_burn(&self, burn: &PendingBurn) -> Result<()> { sqlx::query( - "INSERT OR IGNORE INTO pending_burns (linera_height, event_index, evm_recipient, amount) - VALUES (?, ?, ?, ?)", + "INSERT OR IGNORE INTO pending_burns + (linera_height, block_hash, tx_index, event_pos_in_tx, event_index, evm_recipient, amount) + VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(burn.height.0 as i64) + .bind(burn.block_hash.to_string()) + .bind(burn.tx_index as i64) + .bind(burn.event_pos_in_tx as i64) .bind(burn.event_index as i64) .bind(format!("{:#x}", burn.evm_recipient)) .bind(burn.amount.to_string()) @@ -244,10 +254,10 @@ impl BridgeDb { let mut tx = self.pool.begin().await?; let inserted = sqlx::query( "INSERT OR IGNORE INTO finished_burns - (linera_height, event_index, evm_recipient, amount, raw_cert, - status, created_at) - SELECT linera_height, event_index, evm_recipient, amount, raw_cert, - ?, created_at + (linera_height, block_hash, tx_index, event_pos_in_tx, event_index, + evm_recipient, amount, raw_cert, status, created_at) + SELECT linera_height, block_hash, tx_index, event_pos_in_tx, event_index, + evm_recipient, amount, raw_cert, ?, created_at FROM pending_burns WHERE linera_height = ? AND event_index = ?", ) @@ -321,7 +331,7 @@ impl BridgeDb { /// in-memory `MonitorState`. pub async fn load_pending_burns(&self) -> Result> { let rows = sqlx::query( - "SELECT linera_height, event_index, evm_recipient, amount + "SELECT linera_height, block_hash, tx_index, event_pos_in_tx, event_index, evm_recipient, amount FROM pending_burns", ) .fetch_all(&self.pool) @@ -330,12 +340,20 @@ impl BridgeDb { let mut out = Vec::with_capacity(rows.len()); for row in rows { let height: i64 = row.get(0); - let event_index: i64 = row.get(1); - let evm_recipient: String = row.get(2); - let amount: String = row.get(3); + let block_hash: String = row.get(1); + let tx_index: i64 = row.get(2); + let event_pos_in_tx: i64 = row.get(3); + let event_index: i64 = row.get(4); + let evm_recipient: String = row.get(5); + let amount: String = row.get(6); out.push(PendingBurn { height: BlockHeight(height as u64), + block_hash: block_hash + .parse() + .context("invalid block_hash in burns row")?, + tx_index: tx_index as u32, + event_pos_in_tx: event_pos_in_tx as u32, event_index: event_index as u32, evm_recipient: evm_recipient .parse() @@ -382,7 +400,7 @@ mod tests { use std::sync::atomic::{AtomicU32, Ordering}; use alloy::primitives::{Address, B256, U256}; - use linera_base::data_types::Amount; + use linera_base::{crypto::CryptoHash, data_types::Amount}; use test_case::test_case; use super::*; @@ -422,6 +440,9 @@ mod tests { fn test_burn() -> PendingBurn { PendingBurn { height: BlockHeight(100), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, event_index: 0, evm_recipient: "0xabcdef1234567890abcdef1234567890abcdef12" .parse() diff --git a/linera-bridge/src/monitor/evm.rs b/linera-bridge/src/monitor/evm.rs index 805ab0e167eb..fa6229d8bc8f 100644 --- a/linera-bridge/src/monitor/evm.rs +++ b/linera-bridge/src/monitor/evm.rs @@ -94,8 +94,8 @@ pub(crate) async fn process_pending_deposits { - tracing::warn!(%tx_hash, "Proof generation failed: {e:#}"); + Err(error) => { + tracing::warn!(%tx_hash, ?error, "Proof generation failed"); } } - monitor.write().await.mark_deposit_retried(&pending.key); + monitor + .write() + .await + .mark_deposit_retried(&pending.key, max_retries) + .await; } } @@ -163,8 +167,8 @@ async fn evm_scan_iteration( }; let deposit = match parse_deposit_event(&receipt_log, bridge_addr) { Ok(d) => d, - Err(e) => { - tracing::warn!(%tx_hash, "Failed to parse DepositInitiated log: {e:#}"); + Err(error) => { + tracing::warn!(%tx_hash, ?error, "Failed to parse DepositInitiated log"); continue; } }; diff --git a/linera-bridge/src/monitor/linera.rs b/linera-bridge/src/monitor/linera.rs index f8118fdf333a..99f6a249520a 100644 --- a/linera-bridge/src/monitor/linera.rs +++ b/linera-bridge/src/monitor/linera.rs @@ -16,6 +16,7 @@ use crate::relay::{ self, evm::EvmClient, linera::{find_burn_events, LineraClient}, + settlement::estimate_fits, }; /// Background task that scans Linera block history for BurnEvent stream @@ -53,8 +54,9 @@ pub async fn linera_scan_loop( monitor: &RwLock, @@ -65,8 +67,11 @@ pub(crate) async fn process_pending_burns anyhow::Result<()> { loop { - let pending = monitor.read().await.next_burn_for_retry(max_retries); - let Some(pending) = pending else { + let groups = monitor + .read() + .await + .pending_burns_by_height_and_tx(max_retries); + if groups.is_empty() { tracing::trace!( ?poll_interval, "Linera burns processor sleeping until notified or poll interval elapses" @@ -78,86 +83,247 @@ pub(crate) async fn process_pending_burns {} } continue; - }; + } + for super::PendingBurnsAtHeight { + height, + block_hash, + event_indices, + by_tx, + } in groups + { + // Infrastructure-level failures (cert fetch, gas estimate) + // skip the height without consuming any burn's retry budget. + let cert = match linera_client.read_certificate(block_hash).await { + Ok(cert) => cert, + Err(error) => { + tracing::warn!(?height, ?block_hash, ?error, "Failed to read certificate"); + continue; + } + }; - let credit_height = pending.height; - let event_index = pending.event_index; - tracing::info!(?credit_height, event_index, "Processing burn..."); + persist_cert_bytes(monitor, height, &event_indices, &cert).await; - // Read the certificate at the burn's block height (already contains the auto-burn). - let cert = match async { - linera_client.sync().await?; - let info = linera_client.chain_info().await?; - let mut hash = info.block_hash; - loop { - let Some(h) = hash else { - anyhow::bail!("Block at height {credit_height} not found"); - }; - let c = linera_client.read_certificate(h).await?; - if c.block().header.height == credit_height { - break Ok(c); + match estimate_fits(evm_client.estimate_add_block_gas(&cert).await) { + Ok(true) => { + submit_addblock( + monitor, + evm_client, + &cert, + height, + &event_indices, + max_retries, + ) + .await; + relay::update_balance_metrics(evm_client, linera_client).await; + } + Ok(false) => { + submit_chunked(monitor, evm_client, &cert, height, &by_tx, max_retries).await; + relay::update_balance_metrics(evm_client, linera_client).await; + } + Err(error) => { + // Bumping the retry counter is critical: an `eth_estimateGas` + // revert is typically deterministic (the cert is invalid for + // the configured bridge), and without this the burn would be + // re-polled every scan interval forever. After `max_retries` + // the burns at this height transition to `failed` and stop + // being yielded by `pending_burns_by_height_and_tx`. + tracing::warn!(?height, ?error, "estimate_add_block_gas failed"); + let mut state = monitor.write().await; + for ei in &event_indices { + state.mark_burn_retried(height, *ei, max_retries).await; + } } - hash = c.block().header.previous_block_hash; } } - .await - { - Ok(cert) => cert, - Err(e) => { - tracing::warn!( - ?credit_height, - event_index, - "Failed to read certificate: {e:#}" - ); - monitor - .write() - .await - .mark_burn_retried(credit_height, event_index); - continue; - } - }; + } +} - // Persist raw BCS cert bytes so burns can be replayed without the relayer. - let cert_bytes = - bcs::to_bytes(&cert).expect("failed to BCS-serialize ConfirmedBlockCertificate"); - if let Some(db) = monitor.read().await.db() { - if let Err(e) = db - .store_burn_raw(credit_height, event_index, &cert_bytes) - .await - { - tracing::warn!( - ?credit_height, - event_index, - "Failed to store burn raw bytes: {e:#}" - ); +/// Submits the cert via `addBlock`. On success, leaves retry counts alone +/// (completion is observed asynchronously by `check_burn_completion`). On +/// failure, bumps retry once for every burn at the height — addBlock +/// attempted all of them. +async fn submit_addblock( + monitor: &RwLock, + evm_client: &EvmClient

, + cert: &linera_chain::types::ConfirmedBlockCertificate, + height: BlockHeight, + event_indices: &[u32], + max_retries: u32, +) { + match evm_client.forward_cert(cert).await { + Ok(()) => { + tracing::info!( + ?height, + count = event_indices.len(), + "Burns forwarded via addBlock" + ); + } + Err(error) => { + tracing::warn!(?height, ?error, "addBlock submission failed"); + let mut state = monitor.write().await; + for ei in event_indices { + state.mark_burn_retried(height, *ei, max_retries).await; } } + } +} - // Forward cert to EVM. `addBlock` returning Ok only proves the - // EVM tx didn't revert; it does NOT prove that this specific - // burn's `token.transfer` ran inside `_onBlock` (e.g. if the - // event match silently rejects the burn event). Per-burn - // completion is decided by `check_burn_completion` polling - // on-chain state. Here we only forward and retry. - match evm_client.forward_cert(&cert).await { - Ok(()) => { - tracing::info!(?credit_height, event_index, "Burn forwarded to EVM"); - relay::update_balance_metrics(evm_client, linera_client).await; - } - Err(e) => { - let msg = format!("{e:#}"); - if msg.contains("already verified") { - tracing::trace!(?credit_height, event_index, "Block already verified on EVM"); - } else { - tracing::warn!(?credit_height, event_index, "EVM forwarding failed: {e:#}"); - } - } +/// Per-tx chunked fallback: split each tx group's positions to fit under +/// the block gas limit, mark any individually-oversized burn as `failed`, +/// then submit each fitting chunk as an independent `processBurns` tx. +async fn submit_chunked( + monitor: &RwLock, + evm_client: &EvmClient

, + cert: &linera_chain::types::ConfirmedBlockCertificate, + height: BlockHeight, + by_tx: &[(u32, Vec)], + max_retries: u32, +) { + for (tx_index, positions) in by_tx { + let (chunks, oversized) = split_to_fit(evm_client, cert, *tx_index, positions).await; + mark_oversized_failed(monitor, height, *tx_index, &oversized).await; + submit_chunks_with_retry( + monitor, + evm_client, + cert, + height, + *tx_index, + chunks, + max_retries, + ) + .await; + } +} + +/// Iterative LIFO binary search: keep halving slices that don't fit until +/// each one either fits as a chunk or shrinks to a single oversized +/// position. Inlined as a free fn rather than factored behind an async +/// predicate so it doesn't need an `AsyncFn` closure capturing +/// `&EvmClient` across awaits (which trips an HRTB Send bound under +/// `tokio::spawn`). +/// +/// Returns `(chunks, oversized)`. Chunks are in input order because each +/// split pushes right then left. +async fn split_to_fit( + evm_client: &EvmClient

, + cert: &linera_chain::types::ConfirmedBlockCertificate, + tx_index: u32, + positions: &[u32], +) -> (Vec>, Vec) { + let mut stack: Vec> = vec![positions.to_vec()]; + let mut chunks: Vec> = Vec::new(); + let mut oversized: Vec = Vec::new(); + while let Some(slice) = stack.pop() { + let est = evm_client + .estimate_process_burns_gas(cert, tx_index, &slice) + .await; + let fits = estimate_fits(est).unwrap_or(false); + if fits { + chunks.push(slice); + continue; } + if slice.len() == 1 { + oversized.push(slice[0]); + continue; + } + let mid = slice.len() / 2; + let (left, right) = slice.split_at(mid); + stack.push(right.to_vec()); + stack.push(left.to_vec()); + } + (chunks, oversized) +} - monitor - .write() +/// Marks oversized positions `failed` so they drop out of subsequent +/// `pending_burns_by_height_and_tx` snapshots instead of poisoning their +/// tx group on every retry pass. +async fn mark_oversized_failed( + monitor: &RwLock, + height: BlockHeight, + tx_index: u32, + oversized: &[u32], +) { + if oversized.is_empty() { + return; + } + let to_fail = { + let state = monitor.read().await; + oversized + .iter() + .filter_map(|&pos| state.event_index_for_pos(height, tx_index, pos)) + .collect::>() + }; + let mut state = monitor.write().await; + for event_index in to_fail { + tracing::error!( + ?height, + tx_index, + event_index, + "single burn does not fit under the EVM block gas limit; marking failed" + ); + state.mark_burn_failed(height, event_index).await; + } +} + +/// Submits each chunk as its own `processBurns` tx. A chunk's failure only +/// consumes that chunk's retry budget — remaining chunks still attempt +/// submission, because each is independent on-chain. +async fn submit_chunks_with_retry( + monitor: &RwLock, + evm_client: &EvmClient

, + cert: &linera_chain::types::ConfirmedBlockCertificate, + height: BlockHeight, + tx_index: u32, + events_chunks: Vec>, + max_retries: u32, +) { + for events_chunk in events_chunks { + if let Err(error) = evm_client + .process_burns(cert, tx_index, &events_chunk) .await - .mark_burn_retried(credit_height, event_index); + { + tracing::warn!( + tx_index, + ?events_chunk, + ?error, + "processBurns submission failed" + ); + let to_bump = { + let state = monitor.read().await; + events_chunk + .iter() + .filter_map(|&pos| state.event_index_for_pos(height, tx_index, pos)) + .collect::>() + }; + let mut state = monitor.write().await; + for event_index in to_bump { + state + .mark_burn_retried(height, event_index, max_retries) + .await; + } + } + } +} + +/// Stores BCS cert bytes for every pending burn at `height`. +async fn persist_cert_bytes( + monitor: &RwLock, + height: BlockHeight, + event_indices: &[u32], + cert: &linera_chain::types::ConfirmedBlockCertificate, +) { + let cert_bytes = bcs::to_bytes(cert).expect("BCS-serialize cert"); + let state = monitor.read().await; + let Some(db) = state.db() else { return }; + for event_index in event_indices { + if let Err(error) = db.store_burn_raw(height, *event_index, &cert_bytes).await { + tracing::warn!( + ?height, + ?event_index, + ?error, + "Failed to store burn raw bytes" + ); + } } } @@ -186,17 +352,20 @@ async fn linera_scan_iteration( break; } hash = block.block().header.previous_block_hash; - blocks.push(block); + blocks.push((h, block)); } blocks.reverse(); let mut new_burns = Vec::new(); - for block in &blocks { + for (block_hash, block) in &blocks { let height = block.block().header.height; let burn_events = find_burn_events(&block.block().body.events, fungible_app_id); - for (event_index, burn_event) in burn_events { + for (tx_index, event_pos_in_tx, event_index, burn_event) in burn_events { new_burns.push(( height, + *block_hash, + tx_index, + event_pos_in_tx, event_index, Address::from(burn_event.target), burn_event.amount, @@ -205,13 +374,18 @@ async fn linera_scan_iteration( } let mut tracked_any = false; - for (height, event_index, recipient, amount) in &new_burns { - tracing::info!(?height, event_index, %recipient, %amount, "Discovered burn"); + for (height, block_hash, tx_index, event_pos_in_tx, event_index, recipient, amount) in + &new_burns + { + tracing::info!(?height, ?block_hash, tx_index, event_pos_in_tx, event_index, %recipient, %amount, "Discovered burn"); let was_new = monitor .write() .await .track_burn(PendingBurn { height: *height, + block_hash: *block_hash, + tx_index: *tx_index, + event_pos_in_tx: *event_pos_in_tx, event_index: *event_index, evm_recipient: *recipient, amount: *amount, @@ -261,11 +435,12 @@ async fn check_burn_completion( .await; } Ok(false) => {} - Err(e) => { + Err(error) => { tracing::warn!( ?height, event_index, - "is_burn_processed query failed: {e:#}" + ?error, + "is_burn_processed query failed" ); } } diff --git a/linera-bridge/src/monitor/mod.rs b/linera-bridge/src/monitor/mod.rs index 221155484f56..917ce4da4bd0 100644 --- a/linera-bridge/src/monitor/mod.rs +++ b/linera-bridge/src/monitor/mod.rs @@ -13,13 +13,14 @@ pub mod evm; pub mod linera; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap}, sync::Arc, time::{Duration, Instant}, }; use alloy::primitives::{Address, B256, U256}; use linera_base::{ + crypto::CryptoHash, data_types::{Amount, BlockHeight}, identifiers::ApplicationId, }; @@ -65,6 +66,21 @@ pub struct PendingDeposit { #[derive(Debug, Clone, serde::Serialize)] pub struct PendingBurn { pub height: BlockHeight, + /// Hash of the Linera block that produced this burn. Lets the relayer + /// fetch the certificate via a direct `linera_client.read_certificate` + /// call instead of walking the chain backwards from head. + pub block_hash: CryptoHash, + /// Position of this burn's transaction within `body.events`. + /// Used by `processBurns(cert, tx_index, ...)`. + pub tx_index: u32, + /// Position of this burn within `body.events[tx_index]`. + /// Used by `processBurns(cert, tx_index, [event_pos_in_tx, ...])`. + pub event_pos_in_tx: u32, + /// `Event.index` — sequential position of this burn within its stream + /// (the "burns" stream of the configured fungible app on the configured + /// Linera chain). Unique for the lifetime of that stream, so unique + /// across all heights within the relayer's scope. Off-chain and on-chain + /// dedup key. pub event_index: u32, pub evm_recipient: Address, pub amount: Amount, @@ -98,6 +114,50 @@ impl Tracked { pub type TrackedDeposit = Tracked; pub type TrackedBurn = Tracked; +/// One height's slice of `pending_burns_by_height_and_tx`. The two views +/// (`event_indices` and `by_tx`) describe the same set of burns under one +/// retry-filter snapshot. +/// +/// `Ord` is keyed solely on `height`, so a `BTreeSet` +/// is naturally height-sorted and structurally rejects a second entry for +/// the same height (which never happens in practice — there is at most one +/// entry per height). +#[derive(Debug, Clone)] +pub struct PendingBurnsAtHeight { + pub height: BlockHeight, + /// Hash of the Linera block at `height` — lets `process_pending_burns` + /// pull the certificate via a direct `read_certificate` call. + pub block_hash: CryptoHash, + /// Stream indices (`Event.index`) of every pending burn at this height, + /// sorted ascending. Used for retry accounting and cert persistence. + pub event_indices: Vec, + /// Pending burns grouped by `tx_index`, in ascending `tx_index` order; + /// the `Vec` inside each entry is the sorted `event_pos_in_tx` + /// positions for that tx — input to the chunked `processBurns` + /// fallback when `addBlock` would not fit. + pub by_tx: Vec<(u32, Vec)>, +} + +impl PartialEq for PendingBurnsAtHeight { + fn eq(&self, other: &Self) -> bool { + self.height == other.height + } +} + +impl Eq for PendingBurnsAtHeight {} + +impl Ord for PendingBurnsAtHeight { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.height.cmp(&other.height) + } +} + +impl PartialOrd for PendingBurnsAtHeight { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// In-memory monitoring state shared across scan loops and HTTP handlers. pub struct MonitorState { pub(crate) deposits: HashMap, @@ -136,8 +196,8 @@ impl MonitorState { Entry::Occupied(_) => false, Entry::Vacant(e) => { if let Some(db) = &self.db { - if let Err(e) = db.insert_deposit(&pending).await { - tracing::warn!("Failed to persist deposit to SQLite: {e:#}"); + if let Err(error) = db.insert_deposit(&pending).await { + tracing::warn!(?error, "Failed to persist deposit to SQLite"); } } e.insert(Tracked::new(pending)); @@ -152,8 +212,8 @@ impl MonitorState { d.forwarded = true; crate::relay::metrics::deposit_completed(); if let Some(db) = &self.db { - if let Err(e) = db.update_deposit_status(key, "completed").await { - tracing::warn!(?key, "Failed to update deposit status in SQLite: {e:#}"); + if let Err(error) = db.update_deposit_status(key, "completed").await { + tracing::warn!(?key, ?error, "Failed to update deposit status in SQLite"); } } } else { @@ -170,8 +230,8 @@ impl MonitorState { Entry::Occupied(_) => false, Entry::Vacant(e) => { if let Some(db) = &self.db { - if let Err(err) = db.insert_burn(&pending).await { - tracing::warn!("Failed to persist burn to SQLite: {err:#}"); + if let Err(error) = db.insert_burn(&pending).await { + tracing::warn!(?error, "Failed to persist burn to SQLite"); } } e.insert(Tracked::new(pending)); @@ -186,14 +246,15 @@ impl MonitorState { b.forwarded = true; crate::relay::metrics::burn_completed(); if let Some(db) = &self.db { - if let Err(e) = db + if let Err(error) = db .update_burn_status(height, event_index, "completed") .await { tracing::warn!( ?height, event_index, - "Failed to update burn status in SQLite: {e:#}" + ?error, + "Failed to update burn status in SQLite" ); } } @@ -226,6 +287,55 @@ impl MonitorState { self.burns.values().filter(|b| b.forwarded).collect() } + /// Returns one `PendingBurnsAtHeight` per height with pending burns, + /// in ascending height order. Burns are skipped if they are `failed` + /// (e.g. permanently oversized) or have exceeded `max_retries`. + pub fn pending_burns_by_height_and_tx( + &self, + max_retries: u32, + ) -> BTreeSet { + struct HeightAccum { + block_hash: CryptoHash, + by_tx: BTreeMap>, + event_indices: Vec, + } + let mut tree: BTreeMap = BTreeMap::new(); + for tracked in self.pending_burns() { + // Failed burns (e.g. permanently oversized) and burns past the + // retry budget are no longer eligible for processing. + if tracked.failed || tracked.retry_count >= max_retries { + continue; + } + // All burns at a given height share the same block (cert) hash, + // so the first one populates it and the rest just append. + let entry = tree.entry(tracked.value.height).or_insert(HeightAccum { + block_hash: tracked.value.block_hash, + by_tx: BTreeMap::new(), + event_indices: Vec::new(), + }); + entry + .by_tx + .entry(tracked.value.tx_index) + .or_default() + .push(tracked.value.event_pos_in_tx); + entry.event_indices.push(tracked.value.event_index); + } + tree.into_iter() + .map(|(h, mut accum)| { + for positions in accum.by_tx.values_mut() { + positions.sort_unstable(); + } + accum.event_indices.sort_unstable(); + PendingBurnsAtHeight { + height: h, + block_hash: accum.block_hash, + event_indices: accum.event_indices, + by_tx: accum.by_tx.into_iter().collect(), + } + }) + .collect() + } + pub fn deposits_ready_for_retry(&self, max_retries: u32) -> Vec<&TrackedDeposit> { self.deposits .values() @@ -301,10 +411,20 @@ impl MonitorState { Ok(()) } - pub fn mark_deposit_retried(&mut self, key: &DepositKey) { - if let Some(d) = self.deposits.get_mut(key) { + /// Bumps the deposit's retry counter; if the bump exhausts `max_retries`, + /// the deposit is marked `failed` (moved to `finished_deposits` in + /// SQLite) so it does not get loaded back as a pending item on the next + /// relayer start. + pub async fn mark_deposit_retried(&mut self, key: &DepositKey, max_retries: u32) { + let exhausted = if let Some(d) = self.deposits.get_mut(key) { d.retry_count += 1; d.last_retry_at = Some(Instant::now()); + d.retry_count >= max_retries + } else { + false + }; + if exhausted { + self.mark_deposit_failed(key).await; } } @@ -313,17 +433,50 @@ impl MonitorState { d.failed = true; crate::relay::metrics::deposit_failed(); if let Some(db) = &self.db { - if let Err(e) = db.update_deposit_status(key, "failed").await { - tracing::warn!(?key, "Failed to update deposit status in SQLite: {e:#}"); + if let Err(error) = db.update_deposit_status(key, "failed").await { + tracing::warn!(?key, ?error, "Failed to update deposit status in SQLite"); } } } } - pub fn mark_burn_retried(&mut self, height: BlockHeight, event_index: u32) { - if let Some(b) = self.burns.get_mut(&(height, event_index)) { + /// Looks up the `event_index` of the pending burn at + /// `(height, tx_index, pos_in_tx)`. Used by `process_pending_burns` + /// to map per-chunk positions back to the stream-index keys that + /// `mark_burn_retried` / `mark_burn_failed` expect. + pub fn event_index_for_pos( + &self, + height: BlockHeight, + tx_index: u32, + pos_in_tx: u32, + ) -> Option { + self.burns.values().find_map(|b| { + (b.value.height == height + && b.value.tx_index == tx_index + && b.value.event_pos_in_tx == pos_in_tx) + .then_some(b.value.event_index) + }) + } + + /// Bumps the burn's retry counter; if the bump exhausts `max_retries`, + /// the burn is marked `failed` (moved to `finished_burns` in SQLite) so + /// it does not get loaded back as a pending item on the next relayer + /// start. + pub async fn mark_burn_retried( + &mut self, + height: BlockHeight, + event_index: u32, + max_retries: u32, + ) { + let exhausted = if let Some(b) = self.burns.get_mut(&(height, event_index)) { b.retry_count += 1; b.last_retry_at = Some(Instant::now()); + b.retry_count >= max_retries + } else { + false + }; + if exhausted { + self.mark_burn_failed(height, event_index).await; } } @@ -332,11 +485,12 @@ impl MonitorState { b.failed = true; crate::relay::metrics::burn_failed(); if let Some(db) = &self.db { - if let Err(e) = db.update_burn_status(height, event_index, "failed").await { + if let Err(error) = db.update_burn_status(height, event_index, "failed").await { tracing::warn!( ?height, event_index, - "Failed to update burn status in SQLite: {e:#}" + ?error, + "Failed to update burn status in SQLite" ); } } @@ -482,6 +636,9 @@ mod tests { state .track_burn(PendingBurn { height: BlockHeight(10), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, event_index: 0, evm_recipient: Address::from([0xab; 20]), amount: Amount::from_attos(500), @@ -519,6 +676,9 @@ mod tests { state .track_burn(PendingBurn { height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, event_index: 0, evm_recipient: Address::from([0x12; 20]), amount: Amount::from_attos(100), @@ -576,7 +736,7 @@ mod tests { assert_eq!(state.deposits_ready_for_retry(10).len(), 1); - state.mark_deposit_retried(&key); + state.mark_deposit_retried(&key, 10).await; assert_eq!(state.deposits_ready_for_retry(10).len(), 0); state.mark_deposit_failed(&key).await; @@ -609,7 +769,7 @@ mod tests { let next = state.next_deposit_for_retry(10); assert!(matches!(next, Some(p) if p.key == key)); - state.mark_deposit_retried(&key); + state.mark_deposit_retried(&key, 10).await; assert!(state.next_deposit_for_retry(10).is_none()); state.complete_deposit(&key).await; @@ -671,6 +831,9 @@ mod tests { .unwrap(); db.insert_burn(&PendingBurn { height: BlockHeight(99), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, event_index: 2, evm_recipient: Address::from([0xDD; 20]), amount: Amount::from_attos(7), @@ -697,6 +860,9 @@ mod tests { state .track_burn(PendingBurn { height, + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, event_index: 0, evm_recipient: Address::from([0xab; 20]), amount: Amount::from_attos(500), @@ -704,11 +870,147 @@ mod tests { .await; assert!(state.next_burn_for_retry(10).is_some()); - state.mark_burn_retried(height, 0); + state.mark_burn_retried(height, 0, 10).await; assert!(state.next_burn_for_retry(10).is_none()); // Once forwarded, the item is no longer offered for retry. state.complete_burn(height, 0).await; assert!(state.next_burn_for_retry(10).is_none()); } + + #[tokio::test] + async fn pending_burns_by_height_and_tx_groups_and_sorts() { + let mut state = MonitorState::new(0); + let burns = [ + // Two burns at height 5: tx 0 has positions 1 then 0 (out of + // order so the helper's sort is tested); tx 1 has one burn. + PendingBurn { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 1, + event_index: 11, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }, + PendingBurn { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, + event_index: 10, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }, + PendingBurn { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 1, + event_pos_in_tx: 0, + event_index: 12, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }, + // One burn at a later height. + PendingBurn { + height: BlockHeight(7), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, + event_index: 0, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }, + ]; + for b in burns { + state.track_burn(b).await; + } + + let groups = state.pending_burns_by_height_and_tx(/* max_retries */ 10); + // Same retry filter applies to both views — event_indices is the + // sorted list of `event_index` values at each height, used by + // `process_pending_burns` for retry accounting and cert persistence. + let expected: BTreeSet = [ + PendingBurnsAtHeight { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + event_indices: vec![10u32, 11, 12], + by_tx: vec![(0u32, vec![0u32, 1]), (1u32, vec![0u32])], + }, + PendingBurnsAtHeight { + height: BlockHeight(7), + block_hash: CryptoHash::from([0u8; 32]), + event_indices: vec![0u32], + by_tx: vec![(0u32, vec![0u32])], + }, + ] + .into_iter() + .collect(); + assert_eq!(groups, expected); + } + + #[tokio::test] + async fn pending_burns_by_height_and_tx_excludes_failed_burns() { + // After a burn is marked `failed` (e.g. oversized in the chunked + // `processBurns` path), it must not reappear in subsequent retry + // snapshots — otherwise the chunking loop would keep re-discovering + // it as oversized and burn estimate-RPC budget on every pass. + let mut state = MonitorState::new(0); + state + .track_burn(PendingBurn { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 0, + event_index: 10, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }) + .await; + state + .track_burn(PendingBurn { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 0, + event_pos_in_tx: 1, + event_index: 11, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }) + .await; + + state.mark_burn_failed(BlockHeight(5), 10).await; + + let groups = state.pending_burns_by_height_and_tx(/* max_retries */ 10); + let expected: BTreeSet = [PendingBurnsAtHeight { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + event_indices: vec![11u32], + by_tx: vec![(0u32, vec![1u32])], + }] + .into_iter() + .collect(); + assert_eq!(groups, expected); + } + + #[tokio::test] + async fn event_index_for_pos_matches_tracked_burn() { + let mut state = MonitorState::new(0); + state + .track_burn(PendingBurn { + height: BlockHeight(5), + block_hash: CryptoHash::from([0u8; 32]), + tx_index: 2, + event_pos_in_tx: 1, + event_index: 42, + evm_recipient: Address::ZERO, + amount: Amount::ZERO, + }) + .await; + + assert_eq!(state.event_index_for_pos(BlockHeight(5), 2, 1), Some(42)); + assert_eq!(state.event_index_for_pos(BlockHeight(5), 2, 0), None); + assert_eq!(state.event_index_for_pos(BlockHeight(5), 0, 1), None); + assert_eq!(state.event_index_for_pos(BlockHeight(6), 2, 1), None); + } } diff --git a/linera-bridge/src/relay/committee.rs b/linera-bridge/src/relay/committee.rs index 79ff4d503c29..f5f58178cf9e 100644 --- a/linera-bridge/src/relay/committee.rs +++ b/linera-bridge/src/relay/committee.rs @@ -69,8 +69,8 @@ where { let current_epoch = match evm_client.get_current_epoch().await { Ok(epoch) => epoch, - Err(e) => { - tracing::info!("LightClient not initialized yet, skipping catch-up: {e:#}"); + Err(error) => { + tracing::info!(?error, "LightClient not initialized yet, skipping catch-up"); return Ok(()); } }; diff --git a/linera-bridge/src/relay/evm.rs b/linera-bridge/src/relay/evm.rs index 971122dfd43a..b4bdd2926e01 100644 --- a/linera-bridge/src/relay/evm.rs +++ b/linera-bridge/src/relay/evm.rs @@ -19,6 +19,7 @@ sol! { #[sol(rpc)] interface IFungibleBridge { function addBlock(bytes calldata data) external; + function processBurns(bytes calldata data, uint32 txIndex, uint32[] calldata eventPositionsInTx) external; function lightClient() external view returns (address); function isBurnProcessed(uint64 height, uint32 eventIndex) external view returns (bool); } @@ -137,6 +138,72 @@ impl EvmClient

{ Ok(()) } + /// Dry-runs `addBlock(cert)` against the EVM to estimate the gas it + /// would consume. `Ok(g)` means the call would fit under the node's + /// current block gas limit (the value is the estimate); a gas-exceeded + /// RPC error indicates the call would not fit. Other RPC errors bubble + /// up. Classification is done by `relay::settlement::estimate_fits`. + pub async fn estimate_add_block_gas( + &self, + cert: &linera_chain::types::ConfirmedBlockCertificate, + ) -> alloy::contract::Result { + let cert_bytes = bcs::to_bytes(cert).expect("BCS-serialize cert"); + tracing::trace!(size = cert_bytes.len(), "Estimating gas for addBlock"); + let bridge = IFungibleBridge::new(self.bridge_addr, &self.provider); + bridge.addBlock(cert_bytes.into()).estimate_gas().await + } + + /// Same as `estimate_add_block_gas` but for + /// `processBurns(cert, tx_index, positions_in_tx)`. + pub async fn estimate_process_burns_gas( + &self, + cert: &linera_chain::types::ConfirmedBlockCertificate, + tx_index: u32, + positions_in_tx: &[u32], + ) -> alloy::contract::Result { + let cert_bytes = bcs::to_bytes(cert).expect("BCS-serialize cert"); + tracing::trace!( + tx_index, + count = positions_in_tx.len(), + size = cert_bytes.len(), + "Estimating gas for processBurns" + ); + let bridge = IFungibleBridge::new(self.bridge_addr, &self.provider); + bridge + .processBurns(cert_bytes.into(), tx_index, positions_in_tx.to_vec()) + .estimate_gas() + .await + } + + /// Submits `processBurns(cert, tx_index, positions_in_tx)` and waits + /// for the receipt. Used after `split_to_fit` returns a chunk. + pub async fn process_burns( + &self, + cert: &linera_chain::types::ConfirmedBlockCertificate, + tx_index: u32, + positions_in_tx: &[u32], + ) -> Result<()> { + let cert_bytes = bcs::to_bytes(cert).expect("BCS-serialize cert"); + let bridge = IFungibleBridge::new(self.bridge_addr, &self.provider); + tracing::info!( + tx_index, + count = positions_in_tx.len(), + size = cert_bytes.len(), + "Calling processBurns on FungibleBridge..." + ); + let pending_tx = bridge + .processBurns(cert_bytes.into(), tx_index, positions_in_tx.to_vec()) + .send() + .await + .context("processBurns send failed")?; + let receipt = pending_tx + .get_receipt() + .await + .context("processBurns receipt failed")?; + tracing::info!(tx = ?receipt.transaction_hash, "processBurns transaction confirmed"); + Ok(()) + } + /// Discovers the LightClient contract address from the FungibleBridge. pub async fn get_light_client_address(&self) -> Result

{ self.light_client_addr diff --git a/linera-bridge/src/relay/linera.rs b/linera-bridge/src/relay/linera.rs index fe51a997d5a6..1f67193e07fd 100644 --- a/linera-bridge/src/relay/linera.rs +++ b/linera-bridge/src/relay/linera.rs @@ -148,17 +148,19 @@ impl Clone for LineraClient { } } -/// Finds all `BurnEvent`s in a block's event streams for a given application, -/// returning each burn paired with the underlying `Event.index` — the -/// position of the event within its stream. That index is what the -/// `FungibleBridge` contract keys its per-burn dedup mapping on. +/// Finds all `BurnEvent`s in a block's event streams for a given application. +/// +/// Returns `(tx_index, event_pos_in_tx, event_index, BurnEvent)` for each burn: +/// - `tx_index`: position of the transaction within `body.events` (outer index) +/// - `event_pos_in_tx`: position of the event within `body.events[tx_index]` (inner index) +/// - `event_index`: `Event.index` — the stream index, used as the on-chain dedup key pub(crate) fn find_burn_events( events: &[Vec], fungible_app_id: ApplicationId, -) -> Vec<(u32, wrapped_fungible::BurnEvent)> { +) -> Vec<(u32, u32, u32, wrapped_fungible::BurnEvent)> { let mut result = Vec::new(); - for tx_events in events { - for event in tx_events { + for (tx_index, tx_events) in (0u32..).zip(events) { + for (event_pos, event) in (0u32..).zip(tx_events) { if event.stream_id.application_id != GenericApplicationId::User(fungible_app_id) { continue; } @@ -166,7 +168,7 @@ pub(crate) fn find_burn_events( continue; } if let Ok(burn) = bcs::from_bytes::(&event.value) { - result.push((event.index, burn)); + result.push((tx_index, event_pos, event.index, burn)); } } } diff --git a/linera-bridge/src/relay/mod.rs b/linera-bridge/src/relay/mod.rs index 49fbb2b41dba..9bad1e7f901d 100644 --- a/linera-bridge/src/relay/mod.rs +++ b/linera-bridge/src/relay/mod.rs @@ -17,6 +17,7 @@ mod committee; pub mod evm; pub mod linera; pub(crate) mod metrics; +pub(crate) mod settlement; use std::{path::Path, sync::Arc, time::Duration}; @@ -62,7 +63,7 @@ pub(crate) async fn update_evm_balance_metric( let wei: u128 = balance.to(); metrics::set_relayer_evm_balance(wei as f64); } - Err(e) => tracing::warn!("Failed to query EVM relayer balance: {e:#}"), + Err(error) => tracing::warn!(?error, "Failed to query EVM relayer balance"), } } @@ -71,7 +72,7 @@ pub(crate) async fn update_linera_balance_metric metrics::set_relayer_linera_balance(u128::from(balance) as f64), - Err(e) => tracing::warn!("Failed to query Linera chain balance: {e:#}"), + Err(error) => tracing::warn!(?error, "Failed to query Linera chain balance"), } } diff --git a/linera-bridge/src/relay/settlement.rs b/linera-bridge/src/relay/settlement.rs new file mode 100644 index 000000000000..0852ee4d9973 --- /dev/null +++ b/linera-bridge/src/relay/settlement.rs @@ -0,0 +1,137 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Pure routing helpers for the per-burn fallback path. +//! +//! `estimate_fits` turns a raw `eth_estimateGas` result into a fits / +//! doesn't-fit decision. The actual chunking algorithm is inlined in +//! `monitor::linera::process_pending_burns` to avoid an `AsyncFn` +//! predicate's HRTB Send issue when the future captures `&EvmClient` +//! across await points. + +/// Turns a raw `eth_estimateGas` result into a fits/doesn't-fit decision. +/// `Ok(_)` means the node already accepted the estimate. A gas-exceeded +/// RPC error means the call wouldn't fit. Other errors bubble up. +pub fn estimate_fits(r: Result) -> anyhow::Result { + match r { + Ok(_) => Ok(true), + Err(e) if is_gas_exceeded_error(&e) => Ok(false), + Err(e) => Err(e.into()), + } +} + +/// Returns true if the error is a JSON-RPC error reporting that the call +/// would not fit under the node's block gas limit — i.e. the node refused to +/// estimate because the work required more gas than a single block can hold. +/// +/// Structurally pattern-matches `RpcError::ErrorResp` and substring-checks +/// the `ErrorPayload`'s `message` against node-specific wordings observed +/// from geth, erigon, alchemy, reth, and anvil (1.6, both calldata-too-big +/// and infinite-loop constructors). Transport-level failures (HTTP, +/// timeouts) and non-RPC errors return `false`. +/// +/// We deliberately do NOT match a bare `"out of gas"` — a regular tx can OOG +/// for reasons unrelated to the block gas limit (e.g. a too-low tx gas cap, or +/// contract state consuming more gas than estimated). Treating those as +/// "doesn't fit" would mask real misconfigurations behind retry churn down +/// the chunking path. +fn is_gas_exceeded_error(error: &alloy::contract::Error) -> bool { + let alloy::contract::Error::TransportError(transport_err) = error else { + return false; + }; + let Some(payload) = transport_err.as_error_resp() else { + return false; + }; + let msg = payload.message.to_lowercase(); + msg.contains("gas required exceeds") || msg.contains("exceeds block gas limit") +} + +#[cfg(test)] +mod tests { + use alloy::{ + contract::Error as ContractError, rpc::json_rpc::ErrorPayload, transports::RpcError, + }; + use serde_json::value::RawValue; + + use super::*; + + /// Builds an `Err(ContractError)` whose underlying transport carries a + /// JSON-RPC `ErrorResp` — the same shape geth, reth, anvil, alchemy, and + /// other nodes return for validation/estimate failures. + fn rpc_error(message: &str, data: Option>) -> ContractError { + let payload: ErrorPayload = ErrorPayload { + code: -32000, + message: message.to_string().into(), + data, + }; + ContractError::TransportError(RpcError::ErrorResp(payload)) + } + + #[test] + fn estimate_fits_ok_returns_true() { + assert!(estimate_fits(Ok(123_456)).unwrap()); + assert!(estimate_fits(Ok(0)).unwrap()); + } + + #[test] + fn estimate_fits_gas_required_exceeds_returns_false() { + // Geth / Erigon / Alchemy wording for an estimate that would not fit. + let err = rpc_error("gas required exceeds allowance (30000000)", None); + assert!(!estimate_fits(Err(err)).unwrap()); + } + + #[test] + fn estimate_fits_exceeds_block_gas_limit_returns_false() { + // Reth / foundry forks wording. + let err = rpc_error("call exceeds block gas limit", None); + assert!(!estimate_fits(Err(err)).unwrap()); + } + + #[test] + fn estimate_fits_anvil_out_of_gas_returns_false() { + // Anvil 1.6's eth_estimateGas wording for a block-gas-limit hit + // (verified empirically against both calldata-too-large and + // infinite-loop constructors). Substring "gas required exceeds" + // matches the geth/anvil/alchemy branch. + let err = rpc_error("Out of gas: gas required exceeds allowance: 100000", None); + assert!(!estimate_fits(Err(err)).unwrap()); + } + + #[test] + fn estimate_fits_real_contract_revert_bubbles_up() { + // `execution reverted` is a real on-chain revert (REVERT opcode), + // not a block-gas-limit signal — verified on anvil 1.6, which + // returns this message with `data: "0x"` (i.e. `Some`, not `None`). + let revert_data: Box = serde_json::from_str(r#""0x""#).unwrap(); + let err = rpc_error("execution reverted", Some(revert_data)); + assert!(estimate_fits(Err(err)).is_err()); + } + + /// A bare "out of gas" message is NOT a fits/doesn't-fit signal — it can + /// also be raised when the relayer's tx gas cap is too low or the + /// contract state consumes more gas than estimated. Treating it as + /// "doesn't fit" would mask those real misconfigurations behind retry + /// churn down the chunking path. + #[test] + fn estimate_fits_bare_out_of_gas_bubbles_up() { + let err = rpc_error("transaction reverted: out of gas", None); + assert!(estimate_fits(Err(err)).is_err()); + } + + #[test] + fn estimate_fits_unrelated_rpc_error_bubbles_up() { + let err = rpc_error("nonce too low", None); + assert!(estimate_fits(Err(err)).is_err()); + } + + #[test] + fn estimate_fits_transport_layer_error_bubbles_up() { + // HTTP-level / connection failure — not a JSON-RPC ErrorResp; must + // not be classified as gas-exceeded. + use alloy::transports::TransportErrorKind; + let err = ContractError::TransportError(TransportErrorKind::custom_str( + "connection reset by peer", + )); + assert!(estimate_fits(Err(err)).is_err()); + } +} diff --git a/linera-bridge/src/solidity/FungibleBridge.sol b/linera-bridge/src/solidity/FungibleBridge.sol index 9433ade043ad..86cf31a7837a 100644 --- a/linera-bridge/src/solidity/FungibleBridge.sol +++ b/linera-bridge/src/solidity/FungibleBridge.sol @@ -55,7 +55,7 @@ contract FungibleBridge is Microchain { /// `Event.index` from the Linera block body — the same value the /// off-chain relayer pulls from the certificate. function isBurnProcessed(uint64 height, uint32 eventIndex) external view returns (bool) { - return processedBurns[keccak256(abi.encode(height, eventIndex))]; + return processedBurns[_burnKey(height, eventIndex)]; } /// Locks ERC-20 tokens in the bridge and emits a DepositInitiated event. @@ -103,28 +103,87 @@ contract FungibleBridge is Microchain { BridgeTypes.Event[] memory txEvents = blockValue.body.events[i]; for (uint256 j = 0; j < txEvents.length; j++) { BridgeTypes.Event memory evt = txEvents[j]; + if (!_isMatchingBurn(evt, burnsHash)) continue; - // choice==1 is User application - if (evt.stream_id.application_id.choice != 1) continue; - if (evt.stream_id.application_id.user.application_description_hash.value != fungibleApplicationId) { - continue; - } - - // Check stream name is "burns" - if (keccak256(evt.stream_id.stream_name.value) != burnsHash) continue; - - bytes32 key = keccak256(abi.encode(height, evt.index)); + bytes32 key = _burnKey(height, evt.index); if (processedBurns[key]) continue; - WrappedFungibleTypes.BurnEvent memory burnEvt = - WrappedFungibleTypes.bcs_deserialize_BurnEvent(evt.value); - address target = address(burnEvt.target); - require(token.transfer(target, burnEvt.amount.value), "token transfer failed"); - processedBurns[key] = true; + _releaseBurn(evt, key); } } } + /// Processes burns at the requested `eventPositionsInTx` positions + /// within transaction `txIndex` of `cert`. Verifies the cert once + /// and uses direct array access (`body.events[txIndex][pos]`) for + /// every burn — no nested-loop scan. The off-chain relayer uses + /// this when `addBlock(cert)` would not fit in a single EVM tx, + /// chunking burns per-tx-then-by-gas. + /// + /// Idempotent like `_onBlock`: positions already in `processedBurns` are + /// skipped silently rather than reverted. Lets the relayer recover from + /// overlap with a prior `addBlock` (or a racing/retrying `processBurns`) + /// instead of losing the whole chunk to a single duplicate. + /// + /// Reverts (atomically — no `processedBurns` flag is set if the call + /// reverts) on: + /// - empty `eventPositionsInTx` (`"empty positions"`) + /// - `txIndex` out of range (`"txIndex out of range"`) + /// - any position out of range (`"eventPos out of range"`) + /// - any position whose event is not a matching burn for this app + /// (`"not a matching burn"`) + /// - any failed `token.transfer` (`"token transfer failed"`) + function processBurns(bytes calldata data, uint32 txIndex, uint32[] calldata eventPositionsInTx) external { + require(eventPositionsInTx.length > 0, "empty positions"); + (BridgeTypes.Block memory blockValue,) = lightClient.verifyBlock(data); + require(blockValue.header.chain_id.value.value == chainId, "chain id mismatch"); + require(txIndex < blockValue.body.events.length, "txIndex out of range"); + + uint64 height = blockValue.header.height.value; + bytes32 burnsHash = keccak256("burns"); + BridgeTypes.Event[] memory txEvents = blockValue.body.events[txIndex]; + + for (uint256 k = 0; k < eventPositionsInTx.length; k++) { + uint32 pos = eventPositionsInTx[k]; + require(pos < txEvents.length, "eventPos out of range"); + BridgeTypes.Event memory evt = txEvents[pos]; + require(_isMatchingBurn(evt, burnsHash), "not a matching burn"); + + bytes32 key = _burnKey(height, evt.index); + if (processedBurns[key]) continue; + + _releaseBurn(evt, key); + } + } + + /// Dedup key for a burn at `(height, eventIndex)`. `eventIndex` is the + /// underlying Linera `Event.index`. + function _burnKey(uint64 height, uint32 eventIndex) private pure returns (bytes32) { + return keccak256(abi.encode(height, eventIndex)); + } + + /// Returns true if `evt` belongs to the configured wrapped-fungible + /// application's "burns" stream. + function _isMatchingBurn(BridgeTypes.Event memory evt, bytes32 burnsHash) private view returns (bool) { + // choice == 1 is User application + if (evt.stream_id.application_id.choice != 1) return false; + if (evt.stream_id.application_id.user.application_description_hash.value != fungibleApplicationId) { + return false; + } + if (keccak256(evt.stream_id.stream_name.value) != burnsHash) return false; + return true; + } + + /// Releases the ERC-20 tokens for the burn described by `evt`. Sets + /// the dedup flag BEFORE the external `token.transfer` call + /// (checks-effects-interactions) so a malicious token that re-enters + /// `addBlock` / `processBurns` cannot trigger a second release. + function _releaseBurn(BridgeTypes.Event memory evt, bytes32 key) private { + WrappedFungibleTypes.BurnEvent memory burnEvt = WrappedFungibleTypes.bcs_deserialize_BurnEvent(evt.value); + processedBurns[key] = true; + require(token.transfer(address(burnEvt.target), burnEvt.amount.value), "token transfer failed"); + } + /// @dev Calls transferFrom and handles tokens that don't return a boolean. function _safeTransferFrom(address from, address to, uint256 amount_) internal { (bool success, bytes memory data) = diff --git a/linera-bridge/src/solidity/Microchain.sol b/linera-bridge/src/solidity/Microchain.sol index 5f5226dcb666..e7371ac91e3c 100644 --- a/linera-bridge/src/solidity/Microchain.sol +++ b/linera-bridge/src/solidity/Microchain.sol @@ -7,32 +7,24 @@ import "LightClient.sol"; abstract contract Microchain { LightClient public immutable lightClient; bytes32 public immutable chainId; - mapping(bytes32 => bool) public verifiedBlocks; constructor(address _lightClient, bytes32 _chainId) { lightClient = LightClient(_lightClient); chainId = _chainId; } - /// Verifies a certificate and accepts the block if it matches this chain. - /// - /// Note: this contract does NOT check `previous_block_hash` or enforce - /// sequential block heights. This is safe because `ConfirmedBlockCertificate` - /// implies BFT-finalized canonicality — a quorum of validators signed this - /// specific block at this height, so no conflicting block can exist. - /// Blocks can be submitted in any order; the `verifiedBlocks` mapping - /// prevents duplicate processing. + /// Verifies a certificate and dispatches to the subclass. Subclasses + /// MUST be idempotent under repeated calls for the same block: this + /// contract does not gate on `signedHash`. The off-chain relayer + /// relies on that idempotency to safely re-submit `addBlock(cert)` + /// after partial settlement. function addBlock(bytes calldata data) external { - (BridgeTypes.Block memory blockValue, bytes32 signedHash) = lightClient.verifyBlock(data); - - require(!verifiedBlocks[signedHash], "block already verified"); + (BridgeTypes.Block memory blockValue,) = lightClient.verifyBlock(data); require(blockValue.header.chain_id.value.value == chainId, "chain id mismatch"); - - verifiedBlocks[signedHash] = true; _onBlock(blockValue); } - /// Called after a block has been verified and accepted. Subcontracts implement - /// this to extract and store application-specific data from the block. + /// Called after a block has been verified and accepted. Subcontracts + /// implement this to extract and store application-specific data. function _onBlock(BridgeTypes.Block memory blockValue) internal virtual; } diff --git a/linera-bridge/src/solidity/test/FungibleBridge.t.sol b/linera-bridge/src/solidity/test/FungibleBridge.t.sol new file mode 100644 index 000000000000..678bc299b1fe --- /dev/null +++ b/linera-bridge/src/solidity/test/FungibleBridge.t.sol @@ -0,0 +1,263 @@ +// SPDX-License-Identifier: Apache-2.0 +pragma solidity ^0.8.30; + +import {Test} from "forge-std/Test.sol"; +import {FungibleBridge} from "../FungibleBridge.sol"; +import {BridgeTypes} from "../BridgeTypes.sol"; +import {WrappedFungibleTypes} from "../WrappedFungibleTypes.sol"; +import {LineraToken} from "../LineraToken.sol"; + +// ------------------------------------------------------------------ +// Constants +// ------------------------------------------------------------------ + +bytes32 constant CHAIN_ID = bytes32(uint256(0xC1)); +uint64 constant HEIGHT = 42; +uint32 constant TX = 0; +uint128 constant AMOUNT = 1_000_000_000_000_000_000; // 1e18 +address constant RECIP_0 = address(0xA0); +address constant RECIP_1 = address(0xA1); +address constant RECIP_2 = address(0xA2); +bytes32 constant APP_ID = bytes32(uint256(0xF00D)); + +// ------------------------------------------------------------------ +// MockLightClientForBurns +// +// Returns a Block that has `numBurns` matching burn events at +// tx-slot `txIndexUsed` (preceding tx-slots are empty). +// Stream indices for the burns are 5, 6, ..., 4+numBurns. +// ------------------------------------------------------------------ +contract MockLightClientForBurns { + bytes32 public immutable chainIdRet; + uint64 public immutable heightRet; + uint32 public immutable txIndexUsed; + bytes32 public immutable fungibleAppIdRet; + uint32 public immutable numBurns; + uint128 public immutable amountPerBurn; + address public immutable recipBase; + + constructor( + bytes32 _chainId, + uint64 _height, + uint32 _txIndex, + bytes32 _fungibleAppId, + uint32 _numBurns, + uint128 _amountPerBurn, + address _recipBase + ) { + chainIdRet = _chainId; + heightRet = _height; + txIndexUsed = _txIndex; + fungibleAppIdRet = _fungibleAppId; + numBurns = _numBurns; + amountPerBurn = _amountPerBurn; + recipBase = _recipBase; + } + + function verifyBlock(bytes calldata) external view returns (BridgeTypes.Block memory b, bytes32 sigHash) { + b.header.chain_id.value.value = chainIdRet; + b.header.height.value = heightRet; + + // Allocate txIndexUsed + 1 tx-slots; all before txIndexUsed are empty. + b.body.events = new BridgeTypes.Event[][](uint256(txIndexUsed) + 1); + b.body.events[txIndexUsed] = new BridgeTypes.Event[](numBurns); + + for (uint32 i = 0; i < numBurns; i++) { + BridgeTypes.Event memory evt; + evt.stream_id.application_id.choice = 1; // User + evt.stream_id.application_id.user.application_description_hash.value = fungibleAppIdRet; + evt.stream_id.stream_name.value = bytes("burns"); + evt.index = 5 + i; // stream index differs from positional index + evt.value = _encodeBurn(address(uint160(recipBase) + i), amountPerBurn); + b.body.events[txIndexUsed][i] = evt; + } + + sigHash = bytes32(uint256(0x1234)); + } + + function _encodeBurn(address target, uint128 amount) private pure returns (bytes memory) { + WrappedFungibleTypes.BurnEvent memory burnEvt; + burnEvt.target = bytes20(target); + burnEvt.amount = BridgeTypes.Amount(amount); + return WrappedFungibleTypes.bcs_serialize_BurnEvent(burnEvt); + } +} + +// ------------------------------------------------------------------ +// MockLightClientForNonBurn +// +// Returns a Block whose single event has stream_name == "deposits" +// (not "burns"), so FungibleBridge.processBurns must reject it. +// ------------------------------------------------------------------ +contract MockLightClientForNonBurn { + bytes32 public immutable chainIdRet; + uint64 public immutable heightRet; + bytes32 public immutable fungibleAppIdRet; + uint128 public immutable amountPerBurn; + address public immutable recipBase; + + constructor(bytes32 _chainId, uint64 _height, bytes32 _fungibleAppId, uint128 _amountPerBurn, address _recipBase) { + chainIdRet = _chainId; + heightRet = _height; + fungibleAppIdRet = _fungibleAppId; + amountPerBurn = _amountPerBurn; + recipBase = _recipBase; + } + + function verifyBlock(bytes calldata) external view returns (BridgeTypes.Block memory b, bytes32 sigHash) { + b.header.chain_id.value.value = chainIdRet; + b.header.height.value = heightRet; + + b.body.events = new BridgeTypes.Event[][](1); + b.body.events[0] = new BridgeTypes.Event[](1); + + BridgeTypes.Event memory evt; + evt.stream_id.application_id.choice = 1; + evt.stream_id.application_id.user.application_description_hash.value = fungibleAppIdRet; + // Wrong stream name — should cause "not a matching burn" + evt.stream_id.stream_name.value = bytes("deposits"); + evt.index = 5; + WrappedFungibleTypes.BurnEvent memory burnEvt; + burnEvt.target = bytes20(recipBase); + burnEvt.amount = BridgeTypes.Amount(amountPerBurn); + evt.value = WrappedFungibleTypes.bcs_serialize_BurnEvent(burnEvt); + b.body.events[0][0] = evt; + + sigHash = bytes32(uint256(0x1234)); + } +} + +// ------------------------------------------------------------------ +// Helpers +// ------------------------------------------------------------------ + +function _u32s(uint32 a, uint32 b) pure returns (uint32[] memory) { + uint32[] memory arr = new uint32[](2); + arr[0] = a; + arr[1] = b; + return arr; +} + +function _u32s_single(uint32 a) pure returns (uint32[] memory) { + uint32[] memory arr = new uint32[](1); + arr[0] = a; + return arr; +} + +// ------------------------------------------------------------------ +// Test contract +// ------------------------------------------------------------------ + +contract FungibleBridgeProcessBurnsTest is Test { + // Deploy a bridge backed by `lc`, with a LineraToken that has + // `supply` tokens pre-minted to the bridge. + function _deployBridge(address lc, uint256 supply) internal returns (FungibleBridge bridge, LineraToken tok) { + tok = new LineraToken("Test", "TST", supply); + bridge = new FungibleBridge(lc, CHAIN_ID, address(tok), APP_ID); + // Send all tokens to the bridge so transfer() calls succeed. + tok.transfer(address(bridge), supply); + } + + // ------------------------------------------------------------------ + + function test_processBurns_single_position_marks_processed() public { + // 2 burns in tx TX at positions 0 and 1 with stream indices 5 and 6. + // Settle only position 0; assert (HEIGHT, 5) is flipped, (HEIGHT, 6) stays false. + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 2, AMOUNT, RECIP_0); + (FungibleBridge bridge,) = _deployBridge(address(lc), AMOUNT * 10); + + bridge.processBurns(hex"deadbeef", TX, _u32s_single(0)); + + assertTrue(bridge.isBurnProcessed(HEIGHT, 5), "stream index 5 should be processed"); + assertFalse(bridge.isBurnProcessed(HEIGHT, 6), "stream index 6 should not be processed yet"); + } + + function test_processBurns_multi_position_marks_both_processed() public { + // 2 burns; settle both positions; both flags true. + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 2, AMOUNT, RECIP_0); + (FungibleBridge bridge,) = _deployBridge(address(lc), AMOUNT * 10); + + bridge.processBurns(hex"deadbeef", TX, _u32s(0, 1)); + + assertTrue(bridge.isBurnProcessed(HEIGHT, 5), "stream index 5 should be processed"); + assertTrue(bridge.isBurnProcessed(HEIGHT, 6), "stream index 6 should be processed"); + } + + function test_processBurns_already_processed_skips() public { + // Idempotent like `_onBlock`: re-processing the same burn must be a + // no-op, not a revert. Keeps the relayer robust to overlap between + // an addBlock-path settlement and a racing/retrying processBurns + // call covering the same (height, tx, pos). + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 1, AMOUNT, RECIP_0); + (FungibleBridge bridge, LineraToken tok) = _deployBridge(address(lc), AMOUNT * 10); + + bridge.processBurns(hex"deadbeef", TX, _u32s_single(0)); + uint256 firstBal = tok.balanceOf(RECIP_0); + assertEq(firstBal, AMOUNT, "first call should have released to recipient"); + + // Second call: must not revert and must not double-release. + bridge.processBurns(hex"deadbeef", TX, _u32s_single(0)); + assertEq(tok.balanceOf(RECIP_0), firstBal, "second call must not double-release"); + assertTrue(bridge.isBurnProcessed(HEIGHT, 5), "burn stays marked processed"); + } + + function test_processBurns_tx_index_out_of_range_reverts() public { + // Block has 1 tx; processBurns with txIndex=99 → revert "txIndex out of range". + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 1, AMOUNT, RECIP_0); + (FungibleBridge bridge,) = _deployBridge(address(lc), AMOUNT * 10); + + vm.expectRevert(bytes("txIndex out of range")); + bridge.processBurns(hex"deadbeef", 99, _u32s_single(0)); + } + + function test_processBurns_event_pos_out_of_range_reverts() public { + // 2 burns at positions 0,1; processBurns with position=99 → revert "eventPos out of range". + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 2, AMOUNT, RECIP_0); + (FungibleBridge bridge,) = _deployBridge(address(lc), AMOUNT * 10); + + vm.expectRevert(bytes("eventPos out of range")); + bridge.processBurns(hex"deadbeef", TX, _u32s_single(99)); + } + + function test_processBurns_non_burn_event_reverts() public { + // MockLightClient returns a Block whose only event has the wrong + // stream_name ("deposits") → processBurns(tx=0, [0]) → revert "not a matching burn". + MockLightClientForNonBurn lc = new MockLightClientForNonBurn(CHAIN_ID, HEIGHT, APP_ID, AMOUNT, RECIP_0); + (FungibleBridge bridge,) = _deployBridge(address(lc), AMOUNT * 10); + + vm.expectRevert(bytes("not a matching burn")); + bridge.processBurns(hex"deadbeef", 0, _u32s_single(0)); + } + + function test_processBurns_empty_positions_reverts() public { + // An empty positions array would silently pay for cert verification + // with no work to do. Reject it eagerly so caller bugs surface. + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 1, AMOUNT, RECIP_0); + (FungibleBridge bridge,) = _deployBridge(address(lc), AMOUNT * 10); + + uint32[] memory empty = new uint32[](0); + vm.expectRevert(bytes("empty positions")); + bridge.processBurns(hex"deadbeef", TX, empty); + } + + function test_processBurns_partial_overlap_releases_remaining() public { + // 2 burns at positions 0,1. Settle pos 1 first; then call + // processBurns([0, 1]). Under skip-on-duplicate semantics pos 0 must + // be released and pos 1 silently skipped — no revert, no double-release. + MockLightClientForBurns lc = new MockLightClientForBurns(CHAIN_ID, HEIGHT, TX, APP_ID, 2, AMOUNT, RECIP_0); + (FungibleBridge bridge, LineraToken tok) = _deployBridge(address(lc), AMOUNT * 10); + + bridge.processBurns(hex"deadbeef", TX, _u32s_single(1)); + assertTrue(bridge.isBurnProcessed(HEIGHT, 6), "pos 1 should now be processed"); + address recip1 = address(uint160(RECIP_0) + 1); + assertEq(tok.balanceOf(recip1), AMOUNT, "pos 1 recipient should hold released amount"); + + // Overlapping call — pos 0 settles, pos 1 silently skipped. + bridge.processBurns(hex"deadbeef", TX, _u32s(0, 1)); + + assertTrue(bridge.isBurnProcessed(HEIGHT, 5), "pos 0 should now be processed"); + assertTrue(bridge.isBurnProcessed(HEIGHT, 6), "pos 1 stays processed"); + assertEq(tok.balanceOf(RECIP_0), AMOUNT, "pos 0 released once to its recipient"); + assertEq(tok.balanceOf(recip1), AMOUNT, "pos 1 not double-released"); + } +} diff --git a/linera-bridge/src/solidity/test/Microchain.t.sol b/linera-bridge/src/solidity/test/Microchain.t.sol new file mode 100644 index 000000000000..c61b6cd79ed6 --- /dev/null +++ b/linera-bridge/src/solidity/test/Microchain.t.sol @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +pragma solidity ^0.8.30; + +import {Test} from "forge-std/Test.sol"; +import {Microchain} from "../Microchain.sol"; +import {BridgeTypes} from "../BridgeTypes.sol"; + +/// Returns a hand-built `BridgeTypes.Block` directly from `verifyBlock` +/// without going through `vm.mockCall(...abi.encode(Block)...)`, which +/// triggers a solc 0.8.30 stack-too-deep on the `BridgeTypes.Block` +/// struct's via_ir codegen. +contract MockLightClient { + bytes32 public immutable expectedChainId; + + constructor(bytes32 _chainId) { + expectedChainId = _chainId; + } + + function verifyBlock(bytes calldata) external view returns (BridgeTypes.Block memory b, bytes32 sigHash) { + b.header.chain_id.value.value = expectedChainId; + sigHash = bytes32(uint256(0x1234)); + } +} + +/// Counts `_onBlock` invocations so the test can assert re-entry is allowed. +contract CountingMicrochain is Microchain { + uint256 public onBlockCalls; + + constructor(address _lc, bytes32 _cid) Microchain(_lc, _cid) {} + + function _onBlock(BridgeTypes.Block memory) internal override { + onBlockCalls += 1; + } +} + +contract MicrochainIdempotencyTest is Test { + bytes32 constant CHAIN_ID = bytes32(uint256(0xC1)); + + function test_addBlock_can_be_called_repeatedly_for_same_cert() public { + MockLightClient lc = new MockLightClient(CHAIN_ID); + CountingMicrochain mc = new CountingMicrochain(address(lc), CHAIN_ID); + + bytes memory cert = hex"deadbeef"; + mc.addBlock(cert); + mc.addBlock(cert); + + assertEq(mc.onBlockCalls(), 2, "addBlock must accept repeated calls; subclass owns dedup"); + } +} diff --git a/linera-bridge/tests/e2e/src/lib.rs b/linera-bridge/tests/e2e/src/lib.rs index df30e647b770..0c99124dbeb7 100644 --- a/linera-bridge/tests/e2e/src/lib.rs +++ b/linera-bridge/tests/e2e/src/lib.rs @@ -196,7 +196,13 @@ pub async fn deploy_linera_token( compose_file, ) .await; - parse_broadcast_address(compose, project_name, compose_file, "DeployLineraToken.s.sol").await + parse_broadcast_address( + compose, + project_name, + compose_file, + "DeployLineraToken.s.sol", + ) + .await } /// Same as [`deploy_linera_token`] but overrides the initial token supply @@ -224,7 +230,13 @@ pub async fn deploy_linera_token_with_supply( compose_file, ) .await; - parse_broadcast_address(compose, project_name, compose_file, "DeployLineraToken.s.sol").await + parse_broadcast_address( + compose, + project_name, + compose_file, + "DeployLineraToken.s.sol", + ) + .await } /// Deploys FungibleBridge via the `DeployFungibleBridge.s.sol` forge @@ -256,8 +268,13 @@ pub async fn deploy_fungible_bridge( compose_file, ) .await; - parse_broadcast_address(compose, project_name, compose_file, "DeployFungibleBridge.s.sol") - .await + parse_broadcast_address( + compose, + project_name, + compose_file, + "DeployFungibleBridge.s.sol", + ) + .await } /// Queries the evm-bridge app to check whether a deposit has been processed. diff --git a/linera-bridge/tests/e2e/tests/auto_deposit_scan.rs b/linera-bridge/tests/e2e/tests/auto_deposit_scan.rs index ce981d9459b1..3a4adc08d73f 100644 --- a/linera-bridge/tests/e2e/tests/auto_deposit_scan.rs +++ b/linera-bridge/tests/e2e/tests/auto_deposit_scan.rs @@ -153,11 +153,13 @@ async fn test_auto_deposit_scan() -> anyhow::Result<()> { .nth(3) .context("manifest dir has fewer than 3 ancestors")? .to_path_buf(); - let evm_bridge_wasm_dir = repo_root.join("linera-bridge/contracts/evm-bridge/target/wasm32-unknown-unknown/release"); + let evm_bridge_wasm_dir = + repo_root.join("linera-bridge/contracts/evm-bridge/target/wasm32-unknown-unknown/release"); let wasm_dir = repo_root.join("examples/target/wasm32-unknown-unknown/release"); tracing::info!("Publishing evm-bridge module..."); - let eb_contract = Bytecode::load_from_file(evm_bridge_wasm_dir.join("evm_bridge_contract.wasm"))?; + let eb_contract = + Bytecode::load_from_file(evm_bridge_wasm_dir.join("evm_bridge_contract.wasm"))?; let eb_service = Bytecode::load_from_file(evm_bridge_wasm_dir.join("evm_bridge_service.wasm"))?; let (eb_module_id, _) = cc_a .publish_module(eb_contract, eb_service, VmRuntime::Wasm) @@ -316,9 +318,9 @@ async fn test_auto_deposit_scan() -> anyhow::Result<()> { None, relay_port, &linera_storage_runtime::CommonStorageOptions::with_defaults(), - std::time::Duration::from_secs(5), // monitor_scan_interval - 0, // monitor_start_block - 5, // max_retries + std::time::Duration::from_secs(5), // monitor_scan_interval + 0, // monitor_start_block + 5, // max_retries None, )) .await @@ -401,7 +403,9 @@ async fn test_auto_deposit_scan() -> anyhow::Result<()> { let deposit_key = linera_bridge::proof::DepositKey { source_chain_id: 31337, block_hash: deposit_receipt.block_hash.unwrap(), - tx_index: deposit_receipt.transaction_index.expect("transaction_index missing"), + tx_index: deposit_receipt + .transaction_index + .expect("transaction_index missing"), log_index, }; diff --git a/linera-bridge/tests/e2e/tests/burn_completion_requires_on_chain_evidence.rs b/linera-bridge/tests/e2e/tests/burn_completion_requires_on_chain_evidence.rs index f2b8b3f4e010..dc0cd664e3cb 100644 --- a/linera-bridge/tests/e2e/tests/burn_completion_requires_on_chain_evidence.rs +++ b/linera-bridge/tests/e2e/tests/burn_completion_requires_on_chain_evidence.rs @@ -16,11 +16,7 @@ #![recursion_limit = "512"] -use std::{ - collections::BTreeMap, - path::PathBuf, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use alloy::{ primitives::U256, @@ -28,10 +24,7 @@ use alloy::{ signers::local::PrivateKeySigner, sol, }; -use anyhow::Context as _; -use linera_base::{ - crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner, -}; +use linera_base::{crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner}; use linera_bridge_e2e::{ compose_file_path, deploy_fungible_bridge, deploy_linera_token, fund_bridge_erc20, light_client_address, parse_metric_value, publish_and_create_wrapped_fungible, start_compose, diff --git a/linera-bridge/tests/e2e/tests/burns_per_evm_tx.rs b/linera-bridge/tests/e2e/tests/burns_per_evm_tx.rs index 86371438b351..e3090981498d 100644 --- a/linera-bridge/tests/e2e/tests/burns_per_evm_tx.rs +++ b/linera-bridge/tests/e2e/tests/burns_per_evm_tx.rs @@ -200,7 +200,11 @@ async fn burns_per_evm_tx( tracing::info!(burn_ops = next_hi, gas, "search: found upper bound"); break; } - tracing::info!(burn_ops = next_hi, gas, "search: doubling, still under limit"); + tracing::info!( + burn_ops = next_hi, + gas, + "search: doubling, still under limit" + ); if hi == MAX_SEARCH_N { break; } diff --git a/linera-bridge/tests/e2e/tests/committee_rotation.rs b/linera-bridge/tests/e2e/tests/committee_rotation.rs index c53e0a7ec078..a8c15a68d9eb 100644 --- a/linera-bridge/tests/e2e/tests/committee_rotation.rs +++ b/linera-bridge/tests/e2e/tests/committee_rotation.rs @@ -97,9 +97,9 @@ async fn test_committee_rotation_updates_evm_light_client() -> anyhow::Result<() Some(&light_client.to_string()), relay_port, &linera_storage_runtime::CommonStorageOptions::with_defaults(), - std::time::Duration::from_secs(5), // monitor_scan_interval - 0, // monitor_start_block - 5, // max_retries + std::time::Duration::from_secs(5), // monitor_scan_interval + 0, // monitor_start_block + 5, // max_retries None, )) .await diff --git a/linera-bridge/tests/e2e/tests/evm_to_linera_bridge.rs b/linera-bridge/tests/e2e/tests/evm_to_linera_bridge.rs index a30d5e00d153..bce26cf2dd95 100644 --- a/linera-bridge/tests/e2e/tests/evm_to_linera_bridge.rs +++ b/linera-bridge/tests/e2e/tests/evm_to_linera_bridge.rs @@ -27,8 +27,7 @@ use linera_bridge::{ }; use linera_bridge_e2e::{ compose_file_path, deploy_fungible_bridge, deploy_linera_token, light_client_address, - start_compose, wait_for_light_client, - ANVIL_PRIVATE_KEY, + start_compose, wait_for_light_client, ANVIL_PRIVATE_KEY, }; use linera_client::{chain_listener::ClientContext as _, client_context::ClientContext}; use linera_core::environment::wallet::Memory; @@ -133,7 +132,8 @@ async fn test_evm_to_linera_bridge() -> anyhow::Result<()> { .nth(3) .context("manifest dir has fewer than 3 ancestors")? .to_path_buf(); - let evm_bridge_wasm_dir = repo_root.join("linera-bridge/contracts/evm-bridge/target/wasm32-unknown-unknown/release"); + let evm_bridge_wasm_dir = + repo_root.join("linera-bridge/contracts/evm-bridge/target/wasm32-unknown-unknown/release"); let wasm_dir = repo_root.join("examples/target/wasm32-unknown-unknown/release"); // 4a. Publish and create wrapped-fungible app @@ -151,7 +151,8 @@ async fn test_evm_to_linera_bridge() -> anyhow::Result<()> { // 4b. Publish and create evm-bridge app first (so wrapped-fungible can reference it) tracing::info!("Publishing evm-bridge module..."); - let eb_contract = Bytecode::load_from_file(evm_bridge_wasm_dir.join("evm_bridge_contract.wasm"))?; + let eb_contract = + Bytecode::load_from_file(evm_bridge_wasm_dir.join("evm_bridge_contract.wasm"))?; let eb_service = Bytecode::load_from_file(evm_bridge_wasm_dir.join("evm_bridge_service.wasm"))?; let (eb_module_id, _) = cc diff --git a/linera-bridge/tests/e2e/tests/multi_tx_burn_chunking.rs b/linera-bridge/tests/e2e/tests/multi_tx_burn_chunking.rs new file mode 100644 index 000000000000..18e0c8feeb42 --- /dev/null +++ b/linera-bridge/tests/e2e/tests/multi_tx_burn_chunking.rs @@ -0,0 +1,359 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Verifies that when a Linera block's `addBlock(cert)` would exceed the +//! EVM block gas limit, the relayer falls back to chunked +//! `processBurns(cert, txIndex, positionsInTx)` calls and every burn +//! still settles correctly. +//! +//! Setup: anvil's block gas limit is dialled down (via +//! `evm_setBlockGasLimit`) before the bridge is deployed, so `addBlock` +//! at `NUM_BURNS = 8` does not fit and the relayer must take the +//! `processBurns` chunk path. Chain B submits one block with N +//! `Transfer` operations to N distinct recipients on chain A. After the +//! relayer is spawned, every recipient must hold the correct ERC-20 +//! balance and `linera_bridge_burns_completed` must reach N. + +#![recursion_limit = "512"] + +use std::time::Duration; + +use std::collections::HashSet; + +use alloy::{ + primitives::{B256, U256}, + providers::{Provider, ProviderBuilder}, + rpc::types::Filter, + sol, +}; +use linera_base::{crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner}; +use linera_bridge_e2e::{ + compose_file_path, deploy_fungible_bridge, deploy_linera_token, fund_bridge_erc20, + light_client_address, parse_metric_value, publish_and_create_wrapped_fungible, + set_anvil_block_gas_limit, start_compose, wait_for_light_client, wait_for_relay_http_ready, + wait_for_relay_metrics, ANVIL_PRIVATE_KEY, +}; +use linera_client::{chain_listener::ClientContext as _, client_context::ClientContext}; +use linera_core::environment::wallet::Memory; +use linera_execution::{Operation, WasmRuntime}; +use linera_faucet_client::Faucet; +use linera_storage::{DbStorage, StorageCacheConfig}; +use linera_views::backends::memory::{MemoryDatabase, MemoryStoreConfig}; +use wrapped_fungible::{Account, WrappedFungibleOperation}; + +sol! { + #[sol(rpc)] + interface IERC20 { + function balanceOf(address account) external view returns (uint256); + } +} + +const NUM_BURNS: usize = 8; +const BURN_AMOUNT_TOKENS: u128 = 1; +/// Per-block gas ceiling sized to live between `processBurns(cert, tx, [single])` +/// (which is dominated by cert verification, ~2–2.5M gas) and `addBlock(cert)` +/// for `NUM_BURNS` burns (~3.3M gas observed in `burns_per_evm_tx`). The +/// relayer must therefore route through chunked `processBurns` per tx. +const ANVIL_BLOCK_GAS_LIMIT: u64 = 3_000_000; + +#[tokio::test] +#[ignore] // Requires pre-built docker images, Wasm, and relay binary. +async fn relayer_falls_back_to_chunked_process_burns() -> anyhow::Result<()> { + tracing_subscriber::fmt().with_test_writer().try_init().ok(); + linera_bridge_e2e::ensure_rustls_provider(); + let compose_file = compose_file_path(); + let project_name = "linera-multi-tx-burn-chunking-test"; + + let compose = start_compose(&compose_file, project_name).await; + wait_for_light_client(&compose, project_name, &compose_file).await; + + // Provider is held for the post-deploy gas-limit drop and for the + // final balance reads. Deploy txs need anvil's default (high) limit; + // we tighten it once the bridge is live. + let rpc_url = "http://localhost:8545".parse()?; + let provider = ProviderBuilder::new().connect_http(rpc_url); + + let faucet = Faucet::new("http://localhost:8080".to_string()); + let genesis_config = faucet.genesis_config().await?; + let relay_genesis_config = genesis_config.clone(); + + let store_config = MemoryStoreConfig { + max_stream_queries: 10, + kill_on_drop: true, + }; + let mut storage = DbStorage::::maybe_create_and_connect( + &store_config, + "multi-tx-burn-chunking-e2e", + Some(WasmRuntime::default()), + StorageCacheConfig { + blob_cache_size: 1000, + confirmed_block_cache_size: 1000, + certificate_cache_size: 1000, + certificate_raw_cache_size: 1000, + event_cache_size: 1000, + cache_cleanup_interval_secs: linera_storage::DEFAULT_CLEANUP_INTERVAL_SECS, + }, + ) + .await?; + genesis_config.initialize_storage(&mut storage).await?; + + let mut signer = InMemorySigner::new(None); + let mut ctx = ClientContext::new( + storage, + Memory::default(), + signer.clone(), + &Default::default(), + None, + genesis_config, + linera_core::worker::DEFAULT_BLOCK_CACHE_SIZE, + linera_core::worker::DEFAULT_EXECUTION_STATE_CACHE_SIZE, + ) + .await?; + + let owner_a = AccountOwner::from(signer.generate_new()); + let chain_a_desc = faucet.claim(&owner_a).await?; + let chain_a = chain_a_desc.id(); + ctx.extend_with_chain(chain_a_desc, Some(owner_a)).await?; + let cc_a = ctx.make_chain_client(chain_a).await?; + cc_a.synchronize_from_validators().await?; + + let owner_b = AccountOwner::from(signer.generate_new()); + let chain_b_desc = faucet.claim(&owner_b).await?; + let chain_b = chain_b_desc.id(); + ctx.extend_with_chain(chain_b_desc, Some(owner_b)).await?; + let cc_b = ctx.make_chain_client(chain_b).await?; + cc_b.synchronize_from_validators().await?; + + let erc20_addr = deploy_linera_token(&compose, project_name, &compose_file).await?; + let fungible_app_id = + publish_and_create_wrapped_fungible(&cc_b, owner_b, chain_a, erc20_addr, 1_000).await?; + + let app_id_bytes32 = format!("0x{}", fungible_app_id.application_description_hash); + let chain_a_bytes32 = format!("0x{chain_a}"); + let bridge_addr = deploy_fungible_bridge( + &compose, + project_name, + &compose_file, + light_client_address(), + &chain_a_bytes32, + erc20_addr, + &app_id_bytes32, + ) + .await?; + + fund_bridge_erc20( + &compose, + project_name, + &compose_file, + erc20_addr, + bridge_addr, + 1_000_000_000_000_000_000_000u128, + ) + .await; + + // Now tighten the block gas limit so `addBlock(cert)` for `NUM_BURNS` + // doesn't fit and the relayer is forced through the chunked + // `processBurns` path. Deploy + funding txs already landed. + set_anvil_block_gas_limit(&provider, ANVIL_BLOCK_GAS_LIMIT).await?; + + // Distinct recipients so each burn produces a distinct ERC-20 balance. + let recipients: [alloy::primitives::Address; NUM_BURNS] = [ + "0x70997970C51812dc3A010C7d01b50e0d17dc79C8".parse()?, + "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC".parse()?, + "0x90F79bf6EB2c4f870365E785982E1f101E93b906".parse()?, + "0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65".parse()?, + "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc".parse()?, + "0x976EA74026E726554dB657fA54763abd0C3a0aa9".parse()?, + "0x14dC79964da2C08b23698B3D3cc7Ca32193d9955".parse()?, + "0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f".parse()?, + ]; + let burn_amount = Amount::from_tokens(BURN_AMOUNT_TOKENS); + + // Bundle every Transfer into one chain-B block; chain-A's + // process_inbox produces a single chain-A block with N BurnEvents. + let operations = recipients + .iter() + .map(|recipient| { + let owner = AccountOwner::Address20(recipient.0 .0); + let withdraw_bytes = bcs::to_bytes(&WrappedFungibleOperation::Transfer { + owner: owner_b, + amount: burn_amount, + target_account: Account { + chain_id: chain_a, + owner, + }, + }) + .expect("BCS serialization"); + Operation::User { + application_id: fungible_app_id, + bytes: withdraw_bytes, + } + }) + .collect(); + + cc_b.synchronize_from_validators().await?; + cc_b.execute_operations(operations, vec![]) + .await? + .expect("multi-burn block committed on chain B"); + + cc_a.synchronize_from_validators().await?; + let height_before_inbox = cc_a.chain_info().await?.next_block_height; + cc_a.process_inbox().await?; + let height_after_inbox = cc_a.chain_info().await?.next_block_height; + assert_eq!( + height_after_inbox.0, + height_before_inbox.0 + 1, + "chain A must produce exactly ONE block carrying all {NUM_BURNS} BurnEvents \ + (before={height_before_inbox}, after={height_after_inbox})", + ); + + let relay_dir = tempfile::tempdir()?; + let wallet_path = relay_dir.path().join("wallet.json"); + let keystore_path = relay_dir.path().join("keystore.json"); + let storage_config = format!("rocksdb:{}", relay_dir.path().join("client.db").display()); + let sqlite_path = relay_dir.path().join("relay.sqlite3"); + + { + use linera_persistent::Persist; + let mut ks = linera_persistent::File::new(&keystore_path, signer.clone())?; + ks.persist().await?; + } + linera_wallet_json::PersistentWallet::create(&wallet_path, relay_genesis_config)?; + + let relay_port = 3009u16; + let bridge_addr_str = format!("{bridge_addr}"); + let bridge_app_str = format!("{fungible_app_id}"); + let fungible_app_str = format!("{fungible_app_id}"); + let sqlite_path_for_relay = sqlite_path.clone(); + let relay_handle = tokio::spawn(async move { + Box::pin(linera_bridge::relay::run( + "http://localhost:8545", + Some(wallet_path.as_path()), + Some(keystore_path.as_path()), + Some(&storage_config), + chain_a, + owner_a, + &bridge_addr_str, + &bridge_app_str, + &fungible_app_str, + ANVIL_PRIVATE_KEY, + None, + relay_port, + &linera_storage_runtime::CommonStorageOptions::with_defaults(), + Duration::from_secs(2), + 0, + 5, + Some(sqlite_path_for_relay.as_path()), + )) + .await + }); + + let relay_url = format!("http://localhost:{relay_port}"); + let http = reqwest::Client::new(); + if let Err(error) = wait_for_relay_http_ready(&http, &relay_url, Duration::from_secs(60)).await + { + relay_handle.abort(); + return Err(error); + } + + let num_burns_i64 = i64::try_from(NUM_BURNS).unwrap(); + if let Err(error) = wait_for_relay_metrics( + &http, + &relay_url, + |detected, _completed, _pending, _failed| detected >= num_burns_i64, + Duration::from_secs(60), + ) + .await + { + relay_handle.abort(); + return Err(error); + } + // Wait for the chunked-processBurns path to settle every burn. + // Allow more wall time than the single-addBlock test because each + // chunk is its own EVM tx + receipt round-trip. + if let Err(error) = wait_for_relay_metrics( + &http, + &relay_url, + |_detected, completed, _pending, _failed| completed >= num_burns_i64, + Duration::from_secs(180), + ) + .await + { + relay_handle.abort(); + return Err(error); + } + + let token = IERC20::new(erc20_addr, &provider); + let one_burn = U256::from(BURN_AMOUNT_TOKENS) * U256::from(10u128.pow(18)); + let mut observed_balances = Vec::with_capacity(recipients.len()); + for recipient in &recipients { + let balance = token.balanceOf(*recipient).call().await?; + observed_balances.push((*recipient, balance)); + } + + let final_metrics = http + .get(format!("{relay_url}/metrics")) + .send() + .await? + .text() + .await?; + let burns_detected = parse_metric_value(&final_metrics, "linera_bridge_burns_detected"); + let burns_completed = parse_metric_value(&final_metrics, "linera_bridge_burns_completed"); + + relay_handle.abort(); + + tracing::info!( + ?observed_balances, + burns_detected, + burns_completed, + "Final state" + ); + + assert_eq!( + burns_detected, num_burns_i64, + "relayer must have detected all {NUM_BURNS} burns; got {burns_detected}" + ); + assert_eq!( + burns_completed, num_burns_i64, + "relayer must have completed all {NUM_BURNS} burns; got {burns_completed}" + ); + + for (recipient, balance) in &observed_balances { + assert_eq!( + *balance, one_burn, + "recipient {recipient:?} balance {balance}, expected {one_burn}" + ); + } + + // The whole point of this test is the chunked-processBurns path, which + // splits the cert across multiple EVM transactions. Each chunk lands in + // its own EVM block. Verify by counting distinct block hashes among + // ERC-20 `Transfer` events emitted by the bridge contract: if `addBlock` + // had fit, all 8 transfers would share one EVM block. + // + // `Transfer(address,address,uint256)` topic0. + let transfer_sig = B256::from(alloy::primitives::keccak256( + "Transfer(address,address,uint256)", + )); + let bridge_topic = B256::left_padding_from(bridge_addr.as_slice()); + let transfer_logs = provider + .get_logs( + &Filter::new() + .address(erc20_addr) + .event_signature(transfer_sig) + .topic1(bridge_topic) + .from_block(0u64), + ) + .await?; + let transfer_blocks: HashSet = + transfer_logs.iter().filter_map(|l| l.block_hash).collect(); + assert!( + transfer_blocks.len() > 1, + "expected chunked processBurns path to span multiple EVM blocks; \ + saw {} distinct block(s) across {} bridge-originated Transfer logs", + transfer_blocks.len(), + transfer_logs.len(), + ); + + Ok(()) +} diff --git a/linera-bridge/tests/e2e/tests/multiple_burns_same_block.rs b/linera-bridge/tests/e2e/tests/multiple_burns_same_block.rs index f3d5459d7570..9e6a243be239 100644 --- a/linera-bridge/tests/e2e/tests/multiple_burns_same_block.rs +++ b/linera-bridge/tests/e2e/tests/multiple_burns_same_block.rs @@ -17,9 +17,7 @@ use std::time::Duration; use alloy::{primitives::U256, providers::ProviderBuilder, sol}; -use linera_base::{ - crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner, -}; +use linera_base::{crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner}; use linera_bridge_e2e::{ compose_file_path, deploy_fungible_bridge, deploy_linera_token, fund_bridge_erc20, light_client_address, parse_metric_value, publish_and_create_wrapped_fungible, start_compose, @@ -274,13 +272,13 @@ async fn relayer_processes_every_burn_in_one_block() -> anyhow::Result<()> { // Each occurrence in `recipients` is one expected transfer. A // recipient listed twice should accumulate two transfers, etc. - let mut expected_per_recipient: std::collections::BTreeMap< - alloy::primitives::Address, - U256, - > = std::collections::BTreeMap::new(); + let mut expected_per_recipient = + std::collections::BTreeMap::::new(); let one_burn = U256::from(BURN_AMOUNT_TOKENS) * U256::from(10u128.pow(18)); for recipient in &recipients { - *expected_per_recipient.entry(*recipient).or_insert(U256::ZERO) += one_burn; + *expected_per_recipient + .entry(*recipient) + .or_insert(U256::ZERO) += one_burn; } let mut observed_balances = Vec::with_capacity(expected_per_recipient.len()); diff --git a/linera-bridge/tests/e2e/tests/multiple_burns_same_recipient_across_blocks.rs b/linera-bridge/tests/e2e/tests/multiple_burns_same_recipient_across_blocks.rs index 9f03f9bd0c55..a9615c22e815 100644 --- a/linera-bridge/tests/e2e/tests/multiple_burns_same_recipient_across_blocks.rs +++ b/linera-bridge/tests/e2e/tests/multiple_burns_same_recipient_across_blocks.rs @@ -25,9 +25,7 @@ use std::time::Duration; use alloy::{primitives::U256, providers::ProviderBuilder, sol}; -use linera_base::{ - crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner, -}; +use linera_base::{crypto::InMemorySigner, data_types::Amount, identifiers::AccountOwner}; use linera_bridge_e2e::{ compose_file_path, deploy_fungible_bridge, deploy_linera_token, fund_bridge_erc20, light_client_address, parse_metric_value, publish_and_create_wrapped_fungible, start_compose, @@ -230,9 +228,7 @@ async fn relayer_processes_every_burn_to_same_recipient() -> anyhow::Result<()> let settle_result = wait_for_relay_metrics( &http, &relay_url, - |_detected, completed, pending, _failed| { - pending == 0 && completed >= i64::from(NUM_BURNS) - }, + |_detected, completed, pending, _failed| pending == 0 && completed >= i64::from(NUM_BURNS), Duration::from_secs(240), ) .await; @@ -282,8 +278,7 @@ async fn relayer_processes_every_burn_to_same_recipient() -> anyhow::Result<()> // means at least one `PendingBurn` was marked complete without // its `token.transfer` actually landing on-chain — the UI-demo bug. assert_eq!( - token_balance, - expected_balance, + token_balance, expected_balance, "recipient must accumulate every burn; got {token_balance}, expected {expected_balance}" );