Skip to content

Commit a72e75d

Browse files
committed
more
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent b57bfe3 commit a72e75d

2 files changed

Lines changed: 30 additions & 24 deletions

File tree

vortex-jni/src/writer.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,38 +55,30 @@ use crate::session::session_ref;
5555
/// the writer is felt on the Java thread producing batches.
5656
const WRITE_CHANNEL_CAPACITY: usize = 4;
5757

58-
enum WriteTarget {
59-
Local(PathBuf),
60-
ObjectStore {
61-
store: Arc<dyn ObjectStore>,
62-
path: ObjectStorePath,
63-
},
58+
enum ResolvedStore {
59+
ObjectStore(Arc<dyn ObjectStore>, ObjectStorePath),
60+
Path(PathBuf),
6461
}
6562

66-
fn resolve_write_target(
67-
uri: &str,
63+
fn resolve_store(
64+
url_or_path: &str,
6865
properties: &HashMap<String, String>,
69-
) -> VortexResult<WriteTarget> {
70-
match Url::parse(uri) {
66+
) -> VortexResult<ResolvedStore> {
67+
match Url::parse(url_or_path) {
7168
Ok(url) if url.scheme() == "file" => {
7269
let path = url
7370
.to_file_path()
74-
.map_err(|_| vortex_err!("invalid file URL: {uri}"))?;
75-
return Ok(WriteTarget::Local(path));
71+
.map_err(|_| vortex_err!("invalid file URL: {url_or_path}"))?;
72+
Ok(ResolvedStore::Path(path))
7673
}
77-
Ok(url) if uri.contains("://") => {
74+
Ok(url) => {
7875
let path = ObjectStorePath::from_url_path(url.path())
7976
.map_err(|_| vortex_err!("invalid object_store path: {}", url.path()))?;
8077
let store = make_object_store(&url, properties)?;
81-
return Ok(WriteTarget::ObjectStore { store, path });
78+
Ok(ResolvedStore::ObjectStore(store, path))
8279
}
83-
Err(err) if uri.contains("://") => {
84-
return Err(vortex_err!("invalid URL {uri}: {err}"));
85-
}
86-
_ => {}
80+
Err(_) => Ok(ResolvedStore::Path(PathBuf::from(url_or_path))),
8781
}
88-
89-
Ok(WriteTarget::Local(PathBuf::from(uri)))
9082
}
9183

9284
/// Native writer holding a write-task handle and a sender that Java pushes batches into.
@@ -174,13 +166,13 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriter_create(
174166

175167
let file_path: String = uri.try_to_string(env)?;
176168
let properties: HashMap<String, String> = extract_properties(env, &options)?;
177-
let target = resolve_write_target(&file_path, &properties)?;
169+
let resolved = resolve_store(&file_path, &properties)?;
178170
let (tx, rx) = mpsc::channel(WRITE_CHANNEL_CAPACITY);
179171
let stream = ArrayStreamAdapter::new(write_schema.clone(), rx);
180172

181173
let handle = session.handle().spawn(async move {
182-
match target {
183-
WriteTarget::Local(path) => {
174+
match resolved {
175+
ResolvedStore::Path(path) => {
184176
if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
185177
async_fs::create_dir_all(parent).await?;
186178
}
@@ -189,7 +181,7 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriter_create(
189181
file.shutdown().await?;
190182
Ok(summary)
191183
}
192-
WriteTarget::ObjectStore { store, path } => {
184+
ResolvedStore::ObjectStore(store, path) => {
193185
let mut write =
194186
ObjectStoreWrite::new(Arc::new(Compat::new(store)), &path).await?;
195187
let summary = session.write_options().write(&mut write, stream).await?;

vortex-python/src/object_store/resolve.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use object_store::path::Path;
1010
use object_store::registry::ObjectStoreRegistry;
1111
use url::Url;
1212
use vortex::error::VortexResult;
13+
use vortex::error::vortex_err;
1314

1415
use crate::object_store::registry::Registry;
1516

@@ -32,6 +33,12 @@ pub(crate) fn resolve_store(
3233
None => {
3334
// If the URL does not parse
3435
match Url::parse(url_or_path) {
36+
Ok(url) if url.scheme() == "file" => {
37+
let path = url
38+
.to_file_path()
39+
.map_err(|_| vortex_err!("invalid file URL: {url_or_path}"))?;
40+
Ok(ResolvedStore::Path(path))
41+
}
3542
Ok(url) => {
3643
let (store, path) = REGISTRY.resolve(&url)?;
3744
Ok(ResolvedStore::ObjectStore(store, path))
@@ -92,6 +99,13 @@ mod test {
9299
PathBuf::from("/my/absolute/path")
93100
);
94101

102+
assert_eq!(
103+
resolve_store("file:///my/absolute/path", None)
104+
.unwrap()
105+
.unwrap_path(),
106+
PathBuf::from("/my/absolute/path")
107+
);
108+
95109
let (_store, path) = resolve_store("s3://my-bucket/first/second/third/", None)
96110
.unwrap()
97111
.unwrap_store();

0 commit comments

Comments
 (0)