From 6388f3fe76704e3c4cf30eab8a61cfb4929d50e7 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Thu, 26 Feb 2026 11:30:49 +0200 Subject: [PATCH 1/8] add logs --- Cargo.lock | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/transaction/mod.rs | 11 +++++++++++ 3 files changed, 13 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 9b9cbd30da..74d36d99ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3337,6 +3337,7 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "log", "minijinja", "mockall", "moka", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 41ee771617..b424c448c8 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -74,6 +74,7 @@ serde_repr = { workspace = true } serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } tokio = { workspace = true, optional = false, features = ["sync"] } +log = { workspace = true } typed-builder = { workspace = true } typetag = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 1cd63e2221..b0f6330bc0 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -175,6 +175,9 @@ impl Transaction { let backoff = Self::build_backoff(table_props)?; let tx = self; + let table_ident = tx.table.identifier().clone(); + + log::info!("Starting transaction commit for {}", table_ident.clone()); (|mut tx: Transaction| async { let result = tx.do_commit(catalog).await; (tx, result) @@ -183,6 +186,14 @@ impl Transaction { .sleep(tokio::time::sleep) .context(tx) .when(|e| e.retryable()) + .notify(move |err, dur| { + log::warn!( + "Transaction commit for {} failed with retryable error, retrying in {:?}: {}", + table_ident, + dur, + err + ); + }) .await .1 } From 86181f9f6d1972b107146fa55eb18c461165f138 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Thu, 26 Feb 2026 11:57:32 +0200 Subject: [PATCH 2/8] fix clippy --- crates/iceberg/src/transaction/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index b0f6330bc0..3c318cb42a 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -188,10 +188,7 @@ impl Transaction { .when(|e| e.retryable()) .notify(move |err, dur| { log::warn!( - "Transaction commit for {} failed with retryable error, retrying in {:?}: {}", - table_ident, - dur, - err + "Transaction commit for {table_ident} failed with retryable error, retrying in {dur:?}: {err}" ); }) .await From 1ad65457388d33ecf411e4d6ebd00fb0a9650916 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Thu, 26 Feb 2026 11:09:08 -0500 Subject: [PATCH 3/8] add way more logs --- crates/iceberg/src/transaction/mod.rs | 35 ++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 3c318cb42a..51d9bdeb09 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -208,37 +208,60 @@ impl Transaction { } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result { + log::info!("[do_commit] entering do_commit for table {:?}", self.table.identifier()); + + log::info!("[do_commit] calling catalog.load_table..."); let refreshed = catalog.load_table(self.table.identifier()).await?; + log::info!("[do_commit] catalog.load_table returned successfully"); if self.table.metadata() != refreshed.metadata() || self.table.metadata_location() != refreshed.metadata_location() { - // current base is stale, use refreshed as base and re-apply transaction actions + log::info!("[do_commit] base is stale, refreshing table"); self.table = refreshed.clone(); + } else { + log::info!("[do_commit] base is up to date, no refresh needed"); } let mut current_table = self.table.clone(); let mut existing_updates: Vec = vec![]; let mut existing_requirements: Vec = vec![]; - for action in &self.actions { + log::info!("[do_commit] processing {} actions", self.actions.len()); + for (i, action) in self.actions.iter().enumerate() { + log::info!("[do_commit] committing action {}/{}...", i + 1, self.actions.len()); let action_commit = Arc::clone(action).commit(¤t_table).await?; - // apply action commit to current_table - current_table = Self::apply( + log::info!("[do_commit] action {}/{} commit returned, applying...", i + 1, self.actions.len()); + let apply_result = Self::apply( current_table, action_commit, &mut existing_updates, &mut existing_requirements, - )?; + ); + match apply_result { + Ok(table) => { + log::info!("[do_commit] action {}/{} applied successfully", i + 1, self.actions.len()); + current_table = table; + } + Err(e) => { + log::info!("[do_commit] action {}/{} apply failed: {}", i + 1, self.actions.len(), e); + return Err(e); + } + } } + log::info!("[do_commit] building TableCommit with {} updates and {} requirements", + existing_updates.len(), existing_requirements.len()); let table_commit = TableCommit::builder() .ident(self.table.identifier().to_owned()) .updates(existing_updates) .requirements(existing_requirements) .build(); - catalog.update_table(table_commit).await + log::info!("[do_commit] calling catalog.update_table..."); + let result = catalog.update_table(table_commit).await; + log::info!("[do_commit] catalog.update_table returned with success={}", result.is_ok()); + result } } From b02fd0bdedd0668c5e64a31259c38862456c7d87 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Fri, 27 Feb 2026 10:55:42 +0200 Subject: [PATCH 4/8] linter --- crates/iceberg/src/transaction/mod.rs | 42 ++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 51d9bdeb09..f69f6dd81b 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -208,7 +208,10 @@ impl Transaction { } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{ - log::info!("[do_commit] entering do_commit for table {:?}", self.table.identifier()); + log::info!( + "[do_commit] entering do_commit for table {:?}", + self.table.identifier() + ); log::info!("[do_commit] calling catalog.load_table..."); let refreshed = catalog.load_table(self.table.identifier()).await?; @@ -229,9 +232,17 @@ impl Transaction { log::info!("[do_commit] processing {} actions", self.actions.len()); for (i, action) in self.actions.iter().enumerate() { - log::info!("[do_commit] committing action {}/{}...", i + 1, self.actions.len()); + log::info!( + "[do_commit] committing action {}/{}...", + i + 1, + self.actions.len() + ); let action_commit = Arc::clone(action).commit(¤t_table).await?; - log::info!("[do_commit] action {}/{} commit returned, applying...", i + 1, self.actions.len()); + log::info!( + "[do_commit] action {}/{} commit returned, applying...", + i + 1, + self.actions.len() + ); let apply_result = Self::apply( current_table, action_commit, @@ -240,18 +251,30 @@ impl Transaction { ); match apply_result { Ok(table) => { - log::info!("[do_commit] action {}/{} applied successfully", i + 1, self.actions.len()); + log::info!( + "[do_commit] action {}/{} applied successfully", + i + 1, + self.actions.len() + ); current_table = table; } Err(e) => { - log::info!("[do_commit] action {}/{} apply failed: {}", i + 1, self.actions.len(), e); + log::info!( + "[do_commit] action {}/{} apply failed: {}", + i + 1, + self.actions.len(), + e + ); return Err(e); } } } - log::info!("[do_commit] building TableCommit with {} updates and {} requirements", - existing_updates.len(), existing_requirements.len()); + log::info!( + "[do_commit] building TableCommit with {} updates and {} requirements", + existing_updates.len(), + existing_requirements.len() + ); let table_commit = TableCommit::builder() .ident(self.table.identifier().to_owned()) .updates(existing_updates) @@ -260,7 +283,10 @@ impl Transaction { log::info!("[do_commit] calling catalog.update_table..."); let result = catalog.update_table(table_commit).await; - log::info!("[do_commit] catalog.update_table returned with success={}", result.is_ok()); + log::info!( + "[do_commit] catalog.update_table returned with success={}", + result.is_ok() + ); result } } From e7d39919a8fd9de67c007601e271042204c39336 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Fri, 27 Feb 2026 11:23:05 +0200 Subject: [PATCH 5/8] log backoff --- crates/iceberg/src/transaction/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index f69f6dd81b..b2964dad77 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -177,7 +177,11 @@ impl Transaction { let table_ident = tx.table.identifier().clone(); - log::info!("Starting transaction commit for {}", table_ident.clone()); + log::info!( + "Starting transaction commit for {} with backoff {:?}", + table_ident, + backoff + ); (|mut tx: Transaction| async { let result = tx.do_commit(catalog).await; (tx, result) @@ -259,7 +263,7 @@ impl Transaction { current_table = table; } Err(e) => { - log::info!( + log::warn!( "[do_commit] action {}/{} apply failed: {}", i + 1, self.actions.len(), From 79516045cf5d282f7db56b03dbb651ae9c1ac3c5 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Fri, 27 Feb 2026 11:30:14 +0200 Subject: [PATCH 6/8] fix clippy --- crates/iceberg/src/transaction/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index b2964dad77..aff57254f9 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -178,9 +178,7 @@ impl Transaction { let table_ident = tx.table.identifier().clone(); log::info!( - "Starting transaction commit for {} with backoff {:?}", - table_ident, - backoff + "Starting transaction commit for {table_ident} with backoff {backoff:?}" ); (|mut tx: Transaction| async { let result = tx.do_commit(catalog).await; From 8b5e95641bfaac1fe59550a177f909a5e5b561e0 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Fri, 27 Feb 2026 11:31:42 +0200 Subject: [PATCH 7/8] cargo fmt --- crates/iceberg/src/transaction/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index aff57254f9..d48203c6ed 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -177,9 +177,7 @@ impl Transaction { let table_ident = tx.table.identifier().clone(); - log::info!( - "Starting transaction commit for {table_ident} with backoff {backoff:?}" - ); + log::info!("Starting transaction commit for {table_ident} with backoff {backoff:?}"); (|mut tx: Transaction| async { let result = tx.do_commit(catalog).await; (tx, result) From 8e343db59b1e9ce676b756237cbabd918f46aa63 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Mon, 9 Mar 2026 14:35:39 +0200 Subject: [PATCH 8/8] add timing log to duplicate check --- crates/iceberg/src/transaction/append.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..de7084789f 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -97,7 +97,13 @@ impl TransactionAction for FastAppendAction { // Checks duplicate files if self.check_duplicate { - snapshot_producer.validate_duplicate_files().await?; + let start = std::time::Instant::now(); + let result = snapshot_producer.validate_duplicate_files().await; + log::info!( + "[FastAppendAction] validate_duplicate_files took {:?}", + start.elapsed() + ); + result?; } snapshot_producer