Skip to content

Commit b80f796

Browse files
committed
wip: chunked iter exe
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 13937c0 commit b80f796

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

vortex-array/src/arrays/chunked/vtable/canonical.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ use crate::Canonical;
1010
use crate::DynArray;
1111
use crate::ExecutionCtx;
1212
use crate::IntoArray;
13+
use crate::arrays::ListView;
1314
use crate::arrays::ListViewArray;
1415
use crate::arrays::PrimitiveArray;
16+
use crate::arrays::Struct;
1517
use crate::arrays::StructArray;
1618
use crate::arrays::chunked::vtable::ChunkedArray;
1719
use crate::arrays::listview::ListViewRebuildMode;
@@ -41,7 +43,6 @@ pub(super) fn _canonicalize(
4143
&owned_chunks,
4244
Validity::copy_from_array(&array.clone().into_array())?,
4345
struct_dtype,
44-
ctx,
4546
)?;
4647
Canonical::Struct(struct_array)
4748
}
@@ -62,24 +63,22 @@ pub(super) fn _canonicalize(
6263
/// Packs many [`StructArray`]s to instead be a single [`StructArray`], where the [`DynArray`] for each
6364
/// field is a [`ChunkedArray`].
6465
///
65-
/// The caller guarantees there are at least 2 chunks.
66+
/// The caller guarantees there are at least 2 chunks, and that all chunks are already
67+
/// canonicalized to [`StructArray`] by iterative execution.
6668
fn pack_struct_chunks(
6769
chunks: &[ArrayRef],
6870
validity: Validity,
6971
struct_dtype: &StructFields,
70-
ctx: &mut ExecutionCtx,
7172
) -> VortexResult<StructArray> {
7273
let len = chunks.iter().map(|chunk| chunk.len()).sum();
7374
let mut field_arrays = Vec::new();
7475

75-
let executed_chunks: Vec<StructArray> = chunks
76-
.iter()
77-
.map(|c| c.clone().execute::<StructArray>(ctx))
78-
.collect::<VortexResult<_>>()?;
79-
8076
for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
8177
let mut field_chunks = Vec::with_capacity(chunks.len());
82-
for struct_array in &executed_chunks {
78+
for chunk in chunks {
79+
let struct_array = chunk
80+
.as_opt::<Struct>()
81+
.vortex_expect("struct chunk pre-canonicalized by iterative execution");
8382
let field = struct_array.unmasked_field(field_idx).to_array();
8483
field_chunks.push(field);
8584
}
@@ -99,7 +98,8 @@ fn pack_struct_chunks(
9998
///
10099
/// We use the existing arrays (chunks) to form a chunked array of `elements` (the child array).
101100
///
102-
/// The caller guarantees there are at least 2 chunks.
101+
/// The caller guarantees there are at least 2 chunks, and that all chunks are already
102+
/// canonicalized to [`ListViewArray`] by iterative execution.
103103
fn swizzle_list_chunks(
104104
chunks: &[ArrayRef],
105105
validity: Validity,
@@ -131,7 +131,9 @@ fn swizzle_list_chunks(
131131
let mut sizes = BufferMut::<u64>::with_capacity(len);
132132

133133
for chunk in chunks {
134-
let chunk_array = chunk.clone().execute::<ListViewArray>(ctx)?;
134+
let chunk_array = chunk
135+
.as_opt::<ListView>()
136+
.vortex_expect("list chunk pre-canonicalized by iterative execution");
135137
// By rebuilding as zero-copy to `List` and trimming all elements (to prevent gaps), we make
136138
// the final output `ListView` also zero-copyable to `List`.
137139
let chunk_array = chunk_array.rebuild(ListViewRebuildMode::MakeExact)?;

vortex-array/src/arrays/chunked/vtable/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use vortex_error::vortex_err;
1212
use vortex_error::vortex_panic;
1313
use vortex_session::VortexSession;
1414

15+
use crate::AnyCanonical;
1516
use crate::ArrayRef;
1617
use crate::Canonical;
1718
use crate::EmptyMetadata;
@@ -202,6 +203,16 @@ impl VTable for Chunked {
202203
}
203204

204205
fn execute(array: Arc<Array<Self>>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
206+
// Iteratively request execution of each chunk until it reaches canonical form.
207+
// This gives the scheduler visibility into child execution and enables
208+
// cross-step optimizations between chunk decoding steps.
209+
for i in 0..array.nchunks() {
210+
if !array.chunk(i).is::<AnyCanonical>() {
211+
return Ok(ExecutionResult::execute_slot::<AnyCanonical>(array, i + 1));
212+
}
213+
}
214+
215+
// All chunks are now canonical — combine them.
205216
Ok(ExecutionResult::done(
206217
_canonicalize(&array, ctx)?.into_array(),
207218
))

0 commit comments

Comments
 (0)