Skip to content

Commit f2c3432

Browse files
committed
.
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent fb587d7 commit f2c3432

2 files changed

Lines changed: 93 additions & 20 deletions

File tree

encodings/runend/benches/run_end_filter.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ const TRUE_COUNT: usize = 32_768;
2929
const LONG_SLICE_COUNT: usize = 8;
3030
const SHORT_SLICE_LEN: usize = 8;
3131
const CLUSTER_COUNT: usize = 8;
32+
const LONG_RUN_HEAVY_SLICE_COUNT: usize = 4;
3233

3334
#[derive(Clone, Copy, Debug)]
3435
enum MaskShape {
3536
Random,
3637
FewLongSlices,
3738
ManyShortSlices,
3839
ClusteredFewRuns,
40+
LongRunHeavy,
3941
}
4042

4143
impl fmt::Display for MaskShape {
@@ -45,6 +47,7 @@ impl fmt::Display for MaskShape {
4547
Self::FewLongSlices => write!(f, "few_long_slices"),
4648
Self::ManyShortSlices => write!(f, "many_short_slices"),
4749
Self::ClusteredFewRuns => write!(f, "clustered_few_runs"),
50+
Self::LongRunHeavy => write!(f, "long_run_heavy"),
4851
}
4952
}
5053
}
@@ -110,6 +113,18 @@ const BENCH_ARGS: &[BenchArgs] = &[
110113
run_length: 4096,
111114
mask_shape: MaskShape::ClusteredFewRuns,
112115
},
116+
BenchArgs {
117+
run_length: 4096,
118+
mask_shape: MaskShape::LongRunHeavy,
119+
},
120+
BenchArgs {
121+
run_length: 16_384,
122+
mask_shape: MaskShape::LongRunHeavy,
123+
},
124+
BenchArgs {
125+
run_length: 65_536,
126+
mask_shape: MaskShape::LongRunHeavy,
127+
},
113128
];
114129

115130
#[divan::bench(args = BENCH_ARGS)]
@@ -168,6 +183,7 @@ fn mask_fixture(mask_shape: MaskShape, run_length: usize) -> Mask {
168183
MaskShape::FewLongSlices => few_long_slices_mask(run_length),
169184
MaskShape::ManyShortSlices => many_short_slices_mask(run_length),
170185
MaskShape::ClusteredFewRuns => clustered_few_runs_mask(run_length),
186+
MaskShape::LongRunHeavy => long_run_heavy_mask(run_length),
171187
}
172188
}
173189

@@ -233,3 +249,22 @@ fn clustered_few_runs_mask(run_length: usize) -> Mask {
233249

234250
Mask::from_slices(LEN, slices)
235251
}
252+
253+
fn long_run_heavy_mask(run_length: usize) -> Mask {
254+
let run_count = LEN.div_ceil(run_length);
255+
let slice_count = LONG_RUN_HEAVY_SLICE_COUNT.min(run_count);
256+
let slice_len = (run_length * 3) / 4;
257+
let misalignment = (run_length - slice_len).min(13);
258+
let spacing = run_count / slice_count;
259+
260+
let slices = (0..slice_count)
261+
.map(|slice_idx| {
262+
let start_run = slice_idx * spacing;
263+
let start = start_run * run_length + misalignment;
264+
let end = (start + slice_len).min(LEN);
265+
(start, end)
266+
})
267+
.collect();
268+
269+
Mask::from_slices(LEN, slices)
270+
}

encodings/runend/src/compute/filter.rs

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::cmp::min;
5-
use std::ops::AddAssign;
65
use std::sync::atomic::AtomicU8;
76
use std::sync::atomic::Ordering;
87

98
use num_traits::AsPrimitive;
9+
use num_traits::NumCast;
1010
use parking_lot::Mutex;
1111
use parking_lot::MutexGuard;
1212
use vortex_array::ArrayRef;
@@ -71,8 +71,8 @@ impl FilterKernel for RunEnd {
7171
match_each_unsigned_integer_ptype!(primitive_run_ends.ptype(), |P| {
7272
filter_run_end_primitive(
7373
primitive_run_ends.as_slice::<P>(),
74-
array.offset() as u64,
75-
array.len() as u64,
74+
array.offset(),
75+
array.len(),
7676
mask_values.bit_buffer(),
7777
)?
7878
});
@@ -166,33 +166,29 @@ fn auto_filter_path(array: ArrayView<'_, RunEnd>, mask_values: &MaskValues) -> F
166166
}
167167
}
168168

169-
// Code adapted from apache arrow-rs https://github.com/apache/arrow-rs/blob/b1f5c250ebb6c1252b4e7c51d15b8e77f4c361fa/arrow-select/src/filter.rs#L425
170-
fn filter_run_end_primitive<R: NativePType + AddAssign + From<bool> + AsPrimitive<u64>>(
169+
fn filter_run_end_primitive<R: NativePType + AsPrimitive<usize>>(
171170
run_ends: &[R],
172-
offset: u64,
173-
length: u64,
171+
offset: usize,
172+
length: usize,
174173
mask: &BitBuffer,
175174
) -> VortexResult<(PrimitiveArray, Mask)> {
176175
let mut new_run_ends = buffer_mut![R::zero(); run_ends.len()];
177176

178-
let mut start = 0u64;
177+
let mut start = 0usize;
179178
let mut j = 0;
180-
let mut count = R::zero();
179+
let mut count = 0u64;
181180

182181
let new_mask: Mask = BitBuffer::collect_bool(run_ends.len(), |i| {
183-
let mut keep = false;
184182
let end = min(run_ends[i].as_() - offset, length);
185-
186-
// Safety: predicate must be the same length as the array the ends have been taken from
187-
for pred in (start..end).map(|i| unsafe {
188-
mask.value_unchecked(i.try_into().vortex_expect("index must fit in usize"))
189-
}) {
190-
count += <R as From<bool>>::from(pred);
191-
keep |= pred
183+
let run_true_count = mask.slice(start..end).true_count() as u64;
184+
let keep = run_true_count != 0;
185+
186+
count += run_true_count;
187+
if keep {
188+
new_run_ends[j] =
189+
NumCast::from(count).vortex_expect("filtered run end count must fit in run-end type");
190+
j += 1;
192191
}
193-
// this is to avoid branching
194-
new_run_ends[j] = count;
195-
j += keep as usize;
196192

197193
start = end;
198194
keep
@@ -212,14 +208,18 @@ mod tests {
212208

213209
use vortex_array::ArrayRef;
214210
use vortex_array::IntoArray;
211+
use vortex_array::LEGACY_SESSION;
212+
use vortex_array::VortexSessionExecute;
215213
use vortex_array::arrays::PrimitiveArray;
216214
use vortex_array::assert_arrays_eq;
217215
use vortex_buffer::Buffer;
218216
use vortex_error::VortexResult;
219217
use vortex_mask::Mask;
220218

221219
use super::FilterPath;
220+
use super::override_run_end_filter_mode;
222221
use super::select_filter_path;
222+
use crate::_benchmarking::RunEndFilterMode;
223223
use crate::RunEnd;
224224
use crate::RunEndArray;
225225

@@ -282,6 +282,20 @@ mod tests {
282282
)
283283
}
284284

285+
fn filter_with_mode(array: &ArrayRef, mask: Mask, mode: RunEndFilterMode) -> VortexResult<ArrayRef> {
286+
let _guard = override_run_end_filter_mode(mode);
287+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
288+
array.filter(mask)?.execute::<ArrayRef>(&mut ctx)
289+
}
290+
291+
fn assert_encoded_filter_matches_take(array: &ArrayRef, mask: Mask) -> VortexResult<()> {
292+
let take_filtered = filter_with_mode(array, mask.clone(), RunEndFilterMode::Take)?;
293+
let encoded_filtered = filter_with_mode(array, mask, RunEndFilterMode::Encoded)?;
294+
295+
assert_arrays_eq!(encoded_filtered, take_filtered);
296+
Ok(())
297+
}
298+
285299
#[test]
286300
fn filter_sliced_run_end() -> VortexResult<()> {
287301
let arr = ree_array().slice(2..7).unwrap();
@@ -332,4 +346,28 @@ mod tests {
332346
assert_eq!(filter_path(&array, &mask), FilterPath::Encoded);
333347
Ok(())
334348
}
349+
350+
#[test]
351+
fn encoded_filter_matches_take_on_partial_word_boundaries() -> VortexResult<()> {
352+
let array = run_end_fixture(65, 260);
353+
let mask = Mask::from_slices(array.len(), vec![(3, 64), (67, 129), (133, 194), (197, 259)]);
354+
355+
assert_encoded_filter_matches_take(&array, mask)
356+
}
357+
358+
#[test]
359+
fn encoded_filter_matches_take_on_clustered_masks() -> VortexResult<()> {
360+
let array = run_end_fixture(1_024, 16_384);
361+
let mask = Mask::from_slices(array.len(), vec![(13, 513), (4_109, 4_733), (9_001, 10_129)]);
362+
363+
assert_encoded_filter_matches_take(&array, mask)
364+
}
365+
366+
#[test]
367+
fn encoded_filter_matches_take_on_very_short_runs() -> VortexResult<()> {
368+
let array = run_end_fixture(1, 64);
369+
let mask = Mask::from_slices(array.len(), vec![(1, 3), (5, 8), (13, 14), (21, 25), (34, 35)]);
370+
371+
assert_encoded_filter_matches_take(&array, mask)
372+
}
335373
}

0 commit comments

Comments
 (0)