Skip to content

Commit 6db7525

Browse files
wjones127claude
andauthored
feat: clean up transaction files on failed commits (#6319)
Previously, when a manifest commit failed (conflict, error, or retry exhaustion), the `.txn` file written to `_transactions/` was left orphaned. These files would accumulate until the GC cleanup interval (7+ days by default). This PR adds `cleanup_transaction_file()` — a best-effort delete helper — and calls it from all three commit failure paths (`do_commit_new_dataset`, `do_commit_detached_transaction`, `commit_transaction`). Failures to delete are logged as warnings and do not surface to the caller. In the retry loop of `commit_transaction`, the previous iteration's transaction file is cleaned up before each retry attempt, since a new transaction file is written on each iteration. Fixes #6125 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5aca703 commit 6db7525

2 files changed

Lines changed: 123 additions & 8 deletions

File tree

rust/lance/src/dataset/cleanup.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2396,7 +2396,8 @@ mod tests {
23962396
// deposited a data file
23972397
assert_eq!(before_count.num_data_files, 2);
23982398
assert_eq!(before_count.num_manifest_files, 1);
2399-
assert_eq!(before_count.num_tx_files, 2);
2399+
// Only 1 txn file: the failed commit's txn file was already cleaned up.
2400+
assert_eq!(before_count.num_tx_files, 1);
24002401

24012402
// All of our manifests are newer than the threshold but temp files
24022403
// should still be deleted.

rust/lance/src/io/commit.rs

Lines changed: 121 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,29 @@ pub(crate) async fn read_transaction_file(
8686
transaction.try_into()
8787
}
8888

89+
/// Best-effort delete of a transaction file that is no longer needed.
90+
///
91+
/// Logs a warning on failure rather than propagating the error, since the
92+
/// primary operation has already failed and the orphaned file will eventually
93+
/// be removed by GC.
94+
async fn cleanup_transaction_file(
95+
object_store: &ObjectStore,
96+
base_path: &Path,
97+
transaction_file: &str,
98+
) {
99+
if transaction_file.is_empty() {
100+
return;
101+
}
102+
let path = base_path.child(TRANSACTIONS_DIR).child(transaction_file);
103+
if let Err(e) = object_store.delete(&path).await {
104+
log::warn!(
105+
"Failed to clean up orphaned transaction file '{}': {}",
106+
transaction_file,
107+
e
108+
);
109+
}
110+
}
111+
89112
/// Write a transaction to a file and return the relative path.
90113
pub(crate) async fn write_transaction_file(
91114
object_store: &ObjectStore,
@@ -153,7 +176,7 @@ async fn do_commit_new_dataset(
153176
ref_path.clone(),
154177
new_base_id,
155178
branch_name.clone(),
156-
transaction_file,
179+
transaction_file.clone(),
157180
);
158181

159182
let updated_indices = if let Some(index_section_pos) = source_manifest.index_section {
@@ -252,9 +275,13 @@ async fn do_commit_new_dataset(
252275
Ok((manifest, manifest_location))
253276
}
254277
Err(CommitError::CommitConflict) => {
278+
cleanup_transaction_file(object_store, base_path, &transaction_file).await;
255279
Err(crate::Error::dataset_already_exists(base_path.to_string()))
256280
}
257-
Err(CommitError::OtherError(err)) => Err(err),
281+
Err(CommitError::OtherError(err)) => {
282+
cleanup_transaction_file(object_store, base_path, &transaction_file).await;
283+
Err(err)
284+
}
258285
}
259286
}
260287

@@ -812,13 +839,15 @@ pub(crate) async fn do_commit_detached_transaction(
812839
}
813840
Err(CommitError::OtherError(err)) => {
814841
// If other error, return
842+
cleanup_transaction_file(object_store, &dataset.base, &transaction_file).await;
815843
return Err(err);
816844
}
817845
}
818846
}
819847

820848
// This should be extremely unlikely. There should not be *that* many detached commits. If
821849
// this happens then it seems more likely there is a bug in our random u64 generation.
850+
cleanup_transaction_file(object_store, &dataset.base, &transaction_file).await;
822851
Err(crate::Error::commit_conflict_source(
823852
0,
824853
format!(
@@ -904,6 +933,9 @@ pub(crate) async fn commit_transaction(
904933
// Other transactions that may have been committed since the read_version.
905934
// We keep pair of (version, transaction). No other transactions to check initially
906935
let mut other_transactions: Vec<(u64, Arc<Transaction>)>;
936+
// Track the transaction file written in the current loop iteration so we can
937+
// delete it if the commit ultimately fails.
938+
let mut current_transaction_file = String::new();
907939

908940
while backoff.attempt() < num_attempts {
909941
// We are pessimistic here and assume there may be other transactions
@@ -931,11 +963,12 @@ pub(crate) async fn commit_transaction(
931963
transaction = rebase.finish(&dataset).await?;
932964
}
933965

934-
let transaction_file = if !write_config.disable_transaction_file() {
966+
current_transaction_file = if !write_config.disable_transaction_file() {
935967
write_transaction_file(object_store, &dataset.base, &transaction).await?
936968
} else {
937969
String::new()
938970
};
971+
let transaction_file = current_transaction_file.as_str();
939972

940973
target_version = dataset.manifest.version + 1;
941974
if is_detached_version(target_version) {
@@ -952,15 +985,15 @@ pub(crate) async fn commit_transaction(
952985
&dataset.base,
953986
version,
954987
write_config,
955-
&transaction_file,
988+
transaction_file,
956989
&dataset.manifest,
957990
)
958991
.await?
959992
}
960993
_ => transaction.build_manifest(
961994
Some(dataset.manifest.as_ref()),
962995
dataset.load_indices().await?.as_ref().clone(),
963-
&transaction_file,
996+
transaction_file,
964997
write_config,
965998
)?,
966999
};
@@ -1053,19 +1086,29 @@ pub(crate) async fn commit_transaction(
10531086
}
10541087

10551088
if next_attempt_i < num_attempts {
1089+
// The transaction file from this attempt is now stale; clean it up
1090+
// before the next attempt writes a new one (possibly rebased).
1091+
cleanup_transaction_file(
1092+
object_store,
1093+
&dataset.base,
1094+
&current_transaction_file,
1095+
)
1096+
.await;
10561097
tokio::time::sleep(backoff.next_backoff()).await;
10571098
continue;
10581099
} else {
10591100
break;
10601101
}
10611102
}
10621103
Err(CommitError::OtherError(err)) => {
1063-
// If other error, return
1104+
cleanup_transaction_file(object_store, &dataset.base, &current_transaction_file)
1105+
.await;
10641106
return Err(err);
10651107
}
10661108
}
10671109
}
10681110

1111+
cleanup_transaction_file(object_store, &dataset.base, &current_transaction_file).await;
10691112
Err(crate::Error::commit_conflict_source(
10701113
target_version,
10711114
format!(
@@ -1092,7 +1135,7 @@ mod tests {
10921135
use lance_linalg::distance::MetricType;
10931136
use lance_table::format::{DataFile, DataStorageFormat};
10941137
use lance_table::io::commit::{
1095-
CommitLease, CommitLock, RenameCommitHandler, UnsafeCommitHandler,
1138+
CommitLease, CommitLock, ManifestWriter, RenameCommitHandler, UnsafeCommitHandler,
10961139
};
10971140
use lance_testing::datagen::generate_random_array;
10981141

@@ -1700,6 +1743,77 @@ mod tests {
17001743
assert_eq!(manifest.fragments.as_ref(), &expected_fragments);
17011744
}
17021745

1746+
/// A CommitHandler that always fails with OtherError, used to simulate
1747+
/// a manifest write failure so we can verify orphaned transaction files
1748+
/// are cleaned up.
1749+
#[derive(Debug)]
1750+
struct FailingCommitHandler;
1751+
1752+
#[async_trait::async_trait]
1753+
impl CommitHandler for FailingCommitHandler {
1754+
async fn commit(
1755+
&self,
1756+
_manifest: &mut Manifest,
1757+
_indices: Option<Vec<IndexMetadata>>,
1758+
_base_path: &Path,
1759+
_object_store: &ObjectStore,
1760+
_manifest_writer: ManifestWriter,
1761+
_naming_scheme: ManifestNamingScheme,
1762+
_transaction: Option<lance_table::format::Transaction>,
1763+
) -> std::result::Result<ManifestLocation, CommitError> {
1764+
Err(CommitError::OtherError(lance_core::Error::io(
1765+
"simulated commit failure",
1766+
)))
1767+
}
1768+
}
1769+
1770+
fn count_txn_files(uri: &str) -> usize {
1771+
let tx_dir = std::path::Path::new(uri).join("_transactions");
1772+
std::fs::read_dir(&tx_dir)
1773+
.map(|rd| rd.filter_map(|e| e.ok()).count())
1774+
.unwrap_or(0)
1775+
}
1776+
1777+
#[tokio::test]
1778+
async fn test_transaction_file_cleanup_on_commit_failure() {
1779+
let tmp = TempStrDir::default();
1780+
let uri = tmp.as_str();
1781+
1782+
// Create initial dataset with a normal commit handler.
1783+
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1784+
"x",
1785+
DataType::Int32,
1786+
false,
1787+
)]));
1788+
let batch = RecordBatch::try_new(
1789+
schema.clone(),
1790+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1791+
)
1792+
.unwrap();
1793+
let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone());
1794+
Dataset::write(reader, uri, None).await.unwrap();
1795+
1796+
let txn_files_before = count_txn_files(uri);
1797+
1798+
// Attempt to append with a commit handler that always fails.
1799+
let params = WriteParams {
1800+
mode: WriteMode::Append,
1801+
commit_handler: Some(Arc::new(FailingCommitHandler)),
1802+
..Default::default()
1803+
};
1804+
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
1805+
let result = Dataset::write(reader, uri, Some(params)).await;
1806+
assert!(result.is_err(), "expected commit to fail");
1807+
1808+
// The failed commit must not leave any extra transaction files behind.
1809+
let txn_files_after = count_txn_files(uri);
1810+
assert_eq!(
1811+
txn_files_after,
1812+
txn_files_before,
1813+
"failed commit left {extra} orphaned transaction file(s)",
1814+
extra = txn_files_after.saturating_sub(txn_files_before),
1815+
);
1816+
}
17031817
/// Helper to build a simple manifest for check_column_indices tests.
17041818
fn make_manifest_with_file(
17051819
schema: Schema,

0 commit comments

Comments
 (0)