Skip to content

Commit ca3152e

Browse files
committed
bench(zenoh-sync): add notify_wait criterion benchmark
Measures the sticky (hot) path and cross-thread wakeup path. Used to compare event_listener vs the Mutex+Condvar+parking_lot replacement introduced in the preceding commits.
1 parent ad7d48c commit ca3152e

2 files changed

Lines changed: 130 additions & 3 deletions

File tree

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
//
2+
// Copyright (c) 2026 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13+
//
14+
15+
//! Benchmark: Notifier/Waiter notification round-trip.
16+
//!
17+
//! Measures two paths through the synchronisation primitive:
18+
//!
19+
//! `notify_wait/sticky` — producer calls `notify()` before the consumer calls
20+
//! `wait()`. The flag is already set so `wait()` returns without blocking.
21+
//! This is the common case in the transport pipeline when the batch is ready
22+
//! before the tx thread checks. Hot path: lock + flag check + unlock (no
23+
//! condvar sleep).
24+
//!
25+
//! `notify_wait/cross_thread` — a dedicated notifier thread signals the
26+
//! measured thread. `wait()` actually blocks on the condvar until the signal
27+
//! arrives. Measures true cross-thread wakeup latency.
28+
//!
29+
//! # Regression baseline (event_listener, before this PR)
30+
//!
31+
//! Profiling a zenoh SHM publisher at 1 MB / 100 Hz showed `event_listener` taking
32+
//! **12.18% CPU** in `drop_in_place::<EventListener>` on the sender thread —
33+
//! heap-allocating and dropping a linked-list node on *every* batch send cycle.
34+
//!
35+
//! Expected results after this PR (Mutex+Condvar):
36+
//! sticky: ~20–50 ns (lock + flag + unlock, no allocation)
37+
//! cross_thread: ~2–10 µs (futex wakeup round-trip)
38+
//!
39+
//! Run with:
40+
//! cargo bench --bench notify_wait -p zenoh-sync
41+
42+
use std::{
43+
sync::{Arc, Barrier},
44+
thread,
45+
time::{Duration, Instant},
46+
};
47+
48+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
49+
use zenoh_sync::event;
50+
51+
/// Sticky path: notify() fires before wait() — no blocking, just flag check.
52+
///
53+
/// Represents the hot path when the batch is already ready when the tx thread
54+
/// wakes up. Before this PR: EventListener allocated on the heap every cycle.
55+
/// After: lock + u8 check + unlock.
56+
fn bench_sticky(c: &mut Criterion) {
57+
let mut group = c.benchmark_group("notify_wait");
58+
group.throughput(Throughput::Elements(1));
59+
60+
group.bench_function(BenchmarkId::new("sticky", ""), |b| {
61+
let (notifier, waiter) = event::new();
62+
b.iter(|| {
63+
notifier.notify().unwrap();
64+
waiter.wait().unwrap();
65+
});
66+
});
67+
68+
group.finish();
69+
}
70+
71+
/// Cross-thread path: waiter blocks until a dedicated notifier thread fires.
72+
///
73+
/// Measures the full condvar wakeup round-trip between two threads. Uses
74+
/// a Barrier to ensure the waiter is sleeping before notify() fires, giving
75+
/// a clean measurement of the unpark/futex latency.
76+
fn bench_cross_thread(c: &mut Criterion) {
77+
let mut group = c.benchmark_group("notify_wait");
78+
group.throughput(Throughput::Elements(1));
79+
80+
group.bench_function(BenchmarkId::new("cross_thread", ""), |b| {
81+
b.iter_custom(|iters| {
82+
// Barrier: ensures waiter is blocked before notifier fires.
83+
let barrier = Arc::new(Barrier::new(2));
84+
85+
let (notifier, waiter) = event::new();
86+
let notifier = Arc::new(notifier);
87+
88+
let notifier_thread = {
89+
let notifier = Arc::clone(&notifier);
90+
let barrier = Arc::clone(&barrier);
91+
thread::spawn(move || {
92+
for _ in 0..iters {
93+
barrier.wait(); // wait until waiter is about to block
94+
notifier.notify().unwrap();
95+
}
96+
})
97+
};
98+
99+
let mut total = Duration::ZERO;
100+
for _ in 0..iters {
101+
barrier.wait(); // signal notifier thread we're ready
102+
let start = Instant::now();
103+
waiter.wait().unwrap();
104+
total += start.elapsed();
105+
}
106+
107+
notifier_thread.join().unwrap();
108+
total
109+
});
110+
});
111+
112+
group.finish();
113+
}
114+
115+
criterion_group!(benches, bench_sticky, bench_cross_thread);
116+
criterion_main!(benches);

commons/zenoh-sync/src/event.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::{
2020
},
2121
time::{Duration, Instant},
2222
};
23+
2324
use parking_lot::{Condvar, Mutex};
2425
use tokio::sync::Notify as AsyncNotify;
2526

@@ -164,7 +165,11 @@ impl Notifier {
164165
self.0
165166
.flag
166167
.fetch_update(Ordering::Release, Ordering::Relaxed, |f| {
167-
if f == ERR { None } else { Some(OK) }
168+
if f == ERR {
169+
None
170+
} else {
171+
Some(OK)
172+
}
168173
})
169174
.map_err(|_| NotifyError)?;
170175

@@ -349,6 +354,7 @@ mod tests {
349354
sync::{Arc, Barrier},
350355
time::Duration,
351356
};
357+
352358
use crate::WaitTimeoutError;
353359

354360
let barrier = Arc::new(Barrier::new(2));
@@ -403,6 +409,7 @@ mod tests {
403409
sync::{Arc, Barrier},
404410
time::{Duration, Instant},
405411
};
412+
406413
use crate::WaitDeadlineError;
407414

408415
let barrier = Arc::new(Barrier::new(2));
@@ -486,7 +493,9 @@ mod tests {
486493
let tout = Duration::from_secs(60);
487494
loop {
488495
let n = COUNTER.load(Ordering::Relaxed);
489-
if n == N { break; }
496+
if n == N {
497+
break;
498+
}
490499
if start.elapsed() > tout {
491500
panic!("Timeout {tout:#?}. Counter: {n}/{N}");
492501
}
@@ -552,7 +561,9 @@ mod tests {
552561
let tout = Duration::from_secs(60);
553562
loop {
554563
let n = COUNTER.load(Ordering::Relaxed);
555-
if n == N { break; }
564+
if n == N {
565+
break;
566+
}
556567
if start.elapsed() > tout {
557568
panic!("Timeout {tout:#?}. Counter: {n}/{N}");
558569
}

0 commit comments

Comments
 (0)