Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ fn create_dataset<'local>(
&storage_options_obj,
&initial_bases,
&target_bases,
&JObject::null(), // allow_external_blob_outside_bases not used for Dataset.write()
)?;

// Set up namespace commit handler and storage options provider if namespace is provided
Expand Down
100 changes: 55 additions & 45 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
) -> JObject<'local> {
ok_or_throw_with_return!(
env,
Expand All @@ -115,6 +116,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiArray<'local>(
storage_options_obj,
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
),
JObject::default()
)
Expand All @@ -126,15 +128,16 @@ fn inner_create_with_ffi_array<'local>(
dataset_uri: JString,
arrow_array_addr: jlong,
arrow_schema_addr: jlong,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
) -> Result<JObject<'local>> {
let c_array_ptr = arrow_array_addr as *mut FFI_ArrowArray;
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
Expand All @@ -161,6 +164,7 @@ fn inner_create_with_ffi_array<'local>(
storage_options_obj,
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
reader,
)
}
Expand All @@ -171,15 +175,16 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
_obj: JObject,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
) -> JObject<'a> {
ok_or_throw_with_return!(
env,
Expand All @@ -196,6 +201,7 @@ pub extern "system" fn Java_org_lance_Fragment_createWithFfiStream<'a>(
storage_options_obj,
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
),
JObject::null()
)
Expand All @@ -206,15 +212,16 @@ fn inner_create_with_ffi_stream<'local>(
env: &mut JNIEnv<'local>,
dataset_uri: JString,
arrow_array_stream_addr: jlong,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -231,6 +238,7 @@ fn inner_create_with_ffi_stream<'local>(
storage_options_obj,
namespace_obj,
table_id_obj,
allow_external_blob_outside_bases,
reader,
)
}
Expand All @@ -239,15 +247,16 @@ fn inner_create_with_ffi_stream<'local>(
fn create_fragment<'a>(
env: &mut JNIEnv<'a>,
dataset_uri: JString,
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
max_rows_per_file: JObject, // Optional<Integer>
max_rows_per_group: JObject, // Optional<Integer>
max_bytes_per_file: JObject, // Optional<Long>
mode: JObject, // Optional<String>
enable_stable_row_ids: JObject, // Optional<Boolean>
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
namespace_obj: JObject, // LanceNamespace (can be null)
table_id_obj: JObject, // List<String> (can be null)
allow_external_blob_outside_bases: JObject, // Optional<Boolean>
source: impl StreamingWriteSource,
) -> Result<JObject<'a>> {
let path_str = dataset_uri.extract(env)?;
Expand All @@ -264,6 +273,7 @@ fn create_fragment<'a>(
&storage_options_obj,
&JObject::null(), // not used when creating fragments
&JObject::null(), // not used when creating fragments
&allow_external_blob_outside_bases,
)?;

// Set up storage options provider if namespace is provided
Expand Down
9 changes: 7 additions & 2 deletions java/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ pub fn extract_write_params(
data_storage_version: &JObject,
enable_v2_manifest_paths: Option<&JObject>,
storage_options_obj: &JObject,
initial_bases: &JObject, // Optional<BasePath>
target_bases: &JObject, // Optional<String>
initial_bases: &JObject, // Optional<BasePath>
target_bases: &JObject, // Optional<String>
allow_external_blob_outside_bases: &JObject, // Optional<Boolean>
) -> Result<WriteParams> {
let mut write_params = WriteParams::default();

Expand Down Expand Up @@ -97,6 +98,10 @@ pub fn extract_write_params(
write_params.target_base_names_or_paths = Some(names);
}

if let Some(allow) = env.get_boolean_opt(allow_external_blob_outside_bases)? {
write_params.allow_external_blob_outside_bases = allow;
}

// Create storage options accessor from static storage_options
let accessor = if storage_options.is_empty() {
None
Expand Down
12 changes: 8 additions & 4 deletions java/src/main/java/org/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ static List<FragmentMetadata> create(
params.getDataStorageVersion(),
params.getStorageOptions(),
namespaceClient,
tableId);
tableId,
params.getAllowExternalBlobOutsideBases());
}
}

Expand All @@ -304,7 +305,8 @@ static List<FragmentMetadata> create(
params.getDataStorageVersion(),
params.getStorageOptions(),
namespaceClient,
tableId);
tableId,
params.getAllowExternalBlobOutsideBases());
}

/** Create a fragment from the given arrow array and schema. */
Expand All @@ -320,7 +322,8 @@ private static native List<FragmentMetadata> createWithFfiArray(
Optional<String> dataStorageVersion,
Map<String, String> storageOptions,
LanceNamespace namespaceClient,
List<String> tableId);
List<String> tableId,
Optional<Boolean> allowExternalBlobOutsideBases);

/** Create a fragment from the given arrow stream. */
private static native List<FragmentMetadata> createWithFfiStream(
Expand All @@ -334,5 +337,6 @@ private static native List<FragmentMetadata> createWithFfiStream(
Optional<String> dataStorageVersion,
Map<String, String> storageOptions,
LanceNamespace namespaceClient,
List<String> tableId);
List<String> tableId,
Optional<Boolean> allowExternalBlobOutsideBases);
}
36 changes: 34 additions & 2 deletions java/src/main/java/org/lance/WriteParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum WriteMode {
private Map<String, String> storageOptions = new HashMap<>();
private final Optional<List<BasePath>> initialBases;
private final Optional<List<String>> targetBases;
private final Optional<Boolean> allowExternalBlobOutsideBases;

private WriteParams(
Optional<Integer> maxRowsPerFile,
Expand All @@ -51,7 +52,8 @@ private WriteParams(
Optional<Boolean> enableV2ManifestPaths,
Map<String, String> storageOptions,
Optional<List<BasePath>> initialBases,
Optional<List<String>> targetBases) {
Optional<List<String>> targetBases,
Optional<Boolean> allowExternalBlobOutsideBases) {
this.maxRowsPerFile = maxRowsPerFile;
this.maxRowsPerGroup = maxRowsPerGroup;
this.maxBytesPerFile = maxBytesPerFile;
Expand All @@ -62,6 +64,7 @@ private WriteParams(
this.storageOptions = storageOptions;
this.initialBases = initialBases;
this.targetBases = targetBases;
this.allowExternalBlobOutsideBases = allowExternalBlobOutsideBases;
}

public Optional<Integer> getMaxRowsPerFile() {
Expand Down Expand Up @@ -109,6 +112,18 @@ public Optional<List<String>> getTargetBases() {
return targetBases;
}

/**
* Get whether external blob URIs outside registered bases are allowed.
*
* <p>When true, blob v2 columns can reference external URIs that are not under any registered
* base path. The URI is stored as an absolute external reference with base_id=0.
*
* @return Optional containing the setting, or empty if not set
*/
public Optional<Boolean> getAllowExternalBlobOutsideBases() {
return allowExternalBlobOutsideBases;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -132,6 +147,7 @@ public static class Builder {
private Map<String, String> storageOptions = new HashMap<>();
private Optional<List<BasePath>> initialBases = Optional.empty();
private Optional<List<String>> targetBases = Optional.empty();
private Optional<Boolean> allowExternalBlobOutsideBases = Optional.empty();

public Builder withMaxRowsPerFile(int maxRowsPerFile) {
this.maxRowsPerFile = Optional.of(maxRowsPerFile);
Expand Down Expand Up @@ -183,6 +199,21 @@ public Builder withTargetBases(List<String> targetBases) {
return this;
}

/**
* Allow external blob URIs outside registered bases.
*
* <p>When true, blob v2 columns can reference external URIs (e.g. pointing to blob files in
* another Lance dataset) that are not under any registered base path. The URI is stored as an
* absolute external reference with base_id=0.
*
* @param allow true to allow external blob URIs outside bases
* @return this builder
*/
public Builder withAllowExternalBlobOutsideBases(boolean allow) {
this.allowExternalBlobOutsideBases = Optional.of(allow);
return this;
}

public WriteParams build() {
return new WriteParams(
maxRowsPerFile,
Expand All @@ -194,7 +225,8 @@ public WriteParams build() {
enableV2ManifestPaths,
storageOptions,
initialBases,
targetBases);
targetBases,
allowExternalBlobOutsideBases);
}
}
}
Loading