diff --git a/Cargo.lock b/Cargo.lock
index 9b9cbd30da..74d36d99ae 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3337,6 +3337,7 @@ dependencies = [
"futures",
"iceberg_test_utils",
"itertools 0.13.0",
+ "log",
"minijinja",
"mockall",
"moka",
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 41ee771617..b424c448c8 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -74,6 +74,7 @@ serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
tokio = { workspace = true, optional = false, features = ["sync"] }
+log = { workspace = true }
typed-builder = { workspace = true }
typetag = { workspace = true }
url = { workspace = true }
diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs
index 08d4032409..de7084789f 100644
--- a/crates/iceberg/src/transaction/append.rs
+++ b/crates/iceberg/src/transaction/append.rs
@@ -97,7 +97,13 @@ impl TransactionAction for FastAppendAction {
// Checks duplicate files
if self.check_duplicate {
- snapshot_producer.validate_duplicate_files().await?;
+ let start = std::time::Instant::now();
+ let result = snapshot_producer.validate_duplicate_files().await;
+ log::info!(
+ "[FastAppendAction] validate_duplicate_files took {:?}",
+ start.elapsed()
+ );
+ result?;
}
snapshot_producer
diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs
index 1cd63e2221..d48203c6ed 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -175,6 +175,9 @@ impl Transaction {
let backoff = Self::build_backoff(table_props)?;
let tx = self;
+ let table_ident = tx.table.identifier().clone();
+
+ log::info!("Starting transaction commit for {table_ident} with backoff {backoff:?}");
(|mut tx: Transaction| async {
let result = tx.do_commit(catalog).await;
(tx, result)
@@ -183,6 +186,11 @@ impl Transaction {
.sleep(tokio::time::sleep)
.context(tx)
.when(|e| e.retryable())
+ .notify(move |err, dur| {
+ log::warn!(
+ "Transaction commit for {table_ident} failed with retryable error, retrying in {dur:?}: {err}"
+ );
+ })
.await
.1
}
@@ -200,37 +208,86 @@ impl Transaction {
}
async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{
+ log::info!(
+ "[do_commit] entering do_commit for table {:?}",
+ self.table.identifier()
+ );
+
+ log::info!("[do_commit] calling catalog.load_table...");
let refreshed = catalog.load_table(self.table.identifier()).await?;
+ log::info!("[do_commit] catalog.load_table returned successfully");
if self.table.metadata() != refreshed.metadata()
|| self.table.metadata_location() != refreshed.metadata_location()
{
- // current base is stale, use refreshed as base and re-apply transaction actions
+ log::info!("[do_commit] base is stale, refreshing table");
self.table = refreshed.clone();
+ } else {
+ log::info!("[do_commit] base is up to date, no refresh needed");
}
let mut current_table = self.table.clone();
let mut existing_updates: Vec = vec![];
let mut existing_requirements: Vec = vec![];
- for action in &self.actions {
+ log::info!("[do_commit] processing {} actions", self.actions.len());
+ for (i, action) in self.actions.iter().enumerate() {
+ log::info!(
+ "[do_commit] committing action {}/{}...",
+ i + 1,
+ self.actions.len()
+ );
let action_commit = Arc::clone(action).commit(¤t_table).await?;
- // apply action commit to current_table
- current_table = Self::apply(
+ log::info!(
+ "[do_commit] action {}/{} commit returned, applying...",
+ i + 1,
+ self.actions.len()
+ );
+ let apply_result = Self::apply(
current_table,
action_commit,
&mut existing_updates,
&mut existing_requirements,
- )?;
+ );
+ match apply_result {
+ Ok(table) => {
+ log::info!(
+ "[do_commit] action {}/{} applied successfully",
+ i + 1,
+ self.actions.len()
+ );
+ current_table = table;
+ }
+ Err(e) => {
+ log::warn!(
+ "[do_commit] action {}/{} apply failed: {}",
+ i + 1,
+ self.actions.len(),
+ e
+ );
+ return Err(e);
+ }
+ }
}
+ log::info!(
+ "[do_commit] building TableCommit with {} updates and {} requirements",
+ existing_updates.len(),
+ existing_requirements.len()
+ );
let table_commit = TableCommit::builder()
.ident(self.table.identifier().to_owned())
.updates(existing_updates)
.requirements(existing_requirements)
.build();
- catalog.update_table(table_commit).await
+ log::info!("[do_commit] calling catalog.update_table...");
+ let result = catalog.update_table(table_commit).await;
+ log::info!(
+ "[do_commit] catalog.update_table returned with success={}",
+ result.is_ok()
+ );
+ result
}
}