Skip to content

Commit fe388a0

Browse files
authored
More robust types in the compressor (#7415)
## Summary Tracking issue: #7216 Makes the compressor types more robust (removes the possibility for invalid state), which additionally sets up adding tracing easier (draft at #7385) ## API Changes Changes some types: ```rust /// Closure type for [`DeferredEstimate::Callback`]. /// /// The compressor calls this with the same arguments it would pass to sampling. The closure must /// resolve directly to a terminal [`EstimateVerdict`]. #[rustfmt::skip] pub type EstimateFn = dyn FnOnce( &CascadingCompressor, &mut ArrayAndStats, CompressorContext, ) -> VortexResult<EstimateVerdict> + Send + Sync; /// The result of a [`Scheme`]'s compression ratio estimation. /// /// This type is returned by [`Scheme::expected_compression_ratio`] to tell the compressor how /// promising this scheme is for a given array without performing any expensive work. /// /// [`CompressionEstimate::Verdict`] means the scheme already knows the terminal answer. /// [`CompressionEstimate::Deferred`] means the compressor must do extra work before the scheme can /// produce a terminal answer. #[derive(Debug)] pub enum CompressionEstimate { /// The scheme already knows the terminal estimation verdict. Verdict(EstimateVerdict), /// The compressor must perform deferred work to resolve the terminal estimation verdict. Deferred(DeferredEstimate), } /// The terminal answer to a compression estimate request. #[derive(Debug)] pub enum EstimateVerdict { /// Do not use this scheme for this array. Skip, /// Always use this scheme, as it is definitively the best choice. /// /// Some examples include constant detection, decimal byte parts, and temporal decomposition. /// /// The compressor will select this scheme immediately without evaluating further candidates. /// Schemes that return `AlwaysUse` must be mutually exclusive per canonical type (enforced by /// [`Scheme::matches`]), otherwise the winner depends silently on registration order. /// /// [`Scheme::matches`]: crate::scheme::Scheme::matches AlwaysUse, /// The estimated compression ratio. This must be greater than `1.0` to be considered by the /// compressor, otherwise it is worse than the canonical encoding. Ratio(f64), } /// Deferred work that can resolve to a terminal [`EstimateVerdict`]. pub enum DeferredEstimate { /// The scheme cannot cheaply estimate its ratio, so the compressor should compress a small /// sample to determine effectiveness. Sample, /// A fallible estimation requiring a custom expensive computation. /// /// Use this only when the scheme needs to perform trial encoding or other costly checks to /// determine its compression ratio. The callback returns an [`EstimateVerdict`] directly, so /// it cannot request more sampling or another deferred callback. Callback(Box<EstimateFn>), } ``` This will make some changes that we want to make is the future easier as well (tracing, better decision making for what things to try, etc). ## Testing Some new tests Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 71089dd commit fe388a0

17 files changed

Lines changed: 494 additions & 175 deletions

File tree

vortex-btrblocks/src/schemes/decimal.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use vortex_array::arrays::PrimitiveArray;
1111
use vortex_array::arrays::decimal::narrowed_decimal;
1212
use vortex_array::dtype::DecimalType;
1313
use vortex_compressor::estimate::CompressionEstimate;
14+
use vortex_compressor::estimate::EstimateVerdict;
1415
use vortex_decimal_byte_parts::DecimalByteParts;
1516
use vortex_error::VortexResult;
1617

@@ -47,7 +48,7 @@ impl Scheme for DecimalScheme {
4748
_ctx: CompressorContext,
4849
) -> CompressionEstimate {
4950
// Decimal compression is almost always beneficial (narrowing + primitive compression).
50-
CompressionEstimate::AlwaysUse
51+
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
5152
}
5253

5354
fn compress(

vortex-btrblocks/src/schemes/float.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
2121
use vortex_array::arrays::primitive::PrimitiveArrayExt;
2222
use vortex_array::dtype::PType;
2323
use vortex_compressor::estimate::CompressionEstimate;
24+
use vortex_compressor::estimate::DeferredEstimate;
25+
use vortex_compressor::estimate::EstimateVerdict;
2426
use vortex_compressor::scheme::ChildSelection;
2527
use vortex_compressor::scheme::DescendantExclusion;
2628
use vortex_error::VortexResult;
@@ -88,15 +90,15 @@ impl Scheme for ALPScheme {
8890
// ALP encodes floats as integers. Without integer compression afterward, the encoded ints
8991
// are the same size.
9092
if ctx.finished_cascading() {
91-
return CompressionEstimate::Skip;
93+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
9294
}
9395

9496
// We don't support ALP for f16.
9597
if data.array_as_primitive().ptype() == PType::F16 {
96-
return CompressionEstimate::Skip;
98+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
9799
}
98100

99-
CompressionEstimate::Sample
101+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
100102
}
101103

102104
fn compress(
@@ -154,10 +156,10 @@ impl Scheme for ALPRDScheme {
154156
) -> CompressionEstimate {
155157
// We don't support ALPRD for f16.
156158
if data.array_as_primitive().ptype() == PType::F16 {
157-
return CompressionEstimate::Skip;
159+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
158160
}
159161

160-
CompressionEstimate::Sample
162+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
161163
}
162164

163165
fn compress(
@@ -225,16 +227,16 @@ impl Scheme for NullDominatedSparseScheme {
225227

226228
// All-null arrays should be compressed as constant instead anyways.
227229
if value_count == 0 {
228-
return CompressionEstimate::Skip;
230+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
229231
}
230232

231233
// If the majority (90%) of values is null, this will compress well.
232234
if stats.null_count() as f64 / len > 0.9 {
233-
return CompressionEstimate::Ratio(len / value_count as f64);
235+
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
234236
}
235237

236238
// Otherwise we don't go this route.
237-
CompressionEstimate::Skip
239+
CompressionEstimate::Verdict(EstimateVerdict::Skip)
238240
}
239241

240242
fn compress(
@@ -279,7 +281,7 @@ impl Scheme for PcoScheme {
279281
_data: &mut ArrayAndStats,
280282
_ctx: CompressorContext,
281283
) -> CompressionEstimate {
282-
CompressionEstimate::Sample
284+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
283285
}
284286

285287
fn compress(
@@ -326,14 +328,14 @@ impl Scheme for FloatRLEScheme {
326328
) -> CompressionEstimate {
327329
// RLE is only useful when we cascade it with another encoding.
328330
if ctx.finished_cascading() {
329-
return CompressionEstimate::Skip;
331+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
330332
}
331333

332334
if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
333-
return CompressionEstimate::Skip;
335+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
334336
}
335337

336-
CompressionEstimate::Sample
338+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
337339
}
338340

339341
fn compress(

vortex-btrblocks/src/schemes/integer.rs

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use vortex_array::scalar::Scalar;
1717
use vortex_compressor::builtins::FloatDictScheme;
1818
use vortex_compressor::builtins::StringDictScheme;
1919
use vortex_compressor::estimate::CompressionEstimate;
20+
use vortex_compressor::estimate::DeferredEstimate;
21+
use vortex_compressor::estimate::EstimateVerdict;
2022
use vortex_compressor::scheme::AncestorExclusion;
2123
use vortex_compressor::scheme::ChildSelection;
2224
use vortex_compressor::scheme::DescendantExclusion;
@@ -134,21 +136,21 @@ impl Scheme for FoRScheme {
134136
// FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
135137
// the same size.
136138
if ctx.finished_cascading() {
137-
return CompressionEstimate::Skip;
139+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
138140
}
139141

140142
let stats = data.integer_stats();
141143

142144
// Only apply when the min is not already zero.
143145
if stats.erased().min_is_zero() {
144-
return CompressionEstimate::Skip;
146+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
145147
}
146148

147149
// Difference between max and min.
148150
let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
149151
Some(l) => l + 1,
150152
// If max-min == 0, the we should be compressing this as a constant array.
151-
None => return CompressionEstimate::Skip,
153+
None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
152154
};
153155

154156
// If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
@@ -162,7 +164,7 @@ impl Scheme for FoRScheme {
162164
{
163165
let bitpack_bitwidth = max_log + 1;
164166
if for_bitwidth >= bitpack_bitwidth {
165-
return CompressionEstimate::Skip;
167+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
166168
}
167169
}
168170

@@ -173,7 +175,9 @@ impl Scheme for FoRScheme {
173175
.try_into()
174176
.vortex_expect("bit width must fit in u32");
175177

176-
CompressionEstimate::Ratio(full_width as f64 / for_bitwidth as f64)
178+
CompressionEstimate::Verdict(EstimateVerdict::Ratio(
179+
full_width as f64 / for_bitwidth as f64,
180+
))
177181
}
178182

179183
fn compress(
@@ -265,17 +269,17 @@ impl Scheme for ZigZagScheme {
265269
// ZigZag only transforms negative values to positive. Without further compression,
266270
// the output is the same size.
267271
if ctx.finished_cascading() {
268-
return CompressionEstimate::Skip;
272+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
269273
}
270274

271275
let stats = data.integer_stats();
272276

273277
// ZigZag is only useful when there are negative values.
274278
if !stats.erased().min_is_negative() {
275-
return CompressionEstimate::Skip;
279+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
276280
}
277281

278-
CompressionEstimate::Sample
282+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
279283
}
280284

281285
fn compress(
@@ -314,10 +318,10 @@ impl Scheme for BitPackingScheme {
314318

315319
// BitPacking only works for non-negative values.
316320
if stats.erased().min_is_negative() {
317-
return CompressionEstimate::Skip;
321+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
318322
}
319323

320-
CompressionEstimate::Sample
324+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
321325
}
322326

323327
fn compress(
@@ -443,12 +447,12 @@ impl Scheme for SparseScheme {
443447

444448
// All-null arrays should be compressed as constant instead anyways.
445449
if value_count == 0 {
446-
return CompressionEstimate::Skip;
450+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
447451
}
448452

449453
// If the majority (90%) of values is null, this will compress well.
450454
if stats.null_count() as f64 / len > 0.9 {
451-
return CompressionEstimate::Ratio(len / value_count as f64);
455+
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
452456
}
453457

454458
let (_, most_frequent_count) = stats
@@ -460,18 +464,20 @@ impl Scheme for SparseScheme {
460464

461465
// If the most frequent value is the only value, we should compress as constant instead.
462466
if most_frequent_count == value_count {
463-
return CompressionEstimate::Skip;
467+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
464468
}
465469
debug_assert!(value_count > most_frequent_count);
466470

467471
// See if the most frequent value accounts for >= 90% of the set values.
468472
let freq = most_frequent_count as f64 / value_count as f64;
469473
if freq < 0.9 {
470-
return CompressionEstimate::Skip;
474+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
471475
}
472476

473477
// We only store the positions of the non-top values.
474-
CompressionEstimate::Ratio(value_count as f64 / (value_count - most_frequent_count) as f64)
478+
CompressionEstimate::Verdict(EstimateVerdict::Ratio(
479+
value_count as f64 / (value_count - most_frequent_count) as f64,
480+
))
475481
}
476482

477483
fn compress(
@@ -603,10 +609,10 @@ impl Scheme for RunEndScheme {
603609
) -> CompressionEstimate {
604610
// If the run length is below the threshold, drop it.
605611
if data.integer_stats().average_run_length() < RUN_END_THRESHOLD {
606-
return CompressionEstimate::Skip;
612+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
607613
}
608614

609-
CompressionEstimate::Sample
615+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
610616
}
611617

612618
fn compress(
@@ -668,14 +674,14 @@ impl Scheme for SequenceScheme {
668674
// It is pointless checking if a sample is a sequence since it will not correspond to the
669675
// entire array.
670676
if ctx.is_sample() {
671-
return CompressionEstimate::Skip;
677+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
672678
}
673679

674680
let stats = data.integer_stats();
675681

676682
// `SequenceArray` does not support nulls.
677683
if stats.null_count() > 0 {
678-
return CompressionEstimate::Skip;
684+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
679685
}
680686

681687
// If the distinct_values_count was computed, and not all values are unique, then this
@@ -684,23 +690,25 @@ impl Scheme for SequenceScheme {
684690
.distinct_count()
685691
.is_some_and(|count| count as usize != data.array_len())
686692
{
687-
return CompressionEstimate::Skip;
693+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
688694
}
689695

690696
// TODO(connor): Why do we sequence encode the whole thing and then throw it away? And then
691697
// why do we divide the ratio by 2???
692698

693-
CompressionEstimate::Estimate(Box::new(|_compressor, data, _ctx| {
694-
let Some(encoded) = sequence_encode(data.array_as_primitive())? else {
695-
// If we are unable to sequence encode this array, make sure we skip.
696-
return Ok(CompressionEstimate::Skip);
697-
};
698-
699-
// TODO(connor): This doesn't really make sense?
700-
// Since two values are required to store base and multiplier the compression ratio is
701-
// divided by 2.
702-
Ok(CompressionEstimate::Ratio(encoded.len() as f64 / 2.0))
703-
}))
699+
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
700+
|_compressor, data, _ctx| {
701+
let Some(encoded) = sequence_encode(data.array_as_primitive())? else {
702+
// If we are unable to sequence encode this array, make sure we skip.
703+
return Ok(EstimateVerdict::Skip);
704+
};
705+
706+
// TODO(connor): This doesn't really make sense?
707+
// Since two values are required to store base and multiplier the compression ratio is
708+
// divided by 2.
709+
Ok(EstimateVerdict::Ratio(encoded.len() as f64 / 2.0))
710+
},
711+
)))
704712
}
705713

706714
fn compress(
@@ -738,10 +746,10 @@ impl Scheme for PcoScheme {
738746

739747
// Pco does not support I8 or U8.
740748
if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
741-
return CompressionEstimate::Skip;
749+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
742750
}
743751

744-
CompressionEstimate::Sample
752+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
745753
}
746754

747755
fn compress(
@@ -865,14 +873,14 @@ impl Scheme for IntRLEScheme {
865873
) -> CompressionEstimate {
866874
// RLE is only useful when we cascade it with another encoding.
867875
if ctx.finished_cascading() {
868-
return CompressionEstimate::Skip;
876+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
869877
}
870878

871879
if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD {
872-
return CompressionEstimate::Skip;
880+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
873881
}
874882

875-
CompressionEstimate::Sample
883+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
876884
}
877885

878886
fn compress(

vortex-btrblocks/src/schemes/string.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use vortex_array::arrays::VarBinArray;
1111
use vortex_array::arrays::primitive::PrimitiveArrayExt;
1212
use vortex_array::arrays::varbin::VarBinArrayExt;
1313
use vortex_compressor::estimate::CompressionEstimate;
14+
use vortex_compressor::estimate::DeferredEstimate;
15+
use vortex_compressor::estimate::EstimateVerdict;
1416
use vortex_compressor::scheme::ChildSelection;
1517
use vortex_compressor::scheme::DescendantExclusion;
1618
use vortex_error::VortexResult;
@@ -73,7 +75,7 @@ impl Scheme for FSSTScheme {
7375
_data: &mut ArrayAndStats,
7476
_ctx: CompressorContext,
7577
) -> CompressionEstimate {
76-
CompressionEstimate::Sample
78+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
7779
}
7880

7981
fn compress(
@@ -161,16 +163,16 @@ impl Scheme for NullDominatedSparseScheme {
161163

162164
// All-null arrays should be compressed as constant instead anyways.
163165
if value_count == 0 {
164-
return CompressionEstimate::Skip;
166+
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
165167
}
166168

167169
// If the majority (90%) of values is null, this will compress well.
168170
if stats.null_count() as f64 / len > 0.9 {
169-
return CompressionEstimate::Ratio(len / value_count as f64);
171+
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
170172
}
171173

172174
// Otherwise we don't go this route.
173-
CompressionEstimate::Skip
175+
CompressionEstimate::Verdict(EstimateVerdict::Skip)
174176
}
175177

176178
fn compress(
@@ -216,7 +218,7 @@ impl Scheme for ZstdScheme {
216218
_data: &mut ArrayAndStats,
217219
_ctx: CompressorContext,
218220
) -> CompressionEstimate {
219-
CompressionEstimate::Sample
221+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
220222
}
221223

222224
fn compress(
@@ -245,7 +247,7 @@ impl Scheme for ZstdBuffersScheme {
245247
_data: &mut ArrayAndStats,
246248
_ctx: CompressorContext,
247249
) -> CompressionEstimate {
248-
CompressionEstimate::Sample
250+
CompressionEstimate::Deferred(DeferredEstimate::Sample)
249251
}
250252

251253
fn compress(

0 commit comments

Comments
 (0)