Skip to content
4 changes: 2 additions & 2 deletions vortex-array/src/arrays/chunked/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl ChunkedArray {
#[inline]
pub fn chunk(&self, idx: usize) -> &ArrayRef {
assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");

&self.chunks[idx]
// SAFETY: bounds checked by the assert above.
unsafe { self.chunks.get_unchecked(idx) }
}

pub fn nchunks(&self) -> usize {
Expand Down
37 changes: 4 additions & 33 deletions vortex-array/src/arrays/chunked/compute/zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,10 @@ impl ZipKernel for Chunked {
.union_nullability(if_false.dtype().nullability());
let mut out_chunks = Vec::with_capacity(if_true.nchunks() + if_false.nchunks());

let mut lhs_idx = 0;
let mut rhs_idx = 0;
let mut lhs_offset = 0;
let mut rhs_offset = 0;
let mut pos = 0;
let total_len = if_true.len();

while pos < total_len {
let lhs_chunk = if_true.chunk(lhs_idx);
let rhs_chunk = if_false.chunk(rhs_idx);

let lhs_rem = lhs_chunk.len() - lhs_offset;
let rhs_rem = rhs_chunk.len() - rhs_offset;
let take_until = lhs_rem.min(rhs_rem);

let mask_slice = mask.slice(pos..pos + take_until)?;
let lhs_slice = lhs_chunk.slice(lhs_offset..lhs_offset + take_until)?;
let rhs_slice = rhs_chunk.slice(rhs_offset..rhs_offset + take_until)?;

out_chunks.push(mask_slice.zip(lhs_slice, rhs_slice)?);

pos += take_until;
lhs_offset += take_until;
rhs_offset += take_until;

if lhs_offset == lhs_chunk.len() {
lhs_idx += 1;
lhs_offset = 0;
}
if rhs_offset == rhs_chunk.len() {
rhs_idx += 1;
rhs_offset = 0;
}
for pair in if_true.paired_chunks(if_false) {
let pair = pair?;
let mask_slice = mask.slice(pair.pos)?;
out_chunks.push(mask_slice.zip(pair.left, pair.right)?);
}

// SAFETY: chunks originate from zipping slices of inputs that share dtype/nullability.
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod array;
pub use array::ChunkedArray;

pub(crate) mod compute;
mod paired_chunks;

mod vtable;
pub use vtable::Chunked;
Expand Down
265 changes: 265 additions & 0 deletions vortex-array/src/arrays/chunked/paired_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::ops::Range;

use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::arrays::ChunkedArray;

pub(crate) struct AlignedPair {
pub left: ArrayRef,
pub right: ArrayRef,
pub pos: Range<usize>,
}

/// Cursor over a chunk slice that maintains the invariant: `idx` always
/// points at a non-empty chunk or is past the end.
struct ChunkCursor<'a> {
chunks: &'a [ArrayRef],
idx: usize,
offset: usize,
}

impl<'a> ChunkCursor<'a> {
fn new(chunks: &'a [ArrayRef]) -> Self {
let mut cursor = Self {
chunks,
idx: 0,
offset: 0,
};
cursor.skip_empty();
cursor
}

fn skip_empty(&mut self) {
while self.idx < self.chunks.len()
&& unsafe { self.chunks.get_unchecked(self.idx) }.is_empty()
{
self.idx += 1;
}
}

fn current_chunk(&self) -> Option<&'a ArrayRef> {
(self.idx < self.chunks.len()).then(|| unsafe { self.chunks.get_unchecked(self.idx) })
}

fn remaining(&self, chunk: &ArrayRef) -> usize {
chunk.len() - self.offset
}

fn take(&mut self, chunk: &ArrayRef, n: usize) -> VortexResult<ArrayRef> {
let slice = chunk.slice(self.offset..self.offset + n)?;
self.offset += n;
if self.offset == chunk.len() {
self.idx += 1;
self.offset = 0;
self.skip_empty();
}
Ok(slice)
}
}

pub(crate) struct PairedChunks<'a> {
left: ChunkCursor<'a>,
right: ChunkCursor<'a>,
pos: usize,
total_len: usize,
}

impl ChunkedArray {
pub(crate) fn paired_chunks<'a>(&'a self, other: &'a ChunkedArray) -> PairedChunks<'a> {
assert_eq!(
self.len(),
other.len(),
"paired_chunks requires arrays of equal length"
);
PairedChunks {
left: ChunkCursor::new(&self.chunks),
right: ChunkCursor::new(&other.chunks),
pos: 0,
total_len: self.len(),
}
}
}

impl Iterator for PairedChunks<'_> {
type Item = VortexResult<AlignedPair>;

fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.total_len {
return None;
}

let lhs_chunk = self.left.current_chunk()?;
let rhs_chunk = self.right.current_chunk()?;

let take = self
.left
.remaining(lhs_chunk)
.min(self.right.remaining(rhs_chunk));

let (lhs_slice, rhs_slice) = match self
.left
.take(lhs_chunk, take)
.and_then(|l| self.right.take(rhs_chunk, take).map(|r| (l, r)))
{
Ok(pair) => pair,
Err(e) => return Some(Err(e)),
};

let start = self.pos;
self.pos += take;

Some(Ok(AlignedPair {
left: lhs_slice,
right: rhs_slice,
pos: start..self.pos,
}))
}
}

#[cfg(test)]
mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use crate::IntoArray;
use crate::arrays::ChunkedArray;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;

fn i32_dtype() -> DType {
DType::Primitive(PType::I32, Nullability::NonNullable)
}

#[allow(clippy::type_complexity)]
fn collect_pairs(
left: &ChunkedArray,
right: &ChunkedArray,
) -> VortexResult<Vec<(Vec<i32>, Vec<i32>, std::ops::Range<usize>)>> {
use crate::ToCanonical;
let mut result = Vec::new();
for pair in left.paired_chunks(right) {
let pair = pair?;
let l: Vec<i32> = pair.left.to_primitive().as_slice::<i32>().to_vec();
let r: Vec<i32> = pair.right.to_primitive().as_slice::<i32>().to_vec();
result.push((l, r, pair.pos));
}
Ok(result)
}

#[test]
fn test_aligned_chunks() -> VortexResult<()> {
let left = ChunkedArray::try_new(
vec![buffer![1i32, 2].into_array(), buffer![3i32, 4].into_array()],
i32_dtype(),
)?;
let right = ChunkedArray::try_new(
vec![
buffer![10i32, 20].into_array(),
buffer![30i32, 40].into_array(),
],
i32_dtype(),
)?;

let pairs = collect_pairs(&left, &right)?;
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0], (vec![1, 2], vec![10, 20], 0..2));
assert_eq!(pairs[1], (vec![3, 4], vec![30, 40], 2..4));
Ok(())
}

#[test]
fn test_misaligned_chunks() -> VortexResult<()> {
let left = ChunkedArray::try_new(
vec![
buffer![1i32, 2].into_array(),
buffer![3i32].into_array(),
buffer![4i32, 5].into_array(),
],
i32_dtype(),
)?;
let right = ChunkedArray::try_new(
vec![
buffer![10i32].into_array(),
buffer![20i32, 30].into_array(),
buffer![40i32, 50].into_array(),
],
i32_dtype(),
)?;

let pairs = collect_pairs(&left, &right)?;
assert_eq!(pairs.len(), 4);
assert_eq!(pairs[0], (vec![1], vec![10], 0..1));
assert_eq!(pairs[1], (vec![2], vec![20], 1..2));
assert_eq!(pairs[2], (vec![3], vec![30], 2..3));
assert_eq!(pairs[3], (vec![4, 5], vec![40, 50], 3..5));
Ok(())
}

#[test]
fn test_empty_chunks() -> VortexResult<()> {
let left = ChunkedArray::try_new(
vec![
buffer![0i32; 0].into_array(),
buffer![1i32, 2, 3].into_array(),
],
i32_dtype(),
)?;
let right = ChunkedArray::try_new(
vec![
buffer![10i32, 20, 30].into_array(),
buffer![0i32; 0].into_array(),
],
i32_dtype(),
)?;

let pairs = collect_pairs(&left, &right)?;
assert_eq!(pairs.len(), 1);
assert_eq!(pairs[0], (vec![1, 2, 3], vec![10, 20, 30], 0..3));
Ok(())
}

#[test]
fn test_single_element_chunks() -> VortexResult<()> {
let left = ChunkedArray::try_new(
vec![
buffer![1i32].into_array(),
buffer![2i32].into_array(),
buffer![3i32].into_array(),
],
i32_dtype(),
)?;
let right = ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype())?;

let pairs = collect_pairs(&left, &right)?;
assert_eq!(pairs.len(), 3);
assert_eq!(pairs[0], (vec![1], vec![10], 0..1));
assert_eq!(pairs[1], (vec![2], vec![20], 1..2));
assert_eq!(pairs[2], (vec![3], vec![30], 2..3));
Ok(())
}

#[test]
fn test_both_empty() -> VortexResult<()> {
let left = ChunkedArray::try_new(vec![], i32_dtype())?;
let right = ChunkedArray::try_new(vec![], i32_dtype())?;

let pairs = collect_pairs(&left, &right)?;
assert!(pairs.is_empty());
Ok(())
}

#[test]
#[should_panic(expected = "paired_chunks requires arrays of equal length")]
fn test_length_mismatch_panics() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you choose to retain Option<Result<Pair>>, add a test where slicing one of chunks fails

let left = ChunkedArray::try_new(vec![buffer![1i32, 2].into_array()], i32_dtype()).unwrap();
let right =
ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype()).unwrap();

drop(left.paired_chunks(&right).collect::<Vec<_>>());
}
}
Loading
Loading