From 214dfb1b6acf552ef017b2cc2dda824778f48dfb Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 12 May 2026 13:59:24 +0100 Subject: [PATCH] Use async_fs instead of LocalFileSystem for local disk writes Signed-off-by: Robert Kruszewski --- Cargo.lock | 1 + .../java/dev/vortex/api/VortexWriter.java | 4 +- .../java/dev/vortex/jni/JNIWriterTest.java | 32 ++++++++++ vortex-jni/Cargo.toml | 1 + vortex-jni/src/writer.rs | 60 +++++++++++++++---- vortex-python/src/object_store/resolve.rs | 14 +++++ 6 files changed, 99 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 467d9347e25..c4815925cc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10876,6 +10876,7 @@ version = "0.1.0" dependencies = [ "arrow-array 58.2.0", "arrow-schema 58.2.0", + "async-fs", "futures", "jni", "object_store 0.13.2", diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java b/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java index 2f452526348..7761be59cdc 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java @@ -42,8 +42,8 @@ private VortexWriter(long pointer) { } /** - * Create a writer that streams records into the file at {@code uri}. The Arrow schema describes the exact layout of - * every batch written. + * Create a writer that streams records into the file at {@code uri}. The path may be a full URI or a plain local + * filesystem path. The Arrow schema describes the exact layout of every batch written. */ public static VortexWriter create( Session session, String uri, Schema arrowSchema, Map options, BufferAllocator allocator) diff --git a/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java b/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java index d7411e38c07..58cdbbf5315 100644 --- a/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java +++ b/java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java @@ -68,6 +68,38 @@ public void testCreateWriter() throws IOException { assertTrue(Files.exists(outputPath), "output file should exist"); } + @Test + public void testCreateWriterPlainLocalPath() throws IOException { + Path outputPath = tempDir.resolve("test_create_plain_path.vortex"); + String writePath = outputPath.toAbsolutePath().toString(); + + BufferAllocator allocator = ArrowAllocation.rootAllocator(); + Map options = new HashMap<>(); + + Session session = Session.create(); + try (VortexWriter writer = VortexWriter.create(session, writePath, personSchema(), options, allocator)) { + assertNotNull(writer); + } + + assertTrue(Files.exists(outputPath), "output file should exist"); + } + + @Test + public void testCreateWriterCreatesParentDirectories() throws IOException { + Path outputPath = tempDir.resolve("nested/sub/dir/test_create_nested.vortex"); + String writePath = outputPath.toAbsolutePath().toUri().toString(); + + BufferAllocator allocator = ArrowAllocation.rootAllocator(); + Map options = new HashMap<>(); + + Session session = Session.create(); + try (VortexWriter writer = VortexWriter.create(session, writePath, personSchema(), options, allocator)) { + assertNotNull(writer); + } + + assertTrue(Files.exists(outputPath), "output file should exist"); + } + @Test public void testWriteBatch() throws IOException { Path outputPath = tempDir.resolve("test_ffi.vortex"); diff --git a/vortex-jni/Cargo.toml b/vortex-jni/Cargo.toml index 66dd5f6254b..628c2c89d43 100644 --- a/vortex-jni/Cargo.toml +++ b/vortex-jni/Cargo.toml @@ -19,6 +19,7 @@ categories = { workspace = true } [dependencies] arrow-array = { workspace = true, features = ["ffi"] } arrow-schema = { workspace = true } +async-fs = { workspace = true } futures = { workspace = true } jni = { workspace = true } object_store = { workspace = true, features = ["aws", "azure", "gcp"] } diff --git a/vortex-jni/src/writer.rs b/vortex-jni/src/writer.rs index e8ee86434c1..da30952d87b 100644 --- a/vortex-jni/src/writer.rs +++ b/vortex-jni/src/writer.rs @@ -6,12 +6,14 @@ //! Writes go through an in-flight queue of at most [`WRITE_CHANNEL_CAPACITY`] pending //! batches on the same thread that drives the current-thread runtime. +use std::path::PathBuf; use std::sync::Arc; use arrow_array::RecordBatch; use arrow_array::StructArray; use arrow_array::ffi::FFI_ArrowArray; use arrow_array::ffi::FFI_ArrowSchema; +use async_fs::File; use futures::SinkExt; use futures::channel::mpsc; use jni::EnvUnowned; @@ -22,7 +24,8 @@ use jni::sys::JNI_FALSE; use jni::sys::JNI_TRUE; use jni::sys::jboolean; use jni::sys::jlong; -use object_store::path::Path; +use object_store::ObjectStore; +use object_store::path::Path as ObjectStorePath; use url::Url; use vortex::array::ArrayRef; use vortex::array::arrow::FromArrowArray; @@ -52,6 +55,32 @@ use crate::session::session_ref; /// the writer is felt on the Java thread producing batches. const WRITE_CHANNEL_CAPACITY: usize = 4; +enum ResolvedStore { + ObjectStore(Arc, ObjectStorePath), + Path(PathBuf), +} + +fn resolve_store( + url_or_path: &str, + properties: &HashMap, +) -> VortexResult { + match Url::parse(url_or_path) { + Ok(url) if url.scheme() == "file" => { + let path = url + .to_file_path() + .map_err(|_| vortex_err!("invalid file URL: {url_or_path}"))?; + Ok(ResolvedStore::Path(path)) + } + Ok(url) => { + let path = ObjectStorePath::from_url_path(url.path()) + .map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?; + let store = make_object_store(&url, properties)?; + Ok(ResolvedStore::ObjectStore(store, path)) + } + Err(_) => Ok(ResolvedStore::Path(PathBuf::from(url_or_path))), + } +} + /// Native writer holding a write-task handle and a sender that Java pushes batches into. pub struct NativeWriter { handle: Option>>, @@ -136,21 +165,30 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriter_create( let write_schema = import_dtype_from_arrow(arrow_schema_addr)?; let file_path: String = uri.try_to_string(env)?; - let url = Url::parse(&file_path) - .map_err(|e| JNIError::Vortex(vortex_err!("invalid URL {file_path}: {e}")))?; let properties: HashMap = extract_properties(env, &options)?; - let path = Path::from_url_path(url.path()) - .map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?; - - let store = make_object_store(&url, &properties)?; + let resolved = resolve_store(&file_path, &properties)?; let (tx, rx) = mpsc::channel(WRITE_CHANNEL_CAPACITY); let stream = ArrayStreamAdapter::new(write_schema.clone(), rx); let handle = session.handle().spawn(async move { - let mut write = ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?; - let summary = session.write_options().write(&mut write, stream).await?; - write.shutdown().await?; - Ok(summary) + match resolved { + ResolvedStore::Path(path) => { + if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) { + async_fs::create_dir_all(parent).await?; + } + let mut file = File::create(path).await?; + let summary = session.write_options().write(&mut file, stream).await?; + file.shutdown().await?; + Ok(summary) + } + ResolvedStore::ObjectStore(store, path) => { + let mut write = + ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?; + let summary = session.write_options().write(&mut write, stream).await?; + write.shutdown().await?; + Ok(summary) + } + } }); Ok(Box::new(NativeWriter::new(write_schema, handle, tx)).into_raw()) diff --git a/vortex-python/src/object_store/resolve.rs b/vortex-python/src/object_store/resolve.rs index b61d2ecd685..4ec6d321601 100644 --- a/vortex-python/src/object_store/resolve.rs +++ b/vortex-python/src/object_store/resolve.rs @@ -10,6 +10,7 @@ use object_store::path::Path; use object_store::registry::ObjectStoreRegistry; use url::Url; use vortex::error::VortexResult; +use vortex::error::vortex_err; use crate::object_store::registry::Registry; @@ -32,6 +33,12 @@ pub(crate) fn resolve_store( None => { // If the URL does not parse match Url::parse(url_or_path) { + Ok(url) if url.scheme() == "file" => { + let path = url + .to_file_path() + .map_err(|_| vortex_err!("invalid file URL: {url_or_path}"))?; + Ok(ResolvedStore::Path(path)) + } Ok(url) => { let (store, path) = REGISTRY.resolve(&url)?; Ok(ResolvedStore::ObjectStore(store, path)) @@ -92,6 +99,13 @@ mod test { PathBuf::from("/my/absolute/path") ); + assert_eq!( + resolve_store("file:///my/absolute/path", None) + .unwrap() + .unwrap_path(), + PathBuf::from("/my/absolute/path") + ); + let (_store, path) = resolve_store("s3://my-bucket/first/second/third/", None) .unwrap() .unwrap_store();