Skip to content

Commit d1aec0a

Browse files
lijinglunwojiaodoubao
authored andcommitted
feat: add strict commit isolation flag for dataset commits
1 parent f41f0c8 commit d1aec0a

9 files changed

Lines changed: 120 additions & 9 deletions

File tree

java/lance-jni/src/blocking_dataset.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ impl BlockingDataset {
176176
None,
177177
Default::default(),
178178
false, // TODO: support enable_v2_manifest_paths
179+
false, // serial_commit
179180
))?;
180181
Ok(Self { inner })
181182
}

rust/lance-table/src/io/commit.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,9 @@ impl Debug for ConditionalPutCommitHandler {
11131113
pub struct CommitConfig {
11141114
pub num_retries: u32,
11151115
pub skip_auto_cleanup: bool,
1116+
/// When true, fail immediately if any concurrent transactions have been committed
1117+
/// since the read version of the transaction being committed.
1118+
pub serial_commit: bool,
11161119
// TODO: add isolation_level
11171120
}
11181121

@@ -1121,6 +1124,7 @@ impl Default for CommitConfig {
11211124
Self {
11221125
num_retries: 20,
11231126
skip_auto_cleanup: false,
1127+
serial_commit: false,
11241128
}
11251129
}
11261130
}

rust/lance/src/dataset.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,7 @@ impl Dataset {
12591259
commit_handler: Option<Arc<dyn CommitHandler>>,
12601260
session: Arc<Session>,
12611261
enable_v2_manifest_paths: bool,
1262+
serial_commit: bool,
12621263
detached: bool,
12631264
) -> Result<Self> {
12641265
let read_version = read_version.map_or_else(
@@ -1277,7 +1278,8 @@ impl Dataset {
12771278
let mut builder = CommitBuilder::new(base_uri)
12781279
.enable_v2_manifest_paths(enable_v2_manifest_paths)
12791280
.with_session(session)
1280-
.with_detached(detached);
1281+
.with_detached(detached)
1282+
.with_serial_commit(serial_commit);
12811283

12821284
if let Some(store_params) = store_params {
12831285
builder = builder.with_store_params(store_params);
@@ -1324,6 +1326,10 @@ impl Dataset {
13241326
/// dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning
13251327
/// this on will make the dataset unreadable for older versions of Lance
13261328
/// (prior to 0.17.0). Default is False.
1329+
/// * `serial_commit` - When true, the commit will fail if any transactions
1330+
/// have been committed since `read_version`, enforcing strict serial ordering. When
1331+
/// false (the default), the commit may be automatically rebased on concurrent updates.
1332+
#[allow(clippy::too_many_arguments)]
13271333
pub async fn commit(
13281334
dest: impl Into<WriteDestination<'_>>,
13291335
operation: Operation,
@@ -1332,6 +1338,7 @@ impl Dataset {
13321338
commit_handler: Option<Arc<dyn CommitHandler>>,
13331339
session: Arc<Session>,
13341340
enable_v2_manifest_paths: bool,
1341+
serial_commit: bool,
13351342
) -> Result<Self> {
13361343
Self::do_commit(
13371344
dest.into(),
@@ -1341,6 +1348,7 @@ impl Dataset {
13411348
commit_handler,
13421349
session,
13431350
enable_v2_manifest_paths,
1351+
serial_commit,
13441352
/*detached=*/ false,
13451353
)
13461354
.await
@@ -1371,6 +1379,7 @@ impl Dataset {
13711379
commit_handler,
13721380
session,
13731381
enable_v2_manifest_paths,
1382+
/*serial_commit=*/ false,
13741383
/*detached=*/ true,
13751384
)
13761385
.await

rust/lance/src/dataset/fragment.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2868,6 +2868,7 @@ mod tests {
28682868
None,
28692869
Default::default(),
28702870
true,
2871+
false,
28712872
)
28722873
.await
28732874
.unwrap();
@@ -2941,6 +2942,7 @@ mod tests {
29412942
None,
29422943
Default::default(),
29432944
true,
2945+
false,
29442946
)
29452947
.await
29462948
.unwrap();
@@ -3400,10 +3402,18 @@ mod tests {
34003402
initial_bases: None,
34013403
};
34023404

3403-
let new_dataset =
3404-
Dataset::commit(test_uri, op, None, None, None, Default::default(), false)
3405-
.await
3406-
.unwrap();
3405+
let new_dataset = Dataset::commit(
3406+
test_uri,
3407+
op,
3408+
None,
3409+
None,
3410+
None,
3411+
Default::default(),
3412+
false,
3413+
false,
3414+
)
3415+
.await
3416+
.unwrap();
34073417

34083418
assert_eq!(new_dataset.count_rows(None).await.unwrap(), dataset_rows);
34093419

@@ -3510,10 +3520,18 @@ mod tests {
35103520
initial_bases: None,
35113521
};
35123522

3513-
let dataset =
3514-
Dataset::commit(test_uri, op, None, None, None, Default::default(), false)
3515-
.await
3516-
.unwrap();
3523+
let dataset = Dataset::commit(
3524+
test_uri,
3525+
op,
3526+
None,
3527+
None,
3528+
None,
3529+
Default::default(),
3530+
false,
3531+
false,
3532+
)
3533+
.await
3534+
.unwrap();
35173535

35183536
// We only kept the first fragment of 40 rows
35193537
assert_eq!(
@@ -3750,6 +3768,7 @@ mod tests {
37503768
None,
37513769
Default::default(),
37523770
false,
3771+
false,
37533772
)
37543773
.await?;
37553774

@@ -3885,6 +3904,7 @@ mod tests {
38853904
None,
38863905
Default::default(),
38873906
false,
3907+
false,
38883908
)
38893909
.await
38903910
.unwrap();

rust/lance/src/dataset/tests/dataset_merge_update.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,7 @@ async fn test_datafile_replacement() {
674674
None,
675675
Arc::new(Default::default()),
676676
false,
677+
false,
677678
)
678679
.await
679680
.unwrap();
@@ -703,6 +704,7 @@ async fn test_datafile_replacement() {
703704
None,
704705
Arc::new(Default::default()),
705706
false,
707+
false,
706708
)
707709
.await
708710
.unwrap();
@@ -757,6 +759,7 @@ async fn test_datafile_replacement() {
757759
None,
758760
Arc::new(Default::default()),
759761
false,
762+
false,
760763
)
761764
.await
762765
.unwrap();
@@ -820,6 +823,7 @@ async fn test_datafile_partial_replacement() {
820823
None,
821824
Arc::new(Default::default()),
822825
false,
826+
false,
823827
)
824828
.await
825829
.unwrap();
@@ -873,6 +877,7 @@ async fn test_datafile_partial_replacement() {
873877
None,
874878
Arc::new(Default::default()),
875879
false,
880+
false,
876881
)
877882
.await
878883
.unwrap();
@@ -928,6 +933,7 @@ async fn test_datafile_partial_replacement() {
928933
None,
929934
Arc::new(Default::default()),
930935
false,
936+
false,
931937
)
932938
.await
933939
.unwrap();
@@ -1000,6 +1006,7 @@ async fn test_datafile_replacement_error() {
10001006
None,
10011007
Arc::new(Default::default()),
10021008
false,
1009+
false,
10031010
)
10041011
.await
10051012
.unwrap();
@@ -1032,6 +1039,7 @@ async fn test_datafile_replacement_error() {
10321039
None,
10331040
Arc::new(Default::default()),
10341041
false,
1042+
false,
10351043
)
10361044
.await
10371045
.unwrap_err();

rust/lance/src/dataset/tests/dataset_versioning.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ async fn test_v2_manifest_path_commit() {
116116
None,
117117
Default::default(),
118118
true, // enable_v2_manifest_paths
119+
false,
119120
)
120121
.await
121122
.unwrap();

rust/lance/src/dataset/write/commit.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ impl<'a> CommitBuilder<'a> {
163163
self
164164
}
165165

166+
/// When enabled, fail the commit if any transactions have been committed since
167+
/// the transaction's read version. This enforces strict serial ordering.
168+
pub fn with_serial_commit(mut self, serial_commit: bool) -> Self {
169+
self.commit_config.serial_commit = serial_commit;
170+
self
171+
}
172+
166173
/// Provide the set of row addresses that were deleted or updated. This is
167174
/// used to perform fast conflict resolution.
168175
pub fn with_affected_rows(mut self, affected_rows: RowAddrTreeMap) -> Self {
@@ -725,6 +732,54 @@ mod tests {
725732
assert_io_eq!(io_stats, write_iops, 2); // txn + manifest
726733
}
727734

735+
#[tokio::test]
736+
async fn test_serial_commit() {
737+
// Create a dataset with a single committed version.
738+
let session = Arc::new(Session::default());
739+
let write_params = WriteParams {
740+
session: Some(session.clone()),
741+
..Default::default()
742+
};
743+
let data = RecordBatch::try_new(
744+
Arc::new(ArrowSchema::new(vec![ArrowField::new(
745+
"a",
746+
DataType::Int32,
747+
false,
748+
)])),
749+
vec![Arc::new(Int32Array::from(vec![0; 5]))],
750+
)
751+
.unwrap();
752+
let dataset = InsertBuilder::new("memory://force_fail_any_update")
753+
.with_params(&write_params)
754+
.execute(vec![data])
755+
.await
756+
.unwrap();
757+
let dataset = Arc::new(dataset);
758+
let base_read_version = dataset.manifest().version;
759+
assert_eq!(base_read_version, 1);
760+
761+
// Advance the dataset to version 2 using the same read_version.
762+
let _ = CommitBuilder::new(dataset.clone())
763+
.execute(sample_transaction(base_read_version))
764+
.await
765+
.unwrap();
766+
767+
// Attempt to commit again using the stale read_version and serial_commit.
768+
let result = CommitBuilder::new(dataset.clone())
769+
.with_serial_commit(true)
770+
.execute(sample_transaction(base_read_version))
771+
.await;
772+
773+
assert!(matches!(result, Err(Error::CommitConflict { .. })));
774+
775+
// Successfully commit using the same read_version and non serial_isolation.
776+
CommitBuilder::new(dataset.clone())
777+
.with_serial_commit(false)
778+
.execute(sample_transaction(base_read_version))
779+
.await
780+
.unwrap();
781+
}
782+
728783
#[tokio::test]
729784
async fn test_commit_batch() {
730785
// Create a dataset

rust/lance/src/io/commit.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,18 @@ pub(crate) async fn commit_transaction(
828828
if !strict_overwrite {
829829
(dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?;
830830

831+
// In serial commit mode, we fail immediately if any concurrent transactions have been
832+
// committed since the read version of the transaction being committed.
833+
if commit_config.serial_commit && !other_transactions.is_empty() {
834+
return Err(Error::CommitConflict {
835+
version: target_version,
836+
source:
837+
"Concurrent updates detected since read_version with serial_commit enabled"
838+
.into(),
839+
location: location!(),
840+
});
841+
}
842+
831843
// See if we can retry the commit. Try to account for all
832844
// transactions that have been committed since the read_version.
833845
// Use small amount of backoff to handle transactions that all

rust/lance/src/utils/test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ impl TestDatasetGenerator {
124124
None,
125125
Default::default(),
126126
false,
127+
false,
127128
)
128129
.await
129130
.unwrap()

0 commit comments

Comments
 (0)