Skip to content

Commit d0910de

Browse files
authored
feat: add storage options for LanceFileWriter (lance-format#3900)
Enhance java `LanceFileWriter`, enable it to access object store(like s3).
1 parent 0181992 commit d0910de

4 files changed

Lines changed: 65 additions & 33 deletions

File tree

java/core/lance-jni/src/blocking_dataset.rs

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use crate::error::{Error, Result};
1616
use crate::ffi::JNIEnvExt;
1717
use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, FromJString};
18-
use crate::utils::{extract_storage_options, extract_write_params, get_index_params};
18+
use crate::utils::{extract_storage_options, extract_write_params, get_index_params, to_rust_map};
1919
use crate::{traits::IntoJava, RT};
2020
use arrow::array::RecordBatchReader;
2121
use arrow::datatypes::Schema;
@@ -439,19 +439,7 @@ pub fn inner_commit_overwrite<'local>(
439439
let path_str = path.extract(env)?;
440440
let read_version = env.get_u64_opt(&read_version_obj)?;
441441
let jmap = JMap::from_env(env, &storage_options_obj)?;
442-
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
443-
let mut map = HashMap::new();
444-
let mut iter = jmap.iter(env)?;
445-
while let Some((key, value)) = iter.next(env)? {
446-
let key_jstring = JString::from(key);
447-
let value_jstring = JString::from(value);
448-
let key_string: String = env.get_string(&key_jstring)?.into();
449-
let value_string: String = env.get_string(&value_jstring)?.into();
450-
map.insert(key_string, value_string);
451-
}
452-
Ok::<_, Error>(map)
453-
})?;
454-
442+
let storage_options = to_rust_map(env, &jmap)?;
455443
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?;
456444
dataset.into_java(env)
457445
}
@@ -556,20 +544,7 @@ fn inner_open_native<'local>(
556544
let version = env.get_int_opt(&version_obj)?;
557545
let block_size = env.get_int_opt(&block_size_obj)?;
558546
let jmap = JMap::from_env(env, &storage_options_obj)?;
559-
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
560-
let mut map = HashMap::new();
561-
let mut iter = jmap.iter(env)?;
562-
563-
while let Some((key, value)) = iter.next(env)? {
564-
let key_jstring = JString::from(key);
565-
let value_jstring = JString::from(value);
566-
let key_string: String = env.get_string(&key_jstring)?.into();
567-
let value_string: String = env.get_string(&value_jstring)?.into();
568-
map.insert(key_string, value_string);
569-
}
570-
571-
Ok::<_, Error>(map)
572-
})?;
547+
let storage_options = to_rust_map(env, &jmap)?;
573548
let dataset = BlockingDataset::open(
574549
&path_str,
575550
version,

java/core/lance-jni/src/file_writer.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::{Arc, Mutex};
22

3+
use crate::utils::to_rust_map;
34
use crate::{
45
error::{Error, Result},
56
traits::IntoJava,
@@ -10,6 +11,7 @@ use arrow::{
1011
ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema},
1112
};
1213
use arrow_schema::DataType;
14+
use jni::objects::JMap;
1315
use jni::{
1416
objects::{JObject, JString},
1517
sys::jlong,
@@ -20,6 +22,7 @@ use lance_file::{
2022
v2::writer::{FileWriter, FileWriterOptions},
2123
version::LanceFileVersion,
2224
};
25+
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry};
2326

2427
pub const NATIVE_WRITER: &str = "nativeFileWriterHandle";
2528

@@ -61,15 +64,31 @@ pub extern "system" fn Java_com_lancedb_lance_file_LanceFileWriter_openNative<'l
6164
mut env: JNIEnv<'local>,
6265
_writer_class: JObject,
6366
file_uri: JString,
67+
storage_options_obj: JObject, // Map<String, String>
6468
) -> JObject<'local> {
65-
ok_or_throw!(env, inner_open(&mut env, file_uri,))
69+
ok_or_throw!(env, inner_open(&mut env, file_uri, storage_options_obj))
6670
}
6771

68-
fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result<JObject<'local>> {
72+
fn inner_open<'local>(
73+
env: &mut JNIEnv<'local>,
74+
file_uri: JString,
75+
storage_options_obj: JObject,
76+
) -> Result<JObject<'local>> {
6977
let file_uri_str: String = env.get_string(&file_uri)?.into();
78+
let jmap = JMap::from_env(env, &storage_options_obj)?;
79+
let storage_options = to_rust_map(env, &jmap)?;
7080

7181
let writer = RT.block_on(async move {
72-
let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?;
82+
let object_params = ObjectStoreParams {
83+
storage_options: Some(storage_options),
84+
..Default::default()
85+
};
86+
let (obj_store, path) = ObjectStore::from_uri_and_params(
87+
Arc::new(ObjectStoreRegistry::default()),
88+
&file_uri_str,
89+
&object_params,
90+
)
91+
.await?;
7392
let obj_store = Arc::new(obj_store);
7493
let obj_writer = obj_store.create(&path).await?;
7594

java/core/lance-jni/src/utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,3 +287,20 @@ pub fn get_index_params(
287287
)),
288288
}
289289
}
290+
291+
pub fn to_rust_map(env: &mut JNIEnv, jmap: &JMap) -> Result<HashMap<String, String>> {
292+
env.with_local_frame(16, |env| {
293+
let mut map = HashMap::new();
294+
let mut iter = jmap.iter(env)?;
295+
296+
while let Some((key, value)) = iter.next(env)? {
297+
let key_jstring = JString::from(key);
298+
let value_jstring = JString::from(value);
299+
let key_string: String = env.get_string(&key_jstring)?.into();
300+
let value_string: String = env.get_string(&value_jstring)?.into();
301+
map.insert(key_string, value_string);
302+
}
303+
304+
Ok::<_, Error>(map)
305+
})
306+
}

java/core/src/main/java/com/lancedb/lance/file/LanceFileWriter.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.arrow.vector.dictionary.DictionaryProvider;
2424

2525
import java.io.IOException;
26+
import java.util.Collections;
27+
import java.util.Map;
2628

2729
public class LanceFileWriter implements AutoCloseable {
2830

@@ -34,7 +36,8 @@ public class LanceFileWriter implements AutoCloseable {
3436
private BufferAllocator allocator;
3537
private DictionaryProvider dictionaryProvider;
3638

37-
private static native LanceFileWriter openNative(String fileUri) throws IOException;
39+
private static native LanceFileWriter openNative(
40+
String fileUri, Map<String, String> storageOptions) throws IOException;
3841

3942
private native void closeNative(long nativeLanceFileReaderHandle) throws IOException;
4043

@@ -54,7 +57,25 @@ private LanceFileWriter() {}
5457
public static LanceFileWriter open(
5558
String path, BufferAllocator allocator, DictionaryProvider dictionaryProvider)
5659
throws IOException {
57-
LanceFileWriter writer = openNative(path);
60+
return open(path, allocator, dictionaryProvider, Collections.emptyMap());
61+
}
62+
63+
/**
64+
* Open a LanceFileWriter to write to a given file URI
65+
*
66+
* @param path the URI of the file to write to
67+
* @param allocator the BufferAllocator to use for the writer
68+
* @param dictionaryProvider the DictionaryProvider to use for the writer
69+
* @param storageOptions additional storage options for the writer
70+
* @return a new LanceFileWriter
71+
*/
72+
public static LanceFileWriter open(
73+
String path,
74+
BufferAllocator allocator,
75+
DictionaryProvider dictionaryProvider,
76+
Map<String, String> storageOptions)
77+
throws IOException {
78+
LanceFileWriter writer = openNative(path, storageOptions);
5879
writer.allocator = allocator;
5980
writer.dictionaryProvider = dictionaryProvider;
6081
return writer;

0 commit comments

Comments
 (0)