diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6b244bcacef..8fc591c6c26 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -95,7 +95,7 @@ use self::fragment::FileFragment; use self::refs::Refs; use self::scanner::{DatasetRecordBatchStream, Scanner}; use self::transaction::{Operation, Transaction, TransactionBuilder, UpdateMapEntry}; -use self::write::write_fragments_internal; +use self::write::{cleanup_data_fragments, write_fragments_internal}; use crate::dataset::branch_location::BranchLocation; use crate::dataset::cleanup::{CleanupPolicy, CleanupPolicyBuilder}; use crate::dataset::refs::{BranchContents, BranchIdentifier, Branches, Tags}; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index c70eb93bbcd..eab33ddc9ee 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -93,7 +93,7 @@ use super::transaction::{ Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder, }; use super::utils::make_rowid_capture_stream; -use super::{WriteMode, WriteParams, write_fragments_internal}; +use super::{WriteMode, WriteParams, cleanup_data_fragments, write_fragments_internal}; use crate::Dataset; use crate::Result; use crate::dataset::utils::CapturedRowIds; @@ -1263,26 +1263,37 @@ async fn rewrite_files( log::info!("Compaction task {}: file written", task_id); - let row_addrs = if let Some(row_ids_rx) = row_ids_rx { - let captured_ids = row_ids_rx - .try_recv() - .map_err(|err| Error::internal(format!("Failed to receive row ids: {}", err)))?; - let row_addrs = captured_ids.row_addrs(None).into_owned(); - let mut serialized = Vec::with_capacity(row_addrs.serialized_size()); - row_addrs.serialize_into(&mut serialized)?; - Some(serialized) - } else { - if dataset.manifest.uses_stable_row_ids() { - log::info!("Compaction task {}: rechunking stable row ids", task_id); - rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?; - recalc_versions_for_rewritten_fragments( - dataset.as_ref(), - &mut new_fragments, - &fragments, - ) - .await?; + let row_addrs_result: Result>> = async { + if let Some(row_ids_rx) = row_ids_rx { + let captured_ids = row_ids_rx + .try_recv() + .map_err(|err| Error::internal(format!("Failed to receive row ids: {}", err)))?; + let row_addrs = captured_ids.row_addrs(None).into_owned(); + let mut serialized = Vec::with_capacity(row_addrs.serialized_size()); + row_addrs.serialize_into(&mut serialized)?; + Ok(Some(serialized)) + } else { + if dataset.manifest.uses_stable_row_ids() { + log::info!("Compaction task {}: rechunking stable row ids", task_id); + rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?; + recalc_versions_for_rewritten_fragments( + dataset.as_ref(), + &mut new_fragments, + &fragments, + ) + .await?; + } + Ok(None) + } + } + .await; + + let row_addrs = match row_addrs_result { + Ok(v) => v, + Err(e) => { + cleanup_data_fragments(&dataset.object_store, &dataset.base, &new_fragments).await; + return Err(e); } - None }; metrics.files_removed = task @@ -1585,6 +1596,13 @@ pub async fn commit_compaction( None }; + // Collect new fragment paths before moving rewrite_groups into the transaction, + // so we can clean them up if the commit fails. + let all_new_fragments: Vec = rewrite_groups + .iter() + .flat_map(|g| g.new_fragments.iter().cloned()) + .collect(); + let transaction = TransactionBuilder::new( dataset.manifest.version, Operation::Rewrite { @@ -1596,9 +1614,13 @@ pub async fn commit_compaction( .transaction_properties(options.transaction_properties.clone()) .build(); - dataset + if let Err(e) = dataset .apply_commit(transaction, &Default::default(), &Default::default()) - .await?; + .await + { + cleanup_data_fragments(&dataset.object_store, &dataset.base, &all_new_fragments).await; + return Err(e); + } Ok(metrics) } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 6b226969b08..e5a8a7ed04a 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -42,6 +42,8 @@ use crate::session::Session; use super::DATA_DIR; use super::fragment::write::generate_random_filename; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; + +pub use super::progress::{WriteProgressFn, WriteStats}; use super::transaction::Transaction; use super::utils::SchemaAdapter; @@ -52,7 +54,6 @@ pub mod merge_insert; mod retry; pub mod update; -pub use super::progress::{WriteProgressFn, WriteStats}; pub use commit::CommitBuilder; pub use delete::{DeleteBuilder, DeleteResult}; pub use insert::InsertBuilder; @@ -178,13 +179,6 @@ pub struct WriteParams { pub progress: Arc, - /// Optional callback invoked after each batch is written. - /// - /// Receives cumulative [`WriteStats`] so callers can render a progress bar - /// or compute throughput. The callback must be cheap and non-blocking; - /// spawn a task if you need async work. - pub write_progress: Option, - /// If present, dataset will use this to update the latest version /// /// If not set, the default will be based on the object store. Generally this will @@ -258,6 +252,13 @@ pub struct WriteParams { /// Allow writing external blob URIs that cannot be mapped to any registered /// non-dataset-root base path. When disabled, such rows are rejected. pub allow_external_blob_outside_bases: bool, + + /// Optional callback invoked after each batch is written. + /// + /// Receives cumulative [`WriteStats`] so callers can render a progress bar or + /// compute throughput. The callback must be cheap and non-blocking; spawn a + /// task if you need async work. + pub write_progress: Option, } impl Default for WriteParams { @@ -271,7 +272,6 @@ impl Default for WriteParams { mode: WriteMode::Create, store_params: None, progress: Arc::new(NoopFragmentWriteProgress::new()), - write_progress: None, commit_handler: None, data_storage_version: None, enable_stable_row_ids: false, @@ -284,6 +284,7 @@ impl Default for WriteParams { target_bases: None, target_base_names_or_paths: None, allow_external_blob_outside_bases: false, + write_progress: None, } } } @@ -428,7 +429,7 @@ pub async fn do_write_fragments( }; let writer_generator = WriterGenerator::new( - object_store, + object_store.clone(), base_dir, schema, storage_version, @@ -438,73 +439,155 @@ pub async fn do_write_fragments( ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; - let mut fragments = Vec::new(); + let mut fragments: Vec = Vec::new(); let mut bytes_completed: u64 = 0; let mut rows_completed: u64 = 0; let mut files_written: u32 = 0; - while let Some(batch_chunk) = buffered_reader.next().await { - let batch_chunk = batch_chunk?; - - if writer.is_none() { - let (new_writer, new_fragment) = writer_generator.new_writer().await?; - params.progress.begin(&new_fragment).await?; - writer = Some(new_writer); - fragments.push(new_fragment); - } - writer.as_mut().unwrap().write(&batch_chunk).await?; - for batch in &batch_chunk { - num_rows_in_current_file += batch.num_rows() as u32; - } + let loop_result: Result<()> = async { + while let Some(batch_chunk) = buffered_reader.next().await { + let batch_chunk = batch_chunk?; - if let Some(cb) = ¶ms.write_progress { - let current_bytes = writer.as_mut().unwrap().tell().await?; - cb.call(WriteStats { - bytes_written: bytes_completed + current_bytes, - rows_written: rows_completed + num_rows_in_current_file as u64, - files_written, - }); - } + if writer.is_none() { + let (new_writer, new_fragment) = writer_generator.new_writer().await?; + params.progress.begin(&new_fragment).await?; + writer = Some(new_writer); + fragments.push(new_fragment); + } - if num_rows_in_current_file >= params.max_rows_per_file as u32 - || writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64 - { - let (num_rows, data_file) = writer.take().unwrap().finish().await?; - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); - debug_assert_eq!(num_rows, num_rows_in_current_file); - bytes_completed += data_file.file_size_bytes.get().map_or(0, |s| s.get()); - rows_completed += num_rows as u64; - files_written += 1; - params.progress.complete(fragments.last().unwrap()).await?; - let last_fragment = fragments.last_mut().unwrap(); - last_fragment.physical_rows = Some(num_rows as usize); - last_fragment.files.push(data_file); - num_rows_in_current_file = 0; + writer.as_mut().unwrap().write(&batch_chunk).await?; + for batch in &batch_chunk { + num_rows_in_current_file += batch.num_rows() as u32; + } + + if let Some(cb) = ¶ms.write_progress { + let current_bytes = writer.as_mut().unwrap().tell().await?; + cb.call(WriteStats { + bytes_written: bytes_completed + current_bytes, + rows_written: rows_completed + num_rows_in_current_file as u64, + files_written, + }); + } + + if num_rows_in_current_file >= params.max_rows_per_file as u32 + || writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64 + { + let (num_rows, data_file) = writer.take().unwrap().finish().await?; + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); + debug_assert_eq!(num_rows, num_rows_in_current_file); + bytes_completed += data_file.file_size_bytes.get().map_or(0, |s| s.get()); + rows_completed += num_rows as u64; + files_written += 1; + let last_fragment = fragments.last_mut().unwrap(); + last_fragment.physical_rows = Some(num_rows as usize); + last_fragment.files.push(data_file); + // Notify after pushing the data file so it's tracked for cleanup + // if the callback fails. + params.progress.complete(fragments.last().unwrap()).await?; + if let Some(cb) = ¶ms.write_progress { + cb.call(WriteStats { + bytes_written: bytes_completed, + rows_written: rows_completed, + files_written, + }); + } + num_rows_in_current_file = 0; + } } + Ok(()) + } + .await; + + if let Err(e) = loop_result { + let in_progress_path = writer + .as_ref() + .filter(|w| w.base_id().is_none()) + .map(|w| w.path().to_owned()); + drop(writer.take()); + cleanup_partial_write(&object_store, base_dir, &fragments, in_progress_path).await; + return Err(e); } // Complete the final writer - if let Some(mut writer) = writer.take() { - let (num_rows, data_file) = writer.finish().await?; - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); - bytes_completed += data_file.file_size_bytes.get().map_or(0, |s| s.get()); - rows_completed += num_rows as u64; - files_written += 1; - if let Some(cb) = ¶ms.write_progress { - cb.call(WriteStats { - bytes_written: bytes_completed, - rows_written: rows_completed, - files_written, - }); + if let Some(mut w) = writer.take() { + let in_progress_path = if w.base_id().is_none() { + Some(w.path().to_owned()) + } else { + None + }; + match w.finish().await { + Ok((num_rows, data_file)) => { + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); + bytes_completed += data_file.file_size_bytes.get().map_or(0, |s| s.get()); + rows_completed += num_rows as u64; + files_written += 1; + let last_fragment = fragments.last_mut().unwrap(); + last_fragment.physical_rows = Some(num_rows as usize); + last_fragment.files.push(data_file); + if let Some(cb) = ¶ms.write_progress { + cb.call(WriteStats { + bytes_written: bytes_completed, + rows_written: rows_completed, + files_written, + }); + } + } + Err(e) => { + cleanup_partial_write(&object_store, base_dir, &fragments, in_progress_path).await; + return Err(e); + } } - let last_fragment = fragments.last_mut().unwrap(); - last_fragment.physical_rows = Some(num_rows as usize); - last_fragment.files.push(data_file); } Ok(fragments) } +/// Best-effort cleanup of data files for fragments that were written but not committed. +/// +/// Only cleans up files in the dataset's default storage (base_id == None). Files +/// written to external bases are skipped because we don't have their object stores here. +pub(crate) async fn cleanup_data_fragments( + object_store: &ObjectStore, + base_dir: &Path, + fragments: &[Fragment], +) { + let data_dir = base_dir.child(DATA_DIR); + for fragment in fragments { + for file in &fragment.files { + if file.base_id.is_none() { + let path = data_dir.child(file.path.as_str()); + if let Err(e) = object_store.delete(&path).await { + log::warn!("Failed to clean up orphaned data file '{}': {}", path, e); + } + } else { + log::warn!( + "Unable to clean up orphaned data file '{}' in external base {}: \ + cleanup not supported for external bases", + file.path, + file.base_id.unwrap() + ); + } + } + } +} + +/// Best-effort cleanup of a partially-failed write: deletes all completed fragment +/// data files plus an optional in-progress file that was never finished. +async fn cleanup_partial_write( + object_store: &ObjectStore, + base_dir: &Path, + fragments: &[Fragment], + in_progress_filename: Option, +) { + cleanup_data_fragments(object_store, base_dir, fragments).await; + if let Some(filename) = in_progress_filename { + let path = base_dir.child(DATA_DIR).child(filename.as_str()); + if let Err(e) = object_store.delete(&path).await { + log::warn!("Failed to clean up in-progress data file '{}': {}", path, e); + } + } +} + pub async fn validate_and_resolve_target_bases( params: &mut WriteParams, existing_base_paths: Option<&HashMap>, @@ -822,6 +905,11 @@ pub trait GenericWriter: Send { async fn tell(&mut self) -> Result; /// Finish writing the file (flush the remaining data and write footer) async fn finish(&mut self) -> Result<(u32, DataFile)>; + /// Returns the relative filename (without directory) of the file being written. + fn path(&self) -> &str; + /// Returns the base ID if this writer is writing to an external base, or None + /// for the dataset's default storage. + fn base_id(&self) -> Option; } struct V1WriterAdapter @@ -856,6 +944,12 @@ where ), )) } + fn path(&self) -> &str { + &self.path + } + fn base_id(&self) -> Option { + self.base_id + } } struct V2WriterAdapter { @@ -912,6 +1006,12 @@ impl GenericWriter for V2WriterAdapter { ); Ok((num_rows, data_file)) } + fn path(&self) -> &str { + &self.path + } + fn base_id(&self) -> Option { + self.base_id + } } pub async fn open_writer( @@ -1183,11 +1283,13 @@ async fn new_source_iter( struct SpillStreamIter { receiver: SpillReceiver, - _sender_handle: tokio::task::JoinHandle, + #[allow(dead_code)] // Exists to keep the SpillSender alive + sender_handle: tokio::task::JoinHandle, // This temp dir is used to store the spilled data. It is kept alive by // this struct. When this struct is dropped, the Drop implementation of // tempfile::TempDir will delete the temp dir. - _tmp_dir: TempDir, + #[allow(dead_code)] // Exists to keep the temp dir alive + tmp_dir: TempDir, } impl SpillStreamIter { @@ -1231,8 +1333,8 @@ impl SpillStreamIter { Ok(Self { receiver, - _tmp_dir: tmp_dir, - _sender_handle: sender_handle, + tmp_dir, + sender_handle, }) } } @@ -2979,4 +3081,131 @@ mod tests { "All data should be correctly written" ); } + + /// Returns the number of files in `/data/`. + fn count_data_files(base_dir: &str) -> usize { + let data_dir = std::path::Path::new(base_dir).join("data"); + if !data_dir.exists() { + return 0; + } + std::fs::read_dir(data_dir) + .unwrap() + .filter(|e| e.as_ref().unwrap().path().is_file()) + .count() + } + + #[tokio::test] + async fn test_cleanup_data_files_on_failed_write() { + use lance_core::utils::tempfile::TempStrDir; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + let (object_store, base_dir) = + ObjectStore::from_uri_and_params(Default::default(), test_uri, &Default::default()) + .await + .unwrap(); + + let good_batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + // Build a stream: one good batch, then an error. + let items: Vec> = vec![ + Ok(good_batch.clone()), + Err(DataFusionError::External("injected failure".into())), + ]; + let stream = Box::pin(RecordBatchStreamAdapter::new( + arrow_schema.clone(), + futures::stream::iter(items), + )); + + let result = do_write_fragments( + None, + object_store.clone(), + &base_dir, + &schema, + stream, + WriteParams::default(), + LanceFileVersion::V2_1, + None, + ) + .await; + + assert!(result.is_err(), "Expected write to fail"); + assert_eq!( + count_data_files(test_uri), + 0, + "All partial data files should be cleaned up on failure" + ); + } + + #[tokio::test] + async fn test_cleanup_data_files_on_failed_write_multi_file() { + // Verify cleanup when a failure occurs after one file has already been completed + // (i.e., max_rows_per_file causes a file boundary before the error). + use lance_core::utils::tempfile::TempStrDir; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + let (object_store, base_dir) = + ObjectStore::from_uri_and_params(Default::default(), test_uri, &Default::default()) + .await + .unwrap(); + + // 3 rows per file; 2 good batches of 3 rows (fills one file), then error. + let good_batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let items: Vec> = vec![ + Ok(good_batch.clone()), + Ok(good_batch.clone()), + Err(DataFusionError::External("injected failure".into())), + ]; + let stream = Box::pin(RecordBatchStreamAdapter::new( + arrow_schema.clone(), + futures::stream::iter(items), + )); + + let result = do_write_fragments( + None, + object_store.clone(), + &base_dir, + &schema, + stream, + WriteParams { + max_rows_per_file: 3, + ..Default::default() + }, + LanceFileVersion::V2_1, + None, + ) + .await; + + assert!(result.is_err(), "Expected write to fail"); + assert_eq!( + count_data_files(test_uri), + 0, + "All data files (including completed ones) should be cleaned up on failure" + ); + } } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 290a83fb707..7984e21577e 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -24,6 +24,7 @@ pub mod inserted_rows; use assign_action::merge_insert_action; use inserted_rows::KeyExistenceFilter; +use super::cleanup_data_fragments; use super::retry::{RetryConfig, RetryExecutor, execute_with_retry}; use super::{CommitBuilder, WriteParams, write_fragments_internal}; use crate::dataset::rowids::get_row_id_index; @@ -1636,8 +1637,19 @@ impl MergeInsertJob { let removed_row_addrs = RoaringTreemap::from_iter(removed_row_addr_vec.into_iter()); - let (old_fragments, removed_fragment_ids) = - Self::apply_deletions(&self.dataset, &removed_row_addrs).await?; + let deletions_result = Self::apply_deletions(&self.dataset, &removed_row_addrs).await; + let (old_fragments, removed_fragment_ids) = match deletions_result { + Ok(v) => v, + Err(e) => { + cleanup_data_fragments( + &self.dataset.object_store, + &self.dataset.base, + &new_fragments, + ) + .await; + return Err(e); + } + }; // Commit updated and new fragments let operation = Operation::Update { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index ec34000642d..8934923f42c 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -5,6 +5,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use super::cleanup_data_fragments; use super::retry::{RetryConfig, RetryExecutor, execute_with_retry}; use super::{CommitBuilder, WriteParams, write_fragments_internal}; use crate::dataset::rowids::get_row_id_index; @@ -335,7 +336,19 @@ impl UpdateJob { // Apply deletions let row_id_index = get_row_id_index(&self.dataset).await?; let row_addrs = removed_row_ids.row_addrs(row_id_index.as_deref()); - let (old_fragments, removed_fragment_ids) = self.apply_deletions(&row_addrs).await?; + let deletions_result = self.apply_deletions(&row_addrs).await; + let (old_fragments, removed_fragment_ids) = match deletions_result { + Ok(v) => v, + Err(e) => { + cleanup_data_fragments( + &self.dataset.object_store, + &self.dataset.base, + &new_fragments, + ) + .await; + return Err(e); + } + }; let affected_rows = RowAddrTreeMap::from(row_addrs.as_ref().clone()); let num_updated_rows = new_fragments