Skip to content

Commit 87011ec

Browse files
committed
OnPair: fast LIKE on compressed codes (PrefixAutomaton + bloom + filter ptype fix)
LIKE pushdown rewritten using OnPair's own ideas (see onpair_cpp/include/onpair/search/automata/prefix_automaton.h and …/aho_corasick_automaton.h): * `prefix%` PrefixAutomaton — LPM-tokenise the prefix, precompute `prefix_range` intervals for each query position via binary search over the lex-sorted dict. Per-row scan is `≤ q + 1` u16 comparisons + one interval check, no decode at all. ~7 ns/row on UrlLog 1M. * `%sub%` ContainsBloom — per-dict-entry bits for "this token contains the substring" and "some suffix of this token could start a cross-token match". Most rows resolve from the bloom alone; the rest fall through to per-row decode + memmem. * `'lit'` Token-equality (already pushed via Compare). Re-registers Like in PARENT_KERNELS. Also fixes a panic in the share-dict filter: "Attempted to get slice of type u32 from array of type u16" — codes_offsets can be narrowed by the cascading compressor. Read it through `match_each_integer_ptype!` instead of hard-coding `u32`. Local bench (UrlLog, 1M rows): like_prefix 7.2 ms (~7 ns/row) like_contains 24.1 ms (~24 ns/row, decode only when bloom uncertain) eq_constant 6.5 ms filter 5.2 ms Signed-off-by: claude <claude@anthropic.com>
1 parent 18f0cf2 commit 87011ec

5 files changed

Lines changed: 464 additions & 91 deletions

File tree

encodings/onpair/src/compute/filter.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,41 +46,49 @@ impl FilterKernel for OnPair {
4646
.clone()
4747
.execute::<PrimitiveArray>(ctx)?;
4848
let codes_arr = array.codes().clone().execute::<PrimitiveArray>(ctx)?;
49-
let codes_offsets = codes_offsets_arr.as_slice::<u32>();
50-
51-
// First pass: sum the surviving token count so we reserve once.
52-
let mut new_codes_len: usize = 0;
53-
for r in 0..n_in {
54-
if mask.value(r) {
55-
new_codes_len += (codes_offsets[r + 1] - codes_offsets[r]) as usize;
56-
}
57-
}
5849

5950
let mut new_codes_offsets = BufferMut::<u32>::with_capacity(n_out + 1);
60-
// SAFETY: capacity reserved.
61-
unsafe { new_codes_offsets.push_unchecked(0u32) };
6251

63-
let new_codes: ArrayRef = match_each_integer_ptype!(codes_arr.ptype(), |P| {
64-
let codes = codes_arr.as_slice::<P>();
65-
let mut out = BufferMut::<P>::with_capacity(new_codes_len);
66-
let mut cursor: u32 = 0;
52+
// The cascading compressor may have narrowed `codes_offsets`
53+
// (e.g. u32 → u16 if every row's token count is small). Read
54+
// through whatever ptype it lives at — the values still fit in
55+
// `usize` when widened. Likewise for `codes`.
56+
let new_codes: ArrayRef = match_each_integer_ptype!(codes_offsets_arr.ptype(), |OP| {
57+
let codes_offsets = codes_offsets_arr.as_slice::<OP>();
58+
59+
// First pass: sum the surviving token count so we reserve once.
60+
let mut new_codes_len: usize = 0;
6761
for r in 0..n_in {
6862
if mask.value(r) {
69-
let lo = codes_offsets[r] as usize;
70-
let hi = codes_offsets[r + 1] as usize;
71-
// SAFETY: codes_offsets validated at construction.
72-
let segment = unsafe { codes.get_unchecked(lo..hi) };
73-
out.extend_from_slice(segment);
74-
let segment_len = u32::try_from(hi - lo)
75-
.map_err(|_| vortex_err!("token segment overflows u32"))?;
76-
cursor = cursor
77-
.checked_add(segment_len)
78-
.ok_or_else(|| vortex_err!("codes_offsets overflow u32"))?;
79-
// SAFETY: capacity reserved (n_out + 1 entries).
80-
unsafe { new_codes_offsets.push_unchecked(cursor) };
63+
new_codes_len += (codes_offsets[r + 1] as usize) - (codes_offsets[r] as usize);
8164
}
8265
}
83-
out.freeze().into_array()
66+
67+
// SAFETY: capacity reserved.
68+
unsafe { new_codes_offsets.push_unchecked(0u32) };
69+
70+
match_each_integer_ptype!(codes_arr.ptype(), |P| {
71+
let codes = codes_arr.as_slice::<P>();
72+
let mut out = BufferMut::<P>::with_capacity(new_codes_len);
73+
let mut cursor: u32 = 0;
74+
for r in 0..n_in {
75+
if mask.value(r) {
76+
let lo = codes_offsets[r] as usize;
77+
let hi = codes_offsets[r + 1] as usize;
78+
// SAFETY: codes_offsets validated at construction.
79+
let segment = unsafe { codes.get_unchecked(lo..hi) };
80+
out.extend_from_slice(segment);
81+
let segment_len = u32::try_from(hi - lo)
82+
.map_err(|_| vortex_err!("token segment overflows u32"))?;
83+
cursor = cursor
84+
.checked_add(segment_len)
85+
.ok_or_else(|| vortex_err!("codes_offsets overflow u32"))?;
86+
// SAFETY: capacity reserved (n_out + 1 entries).
87+
unsafe { new_codes_offsets.push_unchecked(cursor) };
88+
}
89+
}
90+
out.freeze().into_array()
91+
})
8492
});
8593

8694
// uncompressed_lengths + validity flow through the standard

encodings/onpair/src/compute/like.rs

Lines changed: 44 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,20 @@
55
//! everything else returns `None` so the caller decompresses + runs the
66
//! scalar `LIKE` on the canonical bytes.
77
//!
8-
//! * `'literal'` — token-aware equality. LPM-tokenise the literal once
8+
//! * `'literal'` — token-aware equality (LPM-tokenise the literal once
99
//! and compare the row's `codes[lo..hi]` against the tokenised needle
10-
//! as `&[u16]`. Full byte equality is exactly equivalent to full LPM
11-
//! token-sequence equality, so this is sound and skips row decode
12-
//! entirely.
13-
//! * `'prefix%'` — byte-streaming via `DecodeView::for_each_dict_slice`
14-
//! with a single length check up front. The naive "tokenise the
15-
//! prefix and compare token prefix" trick is **wrong** because the
16-
//! LPM of the row's leading bytes may extend its last token past the
17-
//! literal prefix's tokenisation boundary. Streaming dict slices and
18-
//! comparing prefix-wise is the correct minimum-work option.
19-
//! * `'%substring%'` — decode each row into a small reusable scratch
20-
//! buffer and run `memchr::memmem::Finder::find`, which is SIMD-
21-
//! accelerated (SSE2/AVX2 on x86_64, NEON on aarch64) and Two-Way
22-
//! underneath. The `Finder` is built once per kernel call and reused
23-
//! across every row.
10+
//! as `&[u16]`). No row decode.
11+
//! * `'prefix%'` — OnPair-style [`PrefixAutomaton`][crate::dfa::PrefixAutomaton]:
12+
//! tokenise the prefix and precompute valid-divergence intervals for
13+
//! each query position. Per-row scan is `≤ q + 1` `u16` comparisons
14+
//! plus one interval check; no decode at all in the hot path.
15+
//! * `'%substring%'` — dict-bloom skip + `memchr::memmem` over the
16+
//! decoded row only when needed.
17+
//! [`ContainsBloom`][crate::dfa::ContainsBloom] precomputes "this
18+
//! dict entry contains the substring" and "some suffix of this entry
19+
//! could start a cross-token match". Most rows resolve via the bloom
20+
//! without touching `dict_bytes`; the rest fall through to a
21+
//! scratch-buffer decode + memmem.
2422
//!
2523
//! Escapes (`\\`), single-character wildcards (`_`), mid-pattern
2624
//! wildcards, and `case_insensitive: true` all bail out with `None`.
@@ -40,6 +38,8 @@ use vortex_error::VortexResult;
4038
use crate::OnPair;
4139
use crate::decode::DecodeView;
4240
use crate::decode::OwnedDecodeInputs;
41+
use crate::dfa::ContainsBloom;
42+
use crate::dfa::PrefixAutomaton;
4343
use crate::lpm::DictIndex;
4444
use crate::lpm::tokenize_needle;
4545

@@ -110,28 +110,36 @@ impl LikeKernel for OnPair {
110110
if let Some(needle_toks) = tokenize_needle(&dv, &index, needle) {
111111
let codes = dv.codes;
112112
let codes_offsets = dv.codes_offsets;
113+
let needle_slice = needle_toks.as_slice();
113114
for r in 0..n {
114115
let lo = codes_offsets[r] as usize;
115116
let hi = codes_offsets[r + 1] as usize;
116117
// SAFETY: codes_offsets validated at construction.
117118
let row_toks = unsafe { codes.get_unchecked(lo..hi) };
118-
if row_toks == needle_toks.as_slice() {
119+
if row_toks == needle_slice {
119120
bytes[r / 8] |= 1u8 << (r % 8);
120121
}
121122
}
122123
}
123-
// Else: needle has a byte not in the dict, no row matches.
124+
// Else: needle has a byte not in the dict no row matches.
124125
}
125126
PatternShape::StartsWith(prefix) => {
126127
if prefix.is_empty() {
127128
fill_all(&mut bytes, n);
128-
} else {
129+
} else if let Some(automaton) = PrefixAutomaton::build(&dv, prefix) {
130+
let codes = dv.codes;
131+
let codes_offsets = dv.codes_offsets;
129132
for r in 0..n {
130-
if row_starts_with(&dv, r, prefix) {
133+
let lo = codes_offsets[r] as usize;
134+
let hi = codes_offsets[r + 1] as usize;
135+
// SAFETY: codes_offsets validated at construction.
136+
let row_toks = unsafe { codes.get_unchecked(lo..hi) };
137+
if automaton.matches(row_toks) {
131138
bytes[r / 8] |= 1u8 << (r % 8);
132139
}
133140
}
134141
}
142+
// Else: prefix has a byte not in the dict ⇒ no row matches.
135143
}
136144
PatternShape::Contains(sub) => {
137145
if sub.is_empty() {
@@ -154,48 +162,27 @@ impl LikeKernel for OnPair {
154162
}
155163
}
156164

157-
/// `LIKE 'prefix%'` — byte-stream the row's dict slices, comparing
158-
/// against `prefix` and short-circuiting on the first mismatch or once
159-
/// the prefix is satisfied.
160-
fn row_starts_with(dv: &DecodeView<'_>, r: usize, prefix: &[u8]) -> bool {
161-
let mut pos = 0usize;
162-
let mut matched = false;
163-
let plen = prefix.len();
164-
let prefix_ptr = prefix.as_ptr();
165-
dv.for_each_dict_slice(r, |slice| {
166-
let remaining = plen - pos;
167-
let take = slice.len().min(remaining);
168-
// SAFETY: `pos + take <= plen` because `take <= remaining`,
169-
// and `take <= slice.len()` by construction.
170-
let eq = unsafe {
171-
let lhs = std::slice::from_raw_parts(prefix_ptr.add(pos), take);
172-
let rhs = slice.get_unchecked(..take);
173-
lhs == rhs
174-
};
175-
if !eq {
176-
return false;
177-
}
178-
pos += take;
179-
if pos == plen {
180-
matched = true;
181-
return false; // short-circuit, prefix satisfied
182-
}
183-
true
184-
});
185-
matched
186-
}
187-
188-
/// `%substring%` pushdown via SIMD-accelerated `memmem`. The `Finder`
189-
/// is built once and reused across every row's decoded bytes; the
190-
/// scratch buffer is reused too so each row decode reuses the same
191-
/// allocation.
165+
/// `%substring%` pushdown: dict-bloom skip + per-row decode + memmem.
192166
fn contains_into_bitmap(dv: &DecodeView<'_>, sub: &[u8], n: usize, out: &mut [u8]) {
167+
let bloom = ContainsBloom::build(dv, sub);
193168
let finder = memmem::Finder::new(sub);
194169
let mut scratch: Vec<u8> = Vec::with_capacity(64);
170+
let codes = dv.codes;
171+
let codes_offsets = dv.codes_offsets;
195172
for r in 0..n {
196-
scratch.clear();
197-
dv.decode_row_into(r, &mut scratch);
198-
if finder.find(&scratch).is_some() {
173+
let lo = codes_offsets[r] as usize;
174+
let hi = codes_offsets[r + 1] as usize;
175+
// SAFETY: codes_offsets validated at construction.
176+
let row_toks = unsafe { codes.get_unchecked(lo..hi) };
177+
let hit = match bloom.classify(row_toks) {
178+
Some(b) => b,
179+
None => {
180+
scratch.clear();
181+
dv.decode_row_into(r, &mut scratch);
182+
finder.find(&scratch).is_some()
183+
}
184+
};
185+
if hit {
199186
out[r / 8] |= 1u8 << (r % 8);
200187
}
201188
}

0 commit comments

Comments
 (0)