Skip to content

Commit d222c87

Browse files
authored
chore[vortex-layout]: allow specifying a set of stats on CompressingStrategy (#7422)
<!-- Thank you for submitting a pull request! We appreciate your time and effort. Please make sure to provide enough information so that we can review your pull request. The Summary and Testing sections below contain guidance on what to include. --> ## Summary This allows users to e.g. disable UncompressedSizeInBytes computation, which may be expensive for deeply nested types. <!-- If this PR is related to a tracked effort, please link to the relevant issue here (e.g., `Closes: #123`). Otherwise, feel free to ignore / delete this. In this section, please: 1. Explain the rationale for this change. 2. Summarize the changes included in this PR. A general rule of thumb is that larger PRs should have larger summaries. If there are a lot of changes, please help us review the code by explaining what was changed and why. If there is an issue or discussion attached, there is no need to duplicate all the details, but clarity is always preferred over brevity. --> <!-- ## API Changes Uncomment this section if there are any user-facing changes. Consider whether the change affects users in one of the following ways: 1. Breaks public APIs in some way. 2. Changes the underlying behavior of one of the engine integrations. 3. Should some documentation be updated to reflect this change? If a public API is changed in a breaking manner, make sure to add the appropriate label. You can run `./scripts/public-api.sh` locally to see if there are any public API changes (and this also runs in our CI). --> ## Testing <!-- Please describe how this change was tested. Here are some common categories for testing in Vortex: 1. Verifying existing behavior is maintained. 2. Verifying new behavior and functionality works correctly. 3. Serialization compatibility (backwards and forwards) should be maintained or explicitly broken. --> Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent 0bcd8fa commit d222c87

2 files changed

Lines changed: 14 additions & 3 deletions

File tree

vortex-layout/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ pub fn vortex_layout::layouts::compressed::CompressingStrategy::new<S: vortex_la
172172

173173
pub fn vortex_layout::layouts::compressed::CompressingStrategy::with_concurrency(self, concurrency: usize) -> Self
174174

175+
pub fn vortex_layout::layouts::compressed::CompressingStrategy::with_stats(self, stats: &[vortex_array::expr::stats::Stat]) -> Self
176+
175177
impl core::clone::Clone for vortex_layout::layouts::compressed::CompressingStrategy
176178

177179
pub fn vortex_layout::layouts::compressed::CompressingStrategy::clone(&self) -> vortex_layout::layouts::compressed::CompressingStrategy

vortex-layout/src/layouts/compressed.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl CompressorPlugin for BtrBlocksCompressor {
5454
pub struct CompressingStrategy {
5555
child: Arc<dyn LayoutStrategy>,
5656
compressor: Arc<dyn CompressorPlugin>,
57+
stats: Arc<[Stat]>,
5758
concurrency: usize,
5859
}
5960

@@ -63,6 +64,7 @@ impl CompressingStrategy {
6364
Self {
6465
child: Arc::new(child),
6566
compressor: Arc::new(compressor),
67+
stats: Stat::all().collect(),
6668
concurrency: std::thread::available_parallelism()
6769
.map(|v| v.get())
6870
.unwrap_or(1),
@@ -73,6 +75,13 @@ impl CompressingStrategy {
7375
self.concurrency = concurrency;
7476
self
7577
}
78+
79+
/// Override the set of statistics computed on each chunk before compression.
80+
/// Defaults to `Stat::all()`.
81+
pub fn with_stats(mut self, stats: &[Stat]) -> Self {
82+
self.stats = stats.into();
83+
self
84+
}
7685
}
7786

7887
#[async_trait]
@@ -87,17 +96,17 @@ impl LayoutStrategy for CompressingStrategy {
8796
) -> VortexResult<LayoutRef> {
8897
let dtype = stream.dtype().clone();
8998
let compressor = Arc::clone(&self.compressor);
99+
let stats = Arc::clone(&self.stats);
90100

91101
let handle = session.handle();
92102
let stream = stream
93103
.map(move |chunk| {
94104
let compressor = Arc::clone(&compressor);
105+
let stats = Arc::clone(&stats);
95106
handle.spawn_cpu(move || {
96107
let (sequence_id, chunk) = chunk?;
97108
// Compute the stats for the chunk prior to compression
98-
chunk
99-
.statistics()
100-
.compute_all(&Stat::all().collect::<Vec<_>>())?;
109+
chunk.statistics().compute_all(&stats)?;
101110
Ok((sequence_id, compressor.compress_chunk(&chunk)?))
102111
})
103112
})

0 commit comments

Comments
 (0)