Skip to content

Commit 38a5f1b

Browse files
committed
add spans for perfetto
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 75808d5 commit 38a5f1b

4 files changed

Lines changed: 149 additions & 125 deletions

File tree

vortex-compressor/src/compressor.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -314,26 +314,23 @@ impl CascadingCompressor {
314314
// Run the winning scheme's `compress`. On failure, emit an ERROR event carrying the
315315
// scheme name and cascade history before propagating.
316316
let error_ctx = trace::enabled_error_context(&compress_ctx);
317-
let compressed = match winner.compress(self, &data, compress_ctx, exec_ctx) {
318-
Ok(compressed) => compressed,
319-
Err(err) => {
317+
let _winner_span = trace::winner_compress_span(winner.id(), before_nbytes).entered();
318+
let compressed = winner
319+
.compress(self, &data, compress_ctx, exec_ctx)
320+
.inspect_err(|err| {
320321
// NB: this is the only way we can tell which scheme panicked / bailed on their
321322
// data, especially for third-party schemes where the error site may not carry any
322323
// compressor context.
323-
trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), &err);
324-
return Err(err);
325-
}
326-
};
324+
trace::scheme_compress_failed(winner.id(), before_nbytes, error_ctx.as_ref(), err);
325+
})?;
327326

328327
let after_nbytes = compressed.nbytes();
329328
let actual_ratio = (after_nbytes != 0).then(|| before_nbytes as f64 / after_nbytes as f64);
330329

331330
// TODO(connor): HACK TO SUPPORT L2 DENORMALIZATION!!!
332331
let accepted = after_nbytes < before_nbytes || compressed.is::<AnyScalarFn>();
333332

334-
trace::scheme_compress_result(
335-
winner.id(),
336-
before_nbytes,
333+
trace::record_winner_compress_result(
337334
after_nbytes,
338335
winner_estimate.trace_ratio(),
339336
actual_ratio,
@@ -373,28 +370,32 @@ impl CascadingCompressor {
373370
let mut deferred: Vec<(&'static dyn Scheme, DeferredEstimate)> = Vec::new();
374371

375372
// Pass 1: evaluate every immediate verdict. Stash deferred work for pass 2.
376-
for &scheme in schemes {
377-
match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
378-
CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
379-
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
380-
return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
381-
}
382-
CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
383-
let score = EstimateScore::FiniteCompression(ratio);
373+
{
374+
let _verdict_pass = trace::verdict_pass_span().entered();
375+
for &scheme in schemes {
376+
match scheme.expected_compression_ratio(data, compress_ctx.clone(), exec_ctx) {
377+
CompressionEstimate::Verdict(EstimateVerdict::Skip) => {}
378+
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => {
379+
return Ok(Some((scheme, WinnerEstimate::AlwaysUse)));
380+
}
381+
CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) => {
382+
let score = EstimateScore::FiniteCompression(ratio);
384383

385-
if is_better_score(score, &best) {
386-
best = Some((scheme, score));
384+
if is_better_score(score, &best) {
385+
best = Some((scheme, score));
386+
}
387+
}
388+
CompressionEstimate::Deferred(deferred_estimate) => {
389+
deferred.push((scheme, deferred_estimate));
387390
}
388-
}
389-
CompressionEstimate::Deferred(deferred_estimate) => {
390-
deferred.push((scheme, deferred_estimate));
391391
}
392392
}
393393
}
394394

395395
// Pass 2: run deferred work. Callbacks receive the current best as a threshold so they can
396396
// short-circuit with `Skip` when they cannot beat it.
397397
for (scheme, deferred_estimate) in deferred {
398+
let _span = trace::scheme_eval_span(scheme.id()).entered();
398399
let threshold: Option<EstimateScore> = best.map(|(_, score)| score);
399400
match deferred_estimate {
400401
DeferredEstimate::Sample => {

vortex-compressor/src/estimate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,9 @@ pub(super) fn estimate_compression_ratio_with_sampling<S: Scheme + ?Sized>(
228228

229229
let score = EstimateScore::from_sample_sizes(before, after);
230230

231-
// Single DEBUG event per sampled scheme. Downstream tooling can join this with the eventual
232-
// `scheme.compress_result` on the same scheme to compute sample-vs-full divergence.
233-
trace::sample_result(scheme.id(), before, after, score.finite_ratio());
231+
// Record the sample outcome as fields on the enclosing `scheme_eval` span. Joins with the
232+
// eventual `scheme.compress_result` on the same scheme to compute sample-vs-full divergence.
233+
trace::record_sample_result(before, after, score.finite_ratio());
234234

235235
Ok(score)
236236
}

vortex-compressor/src/stats/cache.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use super::FloatStats;
2121
use super::GenerateStatsOptions;
2222
use super::IntegerStats;
2323
use super::StringStats;
24+
use crate::trace;
2425

2526
/// A single cache entry: a concrete [`TypeId`] paired with a type-erased value.
2627
type StatsEntry = (TypeId, Arc<dyn Any + Send + Sync>);
@@ -58,7 +59,10 @@ impl StatsCache {
5859
.ok()
5960
.vortex_expect("we just checked the TypeID")
6061
} else {
61-
let new_arc: Arc<T> = Arc::new(f());
62+
let new_arc: Arc<T> = {
63+
let _span = trace::generate_stats_span(std::any::type_name::<T>()).entered();
64+
Arc::new(f())
65+
};
6266
guard.push((type_id, Arc::clone(&new_arc) as Arc<dyn Any + Send + Sync>));
6367
new_arc
6468
}

vortex-compressor/src/trace.rs

Lines changed: 117 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use crate::scheme::SchemeId;
1212
pub(super) const TARGET_TRACE: &str = "vortex_compressor::encode";
1313

1414
/// Builds the top-level compression span.
15+
///
16+
/// `input_nbytes` is known up front; `compressed_nbytes` / `compression_ratio` are filled in by
17+
/// [`record_compress_outcome`] once the cascade returns.
1518
#[inline]
1619
pub(super) fn compress_span(
1720
len: usize,
@@ -21,20 +24,126 @@ pub(super) fn compress_span(
2124
tracing::debug_span!(
2225
target: TARGET_TRACE,
2326
"compress",
24-
len,
27+
array_len = len,
2528
dtype = %dtype,
26-
before_nbytes,
27-
after_nbytes = tracing::field::Empty,
28-
ratio = tracing::field::Empty,
29+
input_nbytes = before_nbytes,
30+
compressed_nbytes = tracing::field::Empty,
31+
compression_ratio = tracing::field::Empty,
2932
)
3033
}
3134

35+
/// Builds a span covering on-demand materialization of a cached stats type.
36+
///
37+
/// Child of whatever span is active when a stats accessor first fires. Typically that's
38+
/// [`verdict_pass_span`]; entering this span disambiguates stats cost from the rest of Pass 1.
39+
/// `kind` is usually `std::any::type_name::<T>()` so the args identify which group was generated
40+
/// (e.g. `IntegerStats`, `FloatStats`).
41+
#[inline]
42+
pub(super) fn generate_stats_span(kind: &'static str) -> tracing::Span {
43+
tracing::debug_span!(
44+
target: TARGET_TRACE,
45+
"generate_stats",
46+
stats_kind = kind,
47+
)
48+
}
49+
50+
/// Builds a span covering Pass 1 of scheme selection (the cheap-verdict pass).
51+
///
52+
/// Stats batches merged across eligible schemes are materialized lazily by the first
53+
/// `expected_compression_ratio` call that touches them. Grouping those calls under one span makes
54+
/// the stats cost (and unexpectedly slow verdicts) visible independently of per-candidate sampling.
55+
#[inline]
56+
pub(super) fn verdict_pass_span() -> tracing::Span {
57+
tracing::debug_span!(
58+
target: TARGET_TRACE,
59+
"verdict_pass",
60+
)
61+
}
62+
63+
/// Builds a span covering one deferred per-scheme evaluation (sample or callback).
64+
///
65+
/// `scheme_candidate` is the scheme being evaluated (not necessarily chosen). The
66+
/// `sample_*_nbytes` / `sample_compression_ratio` fields are filled in by
67+
/// [`record_sample_result`] at the end of a sample compression; callback-based evaluations leave
68+
/// them unset.
69+
#[inline]
70+
pub(super) fn scheme_eval_span(scheme: SchemeId) -> tracing::Span {
71+
tracing::debug_span!(
72+
target: TARGET_TRACE,
73+
"scheme_eval",
74+
scheme_candidate = %scheme,
75+
sample_input_nbytes = tracing::field::Empty,
76+
sample_compressed_nbytes = tracing::field::Empty,
77+
sample_compression_ratio = tracing::field::Empty,
78+
)
79+
}
80+
81+
/// Records the outcome of a sample compression on the current `scheme_eval` span.
82+
#[inline]
83+
pub(super) fn record_sample_result(
84+
sample_input_nbytes: u64,
85+
sample_compressed_nbytes: u64,
86+
sample_compression_ratio: Option<f64>,
87+
) {
88+
let span = tracing::Span::current();
89+
span.record("sample_input_nbytes", sample_input_nbytes);
90+
span.record("sample_compressed_nbytes", sample_compressed_nbytes);
91+
if let Some(ratio) = sample_compression_ratio {
92+
span.record("sample_compression_ratio", ratio);
93+
}
94+
}
95+
96+
/// Builds a span covering the winning scheme's full-array compression.
97+
///
98+
/// `scheme_chosen` and `input_nbytes` are known up front. `compressed_nbytes`,
99+
/// `estimated_ratio`, `achieved_ratio`, and `accepted` are filled in by
100+
/// [`record_winner_compress_result`] once the encode completes.
101+
#[inline]
102+
pub(super) fn winner_compress_span(scheme: SchemeId, before_nbytes: u64) -> tracing::Span {
103+
tracing::debug_span!(
104+
target: TARGET_TRACE,
105+
"winner_compress",
106+
scheme_chosen = %scheme,
107+
input_nbytes = before_nbytes,
108+
compressed_nbytes = tracing::field::Empty,
109+
estimated_ratio = tracing::field::Empty,
110+
achieved_ratio = tracing::field::Empty,
111+
accepted = tracing::field::Empty,
112+
)
113+
}
114+
115+
/// Records the outcome of a winning-scheme compression on the current `winner_compress` span.
116+
#[inline]
117+
pub(super) fn record_winner_compress_result(
118+
compressed_nbytes: u64,
119+
estimated_ratio: Option<f64>,
120+
achieved_ratio: Option<f64>,
121+
accepted: bool,
122+
) {
123+
let span = tracing::Span::current();
124+
span.record("compressed_nbytes", compressed_nbytes);
125+
if let Some(r) = estimated_ratio {
126+
span.record("estimated_ratio", r);
127+
}
128+
if let Some(r) = achieved_ratio {
129+
span.record("achieved_ratio", r);
130+
}
131+
span.record("accepted", accepted);
132+
}
133+
32134
/// Records the final output size and, when finite, the top-level compression ratio.
33135
#[inline]
34-
pub(super) fn record_compress_outcome(span: &tracing::Span, before_nbytes: u64, after_nbytes: u64) {
35-
span.record("after_nbytes", after_nbytes);
36-
if after_nbytes != 0 {
37-
span.record("ratio", before_nbytes as f64 / after_nbytes as f64);
136+
pub(super) fn record_compress_outcome(
137+
span: &tracing::Span,
138+
input_nbytes: u64,
139+
compressed_nbytes: u64,
140+
) {
141+
span.record("compressed_nbytes", compressed_nbytes);
142+
if compressed_nbytes != 0 {
143+
span.record(
144+
"compression_ratio",
145+
input_nbytes as f64 / compressed_nbytes as f64,
146+
);
38147
}
39148
}
40149

@@ -76,68 +185,6 @@ pub(super) fn scheme_compress_failed(
76185
}
77186
}
78187

79-
/// Emits the leaf compression result event.
80-
#[inline]
81-
#[allow(
82-
clippy::cognitive_complexity,
83-
reason = "tracing sometimes triggers this"
84-
)]
85-
pub(super) fn scheme_compress_result(
86-
scheme: SchemeId,
87-
before_nbytes: u64,
88-
after_nbytes: u64,
89-
estimated_ratio: Option<f64>,
90-
actual_ratio: Option<f64>,
91-
accepted: bool,
92-
) {
93-
match (estimated_ratio, actual_ratio) {
94-
(Some(estimated_ratio), Some(actual_ratio)) => {
95-
tracing::debug!(
96-
target: TARGET_TRACE,
97-
scheme = %scheme,
98-
before_nbytes,
99-
after_nbytes,
100-
estimated_ratio,
101-
actual_ratio,
102-
accepted,
103-
"scheme.compress_result",
104-
);
105-
}
106-
(Some(estimated_ratio), None) => {
107-
tracing::debug!(
108-
target: TARGET_TRACE,
109-
scheme = %scheme,
110-
before_nbytes,
111-
after_nbytes,
112-
estimated_ratio,
113-
accepted,
114-
"scheme.compress_result",
115-
);
116-
}
117-
(None, Some(actual_ratio)) => {
118-
tracing::debug!(
119-
target: TARGET_TRACE,
120-
scheme = %scheme,
121-
before_nbytes,
122-
after_nbytes,
123-
actual_ratio,
124-
accepted,
125-
"scheme.compress_result",
126-
);
127-
}
128-
(None, None) => {
129-
tracing::debug!(
130-
target: TARGET_TRACE,
131-
scheme = %scheme,
132-
before_nbytes,
133-
after_nbytes,
134-
accepted,
135-
"scheme.compress_result",
136-
);
137-
}
138-
}
139-
}
140-
141188
/// Emits a sampling-failure event.
142189
#[inline]
143190
pub(super) fn sample_compress_failed(
@@ -156,31 +203,3 @@ pub(super) fn sample_compress_failed(
156203
);
157204
}
158205
}
159-
160-
/// Emits the sampling result event.
161-
#[inline]
162-
pub(super) fn sample_result(
163-
scheme: SchemeId,
164-
sampled_before: u64,
165-
sampled_after: u64,
166-
sampled_ratio: Option<f64>,
167-
) {
168-
if let Some(sampled_ratio) = sampled_ratio {
169-
tracing::debug!(
170-
target: TARGET_TRACE,
171-
scheme = %scheme,
172-
sampled_before,
173-
sampled_after,
174-
sampled_ratio,
175-
"sample.result",
176-
);
177-
} else {
178-
tracing::debug!(
179-
target: TARGET_TRACE,
180-
scheme = %scheme,
181-
sampled_before,
182-
sampled_after,
183-
"sample.result",
184-
);
185-
}
186-
}

0 commit comments

Comments
 (0)