Skip to content

Commit 0907587

Browse files
committed
Use async_fs instead of LocalFileSystem in java bindings for local files
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent c663fa9 commit 0907587

5 files changed

Lines changed: 74 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ private VortexWriter(long pointer) {
4242
}
4343

4444
/**
45-
* Create a writer that streams records into the file at {@code uri}. The Arrow schema describes the exact layout of
46-
* every batch written.
45+
* Create a writer that streams records into the file at {@code uri}. The path may be a full URI or a plain local
46+
* filesystem path. The Arrow schema describes the exact layout of every batch written.
4747
*/
4848
public static VortexWriter create(
4949
Session session, String uri, Schema arrowSchema, Map<String, String> options, BufferAllocator allocator)

java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,22 @@ public void testCreateWriter() throws IOException {
6868
assertTrue(Files.exists(outputPath), "output file should exist");
6969
}
7070

71+
@Test
72+
public void testCreateWriterPlainLocalPath() throws IOException {
73+
Path outputPath = tempDir.resolve("test_create_plain_path.vortex");
74+
String writePath = outputPath.toAbsolutePath().toString();
75+
76+
BufferAllocator allocator = ArrowAllocation.rootAllocator();
77+
Map<String, String> options = new HashMap<>();
78+
79+
Session session = Session.create();
80+
try (VortexWriter writer = VortexWriter.create(session, writePath, personSchema(), options, allocator)) {
81+
assertNotNull(writer);
82+
}
83+
84+
assertTrue(Files.exists(outputPath), "output file should exist");
85+
}
86+
7187
@Test
7288
public void testWriteBatch() throws IOException {
7389
Path outputPath = tempDir.resolve("test_ffi.vortex");

vortex-jni/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ categories = { workspace = true }
1919
[dependencies]
2020
arrow-array = { workspace = true, features = ["ffi"] }
2121
arrow-schema = { workspace = true }
22+
async-fs = { workspace = true }
2223
futures = { workspace = true }
2324
jni = { workspace = true }
2425
object_store = { workspace = true, features = ["aws", "azure", "gcp"] }

vortex-jni/src/writer.rs

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
//! Writes go through an in-flight queue of at most [`WRITE_CHANNEL_CAPACITY`] pending
77
//! batches on the same thread that drives the current-thread runtime.
88
9+
use std::path::PathBuf;
910
use std::sync::Arc;
1011

1112
use arrow_array::RecordBatch;
1213
use arrow_array::StructArray;
1314
use arrow_array::ffi::FFI_ArrowArray;
1415
use arrow_array::ffi::FFI_ArrowSchema;
16+
use async_fs::File;
1517
use futures::SinkExt;
1618
use futures::channel::mpsc;
1719
use jni::EnvUnowned;
@@ -22,7 +24,8 @@ use jni::sys::JNI_FALSE;
2224
use jni::sys::JNI_TRUE;
2325
use jni::sys::jboolean;
2426
use jni::sys::jlong;
25-
use object_store::path::Path;
27+
use object_store::ObjectStore;
28+
use object_store::path::Path as ObjectStorePath;
2629
use url::Url;
2730
use vortex::array::ArrayRef;
2831
use vortex::array::arrow::FromArrowArray;
@@ -52,6 +55,40 @@ use crate::session::session_ref;
5255
/// the writer is felt on the Java thread producing batches.
5356
const WRITE_CHANNEL_CAPACITY: usize = 4;
5457

58+
enum WriteTarget {
59+
Local(PathBuf),
60+
ObjectStore {
61+
store: Arc<dyn ObjectStore>,
62+
path: ObjectStorePath,
63+
},
64+
}
65+
66+
fn resolve_write_target(
67+
uri: &str,
68+
properties: &HashMap<String, String>,
69+
) -> VortexResult<WriteTarget> {
70+
match Url::parse(uri) {
71+
Ok(url) if url.scheme() == "file" => {
72+
let path = url
73+
.to_file_path()
74+
.map_err(|_| vortex_err!("invalid file URL: {uri}"))?;
75+
return Ok(WriteTarget::Local(path));
76+
}
77+
Ok(url) if uri.contains("://") => {
78+
let path = ObjectStorePath::from_url_path(url.path())
79+
.map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?;
80+
let store = make_object_store(&url, properties)?;
81+
return Ok(WriteTarget::ObjectStore { store, path });
82+
}
83+
Err(err) if uri.contains("://") => {
84+
return Err(vortex_err!("invalid URL {uri}: {err}"));
85+
}
86+
_ => {}
87+
}
88+
89+
Ok(WriteTarget::Local(PathBuf::from(uri)))
90+
}
91+
5592
/// Native writer holding a write-task handle and a sender that Java pushes batches into.
5693
pub struct NativeWriter {
5794
handle: Option<Task<VortexResult<WriteSummary>>>,
@@ -136,21 +173,27 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriter_create(
136173
let write_schema = import_dtype_from_arrow(arrow_schema_addr)?;
137174

138175
let file_path: String = uri.try_to_string(env)?;
139-
let url = Url::parse(&file_path)
140-
.map_err(|e| JNIError::Vortex(vortex_err!("invalid URL {file_path}: {e}")))?;
141176
let properties: HashMap<String, String> = extract_properties(env, &options)?;
142-
let path = Path::from_url_path(url.path())
143-
.map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?;
144-
145-
let store = make_object_store(&url, &properties)?;
177+
let target = resolve_write_target(&file_path, &properties)?;
146178
let (tx, rx) = mpsc::channel(WRITE_CHANNEL_CAPACITY);
147179
let stream = ArrayStreamAdapter::new(write_schema.clone(), rx);
148180

149181
let handle = session.handle().spawn(async move {
150-
let mut write = ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?;
151-
let summary = session.write_options().write(&mut write, stream).await?;
152-
write.shutdown().await?;
153-
Ok(summary)
182+
match target {
183+
WriteTarget::Local(path) => {
184+
let mut file = File::create(path).await?;
185+
let summary = session.write_options().write(&mut file, stream).await?;
186+
file.shutdown().await?;
187+
Ok(summary)
188+
}
189+
WriteTarget::ObjectStore { store, path } => {
190+
let mut write =
191+
ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?;
192+
let summary = session.write_options().write(&mut write, stream).await?;
193+
write.shutdown().await?;
194+
Ok(summary)
195+
}
196+
}
154197
});
155198

156199
Ok(Box::new(NativeWriter::new(write_schema, handle, tx)).into_raw())

0 commit comments

Comments
 (0)