From 740163d533f5de69d57076fbafb91671ad49d597 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Sun, 22 Mar 2026 15:03:03 +0800 Subject: [PATCH 1/5] perf(zenoh-sync): replace event_listener with std Mutex+Condvar The Waiter::wait() hot path in the transport pipeline created and dropped an event_listener::EventListener on every batch send cycle. In perf profiles of ros-z-pingpong this showed as 12.18% CPU in drop_in_place on the tx-0 thread. Replace EventInner { EventLib + AtomicU8 } with { Mutex + Condvar }. This eliminates per-wakeup EventListener allocation and intrusive-list register/deregister overhead. Semantics are preserved: - Sticky notification: if notify() fires with no waiter, the next wait() returns immediately (flag stored in the mutex-protected state). - Error propagation: last Notifier drop sets ERR and wakes all waiters. - wait_async: delegated to spawn_blocking (not on the hot path). All existing zenoh-sync tests pass. --- commons/zenoh-sync/src/event.rs | 288 ++++++++------------------------ 1 file changed, 70 insertions(+), 218 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 3dea456704..7c9f5ff23c 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -14,14 +14,12 @@ use std::{ fmt, sync::{ - atomic::{AtomicU16, AtomicU8, Ordering}, - Arc, + atomic::{AtomicU16, Ordering}, + Arc, Condvar, Mutex, }, time::{Duration, Instant}, }; -use event_listener::{Event as EventLib, Listener}; - // Error types const WAIT_ERR_STR: &str = "No notifier available"; pub struct WaitError; @@ -103,92 +101,55 @@ impl fmt::Debug for NotifyError { impl std::error::Error for NotifyError {} -// Inner -struct EventInner { - event: EventLib, - flag: AtomicU8, - notifiers: AtomicU16, - waiters: AtomicU16, -} - +// State values stored inside the mutex. const UNSET: u8 = 0; const OK: u8 = 1; -const ERR: u8 = 1 << 1; - -#[repr(u8)] -enum EventCheck { - Unset = UNSET, - Ok = OK, - Err = ERR, -} - -#[repr(u8)] -enum EventSet { - Ok = OK, - Err = ERR, -} +const ERR: u8 = 2; -impl EventInner { - fn check(&self) -> EventCheck { - let f = self.flag.fetch_and(!OK, Ordering::SeqCst); - if f & ERR != 0 { - return EventCheck::Err; - } - if f == OK { - return EventCheck::Ok; - } - EventCheck::Unset - } - - fn set(&self) -> EventSet { - let f = self.flag.fetch_or(OK, Ordering::SeqCst); - if f & ERR != 0 { - return EventSet::Err; - } - EventSet::Ok - } - - fn err(&self) { - self.flag.store(ERR, Ordering::SeqCst); - } +// Replace event_listener::Event + AtomicU8 with std Mutex + Condvar. +// This eliminates per-wait EventListener allocation and intrusive-list +// register/deregister cost on the transport pipeline hot path. +struct EventInner { + state: Mutex, + cv: Condvar, + notifiers: AtomicU16, + waiters: AtomicU16, } -/// Creates a new lock-free event variable. Every time a [`Notifier`] calls ['Notifier::notify`], one [`Waiter`] will be waken-up. -/// If no waiter is waiting when the `notify` is called, the notification will not be lost. That means the next waiter will return -/// immediately when calling `wait`. +/// Creates a new event variable. Every time a [`Notifier`] calls +/// [`Notifier::notify`], one [`Waiter`] will be woken up. Notifications are +/// sticky: if no waiter is blocking when `notify` is called, the next `wait` +/// call returns immediately. pub fn new() -> (Notifier, Waiter) { let inner = Arc::new(EventInner { - event: EventLib::new(), - flag: AtomicU8::new(UNSET), + state: Mutex::new(UNSET), + cv: Condvar::new(), notifiers: AtomicU16::new(1), waiters: AtomicU16::new(1), }); (Notifier(inner.clone()), Waiter(inner)) } -/// A [`Notifier`] is used to notify and wake up one and only one [`Waiter`]. +/// A [`Notifier`] wakes up one [`Waiter`]. #[repr(transparent)] pub struct Notifier(Arc); impl Notifier { - /// Notifies one pending listener #[inline] pub fn notify(&self) -> Result<(), NotifyError> { - // Set the flag. - match self.0.set() { - EventSet::Ok => { - self.0.event.notify_additional_relaxed(1); - Ok(()) - } - EventSet::Err => Err(NotifyError), + let mut state = self.0.state.lock().unwrap(); + if *state == ERR { + return Err(NotifyError); } + *state = OK; + self.0.cv.notify_one(); + Ok(()) } } impl Clone for Notifier { fn clone(&self) -> Self { let n = self.0.notifiers.fetch_add(1, Ordering::SeqCst); - // Panic on overflow assert!(n != 0); Self(self.0.clone()) } @@ -198,9 +159,10 @@ impl Drop for Notifier { fn drop(&mut self) { let n = self.0.notifiers.fetch_sub(1, Ordering::SeqCst); if n == 1 { - // The last Notifier has been dropped, close the event and notify everyone - self.0.err(); - self.0.event.notify(usize::MAX); + // Last notifier dropped — wake all waiters with error. + let mut state = self.0.state.lock().unwrap(); + *state = ERR; + self.0.cv.notify_all(); } } } @@ -209,131 +171,67 @@ impl Drop for Notifier { pub struct Waiter(Arc); impl Waiter { - /// Waits for the condition to be notified #[inline] - pub async fn wait_async(&self) -> Result<(), WaitError> { - // Wait until the flag is set. + pub fn wait(&self) -> Result<(), WaitError> { + let mut state = self.0.state.lock().unwrap(); loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), + match *state { + OK => { *state = UNSET; return Ok(()); } + ERR => return Err(WaitError), + _ => { state = self.0.cv.wait(state).unwrap(); } } - - // Wait for a notification and continue the loop. - listener.await; } - - Ok(()) } - /// Waits for the condition to be notified #[inline] - pub fn wait(&self) -> Result<(), WaitError> { - // Wait until the flag is set. - loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitError), - } - - // Wait for a notification and continue the loop. - listener.wait(); - } - - Ok(()) + pub async fn wait_async(&self) -> Result<(), WaitError> { + let waiter = self.clone(); + tokio::task::spawn_blocking(move || waiter.wait()) + .await + .unwrap_or(Err(WaitError)) } - /// Waits for the condition to be notified or returns an error when the deadline is reached #[inline] pub fn wait_deadline(&self, deadline: Instant) -> Result<(), WaitDeadlineError> { - // Wait until the flag is set. + let mut state = self.0.state.lock().unwrap(); loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitDeadlineError::WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitDeadlineError::WaitError), - } - - // Wait for a notification and continue the loop. - if listener.wait_deadline(deadline).is_none() { - return Err(WaitDeadlineError::Deadline); + match *state { + OK => { *state = UNSET; return Ok(()); } + ERR => return Err(WaitDeadlineError::WaitError), + _ => { + let now = Instant::now(); + if now >= deadline { + return Err(WaitDeadlineError::Deadline); + } + let (new_state, timeout) = self.0.cv + .wait_timeout(state, deadline - now) + .unwrap(); + state = new_state; + if timeout.timed_out() { + return match *state { + OK => { *state = UNSET; Ok(()) } + ERR => Err(WaitDeadlineError::WaitError), + _ => Err(WaitDeadlineError::Deadline), + }; + } + } } } - - Ok(()) } - /// Waits for the condition to be notified or returns an error when the timeout is expired #[inline] pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeoutError> { - // Wait until the flag is set. - loop { - // Check the flag. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitTimeoutError::WaitError), - } - - // Start listening for events. - let listener = self.0.event.listen(); - - // Check the flag again after creating the listener. - match self.0.check() { - EventCheck::Ok => break, - EventCheck::Unset => {} - EventCheck::Err => return Err(WaitTimeoutError::WaitError), - } - - // Wait for a notification and continue the loop. - if listener.wait_timeout(timeout).is_none() { - return Err(WaitTimeoutError::Timeout); - } + match self.wait_deadline(Instant::now() + timeout) { + Ok(()) => Ok(()), + Err(WaitDeadlineError::Deadline) => Err(WaitTimeoutError::Timeout), + Err(WaitDeadlineError::WaitError) => Err(WaitTimeoutError::WaitError), } - - Ok(()) } } impl Clone for Waiter { fn clone(&self) -> Self { let n = self.0.waiters.fetch_add(1, Ordering::Relaxed); - // Panic on overflow assert!(n != 0); Self(self.0.clone()) } @@ -343,12 +241,13 @@ impl Drop for Waiter { fn drop(&mut self) { let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); if n == 1 { - // The last Waiter has been dropped, close the event - self.0.err(); + let mut state = self.0.state.lock().unwrap(); + *state = ERR; } } } +#[cfg(test)] mod tests { #[test] fn event_timeout() { @@ -356,7 +255,6 @@ mod tests { sync::{Arc, Barrier}, time::Duration, }; - use crate::WaitTimeoutError; let barrier = Arc::new(Barrier::new(2)); @@ -365,57 +263,38 @@ mod tests { let bs = barrier.clone(); let s = std::thread::spawn(move || { - // 1 - Wait one notification match waiter.wait_timeout(tslot) { Ok(()) => {} Err(WaitTimeoutError::Timeout) => panic!("Timeout {tslot:#?}"), Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 2 - Being notified twice but waiting only once bs.wait(); - match waiter.wait_timeout(tslot) { Ok(()) => {} Err(WaitTimeoutError::Timeout) => panic!("Timeout {tslot:#?}"), Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } - match waiter.wait_timeout(tslot) { Ok(()) => panic!("Event Ok but it should be Timeout"), Err(WaitTimeoutError::Timeout) => {} Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 3 - Notifier has been dropped bs.wait(); - waiter.wait().unwrap_err(); - bs.wait(); }); let bp = barrier.clone(); let p = std::thread::spawn(move || { - // 1 - Notify once notifier.notify().unwrap(); - bp.wait(); - - // 2 - Notify twice notifier.notify().unwrap(); notifier.notify().unwrap(); - bp.wait(); bp.wait(); - - // 3 - Drop notifier yielding an error in the waiter drop(notifier); - bp.wait(); bp.wait(); }); @@ -430,7 +309,6 @@ mod tests { sync::{Arc, Barrier}, time::{Duration, Instant}, }; - use crate::WaitDeadlineError; let barrier = Arc::new(Barrier::new(2)); @@ -439,57 +317,38 @@ mod tests { let bs = barrier.clone(); let s = std::thread::spawn(move || { - // 1 - Wait one notification match waiter.wait_deadline(Instant::now() + tslot) { Ok(()) => {} Err(WaitDeadlineError::Deadline) => panic!("Timeout {tslot:#?}"), Err(WaitDeadlineError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 2 - Being notified twice but waiting only once bs.wait(); - match waiter.wait_deadline(Instant::now() + tslot) { Ok(()) => {} Err(WaitDeadlineError::Deadline) => panic!("Timeout {tslot:#?}"), Err(WaitDeadlineError::WaitError) => panic!("Event closed"), } - match waiter.wait_deadline(Instant::now() + tslot) { Ok(()) => panic!("Event Ok but it should be Timeout"), Err(WaitDeadlineError::Deadline) => {} Err(WaitDeadlineError::WaitError) => panic!("Event closed"), } - bs.wait(); - - // 3 - Notifier has been dropped bs.wait(); - waiter.wait().unwrap_err(); - bs.wait(); }); let bp = barrier.clone(); let p = std::thread::spawn(move || { - // 1 - Notify once notifier.notify().unwrap(); - bp.wait(); - - // 2 - Notify twice notifier.notify().unwrap(); notifier.notify().unwrap(); - bp.wait(); bp.wait(); - - // 3 - Drop notifier yielding an error in the waiter drop(notifier); - bp.wait(); bp.wait(); }); @@ -533,13 +392,10 @@ mod tests { let tout = Duration::from_secs(60); loop { let n = COUNTER.load(Ordering::Relaxed); - if n == N { - break; - } + if n == N { break; } if start.elapsed() > tout { panic!("Timeout {tout:#?}. Counter: {n}/{N}"); } - std::thread::sleep(Duration::from_millis(100)); } @@ -602,20 +458,16 @@ mod tests { let tout = Duration::from_secs(60); loop { let n = COUNTER.load(Ordering::Relaxed); - if n == N { - break; - } + if n == N { break; } if start.elapsed() > tout { panic!("Timeout {tout:#?}. Counter: {n}/{N}"); } - std::thread::sleep(Duration::from_millis(100)); } }); p1.join().unwrap(); p2.join().unwrap(); - s1.join().unwrap(); s2.join().unwrap(); } From 291164cd9f422bf746674b47020d8962b4cdb739 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Sun, 22 Mar 2026 19:52:47 +0000 Subject: [PATCH 2/5] perf(zenoh-sync): replace event_listener with Mutex+Condvar+AsyncNotify The original EventListener alloc+intrusive-list per wait() cost CPU on the transport hot path (12% in perf profiles). Previous fix attempt used spawn_blocking for wait_async(), which is not cancellation-safe: tokio::time::timeout() cancels the outer future but the blocking thread detaches and eventually consumes the next real notification, causing a hang at 64KB+ payloads (fragmented messages). New approach: - sync paths (wait/wait_deadline/wait_timeout): std Mutex + Condvar (zero allocation, no intrusive list) - async path (wait_async): tokio::sync::Notify with enable() pattern (cancellation-safe: dropped Notified futures return their permit) The enable()-before-state-check pattern ensures notifications are never missed between the state check and the .await. --- commons/zenoh-sync/src/event.rs | 58 ++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 7c9f5ff23c..65219513c9 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -13,12 +13,14 @@ // use std::{ fmt, + pin::pin, sync::{ atomic::{AtomicU16, Ordering}, Arc, Condvar, Mutex, }, time::{Duration, Instant}, }; +use tokio::sync::Notify as AsyncNotify; // Error types const WAIT_ERR_STR: &str = "No notifier available"; @@ -101,17 +103,25 @@ impl fmt::Debug for NotifyError { impl std::error::Error for NotifyError {} -// State values stored inside the mutex. +// State values for the sync Mutex. const UNSET: u8 = 0; const OK: u8 = 1; const ERR: u8 = 2; -// Replace event_listener::Event + AtomicU8 with std Mutex + Condvar. -// This eliminates per-wait EventListener allocation and intrusive-list -// register/deregister cost on the transport pipeline hot path. +// Replace event_listener::Event + AtomicU8 with: +// - std Mutex + Condvar → sync paths (wait / wait_deadline / wait_timeout) +// - tokio::sync::Notify → async path (wait_async), which is cancellation-safe +// +// The original event_listener approach allocated a linked-list node (EventListener) +// on every wait() call and paid intrusive-list register/deregister cost on the hot path. +// tokio::time::timeout() was also able to cancel wait_async() while its inner +// spawn_blocking task continued running, eventually consuming the next real notification +// (a "zombie" thread bug). tokio::sync::Notify avoids this: a dropped Notified future +// returns its permit to the pool so no notification is lost. struct EventInner { state: Mutex, cv: Condvar, + async_notify: AsyncNotify, notifiers: AtomicU16, waiters: AtomicU16, } @@ -124,6 +134,7 @@ pub fn new() -> (Notifier, Waiter) { let inner = Arc::new(EventInner { state: Mutex::new(UNSET), cv: Condvar::new(), + async_notify: AsyncNotify::new(), notifiers: AtomicU16::new(1), waiters: AtomicU16::new(1), }); @@ -143,6 +154,8 @@ impl Notifier { } *state = OK; self.0.cv.notify_one(); + drop(state); // release before async notify to minimise lock contention + self.0.async_notify.notify_one(); Ok(()) } } @@ -159,10 +172,12 @@ impl Drop for Notifier { fn drop(&mut self) { let n = self.0.notifiers.fetch_sub(1, Ordering::SeqCst); if n == 1 { - // Last notifier dropped — wake all waiters with error. + // Last notifier dropped — signal error to all waiters. let mut state = self.0.state.lock().unwrap(); *state = ERR; self.0.cv.notify_all(); + drop(state); + self.0.async_notify.notify_waiters(); // wake all async waiters } } } @@ -183,12 +198,34 @@ impl Waiter { } } + /// Cancellation-safe async wait. + /// + /// Uses [`tokio::sync::Notify`] so that if this future is dropped (e.g. by + /// `tokio::time::timeout`), the pending notification is returned to the pool + /// rather than silently consumed by a detached `spawn_blocking` thread. #[inline] pub async fn wait_async(&self) -> Result<(), WaitError> { - let waiter = self.clone(); - tokio::task::spawn_blocking(move || waiter.wait()) - .await - .unwrap_or(Err(WaitError)) + loop { + // Create and enable the Notified future BEFORE checking state. + // enable() subscribes to the Notify and consumes any stored permit, + // so a notify_one() that fires between here and the .await is not lost. + let mut notified = pin!(self.0.async_notify.notified()); + notified.as_mut().enable(); + + { + let mut state = self.0.state.lock().unwrap(); + match *state { + OK => { *state = UNSET; return Ok(()); } + ERR => return Err(WaitError), + _ => {} + } + } + + // No pending notification — await (cancellation-safe). + // If this future is dropped here, the Notified future is dropped too, + // which returns any pre-reserved permit to the Notify pool. + notified.await; + } } #[inline] @@ -243,6 +280,9 @@ impl Drop for Waiter { if n == 1 { let mut state = self.0.state.lock().unwrap(); *state = ERR; + self.0.cv.notify_all(); + drop(state); + self.0.async_notify.notify_waiters(); } } } From ad7d48c9d041e950deeff190b0f01ca08bbb0f06 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 23 Mar 2026 10:52:25 +0000 Subject: [PATCH 3/5] =?UTF-8?q?perf(zenoh-sync):=20replace=20std=20Mutex?= =?UTF-8?q?=20with=20parking=5Flot=20for=20~3=C3=97=20notify=20throughput?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit std::sync::Mutex uncontended lock/unlock costs ~90 ns; parking_lot::Mutex costs ~20 ns. The ev-fix design requires notify() to briefly acquire the condvar mutex on every call (lost-wakeup fence). With std that made the sticky (already-set) wait() path cost 179 ns vs event_listener's 31 ns. With parking_lot the sticky path measures 32 ns — matching event_listener — while the cross-thread wakeup path improves to 4.7 µs (from 5.1 µs with event_listener). No semantic change; parking_lot is already in the lockfile. --- Cargo.lock | 2 + Cargo.toml | 1 + commons/zenoh-sync/Cargo.toml | 6 ++ commons/zenoh-sync/src/event.rs | 154 +++++++++++++++++++++----------- 4 files changed, 113 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61535fb9a5..15bf7f1b85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6571,8 +6571,10 @@ name = "zenoh-sync" version = "1.8.0" dependencies = [ "arc-swap", + "criterion", "event-listener 5.4.1", "futures", + "parking_lot", "tokio", "zenoh-buffers", "zenoh-collections", diff --git a/Cargo.toml b/Cargo.toml index 78eda68815..4428580dad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ num_cpus = "1.17.0" once_cell = "1.21.3" ordered-float = "5.1.0" panic-message = "0.3.0" +parking_lot = "0.12.5" petgraph = "0.8.3" phf = { version = "0.13.1", features = ["macros"] } pnet = "0.35.0" diff --git a/commons/zenoh-sync/Cargo.toml b/commons/zenoh-sync/Cargo.toml index 14686d6fe4..ed95199e9e 100644 --- a/commons/zenoh-sync/Cargo.toml +++ b/commons/zenoh-sync/Cargo.toml @@ -31,6 +31,7 @@ version = { workspace = true } [dependencies] arc-swap = { workspace = true } event-listener = { workspace = true } +parking_lot = { workspace = true } futures = { workspace = true } tokio = { workspace = true, features = ["sync"] } zenoh-buffers = { workspace = true } @@ -38,6 +39,7 @@ zenoh-collections = { workspace = true, features = ["default"] } zenoh-core = { workspace = true } [dev-dependencies] +criterion = { workspace = true } tokio = { workspace = true, features = [ "macros", "rt-multi-thread", @@ -45,3 +47,7 @@ tokio = { workspace = true, features = [ "time", ] } zenoh-result = { workspace = true } + +[[bench]] +name = "notify_wait" +harness = false diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 65219513c9..e9bb8d999a 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -15,11 +15,12 @@ use std::{ fmt, pin::pin, sync::{ - atomic::{AtomicU16, Ordering}, - Arc, Condvar, Mutex, + atomic::{AtomicU16, AtomicU8, Ordering}, + Arc, }, time::{Duration, Instant}, }; +use parking_lot::{Condvar, Mutex}; use tokio::sync::Notify as AsyncNotify; // Error types @@ -103,23 +104,32 @@ impl fmt::Debug for NotifyError { impl std::error::Error for NotifyError {} -// State values for the sync Mutex. +// State values for the AtomicU8 flag. const UNSET: u8 = 0; const OK: u8 = 1; const ERR: u8 = 2; // Replace event_listener::Event + AtomicU8 with: -// - std Mutex + Condvar → sync paths (wait / wait_deadline / wait_timeout) -// - tokio::sync::Notify → async path (wait_async), which is cancellation-safe +// - AtomicU8 flag + Mutex<()> + Condvar → sync paths (wait / wait_deadline / wait_timeout) +// - tokio::sync::Notify → async path (wait_async), cancellation-safe // -// The original event_listener approach allocated a linked-list node (EventListener) -// on every wait() call and paid intrusive-list register/deregister cost on the hot path. -// tokio::time::timeout() was also able to cancel wait_async() while its inner -// spawn_blocking task continued running, eventually consuming the next real notification -// (a "zombie" thread bug). tokio::sync::Notify avoids this: a dropped Notified future -// returns its permit to the pool so no notification is lost. +// Hot-path design (sticky notification): +// +// notify(): flag.fetch_update(UNSET|OK → OK) [atomic, no lock] +// + brief lock()/unlock() to prevent lost wakeup +// + cv.notify_one() + async_notify.notify_one() +// +// wait(): flag.compare_exchange(OK → UNSET) [atomic, no lock] ← hot path returns here +// If UNSET: lock() → re-check flag → cv.wait() if still UNSET +// +// The mutex is only taken on the slow (blocking) path and briefly in notify() to +// fence against the lost-wakeup window. No heap allocation on either path. struct EventInner { - state: Mutex, + /// Primary state. Written atomically on hot path; also read under `mutex` + /// on the slow path to re-check after wakeup. + flag: AtomicU8, + /// Used exclusively as the condvar lock. Carries no state of its own. + mutex: Mutex<()>, cv: Condvar, async_notify: AsyncNotify, notifiers: AtomicU16, @@ -132,7 +142,8 @@ struct EventInner { /// call returns immediately. pub fn new() -> (Notifier, Waiter) { let inner = Arc::new(EventInner { - state: Mutex::new(UNSET), + flag: AtomicU8::new(UNSET), + mutex: Mutex::new(()), cv: Condvar::new(), async_notify: AsyncNotify::new(), notifiers: AtomicU16::new(1), @@ -148,13 +159,24 @@ pub struct Notifier(Arc); impl Notifier { #[inline] pub fn notify(&self) -> Result<(), NotifyError> { - let mut state = self.0.state.lock().unwrap(); - if *state == ERR { - return Err(NotifyError); - } - *state = OK; + // Atomically set flag to OK unless it is already ERR (terminal state). + // fetch_update returns Err if the closure returns None (i.e. flag == ERR). + self.0 + .flag + .fetch_update(Ordering::Release, Ordering::Relaxed, |f| { + if f == ERR { None } else { Some(OK) } + }) + .map_err(|_| NotifyError)?; + + // Brief lock/unlock to close the lost-wakeup window. + // + // A waiter on the slow path does: CAS-fails → lock() → re-check flag → cv.wait(). + // By taking the lock here we ensure either: + // (a) waiter has not yet called lock() → it will see OK on its flag re-check, or + // (b) waiter is already in cv.wait() → cv.notify_one() below will wake it. + drop(self.0.mutex.lock()); + self.0.cv.notify_one(); - drop(state); // release before async notify to minimise lock contention self.0.async_notify.notify_one(); Ok(()) } @@ -172,12 +194,11 @@ impl Drop for Notifier { fn drop(&mut self) { let n = self.0.notifiers.fetch_sub(1, Ordering::SeqCst); if n == 1 { - // Last notifier dropped — signal error to all waiters. - let mut state = self.0.state.lock().unwrap(); - *state = ERR; + // Last notifier dropped — set ERR and wake all waiters. + self.0.flag.store(ERR, Ordering::Release); + drop(self.0.mutex.lock()); // lost-wakeup fence self.0.cv.notify_all(); - drop(state); - self.0.async_notify.notify_waiters(); // wake all async waiters + self.0.async_notify.notify_waiters(); } } } @@ -188,12 +209,29 @@ pub struct Waiter(Arc); impl Waiter { #[inline] pub fn wait(&self) -> Result<(), WaitError> { - let mut state = self.0.state.lock().unwrap(); + // Hot path: consume a sticky OK notification without taking the lock. + match self + .0 + .flag + .compare_exchange(OK, UNSET, Ordering::Acquire, Ordering::Relaxed) + { + Ok(_) => return Ok(()), + Err(ERR) => return Err(WaitError), + Err(_) => {} // UNSET → fall through to slow path + } + + // Slow path: take the lock and block on the condvar. + let mut guard = self.0.mutex.lock(); loop { - match *state { - OK => { *state = UNSET; return Ok(()); } + match self.0.flag.load(Ordering::Acquire) { + OK => { + self.0.flag.store(UNSET, Ordering::Release); + return Ok(()); + } ERR => return Err(WaitError), - _ => { state = self.0.cv.wait(state).unwrap(); } + _ => { + self.0.cv.wait(&mut guard); + } } } } @@ -206,19 +244,21 @@ impl Waiter { #[inline] pub async fn wait_async(&self) -> Result<(), WaitError> { loop { - // Create and enable the Notified future BEFORE checking state. - // enable() subscribes to the Notify and consumes any stored permit, - // so a notify_one() that fires between here and the .await is not lost. + // Subscribe BEFORE checking the flag. + // enable() ensures that any notify_one() fired after this point wakes us, + // and consumes any stored permit if notify_one() already fired. let mut notified = pin!(self.0.async_notify.notified()); notified.as_mut().enable(); + // Hot path: consume sticky OK without lock. + match self + .0 + .flag + .compare_exchange(OK, UNSET, Ordering::Acquire, Ordering::Relaxed) { - let mut state = self.0.state.lock().unwrap(); - match *state { - OK => { *state = UNSET; return Ok(()); } - ERR => return Err(WaitError), - _ => {} - } + Ok(_) => return Ok(()), + Err(ERR) => return Err(WaitError), + Err(_) => {} } // No pending notification — await (cancellation-safe). @@ -230,23 +270,38 @@ impl Waiter { #[inline] pub fn wait_deadline(&self, deadline: Instant) -> Result<(), WaitDeadlineError> { - let mut state = self.0.state.lock().unwrap(); + // Hot path. + match self + .0 + .flag + .compare_exchange(OK, UNSET, Ordering::Acquire, Ordering::Relaxed) + { + Ok(_) => return Ok(()), + Err(ERR) => return Err(WaitDeadlineError::WaitError), + Err(_) => {} + } + + // Slow path. + let mut guard = self.0.mutex.lock(); loop { - match *state { - OK => { *state = UNSET; return Ok(()); } + match self.0.flag.load(Ordering::Acquire) { + OK => { + self.0.flag.store(UNSET, Ordering::Release); + return Ok(()); + } ERR => return Err(WaitDeadlineError::WaitError), _ => { let now = Instant::now(); if now >= deadline { return Err(WaitDeadlineError::Deadline); } - let (new_state, timeout) = self.0.cv - .wait_timeout(state, deadline - now) - .unwrap(); - state = new_state; - if timeout.timed_out() { - return match *state { - OK => { *state = UNSET; Ok(()) } + let timed_out = self.0.cv.wait_for(&mut guard, deadline - now).timed_out(); + if timed_out { + return match self.0.flag.load(Ordering::Acquire) { + OK => { + self.0.flag.store(UNSET, Ordering::Release); + Ok(()) + } ERR => Err(WaitDeadlineError::WaitError), _ => Err(WaitDeadlineError::Deadline), }; @@ -278,10 +333,9 @@ impl Drop for Waiter { fn drop(&mut self) { let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); if n == 1 { - let mut state = self.0.state.lock().unwrap(); - *state = ERR; + self.0.flag.store(ERR, Ordering::Release); + drop(self.0.mutex.lock()); // lost-wakeup fence self.0.cv.notify_all(); - drop(state); self.0.async_notify.notify_waiters(); } } From ca3152ebcf622dcb911f71e27664cd616cadeb57 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 23 Mar 2026 11:28:39 +0000 Subject: [PATCH 4/5] bench(zenoh-sync): add notify_wait criterion benchmark Measures the sticky (hot) path and cross-thread wakeup path. Used to compare event_listener vs the Mutex+Condvar+parking_lot replacement introduced in the preceding commits. --- commons/zenoh-sync/benches/notify_wait.rs | 116 ++++++++++++++++++++++ commons/zenoh-sync/src/event.rs | 17 +++- 2 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 commons/zenoh-sync/benches/notify_wait.rs diff --git a/commons/zenoh-sync/benches/notify_wait.rs b/commons/zenoh-sync/benches/notify_wait.rs new file mode 100644 index 0000000000..54b1ac64ec --- /dev/null +++ b/commons/zenoh-sync/benches/notify_wait.rs @@ -0,0 +1,116 @@ +// +// Copyright (c) 2026 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Benchmark: Notifier/Waiter notification round-trip. +//! +//! Measures two paths through the synchronisation primitive: +//! +//! `notify_wait/sticky` — producer calls `notify()` before the consumer calls +//! `wait()`. The flag is already set so `wait()` returns without blocking. +//! This is the common case in the transport pipeline when the batch is ready +//! before the tx thread checks. Hot path: lock + flag check + unlock (no +//! condvar sleep). +//! +//! `notify_wait/cross_thread` — a dedicated notifier thread signals the +//! measured thread. `wait()` actually blocks on the condvar until the signal +//! arrives. Measures true cross-thread wakeup latency. +//! +//! # Regression baseline (event_listener, before this PR) +//! +//! Profiling a zenoh SHM publisher at 1 MB / 100 Hz showed `event_listener` taking +//! **12.18% CPU** in `drop_in_place::` on the sender thread — +//! heap-allocating and dropping a linked-list node on *every* batch send cycle. +//! +//! Expected results after this PR (Mutex+Condvar): +//! sticky: ~20–50 ns (lock + flag + unlock, no allocation) +//! cross_thread: ~2–10 µs (futex wakeup round-trip) +//! +//! Run with: +//! cargo bench --bench notify_wait -p zenoh-sync + +use std::{ + sync::{Arc, Barrier}, + thread, + time::{Duration, Instant}, +}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use zenoh_sync::event; + +/// Sticky path: notify() fires before wait() — no blocking, just flag check. +/// +/// Represents the hot path when the batch is already ready when the tx thread +/// wakes up. Before this PR: EventListener allocated on the heap every cycle. +/// After: lock + u8 check + unlock. +fn bench_sticky(c: &mut Criterion) { + let mut group = c.benchmark_group("notify_wait"); + group.throughput(Throughput::Elements(1)); + + group.bench_function(BenchmarkId::new("sticky", ""), |b| { + let (notifier, waiter) = event::new(); + b.iter(|| { + notifier.notify().unwrap(); + waiter.wait().unwrap(); + }); + }); + + group.finish(); +} + +/// Cross-thread path: waiter blocks until a dedicated notifier thread fires. +/// +/// Measures the full condvar wakeup round-trip between two threads. Uses +/// a Barrier to ensure the waiter is sleeping before notify() fires, giving +/// a clean measurement of the unpark/futex latency. +fn bench_cross_thread(c: &mut Criterion) { + let mut group = c.benchmark_group("notify_wait"); + group.throughput(Throughput::Elements(1)); + + group.bench_function(BenchmarkId::new("cross_thread", ""), |b| { + b.iter_custom(|iters| { + // Barrier: ensures waiter is blocked before notifier fires. + let barrier = Arc::new(Barrier::new(2)); + + let (notifier, waiter) = event::new(); + let notifier = Arc::new(notifier); + + let notifier_thread = { + let notifier = Arc::clone(¬ifier); + let barrier = Arc::clone(&barrier); + thread::spawn(move || { + for _ in 0..iters { + barrier.wait(); // wait until waiter is about to block + notifier.notify().unwrap(); + } + }) + }; + + let mut total = Duration::ZERO; + for _ in 0..iters { + barrier.wait(); // signal notifier thread we're ready + let start = Instant::now(); + waiter.wait().unwrap(); + total += start.elapsed(); + } + + notifier_thread.join().unwrap(); + total + }); + }); + + group.finish(); +} + +criterion_group!(benches, bench_sticky, bench_cross_thread); +criterion_main!(benches); diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index e9bb8d999a..c29af4b662 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -20,6 +20,7 @@ use std::{ }, time::{Duration, Instant}, }; + use parking_lot::{Condvar, Mutex}; use tokio::sync::Notify as AsyncNotify; @@ -164,7 +165,11 @@ impl Notifier { self.0 .flag .fetch_update(Ordering::Release, Ordering::Relaxed, |f| { - if f == ERR { None } else { Some(OK) } + if f == ERR { + None + } else { + Some(OK) + } }) .map_err(|_| NotifyError)?; @@ -349,6 +354,7 @@ mod tests { sync::{Arc, Barrier}, time::Duration, }; + use crate::WaitTimeoutError; let barrier = Arc::new(Barrier::new(2)); @@ -403,6 +409,7 @@ mod tests { sync::{Arc, Barrier}, time::{Duration, Instant}, }; + use crate::WaitDeadlineError; let barrier = Arc::new(Barrier::new(2)); @@ -486,7 +493,9 @@ mod tests { let tout = Duration::from_secs(60); loop { let n = COUNTER.load(Ordering::Relaxed); - if n == N { break; } + if n == N { + break; + } if start.elapsed() > tout { panic!("Timeout {tout:#?}. Counter: {n}/{N}"); } @@ -552,7 +561,9 @@ mod tests { let tout = Duration::from_secs(60); loop { let n = COUNTER.load(Ordering::Relaxed); - if n == N { break; } + if n == N { + break; + } if start.elapsed() > tout { panic!("Timeout {tout:#?}. Counter: {n}/{N}"); } From 9b3faca7ef9dfb9a3367b1428b23b0502eb527d1 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 23 Mar 2026 23:18:30 +0800 Subject: [PATCH 5/5] fix(zenoh-sync): fix taplo key order and test port collision Sort [[bench]] keys alphabetically (harness before name) to satisfy taplo reorder_keys=true. Clear the default peer listen endpoint in pub_config of test_session_from_cloned_config to prevent the OS from assigning port 38446 to pub_session's random listener, which collided with sub_session's explicit bind on that port. --- commons/zenoh-sync/Cargo.toml | 4 ++-- zenoh/tests/session.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/commons/zenoh-sync/Cargo.toml b/commons/zenoh-sync/Cargo.toml index ed95199e9e..fd44e1574f 100644 --- a/commons/zenoh-sync/Cargo.toml +++ b/commons/zenoh-sync/Cargo.toml @@ -31,8 +31,8 @@ version = { workspace = true } [dependencies] arc-swap = { workspace = true } event-listener = { workspace = true } -parking_lot = { workspace = true } futures = { workspace = true } +parking_lot = { workspace = true } tokio = { workspace = true, features = ["sync"] } zenoh-buffers = { workspace = true } zenoh-collections = { workspace = true, features = ["default"] } @@ -49,5 +49,5 @@ tokio = { workspace = true, features = [ zenoh-result = { workspace = true } [[bench]] -name = "notify_wait" harness = false +name = "notify_wait" diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 2ca51a9b06..110ebc67d1 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -421,6 +421,7 @@ async fn test_session_from_cloned_config() { .endpoints .set(vec![locator.parse().unwrap()]) .unwrap(); + pub_config.listen.endpoints.set(vec![]).unwrap(); (pub_config, sub_config) };