Skip to content

Commit 349155f

Browse files
feat: aligned chunked pair iter (#6777)
Define pairwise chunked array iterator and use it in zip --------- Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 18bef2b commit 349155f

5 files changed

Lines changed: 286 additions & 41 deletions

File tree

vortex-array/src/arrays/chunked/array.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ impl ChunkedArray {
107107
#[inline]
108108
pub fn chunk(&self, idx: usize) -> &ArrayRef {
109109
assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
110-
111-
&self.chunks[idx]
110+
// SAFETY: bounds checked by the assert above.
111+
unsafe { self.chunks.get_unchecked(idx) }
112112
}
113113

114114
pub fn nchunks(&self) -> usize {

vortex-array/src/arrays/chunked/compute/zip.rs

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,39 +29,10 @@ impl ZipKernel for Chunked {
2929
.union_nullability(if_false.dtype().nullability());
3030
let mut out_chunks = Vec::with_capacity(if_true.nchunks() + if_false.nchunks());
3131

32-
let mut lhs_idx = 0;
33-
let mut rhs_idx = 0;
34-
let mut lhs_offset = 0;
35-
let mut rhs_offset = 0;
36-
let mut pos = 0;
37-
let total_len = if_true.len();
38-
39-
while pos < total_len {
40-
let lhs_chunk = if_true.chunk(lhs_idx);
41-
let rhs_chunk = if_false.chunk(rhs_idx);
42-
43-
let lhs_rem = lhs_chunk.len() - lhs_offset;
44-
let rhs_rem = rhs_chunk.len() - rhs_offset;
45-
let take_until = lhs_rem.min(rhs_rem);
46-
47-
let mask_slice = mask.slice(pos..pos + take_until)?;
48-
let lhs_slice = lhs_chunk.slice(lhs_offset..lhs_offset + take_until)?;
49-
let rhs_slice = rhs_chunk.slice(rhs_offset..rhs_offset + take_until)?;
50-
51-
out_chunks.push(mask_slice.zip(lhs_slice, rhs_slice)?);
52-
53-
pos += take_until;
54-
lhs_offset += take_until;
55-
rhs_offset += take_until;
56-
57-
if lhs_offset == lhs_chunk.len() {
58-
lhs_idx += 1;
59-
lhs_offset = 0;
60-
}
61-
if rhs_offset == rhs_chunk.len() {
62-
rhs_idx += 1;
63-
rhs_offset = 0;
64-
}
32+
for pair in if_true.paired_chunks(if_false) {
33+
let pair = pair?;
34+
let mask_slice = mask.slice(pair.pos)?;
35+
out_chunks.push(mask_slice.zip(pair.left, pair.right)?);
6536
}
6637

6738
// SAFETY: chunks originate from zipping slices of inputs that share dtype/nullability.

vortex-array/src/arrays/chunked/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod array;
55
pub use array::ChunkedArray;
66

77
pub(crate) mod compute;
8+
mod paired_chunks;
89

910
mod vtable;
1011
pub use vtable::Chunked;
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::ops::Range;
5+
6+
use vortex_error::VortexResult;
7+
8+
use crate::ArrayRef;
9+
use crate::arrays::ChunkedArray;
10+
11+
pub(crate) struct AlignedPair {
12+
pub left: ArrayRef,
13+
pub right: ArrayRef,
14+
pub pos: Range<usize>,
15+
}
16+
17+
/// Cursor over a chunk slice that maintains the invariant: `idx` always
18+
/// points at a non-empty chunk or is past the end.
19+
struct ChunkCursor<'a> {
20+
chunks: &'a [ArrayRef],
21+
idx: usize,
22+
offset: usize,
23+
}
24+
25+
impl<'a> ChunkCursor<'a> {
26+
fn new(chunks: &'a [ArrayRef]) -> Self {
27+
let mut cursor = Self {
28+
chunks,
29+
idx: 0,
30+
offset: 0,
31+
};
32+
cursor.skip_empty();
33+
cursor
34+
}
35+
36+
fn skip_empty(&mut self) {
37+
while self.idx < self.chunks.len()
38+
&& unsafe { self.chunks.get_unchecked(self.idx) }.is_empty()
39+
{
40+
self.idx += 1;
41+
}
42+
}
43+
44+
fn current_chunk(&self) -> Option<&'a ArrayRef> {
45+
(self.idx < self.chunks.len()).then(|| unsafe { self.chunks.get_unchecked(self.idx) })
46+
}
47+
48+
fn remaining(&self, chunk: &ArrayRef) -> usize {
49+
chunk.len() - self.offset
50+
}
51+
52+
fn take(&mut self, chunk: &ArrayRef, n: usize) -> VortexResult<ArrayRef> {
53+
let slice = chunk.slice(self.offset..self.offset + n)?;
54+
self.offset += n;
55+
if self.offset == chunk.len() {
56+
self.idx += 1;
57+
self.offset = 0;
58+
self.skip_empty();
59+
}
60+
Ok(slice)
61+
}
62+
}
63+
64+
pub(crate) struct PairedChunks<'a> {
65+
left: ChunkCursor<'a>,
66+
right: ChunkCursor<'a>,
67+
pos: usize,
68+
total_len: usize,
69+
}
70+
71+
impl ChunkedArray {
72+
pub(crate) fn paired_chunks<'a>(&'a self, other: &'a ChunkedArray) -> PairedChunks<'a> {
73+
assert_eq!(
74+
self.len(),
75+
other.len(),
76+
"paired_chunks requires arrays of equal length"
77+
);
78+
PairedChunks {
79+
left: ChunkCursor::new(&self.chunks),
80+
right: ChunkCursor::new(&other.chunks),
81+
pos: 0,
82+
total_len: self.len(),
83+
}
84+
}
85+
}
86+
87+
impl Iterator for PairedChunks<'_> {
88+
type Item = VortexResult<AlignedPair>;
89+
90+
fn next(&mut self) -> Option<Self::Item> {
91+
if self.pos >= self.total_len {
92+
return None;
93+
}
94+
95+
let lhs_chunk = self.left.current_chunk()?;
96+
let rhs_chunk = self.right.current_chunk()?;
97+
98+
let take = self
99+
.left
100+
.remaining(lhs_chunk)
101+
.min(self.right.remaining(rhs_chunk));
102+
103+
let (lhs_slice, rhs_slice) = match self
104+
.left
105+
.take(lhs_chunk, take)
106+
.and_then(|l| self.right.take(rhs_chunk, take).map(|r| (l, r)))
107+
{
108+
Ok(pair) => pair,
109+
Err(e) => return Some(Err(e)),
110+
};
111+
112+
let start = self.pos;
113+
self.pos += take;
114+
115+
Some(Ok(AlignedPair {
116+
left: lhs_slice,
117+
right: rhs_slice,
118+
pos: start..self.pos,
119+
}))
120+
}
121+
}
122+
123+
#[cfg(test)]
124+
mod tests {
125+
use vortex_buffer::buffer;
126+
use vortex_error::VortexResult;
127+
128+
use crate::IntoArray;
129+
use crate::arrays::ChunkedArray;
130+
use crate::dtype::DType;
131+
use crate::dtype::Nullability;
132+
use crate::dtype::PType;
133+
134+
fn i32_dtype() -> DType {
135+
DType::Primitive(PType::I32, Nullability::NonNullable)
136+
}
137+
138+
#[allow(clippy::type_complexity)]
139+
fn collect_pairs(
140+
left: &ChunkedArray,
141+
right: &ChunkedArray,
142+
) -> VortexResult<Vec<(Vec<i32>, Vec<i32>, std::ops::Range<usize>)>> {
143+
use crate::ToCanonical;
144+
let mut result = Vec::new();
145+
for pair in left.paired_chunks(right) {
146+
let pair = pair?;
147+
let l: Vec<i32> = pair.left.to_primitive().as_slice::<i32>().to_vec();
148+
let r: Vec<i32> = pair.right.to_primitive().as_slice::<i32>().to_vec();
149+
result.push((l, r, pair.pos));
150+
}
151+
Ok(result)
152+
}
153+
154+
#[test]
155+
fn test_aligned_chunks() -> VortexResult<()> {
156+
let left = ChunkedArray::try_new(
157+
vec![buffer![1i32, 2].into_array(), buffer![3i32, 4].into_array()],
158+
i32_dtype(),
159+
)?;
160+
let right = ChunkedArray::try_new(
161+
vec![
162+
buffer![10i32, 20].into_array(),
163+
buffer![30i32, 40].into_array(),
164+
],
165+
i32_dtype(),
166+
)?;
167+
168+
let pairs = collect_pairs(&left, &right)?;
169+
assert_eq!(pairs.len(), 2);
170+
assert_eq!(pairs[0], (vec![1, 2], vec![10, 20], 0..2));
171+
assert_eq!(pairs[1], (vec![3, 4], vec![30, 40], 2..4));
172+
Ok(())
173+
}
174+
175+
#[test]
176+
fn test_misaligned_chunks() -> VortexResult<()> {
177+
let left = ChunkedArray::try_new(
178+
vec![
179+
buffer![1i32, 2].into_array(),
180+
buffer![3i32].into_array(),
181+
buffer![4i32, 5].into_array(),
182+
],
183+
i32_dtype(),
184+
)?;
185+
let right = ChunkedArray::try_new(
186+
vec![
187+
buffer![10i32].into_array(),
188+
buffer![20i32, 30].into_array(),
189+
buffer![40i32, 50].into_array(),
190+
],
191+
i32_dtype(),
192+
)?;
193+
194+
let pairs = collect_pairs(&left, &right)?;
195+
assert_eq!(pairs.len(), 4);
196+
assert_eq!(pairs[0], (vec![1], vec![10], 0..1));
197+
assert_eq!(pairs[1], (vec![2], vec![20], 1..2));
198+
assert_eq!(pairs[2], (vec![3], vec![30], 2..3));
199+
assert_eq!(pairs[3], (vec![4, 5], vec![40, 50], 3..5));
200+
Ok(())
201+
}
202+
203+
#[test]
204+
fn test_empty_chunks() -> VortexResult<()> {
205+
let left = ChunkedArray::try_new(
206+
vec![
207+
buffer![0i32; 0].into_array(),
208+
buffer![1i32, 2, 3].into_array(),
209+
],
210+
i32_dtype(),
211+
)?;
212+
let right = ChunkedArray::try_new(
213+
vec![
214+
buffer![10i32, 20, 30].into_array(),
215+
buffer![0i32; 0].into_array(),
216+
],
217+
i32_dtype(),
218+
)?;
219+
220+
let pairs = collect_pairs(&left, &right)?;
221+
assert_eq!(pairs.len(), 1);
222+
assert_eq!(pairs[0], (vec![1, 2, 3], vec![10, 20, 30], 0..3));
223+
Ok(())
224+
}
225+
226+
#[test]
227+
fn test_single_element_chunks() -> VortexResult<()> {
228+
let left = ChunkedArray::try_new(
229+
vec![
230+
buffer![1i32].into_array(),
231+
buffer![2i32].into_array(),
232+
buffer![3i32].into_array(),
233+
],
234+
i32_dtype(),
235+
)?;
236+
let right = ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype())?;
237+
238+
let pairs = collect_pairs(&left, &right)?;
239+
assert_eq!(pairs.len(), 3);
240+
assert_eq!(pairs[0], (vec![1], vec![10], 0..1));
241+
assert_eq!(pairs[1], (vec![2], vec![20], 1..2));
242+
assert_eq!(pairs[2], (vec![3], vec![30], 2..3));
243+
Ok(())
244+
}
245+
246+
#[test]
247+
fn test_both_empty() -> VortexResult<()> {
248+
let left = ChunkedArray::try_new(vec![], i32_dtype())?;
249+
let right = ChunkedArray::try_new(vec![], i32_dtype())?;
250+
251+
let pairs = collect_pairs(&left, &right)?;
252+
assert!(pairs.is_empty());
253+
Ok(())
254+
}
255+
256+
#[test]
257+
#[should_panic(expected = "paired_chunks requires arrays of equal length")]
258+
fn test_length_mismatch_panics() {
259+
let left = ChunkedArray::try_new(vec![buffer![1i32, 2].into_array()], i32_dtype()).unwrap();
260+
let right =
261+
ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype()).unwrap();
262+
263+
drop(left.paired_chunks(&right).collect::<Vec<_>>());
264+
}
265+
}

0 commit comments

Comments
 (0)