Skip to content

Commit 0ddc198

Browse files
committed
instrument tick loop
1 parent 4d53c2d commit 0ddc198

2 files changed

Lines changed: 23 additions & 6 deletions

File tree

pulsebeam/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ mod internal {
566566
pub fn init(listener: TcpListener) -> anyhow::Result<Self> {
567567
let prometheus = PrometheusBuilder::new()
568568
.set_buckets_for_metric(
569-
Matcher::Full("shard_tick_delay_us".to_string()),
569+
Matcher::Suffix("_delay_us".to_string()),
570570
&create_exponential_buckets(1.0, 4.0, 6), // 1us -> 4ms,
571571
)
572572
.expect("invalid bucket config")

pulsebeam/src/shard/worker.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,11 @@ pub struct ShardWorker {
152152
cross_shard_event_rx: mailbox::Receiver<CrossShardEvent>,
153153
router: ShardRouter,
154154
metrics: Arc<ShardMetrics>,
155-
latency: metrics::Histogram,
155+
156+
delay_ingress: metrics::Histogram,
157+
delay_compute: metrics::Histogram,
158+
delay_egress: metrics::Histogram,
159+
delay_tick: metrics::Histogram,
156160
}
157161

158162
impl ShardWorker {
@@ -178,7 +182,10 @@ impl ShardWorker {
178182
Unit::Microseconds,
179183
"shard tick delay distribution"
180184
);
181-
let latency = metrics::histogram!("shard_tick_delay_us",);
185+
let delay_tick = metrics::histogram!("shard_tick_delay_us",);
186+
let delay_ingress = metrics::histogram!("shard_ingress_delay_us",);
187+
let delay_compute = metrics::histogram!("shard_compute_delay_us",);
188+
let delay_egress = metrics::histogram!("shard_egress_delay_us",);
182189
Self {
183190
core,
184191
recv_batch: Vec::with_capacity(net::BATCH_SIZE),
@@ -189,7 +196,10 @@ impl ShardWorker {
189196
cross_shard_event_rx,
190197
router,
191198
metrics,
192-
latency,
199+
delay_tick,
200+
delay_ingress,
201+
delay_compute,
202+
delay_egress,
193203
}
194204
}
195205

@@ -215,7 +225,7 @@ impl ShardWorker {
215225
loop_start = busy_end;
216226
let busy_duration = busy_end.duration_since(busy_start);
217227
self.metrics.record_busy(busy_duration);
218-
self.latency.record(busy_duration.as_micros() as f64);
228+
self.delay_tick.record(busy_duration.as_micros() as f64);
219229
}
220230
}
221231

@@ -262,26 +272,33 @@ impl ShardWorker {
262272
while let Ok(ev) = self.cross_shard_event_rx.try_recv() {
263273
self.core.on_cross_shard_event(ev, now, &self.router);
264274
}
265-
266275
self.core.fire_timers(now);
267276

268277
let _ = self.udp_socket.try_recv_batch(&mut self.recv_batch);
269278
let _ = self.tcp_socket.try_recv_batch(&mut self.recv_batch);
270279
for batch in self.recv_batch.drain(..) {
271280
self.core.on_udp_batch(batch, &self.router);
272281
}
282+
let ingress_delay = now.elapsed();
283+
self.delay_ingress.record(ingress_delay.as_micros() as f64);
273284

274285
// phase 2: compute
286+
let now = Instant::now();
275287
self.core.poll_input(now);
276288
self.core.flush_rtp_events(&self.router);
277289
self.core.poll_fanout(now);
278290
self.core.flush_participant_events(&self.router);
291+
let compute_delay = now.elapsed();
292+
self.delay_compute.record(compute_delay.as_micros() as f64);
279293

280294
// phase 3: output
295+
let now = Instant::now();
281296
self.core
282297
.flush_egress(&self.udp_socket, &mut self.tcp_socket);
298+
let egress_delay = now.elapsed();
283299
self.core
284300
.flush_close_peers(&mut self.udp_socket, &mut self.tcp_socket);
301+
self.delay_egress.record(egress_delay.as_micros() as f64);
285302
}
286303

287304
async fn flush_shard_events(&mut self) -> Result<(), ShardError> {

0 commit comments

Comments
 (0)