Skip to content

Commit f152281

Browse files
authored
Implement smarter sampler (#7585)
## Summary Tracking issue: #7216 Right now, we have a very naive sampling strategy in the compressor. We will run the estimation function in the order of the `Scheme` declaration, regardless of how expensive it is to estimate things. This changes the sampling algorithm to first do all of the cheap estimations, and only when all of them fail do we do the more expensive sampling / callbacks. Edit: so benchmarks show no real change, but theoretically if someone comes along and has a really slow compress scheme, this should lessen the impact of that. ## Testing Will run benchmarks. Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 7324308 commit f152281

7 files changed

Lines changed: 350 additions & 54 deletions

File tree

vortex-btrblocks/src/schemes/integer.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use vortex_compressor::builtins::FloatDictScheme;
1717
use vortex_compressor::builtins::StringDictScheme;
1818
use vortex_compressor::estimate::CompressionEstimate;
1919
use vortex_compressor::estimate::DeferredEstimate;
20+
use vortex_compressor::estimate::EstimateScore;
2021
use vortex_compressor::estimate::EstimateVerdict;
2122
use vortex_compressor::scheme::AncestorExclusion;
2223
use vortex_compressor::scheme::ChildSelection;
@@ -737,20 +738,29 @@ impl Scheme for SequenceScheme {
737738
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
738739
}
739740

740-
// TODO(connor): Why do we sequence encode the whole thing and then throw it away? And then
741-
// why do we divide the ratio by 2???
742-
741+
// TODO(connor): `sequence_encode` allocates the encoded array just to confirm feasibility.
742+
// A cheaper `is_sequence` probe would let us skip the allocation entirely.
743743
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
744-
|_compressor, data, _ctx, exec_ctx| {
745-
let Some(encoded) = sequence_encode(data.array_as_primitive(), exec_ctx)? else {
746-
// If we are unable to sequence encode this array, make sure we skip.
744+
|_compressor, data, best_so_far, _ctx, exec_ctx| {
745+
// `SequenceArray` stores exactly two scalars (base and multiplier), so the best
746+
// achievable compression ratio is `array_len / 2`.
747+
let compressed_size = 2usize;
748+
let max_ratio = data.array_len() as f64 / compressed_size as f64;
749+
750+
// If we cannot beat the best so far, then we do not want to even try sequence
751+
// encoding the data.
752+
let threshold = best_so_far.and_then(EstimateScore::finite_ratio);
753+
if threshold.is_some_and(|t| max_ratio <= t) {
747754
return Ok(EstimateVerdict::Skip);
748-
};
755+
}
749756

750-
// TODO(connor): This doesn't really make sense?
751-
// Since two values are required to store base and multiplier the compression ratio is
752-
// divided by 2.
753-
Ok(EstimateVerdict::Ratio(encoded.len() as f64 / 2.0))
757+
// TODO(connor): We should pass this array back to the compressor in the case that
758+
// we do want to sequence encode this so that we do not need to recompress.
759+
if sequence_encode(data.array_as_primitive(), exec_ctx)?.is_none() {
760+
return Ok(EstimateVerdict::Skip);
761+
}
762+
// TODO(connor): Should we get the actual ratio here?
763+
Ok(EstimateVerdict::Ratio(max_ratio))
754764
},
755765
)))
756766
}

vortex-compressor/public-api.lock

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,32 @@ impl core::fmt::Debug for vortex_compressor::estimate::DeferredEstimate
326326

327327
pub fn vortex_compressor::estimate::DeferredEstimate::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
328328

329+
pub enum vortex_compressor::estimate::EstimateScore
330+
331+
pub vortex_compressor::estimate::EstimateScore::FiniteCompression(f64)
332+
333+
pub vortex_compressor::estimate::EstimateScore::ZeroBytes
334+
335+
impl vortex_compressor::estimate::EstimateScore
336+
337+
pub fn vortex_compressor::estimate::EstimateScore::finite_ratio(self) -> core::option::Option<f64>
338+
339+
impl core::clone::Clone for vortex_compressor::estimate::EstimateScore
340+
341+
pub fn vortex_compressor::estimate::EstimateScore::clone(&self) -> vortex_compressor::estimate::EstimateScore
342+
343+
impl core::cmp::PartialEq for vortex_compressor::estimate::EstimateScore
344+
345+
pub fn vortex_compressor::estimate::EstimateScore::eq(&self, other: &vortex_compressor::estimate::EstimateScore) -> bool
346+
347+
impl core::fmt::Debug for vortex_compressor::estimate::EstimateScore
348+
349+
pub fn vortex_compressor::estimate::EstimateScore::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
350+
351+
impl core::marker::Copy for vortex_compressor::estimate::EstimateScore
352+
353+
impl core::marker::StructuralPartialEq for vortex_compressor::estimate::EstimateScore
354+
329355
pub enum vortex_compressor::estimate::EstimateVerdict
330356

331357
pub vortex_compressor::estimate::EstimateVerdict::AlwaysUse
@@ -338,7 +364,7 @@ impl core::fmt::Debug for vortex_compressor::estimate::EstimateVerdict
338364

339365
pub fn vortex_compressor::estimate::EstimateVerdict::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
340366

341-
pub type vortex_compressor::estimate::EstimateFn = (dyn core::ops::function::FnOnce(&vortex_compressor::CascadingCompressor, &vortex_compressor::stats::ArrayAndStats, vortex_compressor::ctx::CompressorContext, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_compressor::estimate::EstimateVerdict> + core::marker::Send + core::marker::Sync)
367+
pub type vortex_compressor::estimate::EstimateFn = (dyn core::ops::function::FnOnce(&vortex_compressor::CascadingCompressor, &vortex_compressor::stats::ArrayAndStats, core::option::Option<vortex_compressor::estimate::EstimateScore>, vortex_compressor::ctx::CompressorContext, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_compressor::estimate::EstimateVerdict> + core::marker::Send + core::marker::Sync)
342368

343369
pub mod vortex_compressor::scheme
344370

vortex-compressor/src/builtins/constant/float.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl Scheme for FloatConstantScheme {
6666
// This is an expensive check, but in practice the distinct count is known because we often
6767
// include dictionary encoding in our set of schemes, so we rarely call this.
6868
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
69-
|_compressor, data, _ctx, exec_ctx| {
69+
|_compressor, data, _best_so_far, _ctx, exec_ctx| {
7070
if is_constant(data.array(), exec_ctx)? {
7171
Ok(EstimateVerdict::AlwaysUse)
7272
} else {

vortex-compressor/src/builtins/constant/string.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Scheme for StringConstantScheme {
6060
// This is an expensive check, but the alternative of not compressing a constant array is
6161
// far less preferable.
6262
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
63-
|_compressor, data, _ctx, exec_ctx| {
63+
|_compressor, data, _best_so_far, _ctx, exec_ctx| {
6464
if is_constant(data.array(), exec_ctx)? {
6565
Ok(EstimateVerdict::AlwaysUse)
6666
} else {

0 commit comments

Comments
 (0)