Skip to content

Commit fb587d7

Browse files
committed
.
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 993bcd6 commit fb587d7

3 files changed

Lines changed: 125 additions & 11 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/runend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ arbitrary = { workspace = true, optional = true }
1818
arrow-array = { workspace = true, optional = true }
1919
itertools = { workspace = true }
2020
num-traits = { workspace = true }
21+
parking_lot = { workspace = true }
2122
prost = { workspace = true }
2223
vortex-array = { workspace = true }
2324
vortex-buffer = { workspace = true }

encodings/runend/src/compute/filter.rs

Lines changed: 123 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
use std::cmp::min;
55
use std::ops::AddAssign;
6-
use std::sync::Mutex;
7-
use std::sync::MutexGuard;
86
use std::sync::atomic::AtomicU8;
97
use std::sync::atomic::Ordering;
108

119
use num_traits::AsPrimitive;
10+
use parking_lot::Mutex;
11+
use parking_lot::MutexGuard;
1212
use vortex_array::ArrayRef;
1313
use vortex_array::ArrayView;
1414
use vortex_array::ExecutionCtx;
@@ -28,15 +28,18 @@ use vortex_mask::MaskValues;
2828
use crate::_benchmarking::RunEndFilterMode;
2929
use crate::RunEnd;
3030
use crate::compute::take::take_indices_unchecked;
31-
const FILTER_TAKE_THRESHOLD: f64 = 0.1;
31+
32+
const FILTER_TAKE_MIN_TRUE_COUNT: usize = 25;
33+
const FILTER_ENCODED_DENSITY_SHIFT: usize = 3;
34+
const FILTER_ENCODED_MIN_TRUES_PER_RUN: usize = 32;
35+
const FILTER_ENCODED_MAX_SLICE_COUNT: usize = 32;
36+
const FILTER_ENCODED_MIN_AVG_SLICE_LEN: usize = 256;
37+
3238
static FILTER_MODE_OVERRIDE: AtomicU8 = AtomicU8::new(RunEndFilterMode::Auto.as_u8());
3339
static FILTER_MODE_OVERRIDE_LOCK: Mutex<()> = Mutex::new(());
3440

3541
pub(crate) fn override_run_end_filter_mode(mode: RunEndFilterMode) -> impl Drop {
36-
let lock = match FILTER_MODE_OVERRIDE_LOCK.lock() {
37-
Ok(lock) => lock,
38-
Err(err) => err.into_inner(),
39-
};
42+
let lock = FILTER_MODE_OVERRIDE_LOCK.lock();
4043
let previous_mode = current_filter_mode();
4144
FILTER_MODE_OVERRIDE.store(mode.as_u8(), Ordering::SeqCst);
4245

@@ -140,13 +143,26 @@ fn select_filter_path(array: ArrayView<'_, RunEnd>, mask_values: &MaskValues) ->
140143
}
141144

142145
fn auto_filter_path(array: ArrayView<'_, RunEnd>, mask_values: &MaskValues) -> FilterPath {
146+
let len = array.len();
147+
let run_count = array.ends().len();
143148
let true_count = mask_values.true_count();
144-
let runs_ratio = true_count as f64 / array.ends().len() as f64;
149+
let slice_count = mask_values.slices().len();
150+
let average_slice_len = true_count.div_ceil(slice_count);
145151

146-
if runs_ratio < FILTER_TAKE_THRESHOLD || true_count < 25 {
147-
FilterPath::Take
148-
} else {
152+
if true_count < FILTER_TAKE_MIN_TRUE_COUNT {
153+
return FilterPath::Take;
154+
}
155+
156+
let dense_selection = true_count.saturating_mul(1 << FILTER_ENCODED_DENSITY_SHIFT) >= len;
157+
let localized_selection = true_count
158+
>= run_count.saturating_mul(FILTER_ENCODED_MIN_TRUES_PER_RUN)
159+
&& slice_count <= FILTER_ENCODED_MAX_SLICE_COUNT
160+
&& average_slice_len >= FILTER_ENCODED_MIN_AVG_SLICE_LEN;
161+
162+
if dense_selection || localized_selection {
149163
FilterPath::Encoded
164+
} else {
165+
FilterPath::Take
150166
}
151167
}
152168

@@ -192,12 +208,18 @@ fn filter_run_end_primitive<R: NativePType + AddAssign + From<bool> + AsPrimitiv
192208

193209
#[cfg(test)]
194210
mod tests {
211+
#![allow(clippy::cast_possible_truncation)]
212+
213+
use vortex_array::ArrayRef;
195214
use vortex_array::IntoArray;
196215
use vortex_array::arrays::PrimitiveArray;
197216
use vortex_array::assert_arrays_eq;
217+
use vortex_buffer::Buffer;
198218
use vortex_error::VortexResult;
199219
use vortex_mask::Mask;
200220

221+
use super::FilterPath;
222+
use super::select_filter_path;
201223
use crate::RunEnd;
202224
use crate::RunEndArray;
203225

@@ -206,6 +228,60 @@ mod tests {
206228
.unwrap()
207229
}
208230

231+
fn run_end_fixture(run_length: usize, len: usize) -> ArrayRef {
232+
let run_count = len.div_ceil(run_length);
233+
let ends = (0..run_count)
234+
.map(|run_idx| ((run_idx + 1) * run_length).min(len) as u32)
235+
.collect::<Buffer<_>>()
236+
.into_array();
237+
let values =
238+
PrimitiveArray::from_iter((0..run_count).map(|run_idx| run_idx as i32)).into_array();
239+
240+
RunEnd::new(ends, values).into_array()
241+
}
242+
243+
fn run_end_offset_fixture(
244+
run_length: usize,
245+
total_len: usize,
246+
offset: usize,
247+
len: usize,
248+
) -> VortexResult<ArrayRef> {
249+
let run_count = total_len.div_ceil(run_length);
250+
let ends = (0..run_count)
251+
.map(|run_idx| ((run_idx + 1) * run_length).min(total_len) as u32)
252+
.collect::<Buffer<_>>()
253+
.into_array();
254+
let values =
255+
PrimitiveArray::from_iter((0..run_count).map(|run_idx| run_idx as i32)).into_array();
256+
257+
Ok(RunEnd::try_new_offset_length(ends, values, offset, len)?.into_array())
258+
}
259+
260+
fn sparse_random_mask(len: usize, true_count: usize) -> Mask {
261+
let mut indices = (0..true_count)
262+
.map(|idx| (idx * 7_919) % len)
263+
.collect::<Vec<_>>();
264+
indices.sort_unstable();
265+
266+
Mask::from_indices(len, indices)
267+
}
268+
269+
fn sparse_clustered_mask(len: usize) -> Mask {
270+
Mask::from_slices(len, vec![(1_024, 1_536), (8_192, 8_704)])
271+
}
272+
273+
fn sparse_clustered_mask_for_slice(len: usize) -> Mask {
274+
Mask::from_slices(len, vec![(1_024, 1_536), (6_144, 6_656)])
275+
}
276+
277+
fn filter_path(array: &ArrayRef, mask: &Mask) -> FilterPath {
278+
select_filter_path(
279+
array.as_::<RunEnd>(),
280+
mask.values()
281+
.expect("heuristic tests require a partial filter mask"),
282+
)
283+
}
284+
209285
#[test]
210286
fn filter_sliced_run_end() -> VortexResult<()> {
211287
let arr = ree_array().slice(2..7).unwrap();
@@ -220,4 +296,40 @@ mod tests {
220296
);
221297
Ok(())
222298
}
299+
300+
#[test]
301+
fn heuristic_prefers_take_for_sparse_random_mask() -> VortexResult<()> {
302+
let array = run_end_fixture(1_024, 16_384);
303+
let mask = sparse_random_mask(array.len(), 1_024);
304+
305+
assert_eq!(filter_path(&array, &mask), FilterPath::Take);
306+
Ok(())
307+
}
308+
309+
#[test]
310+
fn heuristic_prefers_encoded_for_sparse_clustered_mask() -> VortexResult<()> {
311+
let array = run_end_fixture(1_024, 16_384);
312+
let mask = sparse_clustered_mask(array.len());
313+
314+
assert_eq!(filter_path(&array, &mask), FilterPath::Encoded);
315+
Ok(())
316+
}
317+
318+
#[test]
319+
fn heuristic_prefers_take_for_sparse_random_mask_on_slice() -> VortexResult<()> {
320+
let array = run_end_offset_fixture(1_024, 16_384, 1_024, 14_336)?;
321+
let mask = sparse_random_mask(array.len(), 1_024);
322+
323+
assert_eq!(filter_path(&array, &mask), FilterPath::Take);
324+
Ok(())
325+
}
326+
327+
#[test]
328+
fn heuristic_prefers_encoded_for_sparse_clustered_mask_on_slice() -> VortexResult<()> {
329+
let array = run_end_offset_fixture(1_024, 16_384, 1_024, 14_336)?;
330+
let mask = sparse_clustered_mask_for_slice(array.len());
331+
332+
assert_eq!(filter_path(&array, &mask), FilterPath::Encoded);
333+
Ok(())
334+
}
223335
}

0 commit comments

Comments
 (0)