Skip to content

Commit 70947a8

Browse files
committed
Wire OnPair as a btrblocks string scheme
* Extract a small `parts_to_children` helper in `vortex-onpair`'s `compress.rs` so the lift-out-of-C++ step reads top-to-bottom rather than via a block-and-drop dance. * Add `OnPairScheme` to `vortex-btrblocks::schemes::string`. The scheme matches utf8 strings, declares its four primitive children (dict_offsets / codes / codes_offsets / uncompressed_lengths) so the cascading compressor can re-encode them downstream (FastLanes-bit-pack on `codes`, etc.), defers the compression-ratio estimate to the sample-based path (same as FSST / Zstd), and reassembles the result via `OnPair::try_new`. * Feature-gate it via a new `onpair` Cargo feature, enabled by default, so out-of-the-box `BtrBlocksCompressorBuilder::default()` includes it in `ALL_SCHEMES` and consumers without a C++ toolchain can opt out with `default-features = false`. * Update the FSST scheme-selection test to accept either FSST or OnPair as the winning encoding — both target the same workload (short strings with high lexical overlap) and the sample-based selector now picks the one with the better ratio on the test corpus. Test results vortex-onpair 7 unit + 1 100k smoke all green vortex-btrblocks 36 unit + 3 doctests all green (incl. new `test_onpair_in_default_scheme_list`) Signed-off-by: Claude <noreply@anthropic.com>
1 parent 87f217f commit 70947a8

5 files changed

Lines changed: 180 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/onpair/src/compress.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -80,35 +80,7 @@ where
8080

8181
let column = Column::compress(&flat, &offsets, config)
8282
.map_err(|e| vortex_err!("OnPair compress failed: {e}"))?;
83-
84-
let bits;
85-
let dict_bytes;
86-
let dict_offsets;
87-
let codes;
88-
let codes_offsets;
89-
{
90-
let parts = column
91-
.parts()
92-
.map_err(|e| vortex_err!("OnPair parts failed: {e}"))?;
93-
bits = parts.bits;
94-
95-
// Last dict_offset = total token bytes; unpack into a single
96-
// contiguous ByteBuffer for the Vortex `dict_bytes` blob.
97-
dict_bytes = BufferHandle::new_host(ByteBuffer::from(parts.dict_bytes.to_vec()));
98-
dict_offsets = Buffer::<u32>::copy_from(parts.dict_offsets).into_array();
99-
100-
let total_tokens = *parts
101-
.codes_boundaries
102-
.last()
103-
.ok_or_else(|| vortex_err!("OnPair: missing boundaries"))?
104-
as usize;
105-
let codes_vec = unpack_codes_to_u16(parts.codes_packed, total_tokens, bits);
106-
codes = Buffer::<u16>::copy_from(codes_vec).into_array();
107-
108-
// Token-index boundaries are exactly the offsets into our flat u16
109-
// `codes` array, so we can use them as-is.
110-
codes_offsets = Buffer::<u32>::copy_from(parts.codes_boundaries).into_array();
111-
}
83+
let (bits, dict_bytes, dict_offsets, codes, codes_offsets) = parts_to_children(&column)?;
11284
drop(column);
11385

11486
let uncompressed_lengths = uncompressed_lengths.into_array();
@@ -129,6 +101,30 @@ where
129101
)
130102
}
131103

104+
/// Borrow the raw C++ parts and lift them into owned Vortex children.
105+
/// Returns `(bits, dict_bytes, dict_offsets, codes, codes_offsets)`.
106+
fn parts_to_children(
107+
column: &Column,
108+
) -> VortexResult<(u32, BufferHandle, ArrayRef, ArrayRef, ArrayRef)> {
109+
let parts = column
110+
.parts()
111+
.map_err(|e| vortex_err!("OnPair parts failed: {e}"))?;
112+
let bits = parts.bits;
113+
let dict_bytes = BufferHandle::new_host(ByteBuffer::from(parts.dict_bytes.to_vec()));
114+
let dict_offsets = Buffer::<u32>::copy_from(parts.dict_offsets).into_array();
115+
let total_tokens = usize::try_from(
116+
*parts
117+
.codes_boundaries
118+
.last()
119+
.ok_or_else(|| vortex_err!("OnPair: missing codes_boundaries"))?,
120+
)
121+
.map_err(|_| vortex_err!("OnPair: total_tokens does not fit in usize"))?;
122+
let codes_vec = unpack_codes_to_u16(parts.codes_packed, total_tokens, bits);
123+
let codes = Buffer::<u16>::copy_from(codes_vec).into_array();
124+
let codes_offsets = Buffer::<u32>::copy_from(parts.codes_boundaries).into_array();
125+
Ok((bits, dict_bytes, dict_offsets, codes, codes_offsets))
126+
}
127+
132128
/// Compress a byte-string accessor (typically a `VarBinArray` or
133129
/// `VarBinViewArray`).
134130
pub fn onpair_compress<A: ArrayAccessor<[u8]>>(

vortex-btrblocks/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ vortex-error = { workspace = true }
3030
vortex-fastlanes = { workspace = true }
3131
vortex-fsst = { workspace = true }
3232
vortex-mask = { workspace = true }
33+
vortex-onpair = { workspace = true, optional = true }
3334
vortex-pco = { workspace = true, optional = true }
3435
vortex-runend = { workspace = true }
3536
vortex-sequence = { workspace = true }
@@ -47,8 +48,10 @@ vortex-array = { workspace = true, features = ["_test-harness"] }
4748
vortex-session = { workspace = true }
4849

4950
[features]
51+
default = ["onpair"]
5052
# This feature enabled unstable encodings for which we don't guarantee stability.
5153
unstable_encodings = ["dep:vortex-tensor", "vortex-zstd?/unstable_encodings"]
54+
onpair = ["dep:vortex-onpair"]
5255
pco = ["dep:pco", "dep:vortex-pco"]
5356
zstd = ["dep:vortex-zstd"]
5457

vortex-btrblocks/src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
5454
////////////////////////////////////////////////////////////////////////////////////////////////
5555
&string::StringDictScheme,
5656
&string::FSSTScheme,
57+
#[cfg(feature = "onpair")]
58+
&string::OnPairScheme,
5759
&string::StringConstantScheme,
5860
&string::NullDominatedSparseScheme,
5961
// Decimal schemes.

vortex-btrblocks/src/schemes/string.rs

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ use vortex_fsst::FSST;
2121
use vortex_fsst::FSSTArrayExt;
2222
use vortex_fsst::fsst_compress;
2323
use vortex_fsst::fsst_train_compressor;
24+
#[cfg(feature = "onpair")]
25+
use vortex_onpair::DEFAULT_DICT12_CONFIG;
26+
#[cfg(feature = "onpair")]
27+
use vortex_onpair::OnPair;
28+
#[cfg(feature = "onpair")]
29+
use vortex_onpair::OnPairArrayExt;
30+
#[cfg(feature = "onpair")]
31+
use vortex_onpair::onpair_compress;
2432
use vortex_sparse::Sparse;
2533
use vortex_sparse::SparseExt as _;
2634

@@ -36,6 +44,18 @@ use crate::SchemeExt;
3644
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3745
pub struct FSSTScheme;
3846

47+
/// OnPair short-string compression (dict-12, FSST-shape children).
48+
///
49+
/// Targets the same workload as FSST — large columns of short-to-medium
50+
/// strings with high lexical overlap — but uses a learned dictionary of
51+
/// frequent adjacent substrings and 12-bit codes. The codes / offsets /
52+
/// uncompressed-lengths children all flow through the cascading compressor
53+
/// the same way FSST's do, so any downstream bit-packing / FoR / etc. still
54+
/// applies.
55+
#[cfg(feature = "onpair")]
56+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
57+
pub struct OnPairScheme;
58+
3959
/// Sparse encoding for null-dominated arrays.
4060
///
4161
/// This is the same as the integer `SparseScheme`, but we only use this for null-dominated arrays.
@@ -138,6 +158,108 @@ impl Scheme for FSSTScheme {
138158
}
139159
}
140160

161+
#[cfg(feature = "onpair")]
162+
impl Scheme for OnPairScheme {
163+
fn scheme_name(&self) -> &'static str {
164+
"vortex.string.onpair"
165+
}
166+
167+
fn matches(&self, canonical: &Canonical) -> bool {
168+
is_utf8_string(canonical)
169+
}
170+
171+
/// Children, in slot order:
172+
/// 0 = dict_offsets, 1 = codes, 2 = codes_offsets, 3 = uncompressed_lengths.
173+
/// Validity is handled separately by the outer array.
174+
fn num_children(&self) -> usize {
175+
4
176+
}
177+
178+
fn expected_compression_ratio(
179+
&self,
180+
_data: &ArrayAndStats,
181+
_compress_ctx: CompressorContext,
182+
_exec_ctx: &mut ExecutionCtx,
183+
) -> CompressionEstimate {
184+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
185+
}
186+
187+
fn compress(
188+
&self,
189+
compressor: &CascadingCompressor,
190+
data: &ArrayAndStats,
191+
compress_ctx: CompressorContext,
192+
exec_ctx: &mut ExecutionCtx,
193+
) -> VortexResult<ArrayRef> {
194+
let utf8 = data.array_as_utf8().into_owned();
195+
let onpair_array = onpair_compress(&utf8, utf8.len(), utf8.dtype(), DEFAULT_DICT12_CONFIG)?;
196+
197+
let dict_offsets = compress_primitive_child(
198+
compressor,
199+
onpair_array.dict_offsets(),
200+
&compress_ctx,
201+
self.id(),
202+
0,
203+
exec_ctx,
204+
)?;
205+
let codes = compress_primitive_child(
206+
compressor,
207+
onpair_array.codes(),
208+
&compress_ctx,
209+
self.id(),
210+
1,
211+
exec_ctx,
212+
)?;
213+
let codes_offsets = compress_primitive_child(
214+
compressor,
215+
onpair_array.codes_offsets(),
216+
&compress_ctx,
217+
self.id(),
218+
2,
219+
exec_ctx,
220+
)?;
221+
let uncompressed_lengths = compress_primitive_child(
222+
compressor,
223+
onpair_array.uncompressed_lengths(),
224+
&compress_ctx,
225+
self.id(),
226+
3,
227+
exec_ctx,
228+
)?;
229+
230+
Ok(OnPair::try_new(
231+
onpair_array.dtype().clone(),
232+
onpair_array.dict_bytes_handle().clone(),
233+
dict_offsets,
234+
codes,
235+
codes_offsets,
236+
uncompressed_lengths,
237+
onpair_array.array_validity(),
238+
onpair_array.bits(),
239+
)?
240+
.into_array())
241+
}
242+
}
243+
244+
/// Helper: narrow a primitive child to its tightest int type, then hand it
245+
/// off to the cascading compressor.
246+
#[cfg(feature = "onpair")]
247+
fn compress_primitive_child(
248+
compressor: &CascadingCompressor,
249+
child: &ArrayRef,
250+
compress_ctx: &CompressorContext,
251+
scheme_id: vortex_compressor::scheme::SchemeId,
252+
child_idx: usize,
253+
exec_ctx: &mut ExecutionCtx,
254+
) -> VortexResult<ArrayRef> {
255+
let narrowed = child
256+
.clone()
257+
.execute::<PrimitiveArray>(exec_ctx)?
258+
.narrow()?
259+
.into_array();
260+
compressor.compress_child(&narrowed, compress_ctx, scheme_id, child_idx, exec_ctx)
261+
}
262+
141263
impl Scheme for NullDominatedSparseScheme {
142264
fn scheme_name(&self) -> &'static str {
143265
"vortex.string.sparse"
@@ -411,8 +533,24 @@ mod scheme_selection_tests {
411533
Ok(())
412534
}
413535

536+
#[cfg(feature = "onpair")]
537+
#[test]
538+
fn test_onpair_in_default_scheme_list() {
539+
use crate::SchemeExt;
540+
use crate::schemes::string::OnPairScheme;
541+
542+
let ids: Vec<_> = crate::ALL_SCHEMES.iter().map(|s| s.id()).collect();
543+
assert!(
544+
ids.contains(&OnPairScheme.id()),
545+
"OnPairScheme not registered in ALL_SCHEMES"
546+
);
547+
}
548+
414549
#[test]
415-
fn test_fsst_compressed() -> VortexResult<()> {
550+
fn test_dictionary_string_scheme_compressed() -> VortexResult<()> {
551+
// Dictionary-style string corpus: high lexical overlap, short rows.
552+
// FSST and OnPair both target this shape; the cascading compressor
553+
// picks whichever samples better, so accept either.
416554
let mut strings = Vec::with_capacity(1000);
417555
for i in 0..1000 {
418556
strings.push(Some(format!(
@@ -423,7 +561,16 @@ mod scheme_selection_tests {
423561
let array_ref = array.into_array();
424562
let compressed = BtrBlocksCompressor::default()
425563
.compress(&array_ref, &mut SESSION.create_execution_ctx())?;
426-
assert!(compressed.is::<FSST>());
564+
let is_fsst = compressed.is::<FSST>();
565+
#[cfg(feature = "onpair")]
566+
let is_onpair = compressed.is::<vortex_onpair::OnPair>();
567+
#[cfg(not(feature = "onpair"))]
568+
let is_onpair = false;
569+
assert!(
570+
is_fsst || is_onpair,
571+
"expected FSST or OnPair, got {}",
572+
compressed.encoding_id()
573+
);
427574
Ok(())
428575
}
429576
}

0 commit comments

Comments
 (0)