Skip to content

Commit a804c5a

Browse files
wjones127claude
andcommitted
fix: push data_file before progress callback, extract cleanup helper
Move fragment.files.push(data_file) before params.progress.complete() so that if the progress callback fails, the completed file is still tracked for cleanup. Extract the duplicated in-progress file cleanup into cleanup_partial_write(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ceb02f9 commit a804c5a

1 file changed

Lines changed: 23 additions & 24 deletions

File tree

rust/lance/src/dataset/write.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ pub async fn do_write_fragments(
481481
let last_fragment = fragments.last_mut().unwrap();
482482
last_fragment.physical_rows = Some(num_rows as usize);
483483
last_fragment.files.push(data_file);
484+
// Notify after pushing the data file so it's tracked for cleanup
485+
// if the callback fails.
484486
params.progress.complete(fragments.last().unwrap()).await?;
485487
if let Some(cb) = &params.write_progress {
486488
cb.call(WriteStats {
@@ -497,28 +499,18 @@ pub async fn do_write_fragments(
497499
.await;
498500

499501
if let Err(e) = loop_result {
500-
let in_progress_filename = writer
502+
let in_progress_path = writer
501503
.as_ref()
502504
.filter(|w| w.base_id().is_none())
503505
.map(|w| w.path().to_owned());
504506
drop(writer.take());
505-
cleanup_data_fragments(&object_store, base_dir, &fragments).await;
506-
if let Some(filename) = in_progress_filename {
507-
let path = base_dir.child(DATA_DIR).child(filename.as_str());
508-
if let Err(del_e) = object_store.delete(&path).await {
509-
log::warn!(
510-
"Failed to clean up in-progress data file '{}': {}",
511-
path,
512-
del_e
513-
);
514-
}
515-
}
507+
cleanup_partial_write(&object_store, base_dir, &fragments, in_progress_path).await;
516508
return Err(e);
517509
}
518510

519511
// Complete the final writer
520512
if let Some(mut w) = writer.take() {
521-
let in_progress_filename = if w.base_id().is_none() {
513+
let in_progress_path = if w.base_id().is_none() {
522514
Some(w.path().to_owned())
523515
} else {
524516
None
@@ -541,17 +533,7 @@ pub async fn do_write_fragments(
541533
}
542534
}
543535
Err(e) => {
544-
cleanup_data_fragments(&object_store, base_dir, &fragments).await;
545-
if let Some(filename) = in_progress_filename {
546-
let path = base_dir.child(DATA_DIR).child(filename.as_str());
547-
if let Err(del_e) = object_store.delete(&path).await {
548-
log::warn!(
549-
"Failed to clean up in-progress data file '{}': {}",
550-
path,
551-
del_e
552-
);
553-
}
554-
}
536+
cleanup_partial_write(&object_store, base_dir, &fragments, in_progress_path).await;
555537
return Err(e);
556538
}
557539
}
@@ -589,6 +571,23 @@ pub(crate) async fn cleanup_data_fragments(
589571
}
590572
}
591573

574+
/// Best-effort cleanup of a partially-failed write: deletes all completed fragment
575+
/// data files plus an optional in-progress file that was never finished.
576+
async fn cleanup_partial_write(
577+
object_store: &ObjectStore,
578+
base_dir: &Path,
579+
fragments: &[Fragment],
580+
in_progress_filename: Option<String>,
581+
) {
582+
cleanup_data_fragments(object_store, base_dir, fragments).await;
583+
if let Some(filename) = in_progress_filename {
584+
let path = base_dir.child(DATA_DIR).child(filename.as_str());
585+
if let Err(e) = object_store.delete(&path).await {
586+
log::warn!("Failed to clean up in-progress data file '{}': {}", path, e);
587+
}
588+
}
589+
}
590+
592591
pub async fn validate_and_resolve_target_bases(
593592
params: &mut WriteParams,
594593
existing_base_paths: Option<&HashMap<u32, BasePath>>,

0 commit comments

Comments
 (0)