Skip to content

Commit 25a2f01

Browse files
committed
wip: chunked iter exe
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 951bdf9 commit 25a2f01

2 files changed

Lines changed: 32 additions & 12 deletions

File tree

vortex-array/src/arrays/chunked/vtable/canonical.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ use crate::IntoArray;
1212
use crate::array::ArrayView;
1313
use crate::arrays::Chunked;
1414
use crate::arrays::ChunkedArray;
15+
use crate::arrays::ListView;
1516
use crate::arrays::ListViewArray;
1617
use crate::arrays::PrimitiveArray;
18+
use crate::arrays::Struct;
1719
use crate::arrays::StructArray;
1820
use crate::arrays::chunked::ChunkedArrayExt;
1921
use crate::arrays::listview::ListViewArrayExt;
@@ -45,7 +47,6 @@ pub(super) fn _canonicalize(
4547
&owned_chunks,
4648
Validity::copy_from_array(array.array())?,
4749
struct_dtype,
48-
ctx,
4950
)?;
5051
Canonical::Struct(struct_array)
5152
}
@@ -66,24 +67,22 @@ pub(super) fn _canonicalize(
6667
/// Packs many [`StructArray`]s to instead be a single [`StructArray`], where the [`DynArray`] for each
6768
/// field is a [`ChunkedArray`].
6869
///
69-
/// The caller guarantees there are at least 2 chunks.
70+
/// The caller guarantees there are at least 2 chunks, and that all chunks are already
71+
/// canonicalized to [`StructArray`] by iterative execution.
7072
fn pack_struct_chunks(
7173
chunks: &[ArrayRef],
7274
validity: Validity,
7375
struct_dtype: &StructFields,
74-
ctx: &mut ExecutionCtx,
7576
) -> VortexResult<StructArray> {
7677
let len = chunks.iter().map(|chunk| chunk.len()).sum();
7778
let mut field_arrays = Vec::new();
7879

79-
let executed_chunks: Vec<StructArray> = chunks
80-
.iter()
81-
.map(|c| c.clone().execute::<StructArray>(ctx))
82-
.collect::<VortexResult<_>>()?;
83-
8480
for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
8581
let mut field_chunks = Vec::with_capacity(chunks.len());
86-
for struct_array in &executed_chunks {
82+
for chunk in chunks {
83+
let struct_array = chunk
84+
.as_opt::<Struct>()
85+
.vortex_expect("struct chunk pre-canonicalized by iterative execution");
8786
let field = struct_array.unmasked_field(field_idx).clone();
8887
field_chunks.push(field);
8988
}
@@ -103,7 +102,8 @@ fn pack_struct_chunks(
103102
///
104103
/// We use the existing arrays (chunks) to form a chunked array of `elements` (the child array).
105104
///
106-
/// The caller guarantees there are at least 2 chunks.
105+
/// The caller guarantees there are at least 2 chunks, and that all chunks are already
106+
/// canonicalized to [`ListViewArray`] by iterative execution.
107107
fn swizzle_list_chunks(
108108
chunks: &[ArrayRef],
109109
validity: Validity,
@@ -135,7 +135,11 @@ fn swizzle_list_chunks(
135135
let mut sizes = BufferMut::<u64>::with_capacity(len);
136136

137137
for chunk in chunks {
138-
let chunk_array = chunk.clone().execute::<ListViewArray>(ctx)?;
138+
let chunk_array = chunk
139+
.clone()
140+
.try_downcast::<ListView>()
141+
.ok()
142+
.vortex_expect("list chunk pre-canonicalized by iterative execution");
139143
// By rebuilding as zero-copy to `List` and trimming all elements (to prevent gaps), we make
140144
// the final output `ListView` also zero-copyable to `List`.
141145
let chunk_array = chunk_array.rebuild(ListViewRebuildMode::MakeExact)?;

vortex-array/src/arrays/chunked/vtable/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use vortex_error::vortex_err;
1212
use vortex_error::vortex_panic;
1313
use vortex_session::VortexSession;
1414

15+
use crate::AnyCanonical;
1516
use crate::ArrayEq;
1617
use crate::ArrayHash;
1718
use crate::ArrayRef;
@@ -237,7 +238,22 @@ impl VTable for Chunked {
237238
}
238239

239240
fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
240-
Ok(ExecutionResult::done(_canonicalize(array.as_view(), ctx)?))
241+
// Iteratively request execution of each chunk until it reaches canonical form.
242+
// This gives the scheduler visibility into child execution and enables
243+
// cross-step optimizations between chunk decoding steps.
244+
for i in 0..array.nchunks() {
245+
if !array.chunk(i).is::<AnyCanonical>() {
246+
return Ok(ExecutionResult::execute_slot::<AnyCanonical>(
247+
array,
248+
i + CHUNKS_OFFSET,
249+
));
250+
}
251+
}
252+
253+
// All chunks are now canonical — combine them.
254+
Ok(ExecutionResult::done(
255+
_canonicalize(array.as_view(), ctx)?,
256+
))
241257
}
242258

243259
fn reduce(array: ArrayView<'_, Self>) -> VortexResult<Option<ArrayRef>> {

0 commit comments

Comments
 (0)