Skip to content

Commit f49c049

Browse files
authored
feat: strict overwrite operation (#3712)
Modifies the handling of the Overwrite operation to respect `read_version` when `num_retries` is set to 0 (strict Overwrite). A strict Overwrite fails if new commits have been written after `read_version`. This allows users to more easily build custom transaction and retry logic if needed. The first commit refactors only for code deduplication with no behavioral changes. The remainder makes two behavioral changes: 1. Each commit will be tried at least once (previously, commits would always fail when `num_retries` is set to 0) 2. If the operation is Overwrite and `num_retries` is 0, the commit will be attempted based on `read_version` instead of attempting to check out the latest version of the dataset.
1 parent 674cc10 commit f49c049

2 files changed

Lines changed: 82 additions & 63 deletions

File tree

rust/lance/src/dataset.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3167,6 +3167,41 @@ mod tests {
31673167
assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2);
31683168
}
31693169

3170+
#[tokio::test]
3171+
async fn test_strict_overwrite() {
3172+
let schema = Schema::try_from(&ArrowSchema::new(vec![ArrowField::new(
3173+
"x",
3174+
DataType::Int32,
3175+
false,
3176+
)]))
3177+
.unwrap();
3178+
let operation = Operation::Overwrite {
3179+
fragments: vec![],
3180+
schema,
3181+
config_upsert_values: None,
3182+
};
3183+
let test_dir = tempdir().unwrap();
3184+
let test_uri = test_dir.path().to_str().unwrap();
3185+
let read_version_0_transaction = Transaction::new(0, operation, None, None);
3186+
let strict_builder = CommitBuilder::new(test_uri).with_max_retries(0);
3187+
let unstrict_builder = CommitBuilder::new(test_uri).with_max_retries(1);
3188+
strict_builder
3189+
.clone()
3190+
.execute(read_version_0_transaction.clone())
3191+
.await
3192+
.expect("Strict overwrite should succeed when writing a new dataset");
3193+
strict_builder
3194+
.clone()
3195+
.execute(read_version_0_transaction.clone())
3196+
.await
3197+
.expect_err("Strict overwrite should fail when committing to a stale version");
3198+
unstrict_builder
3199+
.clone()
3200+
.execute(read_version_0_transaction.clone())
3201+
.await
3202+
.expect("Unstrict overwrite should succeed when committing to a stale version");
3203+
}
3204+
31703205
#[rstest]
31713206
#[tokio::test]
31723207
async fn test_merge(

rust/lance/src/io/commit.rs

Lines changed: 47 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -732,46 +732,51 @@ pub(crate) async fn commit_transaction(
732732
// Note: object_store has been configured with WriteParams, but dataset.object_store()
733733
// has not necessarily. So for anything involving writing, use `object_store`.
734734
let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?;
735-
736-
// First, get all transactions since read_version
737735
let read_version = transaction.read_version;
736+
let mut target_version = read_version + 1;
738737
let mut dataset = dataset.clone();
739-
// We need to checkout the latest version, because any fixes we apply
740-
// (like computing the new row ids) needs to be done based on the most
741-
// recent manifest.
742-
dataset.checkout_latest().await?;
743-
let latest_version = dataset.manifest.version;
744-
let other_transactions = futures::stream::iter((read_version + 1)..=latest_version)
745-
.map(|version| {
746-
read_dataset_transaction_file(&dataset, version)
747-
.map(move |res| res.map(|tx| (version, tx)))
748-
})
749-
.buffer_unordered(dataset.object_store().io_parallelism())
750-
.take_while(|res| {
751-
futures::future::ready(!matches!(
752-
res,
753-
Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. })
754-
))
755-
})
756-
.try_collect::<Vec<_>>()
757-
.await?;
758-
759-
let mut target_version = latest_version + 1;
760-
761-
if is_detached_version(target_version) {
762-
return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() });
763-
}
764-
765-
// If any of them conflict with the transaction, return an error
766-
for (other_version, other_transaction) in other_transactions.iter() {
767-
check_transaction(
768-
transaction,
769-
*other_version,
770-
Some(other_transaction.as_ref()),
771-
)?;
738+
if matches!(transaction.operation, Operation::Overwrite { .. })
739+
&& commit_config.num_retries == 0
740+
{
741+
dataset.checkout_version(transaction.read_version).await?;
742+
} else {
743+
// We need to checkout the latest version, because any fixes we apply
744+
// (like computing the new row ids) needs to be done based on the most
745+
// recent manifest.
746+
dataset.checkout_latest().await?;
772747
}
773-
774-
for attempt_i in 0..commit_config.num_retries {
748+
let num_attempts = std::cmp::max(commit_config.num_retries, 1);
749+
for attempt_i in 0..num_attempts {
750+
// See if we can retry the commit. Try to account for all
751+
// transactions that have been committed since the read_version.
752+
// Use small amount of backoff to handle transactions that all
753+
// started at exact same time better.
754+
futures::stream::iter(target_version..=dataset.manifest.version)
755+
.map(|version| {
756+
read_dataset_transaction_file(&dataset, version)
757+
.map(move |res| res.map(|tx| (version, tx)))
758+
})
759+
.buffer_unordered(dataset.object_store().io_parallelism())
760+
.take_while(|res| {
761+
futures::future::ready(
762+
attempt_i > 0
763+
|| !matches!(
764+
res,
765+
Err(crate::Error::NotFound { .. })
766+
| Err(crate::Error::DatasetNotFound { .. })
767+
),
768+
)
769+
})
770+
.try_for_each(|(other_version, other_transaction)| {
771+
let res =
772+
check_transaction(transaction, other_version, Some(other_transaction.as_ref()));
773+
futures::future::ready(res)
774+
})
775+
.await?;
776+
target_version = dataset.manifest.version + 1;
777+
if is_detached_version(target_version) {
778+
return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() });
779+
}
775780
// Build an up-to-date manifest from the transaction and current manifest
776781
let (mut manifest, mut indices) = match transaction.operation {
777782
Operation::Restore { version } => {
@@ -843,32 +848,11 @@ pub(crate) async fn commit_transaction(
843848
return Ok((manifest, manifest_location.path, manifest_location.e_tag));
844849
}
845850
Err(CommitError::CommitConflict) => {
846-
// See if we can retry the commit. Try to account for all
847-
// transactions that have been committed since the read_version.
848-
// Use small amount of backoff to handle transactions that all
849-
// started at exact same time better.
850-
851-
let backoff_time = backoff_time(attempt_i);
852-
tokio::time::sleep(backoff_time).await;
853-
854-
dataset.checkout_latest().await?;
855-
let latest_version = dataset.manifest.version;
856-
futures::stream::iter(target_version..=latest_version)
857-
.map(|version| {
858-
read_dataset_transaction_file(&dataset, version)
859-
.map(move |res| res.map(|tx| (version, tx)))
860-
})
861-
.buffer_unordered(dataset.object_store().io_parallelism())
862-
.try_for_each(|(version, other_transaction)| {
863-
let res = check_transaction(
864-
transaction,
865-
version,
866-
Some(other_transaction.as_ref()),
867-
);
868-
futures::future::ready(res)
869-
})
870-
.await?;
871-
target_version = latest_version + 1;
851+
let next_attempt_i = attempt_i + 1;
852+
if next_attempt_i < num_attempts {
853+
tokio::time::sleep(backoff_time(next_attempt_i)).await;
854+
dataset.checkout_latest().await?;
855+
}
872856
}
873857
Err(CommitError::OtherError(err)) => {
874858
// If other error, return

0 commit comments

Comments
 (0)