Skip to content

Commit 9e17811

Browse files
committed
clean up pluggable compressing some more
Signed-off-by: Will Manning <will@willmanning.io>
1 parent 93e5dc7 commit 9e17811

7 files changed

Lines changed: 50 additions & 63 deletions

File tree

vortex-btrblocks/public-api.lock

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,12 @@ impl core::clone::Clone for vortex_btrblocks::BtrBlocksCompressorBuilder
616616

617617
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::clone(&self) -> vortex_btrblocks::BtrBlocksCompressorBuilder
618618

619+
impl core::cmp::Eq for vortex_btrblocks::BtrBlocksCompressorBuilder
620+
621+
impl core::cmp::PartialEq for vortex_btrblocks::BtrBlocksCompressorBuilder
622+
623+
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::eq(&self, other: &vortex_btrblocks::BtrBlocksCompressorBuilder) -> bool
624+
619625
impl core::default::Default for vortex_btrblocks::BtrBlocksCompressorBuilder
620626

621627
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::default() -> Self
@@ -624,6 +630,8 @@ impl core::fmt::Debug for vortex_btrblocks::BtrBlocksCompressorBuilder
624630

625631
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
626632

633+
impl core::marker::StructuralPartialEq for vortex_btrblocks::BtrBlocksCompressorBuilder
634+
627635
pub const vortex_btrblocks::ALL_SCHEMES: &[&dyn vortex_compressor::scheme::Scheme]
628636

629637
pub fn vortex_btrblocks::compress_patches(patches: &vortex_array::patches::Patches) -> vortex_error::VortexResult<vortex_array::patches::Patches>

vortex-btrblocks/src/builder.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ pub fn default_excluded() -> HashSet<SchemeId> {
7979
excluded.insert(string::ZstdScheme.id());
8080
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
8181
excluded.insert(string::ZstdBuffersScheme.id());
82-
#[cfg(feature = "unstable_encodings")]
83-
excluded.insert(turboquant::scheme::TURBOQUANT_SCHEME.id());
8482
excluded
8583
}
8684

@@ -109,7 +107,7 @@ pub fn default_excluded() -> HashSet<SchemeId> {
109107
/// .include([IntDictScheme.id()])
110108
/// .build();
111109
/// ```
112-
#[derive(Debug, Clone, PartialEq)]
110+
#[derive(Debug, Clone, PartialEq, Eq)]
113111
pub struct BtrBlocksCompressorBuilder {
114112
schemes: HashSet<&'static dyn Scheme>,
115113
}

vortex-file/src/strategy.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use vortex_pco::Pco;
5555
use vortex_runend::RunEnd;
5656
use vortex_sequence::Sequence;
5757
use vortex_sparse::Sparse;
58+
#[cfg(feature = "unstable_encodings")]
5859
use vortex_tensor::encodings::turboquant::TurboQuant;
5960
use vortex_utils::aliases::hash_map::HashMap;
6061
use vortex_zigzag::ZigZag;
@@ -113,6 +114,7 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
113114
session.register(RunEnd);
114115
session.register(Sequence);
115116
session.register(Sparse);
117+
#[cfg(feature = "unstable_encodings")]
116118
session.register(TurboQuant);
117119
session.register(ZigZag);
118120

@@ -135,7 +137,7 @@ pub struct WriteStrategyBuilder {
135137
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
136138
allow_encodings: Option<ArrayRegistry>,
137139
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
138-
builder: Option<BtrBlocksCompressorBuilder>,
140+
builder: BtrBlocksCompressorBuilder,
139141
}
140142

141143
impl Default for WriteStrategyBuilder {
@@ -148,7 +150,7 @@ impl Default for WriteStrategyBuilder {
148150
field_writers: HashMap::new(),
149151
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
150152
flat_strategy: None,
151-
builder: None,
153+
builder: BtrBlocksCompressorBuilder::default(),
152154
}
153155
}
154156
}
@@ -203,8 +205,7 @@ impl WriteStrategyBuilder {
203205
/// GPU decompression. Without it, strings use interleaved Zstd compression.
204206
#[cfg(feature = "zstd")]
205207
pub fn with_cuda_compatible_encodings(mut self) -> Self {
206-
let mut builder = self.builder.unwrap_or_default();
207-
builder = builder.exclude([
208+
self.builder = self.builder.exclude([
208209
integer::SparseScheme.id(),
209210
integer::RLE_INTEGER_SCHEME.id(),
210211
float::RLE_FLOAT_SCHEME.id(),
@@ -215,14 +216,13 @@ impl WriteStrategyBuilder {
215216

216217
#[cfg(feature = "unstable_encodings")]
217218
{
218-
builder = builder.include([string::ZstdBuffersScheme.id()]);
219+
self.builder = self.builder.include([string::ZstdBuffersScheme.id()]);
219220
}
220221
#[cfg(not(feature = "unstable_encodings"))]
221222
{
222-
builder = builder.include([string::ZstdScheme.id()]);
223+
self.builder = self.builder.include([string::ZstdScheme.id()]);
223224
}
224225

225-
self.builder = Some(builder);
226226
self
227227
}
228228

@@ -233,13 +233,11 @@ impl WriteStrategyBuilder {
233233
/// especially for floating-point heavy datasets.
234234
#[cfg(feature = "zstd")]
235235
pub fn with_compact_encodings(mut self) -> Self {
236-
let mut builder = self.builder.unwrap_or_default();
237-
builder = builder.include([
236+
self.builder = self.builder.include([
238237
string::ZstdScheme.id(),
239238
integer::PcoScheme.id(),
240239
float::PcoScheme.id(),
241240
]);
242-
self.builder = Some(builder);
243241
self
244242
}
245243

@@ -254,15 +252,17 @@ impl WriteStrategyBuilder {
254252
/// compressor is used with TurboQuant added.
255253
#[cfg(feature = "unstable_encodings")]
256254
pub fn with_vector_quantization(mut self) -> Self {
257-
let mut builder = self.builder.unwrap_or_default();
258-
builder = builder.include([turboquant::scheme::TURBOQUANT_SCHEME.id()]);
259-
self.builder = Some(builder);
255+
use vortex_tensor::encodings::turboquant::scheme::TURBOQUANT_SCHEME;
256+
self.builder = self.builder.with_scheme(&TURBOQUANT_SCHEME);
260257
self
261258
}
262259

263260
/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
264261
/// applied.
265262
pub fn build(self) -> Arc<dyn LayoutStrategy> {
263+
use vortex_btrblocks::SchemeExt as _;
264+
use vortex_btrblocks::schemes::integer::IntDictScheme;
265+
266266
let flat: Arc<dyn LayoutStrategy> = if let Some(flat) = self.flat_strategy {
267267
flat
268268
} else if let Some(allow_encodings) = self.allow_encodings {
@@ -275,19 +275,28 @@ impl WriteStrategyBuilder {
275275
let chunked = ChunkedLayoutStrategy::new(flat.clone());
276276
// 6. buffer chunks so they end up with closer segment ids physically
277277
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
278-
// 5. compress each chunk
279-
if self.builder.is_some() && self.compressor.is_some() {
280-
vortex_panic!("Cannot configure both a custom compressor and custom builder schemes");
281-
}
282278

283-
let compressing = if let Some(ref compressor) = self.compressor {
284-
CompressingStrategy::new_opaque(buffered, compressor.clone())
285-
} else if let Some(ref builder) = self.builder {
286-
CompressingStrategy::new_opaque(buffered, builder.build())
279+
// 5. compress each chunk
280+
// Build separate compressors for data (excludes IntDict to avoid recursive dict encoding)
281+
// and stats/dict values (includes IntDict).
282+
let (data_compressor, stats_compressor): (
283+
Arc<dyn CompressorPlugin>,
284+
Arc<dyn CompressorPlugin>,
285+
) = if let Some(compressor) = self.compressor {
286+
if self.builder != BtrBlocksCompressorBuilder::default() {
287+
vortex_panic!(
288+
"Cannot configure both a custom compressor and custom builder schemes"
289+
);
290+
}
291+
(compressor.clone(), compressor)
287292
} else {
288-
CompressingStrategy::new_btrblocks(buffered, true)
293+
let stats = Arc::new(self.builder.clone().build());
294+
let data = Arc::new(self.builder.exclude([IntDictScheme.id()]).build());
295+
(data, stats)
289296
};
290297

298+
let compressing = CompressingStrategy::new(buffered, data_compressor);
299+
291300
// 4. prior to compression, coalesce up to a minimum size
292301
let coalescing = RepartitionStrategy::new(
293302
compressing,
@@ -306,11 +315,7 @@ impl WriteStrategyBuilder {
306315
);
307316

308317
// 2.1. | 3.1. compress stats tables and dict values.
309-
let compress_then_flat = if let Some(ref compressor) = compressor {
310-
CompressingStrategy::new_opaque(flat, compressor.clone())
311-
} else {
312-
CompressingStrategy::new_btrblocks(flat, false)
313-
};
318+
let compress_then_flat = CompressingStrategy::new(flat, stats_compressor);
314319

315320
// 3. apply dict encoding or fallback
316321
let dict = DictStrategy::new(

vortex-file/tests/test_write_table.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use vortex_array::field_path;
2020
use vortex_array::scalar_fn::session::ScalarFnSession;
2121
use vortex_array::session::ArraySession;
2222
use vortex_array::validity::Validity;
23+
use vortex_btrblocks::BtrBlocksCompressor;
2324
use vortex_buffer::ByteBuffer;
2425
use vortex_file::OpenOptionsSessionExt;
2526
use vortex_file::WriteOptionsSessionExt;
@@ -67,9 +68,9 @@ async fn test_file_roundtrip() {
6768

6869
// Create a writer which by default uses the BtrBlocks compressor for a.compressed, but leaves
6970
// the b and the a.raw columns uncompressed.
70-
let default_strategy = Arc::new(CompressingStrategy::new_btrblocks(
71+
let default_strategy = Arc::new(CompressingStrategy::new(
7172
FlatLayoutStrategy::default(),
72-
false,
73+
BtrBlocksCompressor::default(),
7374
));
7475

7576
let writer = Arc::new(

vortex-layout/public-api.lock

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,7 @@ pub struct vortex_layout::layouts::compressed::CompressingStrategy
168168

169169
impl vortex_layout::layouts::compressed::CompressingStrategy
170170

171-
pub fn vortex_layout::layouts::compressed::CompressingStrategy::new_btrblocks<S: vortex_layout::LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self
172-
173-
pub fn vortex_layout::layouts::compressed::CompressingStrategy::new_opaque<S: vortex_layout::LayoutStrategy, C: vortex_layout::layouts::compressed::CompressorPlugin>(child: S, compressor: C) -> Self
171+
pub fn vortex_layout::layouts::compressed::CompressingStrategy::new<S: vortex_layout::LayoutStrategy, C: vortex_layout::layouts::compressed::CompressorPlugin>(child: S, compressor: C) -> Self
174172

175173
pub fn vortex_layout::layouts::compressed::CompressingStrategy::with_concurrency(self, concurrency: usize) -> Self
176174

vortex-layout/src/layouts/compressed.rs

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ use vortex_array::ArrayRef;
1010
use vortex_array::DynArray;
1111
use vortex_array::expr::stats::Stat;
1212
use vortex_btrblocks::BtrBlocksCompressor;
13-
use vortex_btrblocks::BtrBlocksCompressorBuilder;
14-
use vortex_btrblocks::SchemeExt;
15-
use vortex_btrblocks::schemes::integer::IntDictScheme;
1613
use vortex_error::VortexResult;
1714
use vortex_io::runtime::Handle;
1815

@@ -61,32 +58,11 @@ pub struct CompressingStrategy {
6158
}
6259

6360
impl CompressingStrategy {
64-
/// Create a new writer that uses the BtrBlocks-style cascading compressor to compress chunks.
65-
///
66-
/// This provides a good balance between decoding speed and small file size.
67-
///
68-
/// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays,
69-
/// which is useful when compressing dictionary codes to avoid recursive dictionary encoding.
70-
pub fn new_btrblocks<S: LayoutStrategy>(child: S, exclude_int_dict_encoding: bool) -> Self {
71-
let compressor = if exclude_int_dict_encoding {
72-
BtrBlocksCompressorBuilder::default()
73-
.exclude([IntDictScheme.id()])
74-
.build()
75-
} else {
76-
BtrBlocksCompressor::default()
77-
};
78-
Self::new(child, Arc::new(compressor))
79-
}
80-
81-
/// Create a new compressor from a plugin interface.
82-
pub fn new_opaque<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
83-
Self::new(child, Arc::new(compressor))
84-
}
85-
86-
fn new<S: LayoutStrategy>(child: S, compressor: Arc<dyn CompressorPlugin>) -> Self {
61+
/// Create a new compressing strategy that wraps a child strategy with a compressor plugin.
62+
pub fn new<S: LayoutStrategy, C: CompressorPlugin>(child: S, compressor: C) -> Self {
8763
Self {
8864
child: Arc::new(child),
89-
compressor,
65+
compressor: Arc::new(compressor),
9066
concurrency: std::thread::available_parallelism()
9167
.map(|v| v.get())
9268
.unwrap_or(1),

vortex-layout/src/layouts/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,13 @@ impl TableStrategy {
8585
/// ```ignore
8686
/// # use std::sync::Arc;
8787
/// # use vortex_array::dtype::{field_path, Field, FieldPath};
88+
/// # use vortex_btrblocks::BtrBlocksCompressor;
8889
/// # use vortex_layout::layouts::compressed::CompressingStrategy;
8990
/// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
9091
/// # use vortex_layout::layouts::table::TableStrategy;
9192
///
9293
/// // A strategy for compressing data using the balanced BtrBlocks compressor.
93-
/// let compress = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true);
94+
/// let compress = CompressingStrategy::new(FlatLayoutStrategy::default(), BtrBlocksCompressor::default());
9495
///
9596
/// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression
9697
/// // for most columns, and stores a nested binary column uncompressed (flat) because it

0 commit comments

Comments
 (0)