Skip to content

Commit 729e17c

Browse files
authored
Add arrow import/export for vortex-json (#8339)
## Summary Adds import/export from `vortex-json` to Arrow's JSON canonical extension type. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent eced293 commit 729e17c

4 files changed

Lines changed: 253 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-json/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,8 @@ version = { workspace = true }
1717
workspace = true
1818

1919
[dependencies]
20+
arrow-array = { workspace = true }
21+
arrow-schema = { workspace = true, features = ["canonical_extension_types"] }
2022
vortex-array = { workspace = true, default-features = false }
2123
vortex-error = { workspace = true, default-features = false }
24+
vortex-session = { workspace = true }

vortex-json/src/arrow.rs

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Arrow import and export support for the JSON extension dtype.
5+
6+
use arrow_array::ArrayRef as ArrowArrayRef;
7+
use arrow_schema::Field;
8+
use arrow_schema::extension::ExtensionType;
9+
use arrow_schema::extension::Json as ArrowJson;
10+
use vortex_array::ArrayRef;
11+
use vortex_array::EmptyMetadata;
12+
use vortex_array::ExecutionCtx;
13+
use vortex_array::IntoArray;
14+
use vortex_array::arrays::ExtensionArray;
15+
use vortex_array::arrays::extension::ExtensionArrayExt;
16+
use vortex_array::arrow::ArrowExport;
17+
use vortex_array::arrow::ArrowExportVTable;
18+
use vortex_array::arrow::ArrowImport;
19+
use vortex_array::arrow::ArrowImportVTable;
20+
use vortex_array::arrow::ArrowSession;
21+
use vortex_array::arrow::ArrowSessionExt;
22+
use vortex_array::arrow::FromArrowArray;
23+
use vortex_array::dtype::DType;
24+
use vortex_array::dtype::extension::ExtDType;
25+
use vortex_array::dtype::extension::ExtVTable;
26+
use vortex_error::VortexExpect;
27+
use vortex_error::VortexResult;
28+
use vortex_session::registry::CachedId;
29+
use vortex_session::registry::Id;
30+
31+
use crate::Json;
32+
33+
/// Arrow's canonical JSON extension name cached as a registry id.
34+
static ARROW_JSON: CachedId = CachedId::new(ArrowJson::NAME);
35+
36+
/// Returns whether an Arrow field contains valid canonical JSON extension metadata.
37+
fn has_valid_json_extension(field: &Field) -> bool {
38+
field.extension_type_name() == Some(ArrowJson::NAME)
39+
&& ArrowJson::try_new_from_field_metadata(field.data_type(), field.metadata()).is_ok()
40+
}
41+
42+
impl ArrowExportVTable for Json {
43+
fn arrow_ext_id(&self) -> Id {
44+
*ARROW_JSON
45+
}
46+
47+
fn vortex_id(&self) -> Id {
48+
Json.id()
49+
}
50+
51+
fn to_arrow_field(
52+
&self,
53+
name: &str,
54+
dtype: &DType,
55+
session: &ArrowSession,
56+
) -> VortexResult<Option<Field>> {
57+
let DType::Extension(ext_dtype) = dtype else {
58+
return Ok(None);
59+
};
60+
if !ext_dtype.is::<Json>() {
61+
return Ok(None);
62+
}
63+
64+
let mut field = session.to_arrow_field(name, ext_dtype.storage_dtype())?;
65+
field
66+
.try_with_extension_type(ArrowJson::default())
67+
.vortex_expect("Utf8 is a valid storage type for Arrow JSON");
68+
Ok(Some(field))
69+
}
70+
71+
fn execute_arrow(
72+
&self,
73+
array: ArrayRef,
74+
target: &Field,
75+
ctx: &mut ExecutionCtx,
76+
) -> VortexResult<ArrowExport> {
77+
let is_json = array
78+
.dtype()
79+
.as_extension_opt()
80+
.map(|ext_dtype| ext_dtype.is::<Json>())
81+
.unwrap_or(false);
82+
if !is_json {
83+
return Ok(ArrowExport::Unsupported(array));
84+
}
85+
86+
ArrowJson::try_new_from_field_metadata(target.data_type(), target.metadata())?;
87+
88+
let executed = array.execute::<ExtensionArray>(ctx)?;
89+
let storage = executed.storage_array().clone();
90+
let storage_field = Field::new(
91+
String::new(),
92+
target.data_type().clone(),
93+
target.is_nullable(),
94+
);
95+
let session = ctx.session().clone();
96+
97+
let storage = session
98+
.arrow()
99+
.execute_arrow(storage, Some(&storage_field), ctx)?;
100+
101+
Ok(ArrowExport::Exported(storage))
102+
}
103+
}
104+
105+
impl ArrowImportVTable for Json {
106+
fn arrow_ext_id(&self) -> Id {
107+
*ARROW_JSON
108+
}
109+
110+
fn from_arrow_field(&self, field: &Field) -> VortexResult<Option<DType>> {
111+
if !has_valid_json_extension(field) {
112+
return Ok(None);
113+
}
114+
115+
Ok(Some(DType::Extension(
116+
ExtDType::<Json>::try_new(EmptyMetadata, DType::Utf8(field.is_nullable().into()))?
117+
.erased(),
118+
)))
119+
}
120+
121+
fn from_arrow_array(
122+
&self,
123+
array: ArrowArrayRef,
124+
field: &Field,
125+
dtype: &DType,
126+
) -> VortexResult<ArrowImport> {
127+
let DType::Extension(ext_dtype) = dtype else {
128+
return Ok(ArrowImport::Unsupported(array));
129+
};
130+
if !ext_dtype.is::<Json>() || !has_valid_json_extension(field) {
131+
return Ok(ArrowImport::Unsupported(array));
132+
}
133+
134+
let storage = ArrayRef::from_arrow(array.as_ref(), field.is_nullable())?;
135+
Ok(ArrowImport::Imported(
136+
ExtensionArray::new(ext_dtype.clone(), storage).into_array(),
137+
))
138+
}
139+
}
140+
141+
#[cfg(test)]
142+
mod tests {
143+
144+
use std::sync::Arc;
145+
146+
use arrow_array::Array;
147+
use arrow_array::ArrayRef as ArrowArrayRef;
148+
use arrow_array::StringArray;
149+
use arrow_array::cast::AsArray;
150+
use arrow_schema::DataType;
151+
use arrow_schema::Field;
152+
use arrow_schema::extension::ExtensionType;
153+
use arrow_schema::extension::Json as ArrowJson;
154+
use vortex_array::EmptyMetadata;
155+
use vortex_array::IntoArray;
156+
use vortex_array::VortexSessionExecute;
157+
use vortex_array::arrays::ExtensionArray;
158+
use vortex_array::arrays::VarBinArray;
159+
use vortex_array::arrow::ArrowSessionExt;
160+
use vortex_array::dtype::Nullability;
161+
use vortex_array::dtype::extension::ExtDType;
162+
use vortex_error::VortexExpect;
163+
use vortex_error::VortexResult;
164+
use vortex_session::VortexSession;
165+
166+
use crate::Json;
167+
use crate::initialize;
168+
169+
/// Export a JSON extension array to Arrow's canonical JSON extension.
170+
#[test]
171+
fn exports_json_extension_array_as_arrow_json() -> VortexResult<()> {
172+
let session = VortexSession::empty();
173+
initialize(&session);
174+
175+
let storage = VarBinArray::from_iter(
176+
[Some("{\"id\":1}"), Some("{\"id\":2}")],
177+
vortex_array::dtype::DType::Utf8(Nullability::NonNullable),
178+
)
179+
.into_array();
180+
let ext_dtype = ExtDType::<Json>::try_new(EmptyMetadata, storage.dtype().clone())?.erased();
181+
182+
dbg!(&ext_dtype);
183+
let array = ExtensionArray::new(ext_dtype, storage).into_array();
184+
185+
let field = session.arrow().to_arrow_field("data", array.dtype())?;
186+
assert_eq!(field.extension_type_name(), Some(ArrowJson::NAME));
187+
ArrowJson::try_new_from_field_metadata(field.data_type(), field.metadata())?;
188+
189+
dbg!(&field);
190+
191+
let exported = session.arrow().execute_arrow(
192+
array,
193+
Some(&field),
194+
&mut session.create_execution_ctx(),
195+
)?;
196+
197+
assert!(exported.data_type().is_string());
198+
199+
dbg!(exported.data_type());
200+
201+
let strings = exported.as_string_view();
202+
assert_eq!(strings.value(0), "{\"id\":1}");
203+
assert_eq!(strings.value(1), "{\"id\":2}");
204+
Ok(())
205+
}
206+
207+
/// Import Arrow's canonical JSON extension as a Vortex JSON extension array.
208+
#[test]
209+
fn imports_arrow_json_extension_array_as_vortex_json() -> VortexResult<()> {
210+
let session = VortexSession::empty();
211+
initialize(&session);
212+
213+
let mut field = Field::new("data", DataType::Utf8, false);
214+
field.try_with_extension_type(ArrowJson::default())?;
215+
let array = Arc::new(StringArray::from(vec!["{\"id\":1}", "{\"id\":2}"])) as ArrowArrayRef;
216+
217+
let imported = session.arrow().from_arrow_array(array, &field)?;
218+
let ext_dtype = imported
219+
.dtype()
220+
.as_extension_opt()
221+
.vortex_expect("expected JSON extension dtype");
222+
assert!(ext_dtype.is::<Json>());
223+
224+
let exported = session.arrow().execute_arrow(
225+
imported,
226+
Some(&field),
227+
&mut session.create_execution_ctx(),
228+
)?;
229+
let strings = exported.as_string::<i32>();
230+
assert_eq!(strings.value(0), "{\"id\":1}");
231+
assert_eq!(strings.value(1), "{\"id\":2}");
232+
Ok(())
233+
}
234+
}

vortex-json/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,19 @@
99

1010
//! Extension type and related functionality for a JSON extension type for Vortex.
1111
12+
mod arrow;
1213
mod dtype;
1314

15+
use std::sync::Arc;
16+
1417
pub use dtype::Json;
18+
use vortex_array::arrow::ArrowSessionExt;
19+
use vortex_array::dtype::session::DTypeSessionExt;
20+
use vortex_session::VortexSession;
21+
22+
/// Register JSON extension support with a session.
23+
pub fn initialize(session: &VortexSession) {
24+
session.dtypes().register(Json);
25+
session.arrow().register_exporter(Arc::new(Json));
26+
session.arrow().register_importer(Arc::new(Json));
27+
}

0 commit comments

Comments
 (0)