Skip to content

Commit 2c5017a

Browse files
committed
Reapply "wip on pluggable compressor cleanup"
This reverts commit 54b158c. Signed-off-by: Will Manning <will@willmanning.io>
1 parent a831042 commit 2c5017a

2 files changed

Lines changed: 24 additions & 27 deletions

File tree

vortex-btrblocks/src/builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ pub fn default_excluded() -> HashSet<SchemeId> {
7979
excluded.insert(string::ZstdScheme.id());
8080
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
8181
excluded.insert(string::ZstdBuffersScheme.id());
82+
#[cfg(feature = "unstable_encodings")]
83+
excluded.insert(turboquant::scheme::TURBOQUANT_SCHEME.id());
8284
excluded
8385
}
8486

@@ -107,7 +109,7 @@ pub fn default_excluded() -> HashSet<SchemeId> {
107109
/// .include([IntDictScheme.id()])
108110
/// .build();
109111
/// ```
110-
#[derive(Debug, Clone)]
112+
#[derive(Debug, Clone, PartialEq)]
111113
pub struct BtrBlocksCompressorBuilder {
112114
schemes: HashSet<&'static dyn Scheme>,
113115
}

vortex-file/src/strategy.rs

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use vortex_btrblocks::BtrBlocksCompressorBuilder;
3232
use vortex_bytebool::ByteBool;
3333
use vortex_datetime_parts::DateTimeParts;
3434
use vortex_decimal_byte_parts::DecimalByteParts;
35+
use vortex_error::vortex_panic;
3536
use vortex_fastlanes::BitPacked;
3637
use vortex_fastlanes::Delta;
3738
use vortex_fastlanes::FoR;
@@ -54,6 +55,7 @@ use vortex_pco::Pco;
5455
use vortex_runend::RunEnd;
5556
use vortex_sequence::Sequence;
5657
use vortex_sparse::Sparse;
58+
use vortex_tensor::encodings::turboquant::TurboQuant;
5759
use vortex_utils::aliases::hash_map::HashMap;
5860
use vortex_zigzag::ZigZag;
5961

@@ -111,6 +113,7 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
111113
session.register(RunEnd);
112114
session.register(Sequence);
113115
session.register(Sparse);
116+
session.register(TurboQuant);
114117
session.register(ZigZag);
115118

116119
#[cfg(feature = "zstd")]
@@ -132,8 +135,7 @@ pub struct WriteStrategyBuilder {
132135
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
133136
allow_encodings: Option<ArrayRegistry>,
134137
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
135-
#[cfg(feature = "unstable_encodings")]
136-
vector_quantization: bool,
138+
builder: Option<BtrBlocksCompressorBuilder>,
137139
}
138140

139141
impl Default for WriteStrategyBuilder {
@@ -146,8 +148,7 @@ impl Default for WriteStrategyBuilder {
146148
field_writers: HashMap::new(),
147149
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
148150
flat_strategy: None,
149-
#[cfg(feature = "unstable_encodings")]
150-
vector_quantization: false,
151+
builder: None,
151152
}
152153
}
153154
}
@@ -202,7 +203,8 @@ impl WriteStrategyBuilder {
202203
/// GPU decompression. Without it, strings use interleaved Zstd compression.
203204
#[cfg(feature = "zstd")]
204205
pub fn with_cuda_compatible_encodings(mut self) -> Self {
205-
let mut builder = BtrBlocksCompressorBuilder::default().exclude([
206+
let mut builder = self.builder.unwrap_or_default();
207+
builder = builder.exclude([
206208
integer::SparseScheme.id(),
207209
integer::RLE_INTEGER_SCHEME.id(),
208210
float::RLE_FLOAT_SCHEME.id(),
@@ -220,7 +222,7 @@ impl WriteStrategyBuilder {
220222
builder = builder.include([string::ZstdScheme.id()]);
221223
}
222224

223-
self.compressor = Some(Arc::new(builder.build()));
225+
self.builder = Some(builder);
224226
self
225227
}
226228

@@ -231,15 +233,13 @@ impl WriteStrategyBuilder {
231233
/// especially for floating-point heavy datasets.
232234
#[cfg(feature = "zstd")]
233235
pub fn with_compact_encodings(mut self) -> Self {
234-
let btrblocks = BtrBlocksCompressorBuilder::default()
235-
.include([
236+
let mut builder = self.builder.unwrap_or_default();
237+
builder = builder.include([
236238
string::ZstdScheme.id(),
237239
integer::PcoScheme.id(),
238240
float::PcoScheme.id(),
239-
])
240-
.build();
241-
242-
self.compressor = Some(Arc::new(btrblocks));
241+
]);
242+
self.builder = Some(builder);
243243
self
244244
}
245245

@@ -254,7 +254,9 @@ impl WriteStrategyBuilder {
254254
/// compressor is used with TurboQuant added.
255255
#[cfg(feature = "unstable_encodings")]
256256
pub fn with_vector_quantization(mut self) -> Self {
257-
self.vector_quantization = true;
257+
let mut builder = self.builder.unwrap_or_default();
258+
builder = builder.include([turboquant::scheme::TURBOQUANT_SCHEME.id()]);
259+
self.builder = Some(builder);
258260
self
259261
}
260262

@@ -274,21 +276,14 @@ impl WriteStrategyBuilder {
274276
// 6. buffer chunks so they end up with closer segment ids physically
275277
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
276278
// 5. compress each chunk
277-
#[cfg(feature = "unstable_encodings")]
278-
let compressor = if self.vector_quantization {
279-
use vortex_tensor::encodings::turboquant::scheme::TURBOQUANT_SCHEME;
280-
281-
// Build a BtrBlocks compressor with TurboQuant added.
282-
let builder = BtrBlocksCompressorBuilder::default().with_scheme(&TURBOQUANT_SCHEME);
283-
Some(Arc::new(builder.build()) as Arc<dyn CompressorPlugin>)
284-
} else {
285-
self.compressor.clone()
286-
};
287-
#[cfg(not(feature = "unstable_encodings"))]
288-
let compressor = self.compressor.clone();
279+
if self.builder.is_some() && self.compressor.is_some() {
280+
vortex_panic!("Cannot configure both a custom compressor and custom builder schemes");
281+
}
289282

290-
let compressing = if let Some(ref compressor) = compressor {
283+
let compressing = if let Some(ref compressor) = self.compressor {
291284
CompressingStrategy::new_opaque(buffered, compressor.clone())
285+
} else if let Some(ref builder) = self.builder {
286+
CompressingStrategy::new_opaque(buffered, builder.build())
292287
} else {
293288
CompressingStrategy::new_btrblocks(buffered, true)
294289
};

0 commit comments

Comments
 (0)