Skip to content

Commit f41f0c8

Browse files
wojiaodoubaolijinglun
andauthored
feat(java): add detached flag to commitTransaction (#5626)
We will need this in partitioned namespace. --------- Co-authored-by: lijinglun <lijinglun@bytedance.com>
1 parent 8dbcfd2 commit f41f0c8

8 files changed

Lines changed: 164 additions & 27 deletions

File tree

java/lance-jni/src/blocking_dataset.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -283,10 +283,12 @@ impl BlockingDataset {
283283
&mut self,
284284
transaction: Transaction,
285285
store_params: ObjectStoreParams,
286+
detached: bool,
286287
) -> Result<Self> {
287288
let new_dataset = RT.block_on(
288289
CommitBuilder::new(Arc::new(self.clone().inner))
289290
.with_store_params(store_params)
291+
.with_detached(detached)
290292
.execute(transaction),
291293
)?;
292294
Ok(BlockingDataset { inner: new_dataset })
@@ -322,13 +324,14 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
322324
_obj: JObject,
323325
arrow_schema_addr: jlong,
324326
path: JString,
325-
max_rows_per_file: JObject, // Optional<Integer>
326-
max_rows_per_group: JObject, // Optional<Integer>
327-
max_bytes_per_file: JObject, // Optional<Long>
328-
mode: JObject, // Optional<String>
329-
enable_stable_row_ids: JObject, // Optional<Boolean>
330-
data_storage_version: JObject, // Optional<String>
331-
storage_options_obj: JObject, // Map<String, String>
327+
max_rows_per_file: JObject, // Optional<Integer>
328+
max_rows_per_group: JObject, // Optional<Integer>
329+
max_bytes_per_file: JObject, // Optional<Long>
330+
mode: JObject, // Optional<String>
331+
enable_stable_row_ids: JObject, // Optional<Boolean>
332+
data_storage_version: JObject, // Optional<String>
333+
enable_v2_manifest_paths: JObject, // Optional<Boolean>
334+
storage_options_obj: JObject, // Map<String, String>
332335
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
333336
initial_bases: JObject,
334337
target_bases: JObject,
@@ -345,6 +348,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
345348
mode,
346349
enable_stable_row_ids,
347350
data_storage_version,
351+
enable_v2_manifest_paths,
348352
storage_options_obj,
349353
s3_credentials_refresh_offset_seconds_obj,
350354
initial_bases,
@@ -358,13 +362,14 @@ fn inner_create_with_ffi_schema<'local>(
358362
env: &mut JNIEnv<'local>,
359363
arrow_schema_addr: jlong,
360364
path: JString,
361-
max_rows_per_file: JObject, // Optional<Integer>
362-
max_rows_per_group: JObject, // Optional<Integer>
363-
max_bytes_per_file: JObject, // Optional<Long>
364-
mode: JObject, // Optional<String>
365-
enable_stable_row_ids: JObject, // Optional<Boolean>
366-
data_storage_version: JObject, // Optional<String>
367-
storage_options_obj: JObject, // Map<String, String>
365+
max_rows_per_file: JObject, // Optional<Integer>
366+
max_rows_per_group: JObject, // Optional<Integer>
367+
max_bytes_per_file: JObject, // Optional<Long>
368+
mode: JObject, // Optional<String>
369+
enable_stable_row_ids: JObject, // Optional<Boolean>
370+
data_storage_version: JObject, // Optional<String>
371+
enable_v2_manifest_paths: JObject, // Optional<Boolean>
372+
storage_options_obj: JObject, // Map<String, String>
368373
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
369374
initial_bases: JObject,
370375
target_bases: JObject,
@@ -383,6 +388,7 @@ fn inner_create_with_ffi_schema<'local>(
383388
mode,
384389
enable_stable_row_ids,
385390
data_storage_version,
391+
enable_v2_manifest_paths,
386392
storage_options_obj,
387393
JObject::null(), // No provider for schema-only creation
388394
s3_credentials_refresh_offset_seconds_obj,
@@ -412,13 +418,14 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
412418
_obj: JObject,
413419
arrow_array_stream_addr: jlong,
414420
path: JString,
415-
max_rows_per_file: JObject, // Optional<Integer>
416-
max_rows_per_group: JObject, // Optional<Integer>
417-
max_bytes_per_file: JObject, // Optional<Long>
418-
mode: JObject, // Optional<String>
419-
enable_stable_row_ids: JObject, // Optional<Boolean>
420-
data_storage_version: JObject, // Optional<String>
421-
storage_options_obj: JObject, // Map<String, String>
421+
max_rows_per_file: JObject, // Optional<Integer>
422+
max_rows_per_group: JObject, // Optional<Integer>
423+
max_bytes_per_file: JObject, // Optional<Long>
424+
mode: JObject, // Optional<String>
425+
enable_stable_row_ids: JObject, // Optional<Boolean>
426+
data_storage_version: JObject, // Optional<String>
427+
enable_v2_manifest_paths: JObject, // Optional<Boolean>
428+
storage_options_obj: JObject, // Map<String, String>
422429
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
423430
initial_bases: JObject,
424431
target_bases: JObject,
@@ -434,6 +441,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
434441
max_bytes_per_file,
435442
mode,
436443
enable_stable_row_ids,
444+
enable_v2_manifest_paths,
437445
data_storage_version,
438446
storage_options_obj,
439447
JObject::null(),
@@ -456,6 +464,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo
456464
mode: JObject, // Optional<String>
457465
enable_stable_row_ids: JObject, // Optional<Boolean>
458466
data_storage_version: JObject, // Optional<String>
467+
enable_v2_manifest_paths: JObject, // Optional<Boolean>
459468
storage_options_obj: JObject, // Map<String, String>
460469
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
461470
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
@@ -474,6 +483,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo
474483
mode,
475484
enable_stable_row_ids,
476485
data_storage_version,
486+
enable_v2_manifest_paths,
477487
storage_options_obj,
478488
storage_options_provider_obj,
479489
s3_credentials_refresh_offset_seconds_obj,
@@ -494,6 +504,7 @@ fn inner_create_with_ffi_stream<'local>(
494504
mode: JObject, // Optional<String>
495505
enable_stable_row_ids: JObject, // Optional<Boolean>
496506
data_storage_version: JObject, // Optional<String>
507+
enable_v2_manifest_paths: JObject, // Optional<Boolean>
497508
storage_options_obj: JObject, // Map<String, String>
498509
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
499510
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
@@ -511,6 +522,7 @@ fn inner_create_with_ffi_stream<'local>(
511522
mode,
512523
enable_stable_row_ids,
513524
data_storage_version,
525+
enable_v2_manifest_paths,
514526
storage_options_obj,
515527
storage_options_provider_obj,
516528
s3_credentials_refresh_offset_seconds_obj,
@@ -530,6 +542,7 @@ fn create_dataset<'local>(
530542
mode: JObject,
531543
enable_stable_row_ids: JObject,
532544
data_storage_version: JObject,
545+
enable_v2_manifest_paths: JObject,
533546
storage_options_obj: JObject,
534547
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
535548
s3_credentials_refresh_offset_seconds_obj: JObject,
@@ -547,6 +560,7 @@ fn create_dataset<'local>(
547560
&mode,
548561
&enable_stable_row_ids,
549562
&data_storage_version,
563+
Some(&enable_v2_manifest_paths),
550564
&storage_options_obj,
551565
&storage_options_provider_obj,
552566
&s3_credentials_refresh_offset_seconds_obj,

java/lance-jni/src/fragment.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ fn create_fragment<'a>(
254254
&mode,
255255
&enable_stable_row_ids,
256256
&data_storage_version,
257+
None,
257258
&storage_options_obj,
258259
&storage_options_provider_obj,
259260
&s3_credentials_refresh_offset_seconds_obj,

java/lance-jni/src/transaction.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use arrow::datatypes::Schema;
1111
use arrow_schema::ffi::FFI_ArrowSchema;
1212
use chrono::DateTime;
1313
use jni::objects::{JByteArray, JLongArray, JMap, JObject, JString, JValue, JValueGen};
14-
use jni::sys::jbyte;
14+
use jni::sys::{jboolean, jbyte};
1515
use jni::JNIEnv;
1616
use lance::dataset::transaction::{
1717
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder,
@@ -728,17 +728,24 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>(
728728
mut env: JNIEnv<'local>,
729729
java_dataset: JObject,
730730
java_transaction: JObject,
731+
detached_jbool: jboolean,
731732
) -> JObject<'local> {
732733
ok_or_throw!(
733734
env,
734-
inner_commit_transaction(&mut env, java_dataset, java_transaction)
735+
inner_commit_transaction(
736+
&mut env,
737+
java_dataset,
738+
java_transaction,
739+
detached_jbool != 0,
740+
)
735741
)
736742
}
737743

738744
fn inner_commit_transaction<'local>(
739745
env: &mut JNIEnv<'local>,
740746
java_dataset: JObject,
741747
java_transaction: JObject,
748+
detached: bool,
742749
) -> Result<JObject<'local>> {
743750
let write_param_jobj = env
744751
.call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])?
@@ -772,7 +779,7 @@ fn inner_commit_transaction<'local>(
772779
let new_blocking_ds = {
773780
let mut dataset_guard =
774781
unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?;
775-
dataset_guard.commit_transaction(transaction, store_params)?
782+
dataset_guard.commit_transaction(transaction, store_params, detached)?
776783
};
777784
new_blocking_ds.into_java(env)
778785
}

java/lance-jni/src/utils.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub fn extract_write_params(
4747
mode: &JObject,
4848
enable_stable_row_ids: &JObject,
4949
data_storage_version: &JObject,
50+
enable_v2_manifest_paths: Option<&JObject>,
5051
storage_options_obj: &JObject,
5152
storage_options_provider_obj: &JObject, // Optional<StorageOptionsProvider>
5253
s3_credentials_refresh_offset_seconds_obj: &JObject, // Optional<Long>
@@ -75,6 +76,16 @@ pub fn extract_write_params(
7576
data_storage_version_val.as_str(),
7677
)?);
7778
}
79+
80+
// Enable v2 manifest paths by default.
81+
write_params.enable_v2_manifest_paths =
82+
if let Some(enable_v2_manifest_paths) = enable_v2_manifest_paths {
83+
env.get_boolean_opt(enable_v2_manifest_paths)?
84+
.unwrap_or(true)
85+
} else {
86+
true
87+
};
88+
7889
let storage_options: HashMap<String, String> =
7990
extract_storage_options(env, storage_options_obj)?;
8091

java/src/main/java/org/lance/Dataset.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public static Dataset create(
141141
params.getMode(),
142142
params.getEnableStableRowIds(),
143143
params.getDataStorageVersion(),
144+
params.getEnableV2ManifestPaths(),
144145
params.getStorageOptions(),
145146
params.getS3CredentialsRefreshOffsetSeconds(),
146147
params.getInitialBases(),
@@ -201,6 +202,7 @@ static Dataset create(
201202
params.getMode(),
202203
params.getEnableStableRowIds(),
203204
params.getDataStorageVersion(),
205+
params.getEnableV2ManifestPaths(),
204206
params.getStorageOptions(),
205207
Optional.ofNullable(storageOptionsProvider),
206208
params.getS3CredentialsRefreshOffsetSeconds(),
@@ -219,6 +221,7 @@ private static native Dataset createWithFfiSchema(
219221
Optional<String> mode,
220222
Optional<Boolean> enableStableRowIds,
221223
Optional<String> dataStorageVersion,
224+
Optional<Boolean> enableV2ManifestPaths,
222225
Map<String, String> storageOptions,
223226
Optional<Long> s3CredentialsRefreshOffsetSeconds,
224227
Optional<List<BasePath>> initialBases,
@@ -233,6 +236,7 @@ private static native Dataset createWithFfiStream(
233236
Optional<String> mode,
234237
Optional<Boolean> enableStableRowIds,
235238
Optional<String> dataStorageVersion,
239+
Optional<Boolean> enableV2ManifestPaths,
236240
Map<String, String> storageOptions,
237241
Optional<Long> s3CredentialsRefreshOffsetSeconds,
238242
Optional<List<BasePath>> initialBases,
@@ -247,6 +251,7 @@ private static native Dataset createWithFfiStreamAndProvider(
247251
Optional<String> mode,
248252
Optional<Boolean> enableStableRowIds,
249253
Optional<String> dataStorageVersion,
254+
Optional<Boolean> enableV2ManifestPaths,
250255
Map<String, String> storageOptions,
251256
Optional<StorageOptionsProvider> storageOptionsProvider,
252257
Optional<Long> s3CredentialsRefreshOffsetSeconds,
@@ -450,9 +455,21 @@ public Transaction.Builder newTransactionBuilder() {
450455
* @return A new instance of {@link Dataset} linked to committed version.
451456
*/
452457
public Dataset commitTransaction(Transaction transaction) {
458+
return commitTransaction(transaction, false);
459+
}
460+
461+
/**
462+
* Commit a single transaction and return a new Dataset with the new version. Original dataset
463+
* version will not be refreshed.
464+
*
465+
* @param transaction The transaction to commit
466+
* @param detached If true, the commit will not be part of the main dataset lineage.
467+
* @return A new instance of {@link Dataset} linked to committed version.
468+
*/
469+
public Dataset commitTransaction(Transaction transaction, boolean detached) {
453470
Preconditions.checkNotNull(transaction);
454471
try {
455-
Dataset dataset = nativeCommitTransaction(transaction);
472+
Dataset dataset = nativeCommitTransaction(transaction, detached);
456473
if (selfManagedAllocator) {
457474
dataset.allocator = new RootAllocator(Long.MAX_VALUE);
458475
} else {
@@ -464,7 +481,7 @@ public Dataset commitTransaction(Transaction transaction) {
464481
}
465482
}
466483

467-
private native Dataset nativeCommitTransaction(Transaction transaction);
484+
private native Dataset nativeCommitTransaction(Transaction transaction, boolean detached);
468485

469486
/**
470487
* Drop a Dataset.

java/src/main/java/org/lance/WriteParams.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public String getVersionString() {
5656
private final Optional<WriteMode> mode;
5757
private final Optional<Boolean> enableStableRowIds;
5858
private final Optional<LanceFileVersion> dataStorageVersion;
59+
private final Optional<Boolean> enableV2ManifestPaths;
5960
private Map<String, String> storageOptions = new HashMap<>();
6061
private final Optional<Long> s3CredentialsRefreshOffsetSeconds;
6162
private final Optional<List<BasePath>> initialBases;
@@ -68,6 +69,7 @@ private WriteParams(
6869
Optional<WriteMode> mode,
6970
Optional<Boolean> enableStableRowIds,
7071
Optional<LanceFileVersion> dataStorageVersion,
72+
Optional<Boolean> enableV2ManifestPaths,
7173
Map<String, String> storageOptions,
7274
Optional<Long> s3CredentialsRefreshOffsetSeconds,
7375
Optional<List<BasePath>> initialBases,
@@ -78,6 +80,7 @@ private WriteParams(
7880
this.mode = mode;
7981
this.enableStableRowIds = enableStableRowIds;
8082
this.dataStorageVersion = dataStorageVersion;
83+
this.enableV2ManifestPaths = enableV2ManifestPaths;
8184
this.storageOptions = storageOptions;
8285
this.s3CredentialsRefreshOffsetSeconds = s3CredentialsRefreshOffsetSeconds;
8386
this.initialBases = initialBases;
@@ -113,6 +116,10 @@ public Optional<String> getDataStorageVersion() {
113116
return dataStorageVersion.map(LanceFileVersion::getVersionString);
114117
}
115118

119+
public Optional<Boolean> getEnableV2ManifestPaths() {
120+
return enableV2ManifestPaths;
121+
}
122+
116123
public Map<String, String> getStorageOptions() {
117124
return storageOptions;
118125
}
@@ -148,6 +155,7 @@ public static class Builder {
148155
private Optional<WriteMode> mode = Optional.empty();
149156
private Optional<Boolean> enableStableRowIds = Optional.empty();
150157
private Optional<LanceFileVersion> dataStorageVersion = Optional.empty();
158+
private Optional<Boolean> enableV2ManifestPaths;
151159
private Map<String, String> storageOptions = new HashMap<>();
152160
private Optional<Long> s3CredentialsRefreshOffsetSeconds = Optional.empty();
153161
private Optional<List<BasePath>> initialBases = Optional.empty();
@@ -188,6 +196,11 @@ public Builder withEnableStableRowIds(boolean enableStableRowIds) {
188196
return this;
189197
}
190198

199+
public Builder withEnableV2ManifestPaths(boolean enableV2ManifestPaths) {
200+
this.enableV2ManifestPaths = Optional.of(enableV2ManifestPaths);
201+
return this;
202+
}
203+
191204
public Builder withS3CredentialsRefreshOffsetSeconds(long s3CredentialsRefreshOffsetSeconds) {
192205
this.s3CredentialsRefreshOffsetSeconds = Optional.of(s3CredentialsRefreshOffsetSeconds);
193206
return this;
@@ -211,6 +224,7 @@ public WriteParams build() {
211224
mode,
212225
enableStableRowIds,
213226
dataStorageVersion,
227+
enableV2ManifestPaths,
214228
storageOptions,
215229
s3CredentialsRefreshOffsetSeconds,
216230
initialBases,

0 commit comments

Comments
 (0)