Skip to content

Commit 88cc0e1

Browse files
committed
[Experiment] Self-defined arrow-export
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 43ae2dc commit 88cc0e1

28 files changed

Lines changed: 1099 additions & 545 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/runend/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ version = { workspace = true }
1515

1616
[dependencies]
1717
arbitrary = { workspace = true, optional = true }
18-
arrow-array = { workspace = true, optional = true }
18+
arrow-array = { workspace = true }
19+
arrow-buffer = { workspace = true }
20+
arrow-schema = { workspace = true }
1921
itertools = { workspace = true }
2022
num-traits = { workspace = true }
2123
prost = { workspace = true }
@@ -39,7 +41,6 @@ vortex-array = { workspace = true, features = ["_test-harness"] }
3941

4042
[features]
4143
arbitrary = ["dep:arbitrary", "vortex-array/arbitrary"]
42-
arrow = ["dep:arrow-array"]
4344

4445
[[bench]]
4546
name = "run_end_null_count"

encodings/runend/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub fn vortex_runend::RunEnd::serialize(metadata: Self::Metadata) -> vortex_erro
9494

9595
pub fn vortex_runend::RunEnd::stats(array: &vortex_runend::RunEndArray) -> vortex_array::stats::array::StatsSetRef<'_>
9696

97+
pub fn vortex_runend::RunEnd::to_arrow_array(array: &vortex_runend::RunEndArray, data_type: &arrow_schema::datatype::DataType, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<arrow_array::array::ArrayRef>>
98+
9799
pub fn vortex_runend::RunEnd::with_children(array: &mut Self::Array, children: alloc::vec::Vec<vortex_array::array::ArrayRef>) -> vortex_error::VortexResult<()>
98100

99101
impl vortex_array::vtable::operations::OperationsVTable<vortex_runend::RunEnd> for vortex_runend::RunEnd

encodings/runend/src/array.rs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33

44
use std::fmt::Debug;
55
use std::hash::Hash;
6-
6+
use std::sync::Arc;
7+
8+
use arrow_array::ArrayRef as ArrowArrayRef;
9+
use arrow_array::RunArray;
10+
use arrow_array::cast::AsArray;
11+
use arrow_array::types::*;
12+
use arrow_buffer::ArrowNativeType;
13+
use arrow_schema::DataType;
714
use vortex_array::ArrayEq;
815
use vortex_array::ArrayHash;
916
use vortex_array::ArrayRef;
@@ -17,6 +24,7 @@ use vortex_array::ProstMetadata;
1724
use vortex_array::SerializeMetadata;
1825
use vortex_array::arrays::Primitive;
1926
use vortex_array::arrays::VarBinViewArray;
27+
use vortex_array::arrow::ArrowArrayExecutor;
2028
use vortex_array::buffer::BufferHandle;
2129
use vortex_array::dtype::DType;
2230
use vortex_array::dtype::Nullability;
@@ -203,6 +211,31 @@ impl VTable for RunEnd {
203211
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
204212
}
205213

214+
fn to_arrow_array(
215+
array: &RunEndArray,
216+
data_type: &DataType,
217+
ctx: &mut ExecutionCtx,
218+
) -> VortexResult<Option<ArrowArrayRef>> {
219+
let DataType::RunEndEncoded(ends_field, values_field) = data_type else {
220+
return Ok(None);
221+
};
222+
let arrow_ends = array
223+
.ends()
224+
.clone()
225+
.execute_arrow(Some(ends_field.data_type()), ctx)?;
226+
let arrow_values = array
227+
.values()
228+
.clone()
229+
.execute_arrow(Some(values_field.data_type()), ctx)?;
230+
Ok(Some(build_run_array(
231+
&arrow_ends,
232+
&arrow_values,
233+
ends_field.data_type(),
234+
array.offset(),
235+
array.len(),
236+
)?))
237+
}
238+
206239
fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
207240
run_end_canonicalize(array, ctx).map(ExecutionStep::Done)
208241
}
@@ -503,6 +536,59 @@ pub(super) fn run_end_canonicalize(
503536
})
504537
}
505538

539+
fn build_run_array(
540+
ends: &ArrowArrayRef,
541+
values: &ArrowArrayRef,
542+
ends_type: &DataType,
543+
offset: usize,
544+
length: usize,
545+
) -> VortexResult<ArrowArrayRef> {
546+
match ends_type {
547+
DataType::Int16 => build_run_array_typed::<Int16Type>(ends, values, offset, length),
548+
DataType::Int32 => build_run_array_typed::<Int32Type>(ends, values, offset, length),
549+
DataType::Int64 => build_run_array_typed::<Int64Type>(ends, values, offset, length),
550+
_ => vortex_bail!("Unsupported run-end index type: {:?}", ends_type),
551+
}
552+
}
553+
554+
fn build_run_array_typed<R: RunEndIndexType>(
555+
ends: &ArrowArrayRef,
556+
values: &ArrowArrayRef,
557+
offset: usize,
558+
length: usize,
559+
) -> VortexResult<ArrowArrayRef>
560+
where
561+
R::Native: std::ops::Sub<Output = R::Native> + Ord,
562+
{
563+
let offset_native = R::Native::from_usize(offset).ok_or_else(|| {
564+
vortex_error::vortex_err!("Offset {offset} exceeds run-end index capacity")
565+
})?;
566+
let length_native = R::Native::from_usize(length).ok_or_else(|| {
567+
vortex_error::vortex_err!("Length {length} exceeds run-end index capacity")
568+
})?;
569+
570+
let ends_prim = ends.as_primitive::<R>();
571+
if offset == 0 && ends_prim.values().last() == Some(&length_native) {
572+
return Ok(Arc::new(RunArray::<R>::try_new(ends_prim, values)?) as ArrowArrayRef);
573+
}
574+
575+
// Trim to only include runs covering the [offset, offset+length) range.
576+
let num_runs = (ends_prim
577+
.values()
578+
.partition_point(|&e| e - offset_native < length_native)
579+
+ 1)
580+
.min(ends_prim.len());
581+
582+
let trimmed_ends = ends.slice(0, num_runs);
583+
let trimmed_values = values.slice(0, num_runs);
584+
585+
let adjusted = trimmed_ends
586+
.as_primitive::<R>()
587+
.unary(|end| (end - offset_native).min(length_native));
588+
589+
Ok(Arc::new(RunArray::<R>::try_new(&adjusted, &trimmed_values)?) as ArrowArrayRef)
590+
}
591+
506592
#[cfg(test)]
507593
mod tests {
508594
use vortex_array::IntoArray;

encodings/runend/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ pub use array::*;
99
pub use iter::trimmed_ends_iter;
1010

1111
mod array;
12-
#[cfg(feature = "arrow")]
1312
mod arrow;
1413
pub mod compress;
1514
mod compute;

0 commit comments

Comments
 (0)