diff --git a/Cargo.lock b/Cargo.lock index 9b9cbd30da..74d36d99ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3337,6 +3337,7 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "log", "minijinja", "mockall", "moka", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 41ee771617..b424c448c8 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -74,6 +74,7 @@ serde_repr = { workspace = true } serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } tokio = { workspace = true, optional = false, features = ["sync"] } +log = { workspace = true } typed-builder = { workspace = true } typetag = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..de7084789f 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -97,7 +97,13 @@ impl TransactionAction for FastAppendAction { // Checks duplicate files if self.check_duplicate { - snapshot_producer.validate_duplicate_files().await?; + let start = std::time::Instant::now(); + let result = snapshot_producer.validate_duplicate_files().await; + log::info!( + "[FastAppendAction] validate_duplicate_files took {:?}", + start.elapsed() + ); + result?; } snapshot_producer diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 1cd63e2221..d48203c6ed 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -175,6 +175,9 @@ impl Transaction { let backoff = Self::build_backoff(table_props)?; let tx = self; + let table_ident = tx.table.identifier().clone(); + + log::info!("Starting transaction commit for {table_ident} with backoff {backoff:?}"); (|mut tx: Transaction| async { let result = tx.do_commit(catalog).await; (tx, result) @@ -183,6 +186,11 @@ impl Transaction { .sleep(tokio::time::sleep) .context(tx) .when(|e| e.retryable()) + .notify(move |err, dur| { + log::warn!( + "Transaction commit for {table_ident} failed with retryable error, retrying in {dur:?}: {err}" + ); + }) .await .1 } @@ -200,37 +208,86 @@ impl Transaction { } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result { + log::info!( + "[do_commit] entering do_commit for table {:?}", + self.table.identifier() + ); + + log::info!("[do_commit] calling catalog.load_table..."); let refreshed = catalog.load_table(self.table.identifier()).await?; + log::info!("[do_commit] catalog.load_table returned successfully"); if self.table.metadata() != refreshed.metadata() || self.table.metadata_location() != refreshed.metadata_location() { - // current base is stale, use refreshed as base and re-apply transaction actions + log::info!("[do_commit] base is stale, refreshing table"); self.table = refreshed.clone(); + } else { + log::info!("[do_commit] base is up to date, no refresh needed"); } let mut current_table = self.table.clone(); let mut existing_updates: Vec = vec![]; let mut existing_requirements: Vec = vec![]; - for action in &self.actions { + log::info!("[do_commit] processing {} actions", self.actions.len()); + for (i, action) in self.actions.iter().enumerate() { + log::info!( + "[do_commit] committing action {}/{}...", + i + 1, + self.actions.len() + ); let action_commit = Arc::clone(action).commit(¤t_table).await?; - // apply action commit to current_table - current_table = Self::apply( + log::info!( + "[do_commit] action {}/{} commit returned, applying...", + i + 1, + self.actions.len() + ); + let apply_result = Self::apply( current_table, action_commit, &mut existing_updates, &mut existing_requirements, - )?; + ); + match apply_result { + Ok(table) => { + log::info!( + "[do_commit] action {}/{} applied successfully", + i + 1, + self.actions.len() + ); + current_table = table; + } + Err(e) => { + log::warn!( + "[do_commit] action {}/{} apply failed: {}", + i + 1, + self.actions.len(), + e + ); + return Err(e); + } + } } + log::info!( + "[do_commit] building TableCommit with {} updates and {} requirements", + existing_updates.len(), + existing_requirements.len() + ); let table_commit = TableCommit::builder() .ident(self.table.identifier().to_owned()) .updates(existing_updates) .requirements(existing_requirements) .build(); - catalog.update_table(table_commit).await + log::info!("[do_commit] calling catalog.update_table..."); + let result = catalog.update_table(table_commit).await; + log::info!( + "[do_commit] catalog.update_table returned with success={}", + result.is_ok() + ); + result } }