Skip to content

Commit 83aa316

Browse files
committed
record metrics in control thread only
1 parent 6914f52 commit 83aa316

2 files changed

Lines changed: 0 additions & 30 deletions

File tree

pulsebeam/src/rtp/sync.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ pub struct Synchronizer {
3535
/// The server Instant that corresponds to a specific NTP time, representing
3636
/// the minimum observed propagation delay.
3737
ntp_anchor: Option<ClockReference>,
38-
ppm_hist: metrics::Histogram,
3938
}
4039

4140
impl Synchronizer {
4241
pub fn new(clock_rate: Frequency) -> Self {
43-
let ppm_hist = metrics::histogram!("rtp_sync_clock_drift_ppm");
4442
Self {
4543
clock_rate,
4644
first_sr: None,
@@ -50,7 +48,6 @@ impl Synchronizer {
5048
base_server_time: None,
5149
estimated_clock_drift_ppm: 0.0,
5250
ntp_anchor: None,
53-
ppm_hist,
5451
}
5552
}
5653

@@ -175,7 +172,6 @@ impl Synchronizer {
175172

176173
if let (Some(first), Some(latest)) = (self.first_sr, self.latest_sr) {
177174
self.estimated_clock_drift_ppm = Self::compute_clock_drift(&first, &latest);
178-
self.ppm_hist.record(self.estimated_clock_drift_ppm);
179175
}
180176

181177
// Update the NTP anchor with a minimum envelope filter

pulsebeam/src/shard/worker.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,6 @@ pub struct ShardWorker {
152152
cross_shard_event_rx: mailbox::Receiver<CrossShardEvent>,
153153
router: ShardRouter,
154154
metrics: Arc<ShardMetrics>,
155-
156-
delay_ingress: metrics::Histogram,
157-
delay_compute: metrics::Histogram,
158-
delay_egress: metrics::Histogram,
159-
delay_tick: metrics::Histogram,
160155
}
161156

162157
impl ShardWorker {
@@ -177,15 +172,6 @@ impl ShardWorker {
177172
cross_shard_event_txs,
178173
};
179174

180-
metrics::describe_histogram!(
181-
"shard_tick_delay_us",
182-
Unit::Microseconds,
183-
"shard tick delay distribution"
184-
);
185-
let delay_tick = metrics::histogram!("shard_tick_delay_us",);
186-
let delay_ingress = metrics::histogram!("shard_ingress_delay_us",);
187-
let delay_compute = metrics::histogram!("shard_compute_delay_us",);
188-
let delay_egress = metrics::histogram!("shard_egress_delay_us",);
189175
Self {
190176
core,
191177
recv_batch: Vec::with_capacity(net::BATCH_SIZE),
@@ -196,10 +182,6 @@ impl ShardWorker {
196182
cross_shard_event_rx,
197183
router,
198184
metrics,
199-
delay_tick,
200-
delay_ingress,
201-
delay_compute,
202-
delay_egress,
203185
}
204186
}
205187

@@ -225,7 +207,6 @@ impl ShardWorker {
225207
loop_start = busy_end;
226208
let busy_duration = busy_end.duration_since(busy_start);
227209
self.metrics.record_busy(busy_duration);
228-
self.delay_tick.record(busy_duration.as_micros() as f64);
229210
}
230211
}
231212

@@ -279,26 +260,19 @@ impl ShardWorker {
279260
for batch in self.recv_batch.drain(..) {
280261
self.core.on_udp_batch(batch, &self.router);
281262
}
282-
let ingress_delay = now.elapsed();
283-
self.delay_ingress.record(ingress_delay.as_micros() as f64);
284263

285264
// phase 2: compute
286265
let now = Instant::now();
287266
self.core.poll_input(now);
288267
self.core.flush_rtp_events(&self.router);
289268
self.core.poll_fanout(now);
290269
self.core.flush_participant_events(&self.router);
291-
let compute_delay = now.elapsed();
292-
self.delay_compute.record(compute_delay.as_micros() as f64);
293270

294271
// phase 3: output
295-
let now = Instant::now();
296272
self.core
297273
.flush_egress(&self.udp_socket, &mut self.tcp_socket);
298-
let egress_delay = now.elapsed();
299274
self.core
300275
.flush_close_peers(&mut self.udp_socket, &mut self.tcp_socket);
301-
self.delay_egress.record(egress_delay.as_micros() as f64);
302276
}
303277

304278
async fn flush_shard_events(&mut self) -> Result<(), ShardError> {

0 commit comments

Comments
 (0)