Skip to content

Commit 214dfb1

Browse files
committed
Use async_fs instead of LocalFileSystem for local disk writes
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 7349cd6 commit 214dfb1

6 files changed

Lines changed: 99 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/vortex-jni/src/main/java/dev/vortex/api/VortexWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ private VortexWriter(long pointer) {
4242
}
4343

4444
/**
45-
* Create a writer that streams records into the file at {@code uri}. The Arrow schema describes the exact layout of
46-
* every batch written.
45+
* Create a writer that streams records into the file at {@code uri}. The path may be a full URI or a plain local
46+
* filesystem path. The Arrow schema describes the exact layout of every batch written.
4747
*/
4848
public static VortexWriter create(
4949
Session session, String uri, Schema arrowSchema, Map<String, String> options, BufferAllocator allocator)

java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,38 @@ public void testCreateWriter() throws IOException {
6868
assertTrue(Files.exists(outputPath), "output file should exist");
6969
}
7070

71+
@Test
72+
public void testCreateWriterPlainLocalPath() throws IOException {
73+
Path outputPath = tempDir.resolve("test_create_plain_path.vortex");
74+
String writePath = outputPath.toAbsolutePath().toString();
75+
76+
BufferAllocator allocator = ArrowAllocation.rootAllocator();
77+
Map<String, String> options = new HashMap<>();
78+
79+
Session session = Session.create();
80+
try (VortexWriter writer = VortexWriter.create(session, writePath, personSchema(), options, allocator)) {
81+
assertNotNull(writer);
82+
}
83+
84+
assertTrue(Files.exists(outputPath), "output file should exist");
85+
}
86+
87+
@Test
88+
public void testCreateWriterCreatesParentDirectories() throws IOException {
89+
Path outputPath = tempDir.resolve("nested/sub/dir/test_create_nested.vortex");
90+
String writePath = outputPath.toAbsolutePath().toUri().toString();
91+
92+
BufferAllocator allocator = ArrowAllocation.rootAllocator();
93+
Map<String, String> options = new HashMap<>();
94+
95+
Session session = Session.create();
96+
try (VortexWriter writer = VortexWriter.create(session, writePath, personSchema(), options, allocator)) {
97+
assertNotNull(writer);
98+
}
99+
100+
assertTrue(Files.exists(outputPath), "output file should exist");
101+
}
102+
71103
@Test
72104
public void testWriteBatch() throws IOException {
73105
Path outputPath = tempDir.resolve("test_ffi.vortex");

vortex-jni/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ categories = { workspace = true }
1919
[dependencies]
2020
arrow-array = { workspace = true, features = ["ffi"] }
2121
arrow-schema = { workspace = true }
22+
async-fs = { workspace = true }
2223
futures = { workspace = true }
2324
jni = { workspace = true }
2425
object_store = { workspace = true, features = ["aws", "azure", "gcp"] }

vortex-jni/src/writer.rs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
//! Writes go through an in-flight queue of at most [`WRITE_CHANNEL_CAPACITY`] pending
77
//! batches on the same thread that drives the current-thread runtime.
88
9+
use std::path::PathBuf;
910
use std::sync::Arc;
1011

1112
use arrow_array::RecordBatch;
1213
use arrow_array::StructArray;
1314
use arrow_array::ffi::FFI_ArrowArray;
1415
use arrow_array::ffi::FFI_ArrowSchema;
16+
use async_fs::File;
1517
use futures::SinkExt;
1618
use futures::channel::mpsc;
1719
use jni::EnvUnowned;
@@ -22,7 +24,8 @@ use jni::sys::JNI_FALSE;
2224
use jni::sys::JNI_TRUE;
2325
use jni::sys::jboolean;
2426
use jni::sys::jlong;
25-
use object_store::path::Path;
27+
use object_store::ObjectStore;
28+
use object_store::path::Path as ObjectStorePath;
2629
use url::Url;
2730
use vortex::array::ArrayRef;
2831
use vortex::array::arrow::FromArrowArray;
@@ -52,6 +55,32 @@ use crate::session::session_ref;
5255
/// the writer is felt on the Java thread producing batches.
5356
const WRITE_CHANNEL_CAPACITY: usize = 4;
5457

58+
enum ResolvedStore {
59+
ObjectStore(Arc<dyn ObjectStore>, ObjectStorePath),
60+
Path(PathBuf),
61+
}
62+
63+
fn resolve_store(
64+
url_or_path: &str,
65+
properties: &HashMap<String, String>,
66+
) -> VortexResult<ResolvedStore> {
67+
match Url::parse(url_or_path) {
68+
Ok(url) if url.scheme() == "file" => {
69+
let path = url
70+
.to_file_path()
71+
.map_err(|_| vortex_err!("invalid file URL: {url_or_path}"))?;
72+
Ok(ResolvedStore::Path(path))
73+
}
74+
Ok(url) => {
75+
let path = ObjectStorePath::from_url_path(url.path())
76+
.map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?;
77+
let store = make_object_store(&url, properties)?;
78+
Ok(ResolvedStore::ObjectStore(store, path))
79+
}
80+
Err(_) => Ok(ResolvedStore::Path(PathBuf::from(url_or_path))),
81+
}
82+
}
83+
5584
/// Native writer holding a write-task handle and a sender that Java pushes batches into.
5685
pub struct NativeWriter {
5786
handle: Option<Task<VortexResult<WriteSummary>>>,
@@ -136,21 +165,30 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriter_create(
136165
let write_schema = import_dtype_from_arrow(arrow_schema_addr)?;
137166

138167
let file_path: String = uri.try_to_string(env)?;
139-
let url = Url::parse(&file_path)
140-
.map_err(|e| JNIError::Vortex(vortex_err!("invalid URL {file_path}: {e}")))?;
141168
let properties: HashMap<String, String> = extract_properties(env, &options)?;
142-
let path = Path::from_url_path(url.path())
143-
.map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?;
144-
145-
let store = make_object_store(&url, &properties)?;
169+
let resolved = resolve_store(&file_path, &properties)?;
146170
let (tx, rx) = mpsc::channel(WRITE_CHANNEL_CAPACITY);
147171
let stream = ArrayStreamAdapter::new(write_schema.clone(), rx);
148172

149173
let handle = session.handle().spawn(async move {
150-
let mut write = ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?;
151-
let summary = session.write_options().write(&mut write, stream).await?;
152-
write.shutdown().await?;
153-
Ok(summary)
174+
match resolved {
175+
ResolvedStore::Path(path) => {
176+
if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
177+
async_fs::create_dir_all(parent).await?;
178+
}
179+
let mut file = File::create(path).await?;
180+
let summary = session.write_options().write(&mut file, stream).await?;
181+
file.shutdown().await?;
182+
Ok(summary)
183+
}
184+
ResolvedStore::ObjectStore(store, path) => {
185+
let mut write =
186+
ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?;
187+
let summary = session.write_options().write(&mut write, stream).await?;
188+
write.shutdown().await?;
189+
Ok(summary)
190+
}
191+
}
154192
});
155193

156194
Ok(Box::new(NativeWriter::new(write_schema, handle, tx)).into_raw())

vortex-python/src/object_store/resolve.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use object_store::path::Path;
1010
use object_store::registry::ObjectStoreRegistry;
1111
use url::Url;
1212
use vortex::error::VortexResult;
13+
use vortex::error::vortex_err;
1314

1415
use crate::object_store::registry::Registry;
1516

@@ -32,6 +33,12 @@ pub(crate) fn resolve_store(
3233
None => {
3334
// If the URL does not parse
3435
match Url::parse(url_or_path) {
36+
Ok(url) if url.scheme() == "file" => {
37+
let path = url
38+
.to_file_path()
39+
.map_err(|_| vortex_err!("invalid file URL: {url_or_path}"))?;
40+
Ok(ResolvedStore::Path(path))
41+
}
3542
Ok(url) => {
3643
let (store, path) = REGISTRY.resolve(&url)?;
3744
Ok(ResolvedStore::ObjectStore(store, path))
@@ -92,6 +99,13 @@ mod test {
9299
PathBuf::from("/my/absolute/path")
93100
);
94101

102+
assert_eq!(
103+
resolve_store("file:///my/absolute/path", None)
104+
.unwrap()
105+
.unwrap_path(),
106+
PathBuf::from("/my/absolute/path")
107+
);
108+
95109
let (_store, path) = resolve_store("s3://my-bucket/first/second/third/", None)
96110
.unwrap()
97111
.unwrap_store();

0 commit comments

Comments
 (0)