From b8165b1de86962e3b9fcd1477af8f29eaed82017 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Thu, 30 Apr 2026 14:13:11 +0200 Subject: [PATCH] perf[arrow-select]: add specialized REE interleave The specialized interleave works by preserving run ends as much as possible by coalescing groups of adjacent logical indices pointing to the same source and calling interleave on the run end values. Future work could additionally coalesce values across sources, but this requires a value equality check. Signed-off-by: Alfonso Subiotto Marques --- arrow-select/src/interleave.rs | 71 ++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs index f5904bc171ee..1bd34a7f3a58 100644 --- a/arrow-select/src/interleave.rs +++ b/arrow-select/src/interleave.rs @@ -108,6 +108,12 @@ pub fn interleave( DataType::Struct(fields) => interleave_struct(fields, values, indices), DataType::List(field) => interleave_list::(values, indices, field), DataType::LargeList(field) => interleave_list::(values, indices, field), + DataType::RunEndEncoded(r, _) => match r.data_type() { + DataType::Int16 => interleave_run_end::(values, indices), + DataType::Int32 => interleave_run_end::(values, indices), + DataType::Int64 => interleave_run_end::(values, indices), + t => unreachable!("illegal run-end type {t}"), + }, _ => interleave_fallback(values, indices) } } @@ -411,6 +417,71 @@ fn interleave_list( Ok(Arc::new(list_array)) } +/// Specialized [`interleave`] for [`RunArray`]. +fn interleave_run_end( + values: &[&dyn Array], + indices: &[(usize, usize)], +) -> Result { + if indices.is_empty() { + return Ok(new_empty_array(values[0].data_type())); + } + + let n = indices.len(); + R::Native::from_usize(n).ok_or_else(|| { + ArrowError::ComputeError(format!( + "interleave_run_end: output length {n} does not fit run-end type" + )) + })?; + + let runs: Vec<&RunArray> = values.iter().map(|a| a.as_run::()).collect(); + let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect(); + + // Resolve each (array, logical_row) to (array, physical_row), so we can + // lookup physical indices by batch. + let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n]; + let mut grouped: Vec<(Vec, Vec)> = + (0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect(); + for (out_pos, &(arr, row)) in indices.iter().enumerate() { + let row = R::Native::from_usize(row).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "interleave_run_end: row index {row} not representable as run-end type {}", + R::DATA_TYPE + )) + })?; + grouped[arr].0.push(row); + grouped[arr].1.push(out_pos); + } + for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() { + let phys = runs[arr_idx].get_physical_indices(&logical_rows)?; + for (p, out_pos) in phys.iter().zip(out_positions.iter()) { + phys_pairs[*out_pos] = (arr_idx, *p); + } + } + + // Coalesce by physical-pair equality only: emit a new run when the + // (array_idx, physical_idx) pair changes between adjacent output rows. + // TODO: We could perform an equality check across sources to extend the + // output run, but we can't call make_comparator from this crate. + let mut run_ends_buf: Vec = Vec::with_capacity(n); + let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n); + dedup_pairs.push(phys_pairs[0]); + for i in 1..n { + if phys_pairs[i] != phys_pairs[i - 1] { + run_ends_buf.push(R::Native::from_usize(i).unwrap()); + dedup_pairs.push(phys_pairs[i]); + } + } + run_ends_buf.push(R::Native::from_usize(n).unwrap()); + + let taken_values = interleave(&value_arrays, &dedup_pairs)?; + let run_ends = PrimitiveArray::::from_iter_values(run_ends_buf); + + Ok(Arc::new(RunArray::::try_new( + &run_ends, + taken_values.as_ref(), + )?)) +} + /// Fallback implementation of interleave using [`MutableArrayData`] fn interleave_fallback( values: &[&dyn Array],