Skip to content

Commit 6fd0803

Browse files
committed
perf[arrow-select]: add specialized REE interleave
The specialized interleave works by preserving run ends as much as possible by coalescing groups of adjacent logical indices pointing to the same source and calling interleave on the run end values. Future work could additionally coalesce values across sources, but this requires a value equality check. Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent b114241 commit 6fd0803

1 file changed

Lines changed: 71 additions & 0 deletions

File tree

arrow-select/src/interleave.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ pub fn interleave(
108108
DataType::Struct(fields) => interleave_struct(fields, values, indices),
109109
DataType::List(field) => interleave_list::<i32>(values, indices, field),
110110
DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
111+
DataType::RunEndEncoded(r, _) => match r.data_type() {
112+
DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
113+
DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
114+
DataType::Int64 => interleave_run_end::<Int64Type>(values, indices),
115+
t => unreachable!("illegal run-end type {t}"),
116+
},
111117
_ => interleave_fallback(values, indices)
112118
}
113119
}
@@ -411,6 +417,71 @@ fn interleave_list<O: OffsetSizeTrait>(
411417
Ok(Arc::new(list_array))
412418
}
413419

420+
/// Specialized [`interleave`] for [`RunArray`].
421+
fn interleave_run_end<R: RunEndIndexType>(
422+
values: &[&dyn Array],
423+
indices: &[(usize, usize)],
424+
) -> Result<ArrayRef, ArrowError> {
425+
if indices.is_empty() {
426+
return Ok(new_empty_array(values[0].data_type()));
427+
}
428+
429+
let n = indices.len();
430+
R::Native::from_usize(n).ok_or_else(|| {
431+
ArrowError::ComputeError(format!(
432+
"interleave_run_end: output length {n} does not fit run-end type"
433+
))
434+
})?;
435+
436+
let runs: Vec<&RunArray<R>> = values.iter().map(|a| a.as_run::<R>()).collect();
437+
let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect();
438+
439+
// Resolve each (array, logical_row) to (array, physical_row), so we can
440+
// lookup physical indices by batch.
441+
let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n];
442+
let mut grouped: Vec<(Vec<R::Native>, Vec<usize>)> =
443+
(0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect();
444+
for (out_pos, &(arr, row)) in indices.iter().enumerate() {
445+
let row = R::Native::from_usize(row).ok_or_else(|| {
446+
ArrowError::InvalidArgumentError(format!(
447+
"interleave_run_end: row index {row} not representable as run-end type {}",
448+
R::DATA_TYPE
449+
))
450+
})?;
451+
grouped[arr].0.push(row);
452+
grouped[arr].1.push(out_pos);
453+
}
454+
for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() {
455+
let phys = runs[arr_idx].get_physical_indices(&logical_rows)?;
456+
for (p, out_pos) in phys.iter().zip(out_positions.iter()) {
457+
phys_pairs[*out_pos] = (arr_idx, *p);
458+
}
459+
}
460+
461+
// Coalesce by physical-pair equality only: emit a new run when the
462+
// (array_idx, physical_idx) pair changes between adjacent output rows.
463+
// TODO: We could perform an equality check across sources to extend the
464+
// output run, but we can't call make_comparator from this crate.
465+
let mut run_ends_buf: Vec<R::Native> = Vec::with_capacity(n);
466+
let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n);
467+
dedup_pairs.push(phys_pairs[0]);
468+
for i in 1..n {
469+
if phys_pairs[i] != phys_pairs[i - 1] {
470+
run_ends_buf.push(R::Native::from_usize(i).unwrap());
471+
dedup_pairs.push(phys_pairs[i]);
472+
}
473+
}
474+
run_ends_buf.push(R::Native::from_usize(n).unwrap());
475+
476+
let taken_values = interleave(&value_arrays, &dedup_pairs)?;
477+
let run_ends = PrimitiveArray::<R>::from_iter_values(run_ends_buf);
478+
479+
Ok(Arc::new(RunArray::<R>::try_new(
480+
&run_ends,
481+
taken_values.as_ref(),
482+
)?))
483+
}
484+
414485
/// Fallback implementation of interleave using [`MutableArrayData`]
415486
fn interleave_fallback(
416487
values: &[&dyn Array],

0 commit comments

Comments
 (0)