Skip to content

Commit 9657308

Browse files
committed
add execution context to compressor
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent ff9e7bc commit 9657308

6 files changed

Lines changed: 34 additions & 15 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-array/src/executor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ impl dyn DynArray + '_ {
172172
///
173173
/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
174174
/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
175+
#[derive(Debug, Clone)]
175176
pub struct ExecutionCtx {
176177
id: usize,
177178
session: VortexSession,

vortex-btrblocks/src/compressor/string.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
use vortex_array::ArrayRef;
55
use vortex_array::Canonical;
66
use vortex_array::IntoArray;
7-
use vortex_array::LEGACY_SESSION;
87
use vortex_array::ToCanonical;
9-
use vortex_array::VortexSessionExecute;
108
use vortex_array::aggregate_fn::fns::is_constant::is_constant;
119
use vortex_array::arrays::ConstantArray;
1210
use vortex_array::arrays::DictArray;
@@ -253,7 +251,7 @@ impl Scheme for ConstantScheme {
253251

254252
fn expected_compression_ratio(
255253
&self,
256-
_compressor: &CascadingCompressor,
254+
compressor: &CascadingCompressor,
257255
data: &mut ArrayAndStats,
258256
ctx: CompressorContext,
259257
_excludes: &[SchemeId],
@@ -264,10 +262,11 @@ impl Scheme for ConstantScheme {
264262

265263
let stats = data.string_stats();
266264

267-
// TODO(connor): Put the execution context somewhere!
268-
let mut ctx = LEGACY_SESSION.create_execution_ctx();
269265
if stats.estimated_distinct_count().is_none_or(|c| c > 1)
270-
|| !is_constant(&stats.source().clone().into_array(), &mut ctx)?
266+
|| !is_constant(
267+
&stats.source().clone().into_array(),
268+
&mut compressor.execution_ctx(),
269+
)?
271270
{
272271
return Ok(0.0);
273272
}

vortex-btrblocks/src/compressor/temporal.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
use vortex_array::ArrayRef;
77
use vortex_array::Canonical;
88
use vortex_array::IntoArray;
9-
use vortex_array::LEGACY_SESSION;
109
use vortex_array::ToCanonical;
11-
use vortex_array::VortexSessionExecute;
1210
use vortex_array::aggregate_fn::fns::is_constant::is_constant;
1311
use vortex_array::arrays::ConstantArray;
1412
use vortex_array::arrays::TemporalArray;
@@ -78,11 +76,11 @@ impl Scheme for TemporalScheme {
7876
let ext_array = array.to_extension();
7977
let temporal_array = TemporalArray::try_from(ext_array.clone().into_array())?;
8078

81-
// TODO(connor): Put the execution context somewhere!
82-
let mut ctx = LEGACY_SESSION.create_execution_ctx();
83-
8479
// Check for constant array and return early if so.
85-
let is_constant = is_constant(&ext_array.clone().into_array(), &mut ctx)?;
80+
let is_constant = is_constant(
81+
&ext_array.clone().into_array(),
82+
&mut compressor.execution_ctx(),
83+
)?;
8684

8785
if is_constant {
8886
return Ok(

vortex-compressor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ version = { workspace = true }
1616
[dependencies]
1717
itertools = { workspace = true }
1818
num-traits = { workspace = true }
19+
parking_lot = { workspace = true }
1920
rand = { workspace = true }
2021
rustc-hash = { workspace = true }
2122
tracing = { workspace = true }

vortex-compressor/src/compressor.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33

44
//! Cascading array compression implementation.
55
6+
use std::sync::Arc;
7+
8+
use parking_lot::Mutex;
9+
use parking_lot::MutexGuard;
610
use vortex_array::ArrayRef;
711
use vortex_array::Canonical;
812
use vortex_array::CanonicalValidity;
913
use vortex_array::DynArray;
14+
use vortex_array::ExecutionCtx;
1015
use vortex_array::IntoArray;
1116
use vortex_array::LEGACY_SESSION;
1217
use vortex_array::ToCanonical;
@@ -42,16 +47,30 @@ use crate::stats::GenerateStatsOptions;
4247
/// 2. Pre-filtering schemes by [`Scheme::matches`] and excludes.
4348
/// 3. Evaluating each matching scheme's compression ratio on a sample.
4449
/// 4. Compressing with the best scheme and verifying the result is smaller.
45-
#[derive(Clone)]
50+
#[derive(Debug, Clone)]
4651
pub struct CascadingCompressor {
4752
/// The enabled compression schemes.
4853
pub schemes: Vec<&'static dyn Scheme>,
54+
55+
/// Shared execution context for array operations during compression.
56+
///
57+
/// This should have low contention as we only execute arrays one at a time during compression.
58+
ctx: Arc<Mutex<ExecutionCtx>>,
4959
}
5060

5161
impl CascadingCompressor {
5262
/// Creates a new compressor with the given schemes.
5363
pub fn new(schemes: Vec<&'static dyn Scheme>) -> Self {
54-
Self { schemes }
64+
Self {
65+
schemes,
66+
// TODO(connor): The caller should probably pass this in.
67+
ctx: Arc::new(Mutex::new(LEGACY_SESSION.create_execution_ctx())),
68+
}
69+
}
70+
71+
/// Returns a mutable borrow of the execution context.
72+
pub fn execution_ctx(&self) -> MutexGuard<'_, ExecutionCtx> {
73+
self.ctx.lock()
5574
}
5675

5776
/// Compresses an array using cascading adaptive compression.
@@ -64,7 +83,7 @@ impl CascadingCompressor {
6483
pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
6584
let canonical = array
6685
.clone()
67-
.execute::<CanonicalValidity>(&mut LEGACY_SESSION.create_execution_ctx())?
86+
.execute::<CanonicalValidity>(&mut self.execution_ctx())?
6887
.0;
6988

7089
// Compact it, removing any wasted space before we attempt to compress it.

0 commit comments

Comments
 (0)