Skip to content

Commit 02b0949

Browse files
authored
Cleaner WriteStrategyBuilder interface (#7275)
## Summary Continuation of #7274 Cleans up the `WriteStrategyBuilder` interface to allow EITHER an opaque compressor OR a `BtrBlocksCompressorBuilder` that we can modify at any point before building the write strategy. ## API Changes `WriteStrategyBuilder` now allows you to add a compressor plugin that will only override a default `BtrBlocksCompressorBuilder`. This means that whenever we add specific encodings, we are still able to exclude things like `IntDictScheme` when building the data compressor (which we did not do before) ## Testing The APIs have changed but the logic hasn't really changed, so existing tests should suffice. Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent d879349 commit 02b0949

File tree

15 files changed

+103
-149
lines changed

15 files changed

+103
-149
lines changed

fuzz/fuzz_targets/file_io.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use vortex_array::dtype::StructFields;
1717
use vortex_array::expr::lit;
1818
use vortex_array::expr::root;
1919
use vortex_array::scalar_fn::fns::operators::Operator;
20+
use vortex_btrblocks::BtrBlocksCompressorBuilder;
2021
use vortex_buffer::ByteBufferMut;
2122
use vortex_error::VortexExpect;
2223
use vortex_error::vortex_panic;
@@ -59,12 +60,11 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus {
5960

6061
let write_options = match compressor_strategy {
6162
CompressorStrategy::Default => SESSION.write_options(),
62-
CompressorStrategy::Compact => {
63-
let strategy = WriteStrategyBuilder::default()
64-
.with_compact_encodings()
65-
.build();
66-
SESSION.write_options().with_strategy(strategy)
67-
}
63+
CompressorStrategy::Compact => SESSION.write_options().with_strategy(
64+
WriteStrategyBuilder::default()
65+
.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact())
66+
.build(),
67+
),
6868
};
6969

7070
let mut full_buff = ByteBufferMut::empty();

vortex-bench/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use tpcds::TpcDsBenchmark;
2626
use tpch::benchmark::TpcHBenchmark;
2727
pub use utils::file::*;
2828
pub use utils::logging::*;
29+
use vortex::compressor::BtrBlocksCompressorBuilder;
2930
use vortex::error::VortexExpect;
3031
use vortex::error::vortex_err;
3132
use vortex::file::VortexWriteOptions;
@@ -231,7 +232,7 @@ impl CompactionStrategy {
231232
match self {
232233
CompactionStrategy::Compact => options.with_strategy(
233234
WriteStrategyBuilder::default()
234-
.with_compact_encodings()
235+
.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact())
235236
.build(),
236237
),
237238
CompactionStrategy::Default => options,

vortex-btrblocks/public-api.lock

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,9 @@ impl vortex_btrblocks::BtrBlocksCompressorBuilder
614614

615615
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::build(self) -> vortex_btrblocks::BtrBlocksCompressor
616616

617-
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude(self, ids: impl core::iter::traits::collect::IntoIterator<Item = vortex_compressor::scheme::SchemeId>) -> Self
617+
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude_schemes(self, ids: impl core::iter::traits::collect::IntoIterator<Item = vortex_compressor::scheme::SchemeId>) -> Self
618+
619+
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::only_cuda_compatible(self) -> Self
618620

619621
pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::with_compact(self) -> Self
620622

vortex-btrblocks/src/builder.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
6767
///
6868
/// By default, all schemes in [`ALL_SCHEMES`] are enabled. Feature-gated schemes (Pco, Zstd)
6969
/// are not in `ALL_SCHEMES` and must be added explicitly via
70-
/// [`with_new_scheme`](BtrBlocksCompressorBuilder::with_new_scheme) or
70+
/// [`with_scheme`](BtrBlocksCompressorBuilder::with_new_scheme) or
7171
/// [`with_compact`](BtrBlocksCompressorBuilder::with_compact).
7272
///
7373
/// # Examples
@@ -79,9 +79,9 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
7979
/// // Default compressor with all schemes in ALL_SCHEMES.
8080
/// let compressor = BtrBlocksCompressorBuilder::default().build();
8181
///
82-
/// // Exclude specific schemes.
82+
/// // Remove specific schemes.
8383
/// let compressor = BtrBlocksCompressorBuilder::default()
84-
/// .exclude([IntDictScheme.id()])
84+
/// .exclude_schemes([IntDictScheme.id()])
8585
/// .build();
8686
/// ```
8787
#[derive(Debug, Clone)]
@@ -100,8 +100,8 @@ impl Default for BtrBlocksCompressorBuilder {
100100
impl BtrBlocksCompressorBuilder {
101101
/// Adds an external compression scheme not in [`ALL_SCHEMES`].
102102
///
103-
/// This allows encoding crates outside of `vortex-btrblocks` to register their own schemes with
104-
/// the compressor.
103+
/// This allows encoding crates outside of `vortex-btrblocks` to register their own schemes
104+
/// with the compressor.
105105
///
106106
/// # Panics
107107
///
@@ -128,7 +128,6 @@ impl BtrBlocksCompressorBuilder {
128128
/// Panics if any of the compact schemes are already present.
129129
#[cfg(feature = "zstd")]
130130
pub fn with_compact(self) -> Self {
131-
// This should be fast since we don't have that many schemes.
132131
let builder = self.with_new_scheme(&string::ZstdScheme);
133132

134133
#[cfg(feature = "pco")]
@@ -139,8 +138,32 @@ impl BtrBlocksCompressorBuilder {
139138
builder
140139
}
141140

142-
/// Excludes the specified compression schemes by their [`SchemeId`].
143-
pub fn exclude(mut self, ids: impl IntoIterator<Item = SchemeId>) -> Self {
141+
/// Excludes schemes without CUDA kernel support and adds Zstd for string compression.
142+
///
143+
/// With the `unstable_encodings` feature, buffer-level Zstd compression is used which
144+
/// preserves the array buffer layout for zero-conversion GPU decompression. Without it,
145+
/// interleaved Zstd compression is used.
146+
#[cfg(feature = "zstd")]
147+
pub fn only_cuda_compatible(self) -> Self {
148+
let builder = self.exclude_schemes([
149+
integer::SparseScheme.id(),
150+
rle::RLE_INTEGER_SCHEME.id(),
151+
rle::RLE_FLOAT_SCHEME.id(),
152+
float::NullDominatedSparseScheme.id(),
153+
string::StringDictScheme.id(),
154+
string::FSSTScheme.id(),
155+
]);
156+
157+
#[cfg(feature = "unstable_encodings")]
158+
let builder = builder.with_new_scheme(&string::ZstdBuffersScheme);
159+
#[cfg(not(feature = "unstable_encodings"))]
160+
let builder = builder.with_new_scheme(&string::ZstdScheme);
161+
162+
builder
163+
}
164+
165+
/// Removes the specified compression schemes by their [`SchemeId`].
166+
pub fn exclude_schemes(mut self, ids: impl IntoIterator<Item = SchemeId>) -> Self {
144167
let ids: HashSet<_> = ids.into_iter().collect();
145168
self.schemes.retain(|s| !ids.contains(&s.id()));
146169
self

vortex-btrblocks/src/canonical_compressor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ use crate::CascadingCompressor;
2525
/// // Default compressor - all schemes allowed.
2626
/// let compressor = BtrBlocksCompressor::default();
2727
///
28-
/// // Exclude specific schemes using the builder.
28+
/// // Remove specific schemes using the builder.
2929
/// let compressor = BtrBlocksCompressorBuilder::default()
30-
/// .exclude([IntDictScheme.id()])
30+
/// .exclude_schemes([IntDictScheme.id()])
3131
/// .build();
3232
/// ```
3333
#[derive(Clone)]

vortex-btrblocks/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646
//! // Default compressor with all schemes enabled.
4747
//! let compressor = BtrBlocksCompressor::default();
4848
//!
49-
//! // Configure with builder to exclude specific schemes.
49+
//! // Remove specific schemes using the builder.
5050
//! let compressor = BtrBlocksCompressorBuilder::default()
51-
//! .exclude([IntDictScheme.id()])
51+
//! .exclude_schemes([IntDictScheme.id()])
5252
//! .build();
5353
//! ```
5454
//!

vortex-cuda/gpu-scan-cli/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use vortex::VortexSessionDefault;
2121
use vortex::array::ToCanonical;
2222
use vortex::array::arrays::Dict;
2323
use vortex::buffer::ByteBufferMut;
24+
use vortex::compressor::BtrBlocksCompressorBuilder;
2425
use vortex::error::VortexResult;
2526
use vortex::file::OpenOptionsSessionExt;
2627
use vortex::file::WriteOptionsSessionExt;
@@ -92,7 +93,7 @@ async fn main() -> VortexResult<()> {
9293
#[cuda_available]
9394
fn cuda_write_strategy() -> Arc<dyn vortex::layout::LayoutStrategy> {
9495
WriteStrategyBuilder::default()
95-
.with_cuda_compatible_encodings()
96+
.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().only_cuda_compatible())
9697
.with_flat_strategy(Arc::new(CudaFlatLayoutStrategy::default()))
9798
.build()
9899
}

vortex-file/public-api.lock

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,12 +346,10 @@ pub fn vortex_file::WriteStrategyBuilder::build(self) -> alloc::sync::Arc<dyn vo
346346

347347
pub fn vortex_file::WriteStrategyBuilder::with_allow_encodings(self, allow_encodings: vortex_array::session::ArrayRegistry) -> Self
348348

349-
pub fn vortex_file::WriteStrategyBuilder::with_compact_encodings(self) -> Self
349+
pub fn vortex_file::WriteStrategyBuilder::with_btrblocks_builder(self, builder: vortex_btrblocks::builder::BtrBlocksCompressorBuilder) -> Self
350350

351351
pub fn vortex_file::WriteStrategyBuilder::with_compressor<C: vortex_layout::layouts::compressed::CompressorPlugin>(self, compressor: C) -> Self
352352

353-
pub fn vortex_file::WriteStrategyBuilder::with_cuda_compatible_encodings(self) -> Self
354-
355353
pub fn vortex_file::WriteStrategyBuilder::with_field_writer(self, field: impl core::convert::Into<vortex_array::dtype::field::FieldPath>, writer: alloc::sync::Arc<dyn vortex_layout::strategy::LayoutStrategy>) -> Self
356354

357355
pub fn vortex_file::WriteStrategyBuilder::with_flat_strategy(self, flat: alloc::sync::Arc<dyn vortex_layout::strategy::LayoutStrategy>) -> Self

vortex-file/src/strategy.rs

Lines changed: 37 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use vortex_array::arrays::VarBinView;
2828
use vortex_array::dtype::FieldPath;
2929
use vortex_array::session::ArrayRegistry;
3030
use vortex_array::session::ArraySession;
31-
use vortex_btrblocks::BtrBlocksCompressor;
3231
use vortex_btrblocks::BtrBlocksCompressorBuilder;
3332
use vortex_btrblocks::SchemeExt;
3433
use vortex_btrblocks::schemes::integer::IntDictScheme;
@@ -59,14 +58,6 @@ use vortex_sequence::Sequence;
5958
use vortex_sparse::Sparse;
6059
use vortex_utils::aliases::hash_map::HashMap;
6160
use vortex_zigzag::ZigZag;
62-
63-
#[rustfmt::skip]
64-
#[cfg(feature = "zstd")]
65-
use vortex_btrblocks::{
66-
schemes::float,
67-
schemes::integer,
68-
schemes::string,
69-
};
7061
#[cfg(feature = "zstd")]
7162
use vortex_zstd::Zstd;
7263
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
@@ -123,13 +114,24 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
123114
session.registry().clone()
124115
});
125116

126-
/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
117+
/// How the compressor was configured on [`WriteStrategyBuilder`].
118+
enum CompressorConfig {
119+
/// A [`BtrBlocksCompressorBuilder`] that [`WriteStrategyBuilder::build`] will finalize.
120+
/// `IntDictScheme` is automatically excluded from the data compressor to prevent recursive
121+
/// dictionary encoding.
122+
BtrBlocks(BtrBlocksCompressorBuilder),
123+
/// An opaque compressor used as-is for both data and stats compression.
124+
Opaque(Arc<dyn CompressorPlugin>),
125+
}
126+
127+
/// Build a new [writer strategy](LayoutStrategy) to compress and reorganize chunks of a Vortex
128+
/// file.
127129
///
128130
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
129131
/// repartitioning and compressing them to strike a balance between size on-disk,
130132
/// bulk decoding performance, and IOPS required to perform an indexed read.
131133
pub struct WriteStrategyBuilder {
132-
compressor_override: Option<Arc<dyn CompressorPlugin>>,
134+
compressor: CompressorConfig,
133135
row_block_size: usize,
134136
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
135137
allow_encodings: Option<ArrayRegistry>,
@@ -141,7 +143,7 @@ impl Default for WriteStrategyBuilder {
141143
/// and then finally built yielding the [`LayoutStrategy`].
142144
fn default() -> Self {
143145
Self {
144-
compressor_override: None,
146+
compressor: CompressorConfig::BtrBlocks(BtrBlocksCompressorBuilder::default()),
145147
row_block_size: 8192,
146148
field_writers: HashMap::new(),
147149
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
@@ -183,97 +185,20 @@ impl WriteStrategyBuilder {
183185
self
184186
}
185187

186-
/// Override the [compressor](CompressorPlugin) used for compressing chunks in the file.
187-
///
188-
/// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
189-
/// total size with decoding performance.
190-
///
191-
/// # Panics
192-
///
193-
/// Panics if a compressor has already been set via
194-
/// [`with_compressor`](Self::with_compressor),
195-
/// [`with_cuda_compatible_encodings`](Self::with_cuda_compatible_encodings), or
196-
/// [`with_compact_encodings`](Self::with_compact_encodings).
197-
///
198-
/// These methods are mutually exclusive.
199-
pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
200-
assert!(
201-
self.compressor_override.is_none(),
202-
"A compressor has already been configured. `with_compressor`, \
203-
`with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive."
204-
);
205-
self.compressor_override = Some(Arc::new(compressor));
206-
self
207-
}
208-
209-
/// Configure a write strategy that emits only CUDA-compatible encodings.
210-
///
211-
/// This method simply exists as a wrapper around [`with_compressor`].
212-
///
213-
/// This configures BtrBlocks to exclude schemes without CUDA kernel support.
214-
/// With the `unstable_encodings` feature, strings use buffer-level Zstd compression
215-
/// (`ZstdBuffersArray`) which preserves the array buffer layout for zero-conversion
216-
/// GPU decompression. Without it, strings use interleaved Zstd compression.
217-
///
218-
/// # Panics
219-
///
220-
/// Panics if a compressor has already been set. See [`with_compressor`]
188+
/// Override the default [`BtrBlocksCompressorBuilder`] used for compression.
221189
///
222-
/// [`with_compressor`]: Self::with_compressor.
223-
#[cfg(feature = "zstd")]
224-
pub fn with_cuda_compatible_encodings(mut self) -> Self {
225-
assert!(
226-
self.compressor_override.is_none(),
227-
"A compressor has already been configured. `with_compressor`, \
228-
`with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive."
229-
);
230-
231-
let mut builder = BtrBlocksCompressorBuilder::default().exclude([
232-
integer::SparseScheme.id(),
233-
integer::RLE_INTEGER_SCHEME.id(),
234-
float::RLE_FLOAT_SCHEME.id(),
235-
float::NullDominatedSparseScheme.id(),
236-
string::StringDictScheme.id(),
237-
string::FSSTScheme.id(),
238-
]);
239-
240-
#[cfg(feature = "unstable_encodings")]
241-
{
242-
builder = builder.with_new_scheme(&string::ZstdBuffersScheme);
243-
}
244-
#[cfg(not(feature = "unstable_encodings"))]
245-
{
246-
builder = builder.with_new_scheme(&string::ZstdScheme);
247-
}
248-
249-
self.compressor_override = Some(Arc::new(builder.build()));
190+
/// The builder is finalized during [`build`](Self::build), producing two compressors: one for
191+
/// data (with `IntDictScheme` excluded) and one for stats.
192+
pub fn with_btrblocks_builder(mut self, builder: BtrBlocksCompressorBuilder) -> Self {
193+
self.compressor = CompressorConfig::BtrBlocks(builder);
250194
self
251195
}
252196

253-
/// Configure a write strategy that uses compact encodings (Pco for numerics, Zstd for
254-
/// strings/binary).
255-
///
256-
/// This method simply exists as a wrapper around [`with_compressor`].
257-
///
258-
/// This provides better compression ratios than the default BtrBlocks strategy,
259-
/// especially for floating-point heavy datasets.
197+
/// Set the compressor to an opaque [`CompressorPlugin`].
260198
///
261-
/// # Panics
262-
///
263-
/// Panics if a compressor has already been set. See [`with_compressor`]
264-
///
265-
/// [`with_compressor`]: Self::with_compressor.
266-
#[cfg(feature = "zstd")]
267-
pub fn with_compact_encodings(mut self) -> Self {
268-
assert!(
269-
self.compressor_override.is_none(),
270-
"A compressor has already been configured. `with_compressor`, \
271-
`with_cuda_compatible_encodings`, and `with_compact_encodings` are mutually exclusive."
272-
);
273-
274-
self.compressor_override = Some(Arc::new(
275-
BtrBlocksCompressorBuilder::default().with_compact().build(),
276-
));
199+
/// The compressor is used as-is for both data and stats compression.
200+
pub fn with_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
201+
self.compressor = CompressorConfig::Opaque(Arc::new(compressor));
277202
self
278203
}
279204

@@ -294,19 +219,18 @@ impl WriteStrategyBuilder {
294219
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
295220

296221
// 5. compress each chunk.
297-
// Exclude IntDictScheme from the default compressor because DictStrategy (step 3) already
222+
// Exclude IntDictScheme from the data compressor because DictStrategy (step 3) already
298223
// dictionary-encodes columns. Allowing IntDictScheme here would redundantly
299224
// dictionary-encode the integer codes produced by that earlier step.
300-
let data_compressor: Arc<dyn CompressorPlugin> =
301-
if let Some(ref compressor) = self.compressor_override {
302-
compressor.clone()
303-
} else {
304-
Arc::new(
305-
BtrBlocksCompressorBuilder::default()
306-
.exclude([IntDictScheme.id()])
307-
.build(),
308-
)
309-
};
225+
let data_compressor: Arc<dyn CompressorPlugin> = match &self.compressor {
226+
CompressorConfig::BtrBlocks(builder) => Arc::new(
227+
builder
228+
.clone()
229+
.exclude_schemes([IntDictScheme.id()])
230+
.build(),
231+
),
232+
CompressorConfig::Opaque(compressor) => compressor.clone(),
233+
};
310234
let compressing = CompressingStrategy::new(buffered, data_compressor);
311235

312236
// 4. prior to compression, coalesce up to a minimum size
@@ -327,12 +251,10 @@ impl WriteStrategyBuilder {
327251
);
328252

329253
// 2.1. | 3.1. compress stats tables and dict values.
330-
let stats_compressor: Arc<dyn CompressorPlugin> =
331-
if let Some(ref compressor) = self.compressor_override {
332-
compressor.clone()
333-
} else {
334-
Arc::new(BtrBlocksCompressor::default())
335-
};
254+
let stats_compressor: Arc<dyn CompressorPlugin> = match self.compressor {
255+
CompressorConfig::BtrBlocks(builder) => Arc::new(builder.build()),
256+
CompressorConfig::Opaque(compressor) => compressor,
257+
};
336258
let compress_then_flat = CompressingStrategy::new(flat, stats_compressor);
337259

338260
// 3. apply dict encoding or fallback

0 commit comments

Comments
 (0)