Skip to content

Commit dcbb25b

Browse files
authored
Merge branch 'main' into fix-sync-unenforced-pk-fields-with-metadata
2 parents 08600f5 + 9672573 commit dcbb25b

21 files changed

Lines changed: 213 additions & 38 deletions

File tree

java/lance-jni/src/blocking_dataset.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
413413
storage_options_obj: JObject, // Map<String, String>
414414
initial_bases: JObject,
415415
target_bases: JObject,
416+
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
417+
blob_pack_file_size_threshold: JObject, // Optional<Long>
416418
) -> JObject<'local> {
417419
ok_or_throw!(
418420
env,
@@ -430,6 +432,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
430432
storage_options_obj,
431433
initial_bases,
432434
target_bases,
435+
allow_external_blob_outside_bases,
436+
blob_pack_file_size_threshold,
433437
)
434438
)
435439
}
@@ -449,6 +453,8 @@ fn inner_create_with_ffi_schema<'local>(
449453
storage_options_obj: JObject, // Map<String, String>
450454
initial_bases: JObject,
451455
target_bases: JObject,
456+
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
457+
blob_pack_file_size_threshold: JObject, // Optional<Long>
452458
) -> Result<JObject<'local>> {
453459
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
454460
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
@@ -468,6 +474,8 @@ fn inner_create_with_ffi_schema<'local>(
468474
storage_options_obj,
469475
initial_bases,
470476
target_bases,
477+
allow_external_blob_outside_bases,
478+
blob_pack_file_size_threshold,
471479
reader,
472480
None, // No namespace for schema-only creation
473481
false, // No managed versioning for schema-only creation
@@ -523,6 +531,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
523531
storage_options_obj: JObject, // Map<String, String>
524532
initial_bases: JObject, // Optional<List<BasePath>>
525533
target_bases: JObject, // Optional<List<String>>
534+
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
535+
blob_pack_file_size_threshold: JObject, // Optional<Long>
526536
namespace_obj: JObject, // LanceNamespace (can be null)
527537
table_id_obj: JObject, // List<String> (can be null)
528538
namespace_client_managed_versioning: jboolean, // Whether namespace manages versioning
@@ -543,6 +553,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
543553
storage_options_obj,
544554
initial_bases,
545555
target_bases,
556+
allow_external_blob_outside_bases,
557+
blob_pack_file_size_threshold,
546558
namespace_obj,
547559
table_id_obj,
548560
namespace_client_managed_versioning != 0,
@@ -555,19 +567,21 @@ fn inner_create_with_ffi_stream<'local>(
555567
env: &mut JNIEnv<'local>,
556568
arrow_array_stream_addr: jlong,
557569
path: JString,
558-
max_rows_per_file: JObject, // Optional<Integer>
559-
max_rows_per_group: JObject, // Optional<Integer>
560-
max_bytes_per_file: JObject, // Optional<Long>
561-
mode: JObject, // Optional<String>
562-
enable_stable_row_ids: JObject, // Optional<Boolean>
563-
data_storage_version: JObject, // Optional<String>
564-
enable_v2_manifest_paths: JObject, // Optional<Boolean>
565-
storage_options_obj: JObject, // Map<String, String>
566-
initial_bases: JObject, // Optional<List<BasePath>>
567-
target_bases: JObject, // Optional<List<String>>
568-
namespace_obj: JObject, // LanceNamespace (can be null)
569-
table_id_obj: JObject, // List<String> (can be null)
570-
namespace_client_managed_versioning: bool, // Whether namespace manages versioning
570+
max_rows_per_file: JObject, // Optional<Integer>
571+
max_rows_per_group: JObject, // Optional<Integer>
572+
max_bytes_per_file: JObject, // Optional<Long>
573+
mode: JObject, // Optional<String>
574+
enable_stable_row_ids: JObject, // Optional<Boolean>
575+
data_storage_version: JObject, // Optional<String>
576+
enable_v2_manifest_paths: JObject, // Optional<Boolean>
577+
storage_options_obj: JObject, // Map<String, String>
578+
initial_bases: JObject, // Optional<List<BasePath>>
579+
target_bases: JObject, // Optional<List<String>>
580+
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
581+
blob_pack_file_size_threshold: JObject, // Optional<Long>
582+
namespace_obj: JObject, // LanceNamespace (can be null)
583+
table_id_obj: JObject, // List<String> (can be null)
584+
namespace_client_managed_versioning: bool, // Whether namespace manages versioning
571585
) -> Result<JObject<'local>> {
572586
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
573587
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
@@ -588,6 +602,8 @@ fn inner_create_with_ffi_stream<'local>(
588602
storage_options_obj,
589603
initial_bases,
590604
target_bases,
605+
allow_external_blob_outside_bases,
606+
blob_pack_file_size_threshold,
591607
reader,
592608
namespace_info,
593609
namespace_client_managed_versioning,
@@ -613,6 +629,8 @@ fn create_dataset<'local>(
613629
storage_options_obj: JObject,
614630
initial_bases: JObject,
615631
target_bases: JObject,
632+
allow_external_blob_outside_bases: JObject,
633+
blob_pack_file_size_threshold: JObject,
616634
reader: impl RecordBatchReader + Send + 'static,
617635
namespace_info: Option<(Arc<dyn LanceNamespace>, Vec<String>)>,
618636
namespace_client_managed_versioning: bool,
@@ -631,7 +649,8 @@ fn create_dataset<'local>(
631649
&storage_options_obj,
632650
&initial_bases,
633651
&target_bases,
634-
&JObject::null(), // allow_external_blob_outside_bases not used for Dataset.write()
652+
&allow_external_blob_outside_bases,
653+
&blob_pack_file_size_threshold,
635654
)?;
636655

637656
// Set up namespace commit handler and storage options provider if namespace is provided

java/lance-jni/src/fragment.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
9999
namespace_obj: JObject, // LanceNamespace (can be null)
100100
table_id_obj: JObject, // List<String> (can be null)
101101
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
102+
blob_pack_file_size_threshold: JObject, // Optional<Long>
102103
) -> JObject<'local> {
103104
ok_or_throw_with_return!(
104105
env,
@@ -117,6 +118,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
117118
namespace_obj,
118119
table_id_obj,
119120
allow_external_blob_outside_bases,
121+
blob_pack_file_size_threshold,
120122
),
121123
JObject::default()
122124
)
@@ -138,6 +140,7 @@ fn inner_create_with_ffi_array<'local>(
138140
namespace_obj: JObject, // LanceNamespace (can be null)
139141
table_id_obj: JObject, // List<String> (can be null)
140142
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
143+
blob_pack_file_size_threshold: JObject, // Optional<Long>
141144
) -> Result<JObject<'local>> {
142145
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
143146
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
@@ -165,6 +168,7 @@ fn inner_create_with_ffi_array<'local>(
165168
namespace_obj,
166169
table_id_obj,
167170
allow_external_blob_outside_bases,
171+
blob_pack_file_size_threshold,
168172
reader,
169173
)
170174
}
@@ -185,6 +189,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
185189
namespace_obj: JObject, // LanceNamespace (can be null)
186190
table_id_obj: JObject, // List<String> (can be null)
187191
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
192+
blob_pack_file_size_threshold: JObject, // Optional<Long>
188193
) -> JObject<'a> {
189194
ok_or_throw_with_return!(
190195
env,
@@ -202,6 +207,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
202207
namespace_obj,
203208
table_id_obj,
204209
allow_external_blob_outside_bases,
210+
blob_pack_file_size_threshold,
205211
),
206212
JObject::null()
207213
)
@@ -222,6 +228,7 @@ fn inner_create_with_ffi_stream<'local>(
222228
namespace_obj: JObject, // LanceNamespace (can be null)
223229
table_id_obj: JObject, // List<String> (can be null)
224230
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
231+
blob_pack_file_size_threshold: JObject, // Optional<Long>
225232
) -> Result<JObject<'local>> {
226233
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
227234
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
@@ -239,6 +246,7 @@ fn inner_create_with_ffi_stream<'local>(
239246
namespace_obj,
240247
table_id_obj,
241248
allow_external_blob_outside_bases,
249+
blob_pack_file_size_threshold,
242250
reader,
243251
)
244252
}
@@ -257,6 +265,7 @@ fn create_fragment<'a>(
257265
namespace_obj: JObject, // LanceNamespace (can be null)
258266
table_id_obj: JObject, // List<String> (can be null)
259267
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
268+
blob_pack_file_size_threshold: JObject, // Optional<Long>
260269
source: impl StreamingWriteSource,
261270
) -> Result<JObject<'a>> {
262271
let path_str = dataset_uri.extract(env)?;
@@ -274,6 +283,7 @@ fn create_fragment<'a>(
274283
&JObject::null(), // not used when creating fragments
275284
&JObject::null(), // not used when creating fragments
276285
&allow_external_blob_outside_bases,
286+
&blob_pack_file_size_threshold,
277287
)?;
278288

279289
// Set up storage options provider if namespace is provided

java/lance-jni/src/utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub fn extract_write_params(
5252
initial_bases: &JObject, // Optional<BasePath>
5353
target_bases: &JObject, // Optional<String>
5454
allow_external_blob_outside_bases: &JObject, // Optional<Boolean>
55+
blob_pack_file_size_threshold: &JObject, // Optional<Long>
5556
) -> Result<WriteParams> {
5657
let mut write_params = WriteParams::default();
5758

@@ -101,6 +102,9 @@ pub fn extract_write_params(
101102
if let Some(allow) = env.get_boolean_opt(allow_external_blob_outside_bases)? {
102103
write_params.allow_external_blob_outside_bases = allow;
103104
}
105+
if let Some(max_bytes) = env.get_long_opt(blob_pack_file_size_threshold)? {
106+
write_params.blob_pack_file_size_threshold = Some(max_bytes as usize);
107+
}
104108

105109
// Create storage options accessor from static storage_options
106110
let accessor = if storage_options.is_empty() {

java/src/main/java/org/lance/Dataset.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ public static Dataset create(
151151
params.getEnableV2ManifestPaths(),
152152
params.getStorageOptions(),
153153
params.getInitialBases(),
154-
params.getTargetBases());
154+
params.getTargetBases(),
155+
params.getAllowExternalBlobOutsideBases(),
156+
params.getBlobPackFileSizeThreshold());
155157
dataset.allocator = allocator;
156158
return dataset;
157159
}
@@ -197,7 +199,9 @@ private static native Dataset createWithFfiSchema(
197199
Optional<Boolean> enableV2ManifestPaths,
198200
Map<String, String> storageOptions,
199201
Optional<List<BasePath>> initialBases,
200-
Optional<List<String>> targetBases);
202+
Optional<List<String>> targetBases,
203+
Optional<Boolean> allowExternalBlobOutsideBases,
204+
Optional<Long> blobPackFileSizeThreshold);
201205

202206
/**
203207
* Creates a dataset from an FFI arrow stream.
@@ -232,6 +236,8 @@ private static native Dataset createWithFfiStream(
232236
Map<String, String> storageOptions,
233237
Optional<List<BasePath>> initialBases,
234238
Optional<List<String>> targetBases,
239+
Optional<Boolean> allowExternalBlobOutsideBases,
240+
Optional<Long> blobPackFileSizeThreshold,
235241
LanceNamespace namespaceClient,
236242
List<String> tableId,
237243
boolean namespaceClientManagedVersioning);
@@ -281,6 +287,8 @@ static Dataset create(
281287
params.getStorageOptions(),
282288
params.getInitialBases(),
283289
params.getTargetBases(),
290+
params.getAllowExternalBlobOutsideBases(),
291+
params.getBlobPackFileSizeThreshold(),
284292
namespaceClient,
285293
tableId,
286294
namespaceClientManagedVersioning);

java/src/main/java/org/lance/Fragment.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ static List<FragmentMetadata> create(
280280
params.getStorageOptions(),
281281
namespaceClient,
282282
tableId,
283-
params.getAllowExternalBlobOutsideBases());
283+
params.getAllowExternalBlobOutsideBases(),
284+
params.getBlobPackFileSizeThreshold());
284285
}
285286
}
286287

@@ -306,7 +307,8 @@ static List<FragmentMetadata> create(
306307
params.getStorageOptions(),
307308
namespaceClient,
308309
tableId,
309-
params.getAllowExternalBlobOutsideBases());
310+
params.getAllowExternalBlobOutsideBases(),
311+
params.getBlobPackFileSizeThreshold());
310312
}
311313

312314
/** Create a fragment from the given arrow array and schema. */
@@ -323,7 +325,8 @@ private static native List<FragmentMetadata> createWithFfiArray(
323325
Map<String, String> storageOptions,
324326
LanceNamespace namespaceClient,
325327
List<String> tableId,
326-
Optional<Boolean> allowExternalBlobOutsideBases);
328+
Optional<Boolean> allowExternalBlobOutsideBases,
329+
Optional<Long> blobPackFileSizeThreshold);
327330

328331
/** Create a fragment from the given arrow stream. */
329332
private static native List<FragmentMetadata> createWithFfiStream(
@@ -338,5 +341,6 @@ private static native List<FragmentMetadata> createWithFfiStream(
338341
Map<String, String> storageOptions,
339342
LanceNamespace namespaceClient,
340343
List<String> tableId,
341-
Optional<Boolean> allowExternalBlobOutsideBases);
344+
Optional<Boolean> allowExternalBlobOutsideBases,
345+
Optional<Long> blobPackFileSizeThreshold);
342346
}

java/src/main/java/org/lance/WriteDatasetBuilder.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public class WriteDatasetBuilder {
7878
private Optional<String> dataStorageVersion = Optional.empty();
7979
private Optional<List<BasePath>> initialBases = Optional.empty();
8080
private Optional<List<String>> targetBases = Optional.empty();
81+
private Optional<Boolean> allowExternalBlobOutsideBases = Optional.empty();
82+
private Optional<Long> blobPackFileSizeThreshold = Optional.empty();
8183
private Session session;
8284

8385
/** Creates a new builder instance. Package-private, use Dataset.write() instead. */
@@ -282,6 +284,30 @@ public WriteDatasetBuilder targetBases(List<String> targetBases) {
282284
return this;
283285
}
284286

287+
/**
288+
* Sets whether to allow external blob URIs outside registered base paths.
289+
*
290+
* @param allowExternalBlobOutsideBases Whether to allow external blob URIs outside bases
291+
* @return this builder instance
292+
*/
293+
public WriteDatasetBuilder allowExternalBlobOutsideBases(boolean allowExternalBlobOutsideBases) {
294+
this.allowExternalBlobOutsideBases = Optional.of(allowExternalBlobOutsideBases);
295+
return this;
296+
}
297+
298+
/**
299+
* Sets the maximum size in bytes for blob v2 pack (.blob) sidecar files.
300+
*
301+
* <p>When a pack file reaches this size, a new one is started. If not set, defaults to 1 GiB.
302+
*
303+
* @param blobPackFileSizeThreshold maximum pack file size in bytes
304+
* @return this builder instance
305+
*/
306+
public WriteDatasetBuilder blobPackFileSizeThreshold(long blobPackFileSizeThreshold) {
307+
this.blobPackFileSizeThreshold = Optional.of(blobPackFileSizeThreshold);
308+
return this;
309+
}
310+
285311
/**
286312
* Sets the session to share caches with other datasets.
287313
*
@@ -414,6 +440,8 @@ private Dataset executeWithNamespaceClient() {
414440

415441
initialBases.ifPresent(paramsBuilder::withInitialBases);
416442
targetBases.ifPresent(paramsBuilder::withTargetBases);
443+
allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases);
444+
blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold);
417445

418446
WriteParams params = paramsBuilder.build();
419447

@@ -446,6 +474,8 @@ private Dataset executeWithUri() {
446474
dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion);
447475
initialBases.ifPresent(paramsBuilder::withInitialBases);
448476
targetBases.ifPresent(paramsBuilder::withTargetBases);
477+
allowExternalBlobOutsideBases.ifPresent(paramsBuilder::withAllowExternalBlobOutsideBases);
478+
blobPackFileSizeThreshold.ifPresent(paramsBuilder::withBlobPackFileSizeThreshold);
449479

450480
WriteParams params = paramsBuilder.build();
451481

0 commit comments

Comments
 (0)