diff --git a/Cargo.lock b/Cargo.lock index 61535fb9a5..15bf7f1b85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6571,8 +6571,10 @@ name = "zenoh-sync" version = "1.8.0" dependencies = [ "arc-swap", + "criterion", "event-listener 5.4.1", "futures", + "parking_lot", "tokio", "zenoh-buffers", "zenoh-collections", diff --git a/Cargo.toml b/Cargo.toml index 78eda68815..4428580dad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ num_cpus = "1.17.0" once_cell = "1.21.3" ordered-float = "5.1.0" panic-message = "0.3.0" +parking_lot = "0.12.5" petgraph = "0.8.3" phf = { version = "0.13.1", features = ["macros"] } pnet = "0.35.0" diff --git a/commons/zenoh-sync/Cargo.toml b/commons/zenoh-sync/Cargo.toml index 14686d6fe4..fd44e1574f 100644 --- a/commons/zenoh-sync/Cargo.toml +++ b/commons/zenoh-sync/Cargo.toml @@ -32,12 +32,14 @@ version = { workspace = true } arc-swap = { workspace = true } event-listener = { workspace = true } futures = { workspace = true } +parking_lot = { workspace = true } tokio = { workspace = true, features = ["sync"] } zenoh-buffers = { workspace = true } zenoh-collections = { workspace = true, features = ["default"] } zenoh-core = { workspace = true } [dev-dependencies] +criterion = { workspace = true } tokio = { workspace = true, features = [ "macros", "rt-multi-thread", @@ -45,3 +47,7 @@ tokio = { workspace = true, features = [ "time", ] } zenoh-result = { workspace = true } + +[[bench]] +harness = false +name = "notify_wait" diff --git a/commons/zenoh-sync/benches/notify_wait.rs b/commons/zenoh-sync/benches/notify_wait.rs new file mode 100644 index 0000000000..54b1ac64ec --- /dev/null +++ b/commons/zenoh-sync/benches/notify_wait.rs @@ -0,0 +1,116 @@ +// +// Copyright (c) 2026 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Benchmark: Notifier/Waiter notification round-trip. +//! +//! Measures two paths through the synchronisation primitive: +//! +//! `notify_wait/sticky` — producer calls `notify()` before the consumer calls +//! `wait()`. The flag is already set so `wait()` returns without blocking. +//! This is the common case in the transport pipeline when the batch is ready +//! before the tx thread checks. Hot path: lock + flag check + unlock (no +//! condvar sleep). +//! +//! `notify_wait/cross_thread` — a dedicated notifier thread signals the +//! measured thread. `wait()` actually blocks on the condvar until the signal +//! arrives. Measures true cross-thread wakeup latency. +//! +//! # Regression baseline (event_listener, before this PR) +//! +//! Profiling a zenoh SHM publisher at 1 MB / 100 Hz showed `event_listener` taking +//! **12.18% CPU** in `drop_in_place::` on the sender thread — +//! heap-allocating and dropping a linked-list node on *every* batch send cycle. +//! +//! Expected results after this PR (Mutex+Condvar): +//! sticky: ~20–50 ns (lock + flag + unlock, no allocation) +//! cross_thread: ~2–10 µs (futex wakeup round-trip) +//! +//! Run with: +//! cargo bench --bench notify_wait -p zenoh-sync + +use std::{ + sync::{Arc, Barrier}, + thread, + time::{Duration, Instant}, +}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use zenoh_sync::event; + +/// Sticky path: notify() fires before wait() — no blocking, just flag check. +/// +/// Represents the hot path when the batch is already ready when the tx thread +/// wakes up. Before this PR: EventListener allocated on the heap every cycle. +/// After: lock + u8 check + unlock. +fn bench_sticky(c: &mut Criterion) { + let mut group = c.benchmark_group("notify_wait"); + group.throughput(Throughput::Elements(1)); + + group.bench_function(BenchmarkId::new("sticky", ""), |b| { + let (notifier, waiter) = event::new(); + b.iter(|| { + notifier.notify().unwrap(); + waiter.wait().unwrap(); + }); + }); + + group.finish(); +} + +/// Cross-thread path: waiter blocks until a dedicated notifier thread fires. +/// +/// Measures the full condvar wakeup round-trip between two threads. Uses +/// a Barrier to ensure the waiter is sleeping before notify() fires, giving +/// a clean measurement of the unpark/futex latency. +fn bench_cross_thread(c: &mut Criterion) { + let mut group = c.benchmark_group("notify_wait"); + group.throughput(Throughput::Elements(1)); + + group.bench_function(BenchmarkId::new("cross_thread", ""), |b| { + b.iter_custom(|iters| { + // Barrier: ensures waiter is blocked before notifier fires. + let barrier = Arc::new(Barrier::new(2)); + + let (notifier, waiter) = event::new(); + let notifier = Arc::new(notifier); + + let notifier_thread = { + let notifier = Arc::clone(¬ifier); + let barrier = Arc::clone(&barrier); + thread::spawn(move || { + for _ in 0..iters { + barrier.wait(); // wait until waiter is about to block + notifier.notify().unwrap(); + } + }) + }; + + let mut total = Duration::ZERO; + for _ in 0..iters { + barrier.wait(); // signal notifier thread we're ready + let start = Instant::now(); + waiter.wait().unwrap(); + total += start.elapsed(); + } + + notifier_thread.join().unwrap(); + total + }); + }); + + group.finish(); +} + +criterion_group!(benches, bench_sticky, bench_cross_thread); +criterion_main!(benches); diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 3dea456704..c29af4b662 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -13,6 +13,7 @@ // use std::{ fmt, + pin::pin, sync::{ atomic::{AtomicU16, AtomicU8, Ordering}, Arc, @@ -20,7 +21,8 @@ use std::{ time::{Duration, Instant}, }; -use event_listener::{Event as EventLib, Listener}; +use parking_lot::{Condvar, Mutex}; +use tokio::sync::Notify as AsyncNotify; // Error types const WAIT_ERR_STR: &str = "No notifier available"; @@ -103,92 +105,91 @@ impl fmt::Debug for NotifyError { impl std::error::Error for NotifyError {} -// Inner +// State values for the AtomicU8 flag. +const UNSET: u8 = 0; +const OK: u8 = 1; +const ERR: u8 = 2; + +// Replace event_listener::Event + AtomicU8 with: +// - AtomicU8 flag + Mutex<()> + Condvar → sync paths (wait / wait_deadline / wait_timeout) +// - tokio::sync::Notify → async path (wait_async), cancellation-safe +// +// Hot-path design (sticky notification): +// +// notify(): flag.fetch_update(UNSET|OK → OK) [atomic, no lock] +// + brief lock()/unlock() to prevent lost wakeup +// + cv.notify_one() + async_notify.notify_one() +// +// wait(): flag.compare_exchange(OK → UNSET) [atomic, no lock] ← hot path returns here +// If UNSET: lock() → re-check flag → cv.wait() if still UNSET +// +// The mutex is only taken on the slow (blocking) path and briefly in notify() to +// fence against the lost-wakeup window. No heap allocation on either path. struct EventInner { - event: EventLib, + /// Primary state. Written atomically on hot path; also read under `mutex` + /// on the slow path to re-check after wakeup. flag: AtomicU8, + /// Used exclusively as the condvar lock. Carries no state of its own. + mutex: Mutex<()>, + cv: Condvar, + async_notify: AsyncNotify, notifiers: AtomicU16, waiters: AtomicU16, } -const UNSET: u8 = 0; -const OK: u8 = 1; -const ERR: u8 = 1 << 1; - -#[repr(u8)] -enum EventCheck { - Unset = UNSET, - Ok = OK, - Err = ERR, -} - -#[repr(u8)] -enum EventSet { - Ok = OK, - Err = ERR, -} - -impl EventInner { - fn check(&self) -> EventCheck { - let f = self.flag.fetch_and(!OK, Ordering::SeqCst); - if f & ERR != 0 { - return EventCheck::Err; - } - if f == OK { - return EventCheck::Ok; - } - EventCheck::Unset - } - - fn set(&self) -> EventSet { - let f = self.flag.fetch_or(OK, Ordering::SeqCst); - if f & ERR != 0 { - return EventSet::Err; - } - EventSet::Ok - } - - fn err(&self) { - self.flag.store(ERR, Ordering::SeqCst); - } -} - -/// Creates a new lock-free event variable. Every time a [`Notifier`] calls ['Notifier::notify`], one [`Waiter`] will be waken-up. -/// If no waiter is waiting when the `notify` is called, the notification will not be lost. That means the next waiter will return -/// immediately when calling `wait`. +/// Creates a new event variable. Every time a [`Notifier`] calls +/// [`Notifier::notify`], one [`Waiter`] will be woken up. Notifications are +/// sticky: if no waiter is blocking when `notify` is called, the next `wait` +/// call returns immediately. pub fn new() -> (Notifier, Waiter) { let inner = Arc::new(EventInner { - event: EventLib::new(), flag: AtomicU8::new(UNSET), + mutex: Mutex::new(()), + cv: Condvar::new(), + async_notify: AsyncNotify::new(), notifiers: AtomicU16::new(1), waiters: AtomicU16::new(1), }); (Notifier(inner.clone()), Waiter(inner)) } -/// A [`Notifier`] is used to notify and wake up one and only one [`Waiter`]. +/// A [`Notifier`] wakes up one [`Waiter`]. #[repr(transparent)] pub struct Notifier(Arc); impl Notifier { - /// Notifies one pending listener #[inline] pub fn notify(&self) -> Result<(), NotifyError> { - // Set the flag. - match self.0.set() { - EventSet::Ok => { - self.0.event.notify_additional_relaxed(1); - Ok(()) - } - EventSet::Err => Err(NotifyError), - } + // Atomically set flag to OK unless it is already ERR (terminal state). + // fetch_update returns Err if the closure returns None (i.e. flag == ERR). + self.0 + .flag + .fetch_update(Ordering::Release, Ordering::Relaxed, |f| { + if f == ERR { + None + } else { + Some(OK) + } + }) + .map_err(|_| NotifyError)?; + + // Brief lock/unlock to close the lost-wakeup window. + // + // A waiter on the slow path does: CAS-fails → lock() → re-check flag → cv.wait(). + // By taking the lock here we ensure either: + // (a) waiter has not yet called lock() → it will see OK on its flag re-check, or + // (b) waiter is already in cv.wait() → cv.notify_one() below will wake it. + drop(self.0.mutex.lock()); + + self.0.cv.notify_one(); + self.0.async_notify.notify_one(); + Ok(()) } } impl Clone for Notifier { fn clone(&self) -> Self { let n = self.0.notifiers.fetch_add(1, Ordering::SeqCst); - // Panic on overflow assert!(n != 0); Self(self.0.clone()) } @@ -198,9 +199,11 @@ impl Drop for Notifier { fn drop(&mut self) { let n = self.0.notifiers.fetch_sub(1, Ordering::SeqCst); if n == 1 { - // The last Notifier has been dropped, close the event and notify everyone - self.0.err(); - self.0.event.notify(usize::MAX); + // Last notifier dropped — set ERR and wake all waiters. + self.0.flag.store(ERR, Ordering::Release); + drop(self.0.mutex.lock()); // lost-wakeup fence + self.0.cv.notify_all(); + self.0.async_notify.notify_waiters(); } } } @@ -209,131 +212,123 @@ impl Drop for Notifier { pub struct Waiter(Arc); impl Waiter { - /// Waits for the condition to be notified #[inline] - pub async fn wait_async(&self) -> Result<(), WaitError> { - // Wait until the flag is set. - loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); + pub fn wait(&self) -> Result<(), WaitError> { + // Hot path: consume a sticky OK notification without taking the lock. + match self + .0 + .flag + .compare_exchange(OK, UNSET, Ordering::Acquire, Ordering::Relaxed) + { + Ok(_) => return Ok(()), + Err(ERR) => return Err(WaitError), + Err(_) => {} // UNSET → fall through to slow path + } - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), + // Slow path: take the lock and block on the condvar. + let mut guard = self.0.mutex.lock(); + loop { + match self.0.flag.load(Ordering::Acquire) { + OK => { + self.0.flag.store(UNSET, Ordering::Release); + return Ok(()); + } + ERR => return Err(WaitError), + _ => { + self.0.cv.wait(&mut guard); + } } - - // Wait for a notification and continue the loop. - listener.await; } - - Ok(()) } - /// Waits for the condition to be notified + /// Cancellation-safe async wait. + /// + /// Uses [`tokio::sync::Notify`] so that if this future is dropped (e.g. by + /// `tokio::time::timeout`), the pending notification is returned to the pool + /// rather than silently consumed by a detached `spawn_blocking` thread. #[inline] - pub fn wait(&self) -> Result<(), WaitError> { - // Wait until the flag is set. + pub async fn wait_async(&self) -> Result<(), WaitError> { loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), + // Subscribe BEFORE checking the flag. + // enable() ensures that any notify_one() fired after this point wakes us, + // and consumes any stored permit if notify_one() already fired. + let mut notified = pin!(self.0.async_notify.notified()); + notified.as_mut().enable(); + + // Hot path: consume sticky OK without lock. + match self + .0 + .flag + .compare_exchange(OK, UNSET, Ordering::Acquire, Ordering::Relaxed) + { + Ok(_) => return Ok(()), + Err(ERR) => return Err(WaitError), + Err(_) => {} } - // Wait for a notification and continue the loop. - listener.wait(); + // No pending notification — await (cancellation-safe). + // If this future is dropped here, the Notified future is dropped too, + // which returns any pre-reserved permit to the Notify pool. + notified.await; } - - Ok(()) } - /// Waits for the condition to be notified or returns an error when the deadline is reached #[inline] pub fn wait_deadline(&self, deadline: Instant) -> Result<(), WaitDeadlineError> { - // Wait until the flag is set. - loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitDeadlineError::WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitDeadlineError::WaitError), - } + // Hot path. + match self + .0 + .flag + .compare_exchange(OK, UNSET, Ordering::Acquire, Ordering::Relaxed) + { + Ok(_) => return Ok(()), + Err(ERR) => return Err(WaitDeadlineError::WaitError), + Err(_) => {} + } - // Wait for a notification and continue the loop. - if listener.wait_deadline(deadline).is_none() { - return Err(WaitDeadlineError::Deadline); + // Slow path. + let mut guard = self.0.mutex.lock(); + loop { + match self.0.flag.load(Ordering::Acquire) { + OK => { + self.0.flag.store(UNSET, Ordering::Release); + return Ok(()); + } + ERR => return Err(WaitDeadlineError::WaitError), + _ => { + let now = Instant::now(); + if now >= deadline { + return Err(WaitDeadlineError::Deadline); + } + let timed_out = self.0.cv.wait_for(&mut guard, deadline - now).timed_out(); + if timed_out { + return match self.0.flag.load(Ordering::Acquire) { + OK => { + self.0.flag.store(UNSET, Ordering::Release); + Ok(()) + } + ERR => Err(WaitDeadlineError::WaitError), + _ => Err(WaitDeadlineError::Deadline), + }; + } + } } } - - Ok(()) } - /// Waits for the condition to be notified or returns an error when the timeout is expired #[inline] pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeoutError> { - // Wait until the flag is set. - loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitTimeoutError::WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitTimeoutError::WaitError), - } - - // Wait for a notification and continue the loop. - if listener.wait_timeout(timeout).is_none() { - return Err(WaitTimeoutError::Timeout); - } + match self.wait_deadline(Instant::now() + timeout) { + Ok(()) => Ok(()), + Err(WaitDeadlineError::Deadline) => Err(WaitTimeoutError::Timeout), + Err(WaitDeadlineError::WaitError) => Err(WaitTimeoutError::WaitError), } - - Ok(()) } } impl Clone for Waiter { fn clone(&self) -> Self { let n = self.0.waiters.fetch_add(1, Ordering::Relaxed); - // Panic on overflow assert!(n != 0); Self(self.0.clone()) } @@ -343,12 +338,15 @@ impl Drop for Waiter { fn drop(&mut self) { let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); if n == 1 { - // The last Waiter has been dropped, close the event - self.0.err(); + self.0.flag.store(ERR, Ordering::Release); + drop(self.0.mutex.lock()); // lost-wakeup fence + self.0.cv.notify_all(); + self.0.async_notify.notify_waiters(); } } } +#[cfg(test)] mod tests { #[test] fn event_timeout() { @@ -365,57 +363,38 @@ mod tests { let bs = barrier.clone(); let s = std::thread::spawn(move || { - // 1 - Wait one notification match waiter.wait_timeout(tslot) { Ok(()) => {} Err(WaitTimeoutError::Timeout) => panic!("Timeout {tslot:#?}"), Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 2 - Being notified twice but waiting only once bs.wait(); - match waiter.wait_timeout(tslot) { Ok(()) => {} Err(WaitTimeoutError::Timeout) => panic!("Timeout {tslot:#?}"), Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } - match waiter.wait_timeout(tslot) { Ok(()) => panic!("Event Ok but it should be Timeout"), Err(WaitTimeoutError::Timeout) => {} Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 3 - Notifier has been dropped bs.wait(); - waiter.wait().unwrap_err(); - bs.wait(); }); let bp = barrier.clone(); let p = std::thread::spawn(move || { - // 1 - Notify once notifier.notify().unwrap(); - bp.wait(); - - // 2 - Notify twice notifier.notify().unwrap(); notifier.notify().unwrap(); - bp.wait(); bp.wait(); - - // 3 - Drop notifier yielding an error in the waiter drop(notifier); - bp.wait(); bp.wait(); }); @@ -439,57 +418,38 @@ mod tests { let bs = barrier.clone(); let s = std::thread::spawn(move || { - // 1 - Wait one notification match waiter.wait_deadline(Instant::now() + tslot) { Ok(()) => {} Err(WaitDeadlineError::Deadline) => panic!("Timeout {tslot:#?}"), Err(WaitDeadlineError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 2 - Being notified twice but waiting only once bs.wait(); - match waiter.wait_deadline(Instant::now() + tslot) { Ok(()) => {} Err(WaitDeadlineError::Deadline) => panic!("Timeout {tslot:#?}"), Err(WaitDeadlineError::WaitError) => panic!("Event closed"), } - match waiter.wait_deadline(Instant::now() + tslot) { Ok(()) => panic!("Event Ok but it should be Timeout"), Err(WaitDeadlineError::Deadline) => {} Err(WaitDeadlineError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 3 - Notifier has been dropped bs.wait(); - waiter.wait().unwrap_err(); - bs.wait(); }); let bp = barrier.clone(); let p = std::thread::spawn(move || { - // 1 - Notify once notifier.notify().unwrap(); - bp.wait(); - - // 2 - Notify twice notifier.notify().unwrap(); notifier.notify().unwrap(); - bp.wait(); bp.wait(); - - // 3 - Drop notifier yielding an error in the waiter drop(notifier); - bp.wait(); bp.wait(); }); @@ -539,7 +499,6 @@ mod tests { if start.elapsed() > tout { panic!("Timeout {tout:#?}. Counter: {n}/{N}"); } - std::thread::sleep(Duration::from_millis(100)); } @@ -608,14 +567,12 @@ mod tests { if start.elapsed() > tout { panic!("Timeout {tout:#?}. Counter: {n}/{N}"); } - std::thread::sleep(Duration::from_millis(100)); } }); p1.join().unwrap(); p2.join().unwrap(); - s1.join().unwrap(); s2.join().unwrap(); } diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 2ca51a9b06..110ebc67d1 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -421,6 +421,7 @@ async fn test_session_from_cloned_config() { .endpoints .set(vec![locator.parse().unwrap()]) .unwrap(); + pub_config.listen.endpoints.set(vec![]).unwrap(); (pub_config, sub_config) };