Skip to content

Commit 1b8a25a

Browse files
OlivierHecartwyfo
authored andcommitted
Add ClosableCallback type
1 parent 131db7d commit 1b8a25a

2 files changed

Lines changed: 36 additions & 18 deletions

File tree

io/zenoh-transport/src/unicast/universal/rx.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
1313
//
14-
use std::sync::{atomic::Ordering, MutexGuard};
14+
use std::sync::MutexGuard;
1515

1616
use zenoh_buffers::ZSlice;
1717
use zenoh_codec::transport::frame::FrameReader;
@@ -108,11 +108,7 @@ impl TransportUnicastUniversal {
108108
// Drop invalid message and continue
109109
return Ok(());
110110
}
111-
if let Some(callback) = self
112-
.callback
113-
.get()
114-
.filter(|_| !self.closed.load(Ordering::Relaxed))
115-
{
111+
if let Some(callback) = self.callback.get() {
116112
for mut msg in frame {
117113
self.trigger_callback(
118114
callback.as_ref(),
@@ -193,11 +189,7 @@ impl TransportUnicastUniversal {
193189
if !more {
194190
// When shared-memory feature is disabled, msg does not need to be mutable
195191
if let Some(mut msg) = guard.defrag.defragment() {
196-
if let Some(callback) = self
197-
.callback
198-
.get()
199-
.filter(|_| !self.closed.load(Ordering::Relaxed))
200-
{
192+
if let Some(callback) = self.callback.get() {
201193
return self.trigger_callback(
202194
callback.as_ref(),
203195
msg.as_mut(),

io/zenoh-transport/src/unicast/universal/transport.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,35 @@ use crate::{
4646
TransportManager, TransportPeerEventHandler,
4747
};
4848

49+
pub(crate) struct ClosableCallback {
50+
callback: OnceLock<Arc<dyn TransportPeerEventHandler>>,
51+
closed: AtomicBool,
52+
}
53+
54+
impl ClosableCallback {
55+
pub(crate) fn new() -> Self {
56+
ClosableCallback {
57+
callback: OnceLock::new(),
58+
closed: AtomicBool::new(false),
59+
}
60+
}
61+
62+
pub(crate) fn set(&self, cb: Arc<dyn TransportPeerEventHandler>) {
63+
let _ = self.callback.set(cb);
64+
}
65+
66+
pub(crate) fn get(&self) -> Option<&Arc<dyn TransportPeerEventHandler>> {
67+
self.callback
68+
.get()
69+
.filter(|_| !self.closed.load(Ordering::Relaxed))
70+
}
71+
72+
pub(crate) fn close(&self) -> Option<&Arc<dyn TransportPeerEventHandler>> {
73+
self.closed.store(true, Ordering::Relaxed);
74+
self.callback.get()
75+
}
76+
}
77+
4978
/*************************************/
5079
/* UNIVERSAL TRANSPORT */
5180
/*************************************/
@@ -64,8 +93,7 @@ pub(crate) struct TransportUnicastUniversal {
6493
// The links associated to the channel
6594
pub(super) links: Arc<RwLock<TransportLinks>>,
6695
// The callback
67-
pub(super) callback: Arc<OnceLock<Arc<dyn TransportPeerEventHandler>>>,
68-
pub(super) closed: Arc<AtomicBool>,
96+
pub(super) callback: Arc<ClosableCallback>,
6997
// Lock used to ensure no race in add_link method
7098
add_link_lock: Arc<AsyncMutex<()>>,
7199
// Mutex for notification
@@ -112,8 +140,7 @@ impl TransportUnicastUniversal {
112140
priority_rx: priority_rx.into_boxed_slice().into(),
113141
links: Arc::new(RwLock::new(TransportLinks::default())),
114142
add_link_lock: Arc::new(AsyncMutex::new(())),
115-
callback: Arc::new(OnceLock::new()),
116-
closed: Arc::new(AtomicBool::new(false)),
143+
callback: Arc::new(ClosableCallback::new()),
117144
status: Arc::new(AsyncMutex::new(TransportStatus::Uninitialized)),
118145
#[cfg(feature = "stats")]
119146
stats,
@@ -138,8 +165,7 @@ impl TransportUnicastUniversal {
138165
// to avoid concurrent new_transport and closing/closed notifications
139166
let mut status_guard = self.get_status().await;
140167
*status_guard = TransportStatus::Closed;
141-
self.closed.store(true, Ordering::Relaxed);
142-
let callback = self.callback.get().cloned();
168+
let callback = self.callback.close();
143169

144170
// Close all the links
145171
let mut links = zwrite!(self.links).take();
@@ -323,7 +349,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
323349
/* ACCESSORS */
324350
/*************************************/
325351
fn set_callback(&self, callback: Arc<dyn TransportPeerEventHandler>) {
326-
let _ = self.callback.set(callback);
352+
self.callback.set(callback)
327353
}
328354

329355
async fn get_status(&self) -> AsyncMutexGuard<'_, TransportStatus> {

0 commit comments

Comments
 (0)