@@ -70,14 +70,12 @@ pub struct EventStream {
7070 // waker when a Python coroutine polls this stream
7171 // (upstream dora-rs/dora#1603).
7272 receiver : tokio:: sync:: mpsc:: Receiver < EventItem > ,
73- /// Dropping this sender disconnects the shutdown channel so the
74- /// `select!` inside each zenoh subscriber thread wakes immediately,
75- /// instead of blocking on `subscriber.recv_async()` until the zenoh
76- /// session tears down. Kept alongside `_zenoh_thread_handles` so
77- /// subscribers notice drop before we join the threads.
78- _zenoh_shutdown_tx : flume:: Sender < ( ) > ,
7973 _thread_handle : EventStreamThreadHandle ,
80- _zenoh_thread_handles : Vec < std:: thread:: JoinHandle < ( ) > > ,
74+ /// Callback subscribers — kept alive for the lifetime of the
75+ /// EventStream. Dropping a subscriber stops further callbacks; the
76+ /// callback itself will see `blocking_send` return Err once the
77+ /// `receiver` (declared above) is dropped.
78+ _zenoh_subscribers : Vec < zenoh:: pubsub:: Subscriber < ( ) > > ,
8179 close_channel : DaemonChannel ,
8280 clock : Arc < uhlc:: HLC > ,
8381 scheduler : Scheduler ,
@@ -304,18 +302,17 @@ impl EventStream {
304302 _ => true ,
305303 } ;
306304
307- // Spawn zenoh subscribers for each input that has a source node.
308- // These feed events directly into the same tokio mpsc channel as
309- // daemon events. Subscriber threads are tracked for cleanup in
310- // EventStream::drop; dropping `zenoh_shutdown_tx` disconnects the
311- // shutdown channel so each thread's `select!` wakes up and exits.
312- let mut zenoh_thread_handles = Vec :: new ( ) ;
313- let ( zenoh_shutdown_tx , zenoh_shutdown_rx ) = flume :: bounded :: < ( ) > ( 0 ) ;
305+ // Declare zenoh subscribers for each input that has a source node.
306+ // We use callback subscribers so the zenoh IO thread delivers the
307+ // sample directly into the event channel without an intermediate
308+ // dora-side thread + recv_async wakeup. The `Subscriber<()>` handle
309+ // must be kept alive for the lifetime of the EventStream — we store
310+ // them in `_zenoh_subscribers` and drop them after `receiver`.
311+ let mut zenoh_subscribers = Vec :: new ( ) ;
314312 if let Some ( session) = zenoh_session {
315313 use zenoh:: Wait ;
316314 for ( input_id, input) in input_config {
317315 let mapping = & input. mapping ;
318- // Only user inputs from other nodes need zenoh subscribers
319316 if let dora_message:: config:: InputMapping :: User ( user_mapping) = mapping {
320317 let source_node = & user_mapping. source ;
321318 let source_output = & user_mapping. output ;
@@ -331,60 +328,82 @@ impl EventStream {
331328 continue ;
332329 }
333330 } ;
334- let subscriber = match session. declare_subscriber ( key_expr) . wait ( ) {
335- Ok ( s) => s,
336- Err ( e) => {
337- // Graceful degradation: this input falls back to daemon delivery
338- tracing:: warn!(
339- input = %input_id,
340- "failed to declare zenoh subscriber ({e}), using daemon path"
341- ) ;
342- continue ;
343- }
344- } ;
345-
346- tracing:: debug!(
347- input = %input_id,
348- %topic,
349- "zenoh subscriber declared for input"
350- ) ;
351-
352- let tx_clone = tx. clone ( ) ;
353- let input_id = input_id. clone ( ) ;
354- let shutdown_rx = zenoh_shutdown_rx. clone ( ) ;
355- let handle = std:: thread:: Builder :: new ( )
356- . name ( format ! ( "zenoh-sub-{input_id}" ) )
357- . spawn ( move || {
358- let input_id_for_panic = input_id. clone ( ) ;
359- let tx_for_panic = tx_clone. clone ( ) ;
360- // catch_unwind: a panic anywhere in the subscriber
361- // loop would otherwise silently kill delivery for
362- // this input. Surface it as a FatalError so the
363- // node sees the failure.
331+ let tx_cb = tx. clone ( ) ;
332+ let input_id_cb = input_id. clone ( ) ;
333+ let subscriber = session
334+ . declare_subscriber ( key_expr)
335+ . callback ( move |sample| {
336+ // catch_unwind: a panic inside the callback would
337+ // otherwise unwind through zenoh's IO worker, which
338+ // is unsafe. Surface as FatalError so the node sees
339+ // it and exits cleanly.
364340 let result =
365341 std:: panic:: catch_unwind ( std:: panic:: AssertUnwindSafe ( || {
366- zenoh_subscriber_loop (
367- subscriber,
368- input_id,
369- tx_clone,
370- shutdown_rx,
371- ) ;
342+ let metadata = match sample. attachment ( ) {
343+ Some ( att) => match bincode:: deserialize :: <
344+ dora_message:: metadata:: Metadata ,
345+ > (
346+ & att. to_bytes ( )
347+ ) {
348+ Ok ( m) => m,
349+ Err ( e) => {
350+ tracing:: warn!(
351+ "zenoh metadata deserialization failed: {e}"
352+ ) ;
353+ return ;
354+ }
355+ } ,
356+ None => {
357+ tracing:: warn!(
358+ "zenoh sample missing metadata attachment"
359+ ) ;
360+ return ;
361+ }
362+ } ;
363+ let payload = sample. payload ( ) . clone ( ) ;
364+ // Callback runs on zenoh's tokio IO worker —
365+ // `blocking_send` panics from a tokio context, so
366+ // use `try_send`. If the channel is full the event
367+ // is dropped (logged); receiver-dropped also
368+ // surfaces here, in which case there's nothing to do.
369+ if let Err ( e) = tx_cb. try_send ( EventItem :: ZenohInput {
370+ id : input_id_cb. clone ( ) ,
371+ metadata : std:: sync:: Arc :: new ( metadata) ,
372+ payload,
373+ } ) {
374+ use tokio:: sync:: mpsc:: error:: TrySendError ;
375+ match e {
376+ TrySendError :: Full ( _) => {
377+ tracing:: warn!(
378+ "event channel full; dropping zenoh input"
379+ ) ;
380+ }
381+ TrySendError :: Closed ( _) => {
382+ // normal shutdown
383+ }
384+ }
385+ }
372386 } ) ) ;
373387 if result. is_err ( ) {
374388 tracing:: error!(
375- "zenoh subscriber thread for input {input_id_for_panic} panicked"
389+ input = %input_id_cb,
390+ "zenoh subscriber callback panicked"
376391 ) ;
377- let _ = tx_for_panic . blocking_send ( EventItem :: FatalError ( eyre ! (
378- "zenoh subscriber thread for input {input_id_for_panic} panicked"
392+ let _ = tx_cb . try_send ( EventItem :: FatalError ( eyre ! (
393+ "zenoh subscriber callback for input `{input_id_cb}` panicked"
379394 ) ) ) ;
380395 }
381- tracing:: trace!( "zenoh subscriber thread exiting" ) ;
382- } ) ;
383- match handle {
384- Ok ( h) => zenoh_thread_handles. push ( h) ,
396+ } )
397+ . wait ( ) ;
398+ match subscriber {
399+ Ok ( s) => {
400+ tracing:: debug!( input = %input_id, %topic, "zenoh subscriber declared (callback)" ) ;
401+ zenoh_subscribers. push ( s) ;
402+ }
385403 Err ( e) => {
386404 tracing:: warn!(
387- "failed to spawn zenoh subscriber thread ({e}), input will use daemon path"
405+ input = %input_id,
406+ "failed to declare zenoh subscriber ({e}), using daemon path"
388407 ) ;
389408 }
390409 }
@@ -397,9 +416,8 @@ impl EventStream {
397416 Ok ( EventStream {
398417 node_id : node_id. clone ( ) ,
399418 receiver : rx,
400- _zenoh_shutdown_tx : zenoh_shutdown_tx,
401419 _thread_handle : thread_handle,
402- _zenoh_thread_handles : zenoh_thread_handles ,
420+ _zenoh_subscribers : zenoh_subscribers ,
403421 close_channel,
404422 start_timestamp : clock. new_timestamp ( ) ,
405423 clock,
@@ -1006,72 +1024,6 @@ pub enum TryRecvError {
10061024/// the original `ZBytes` allocation via `Buffer::from_custom_allocation`,
10071025/// achieving true zero-copy. For `Cow::Owned` (normal network path),
10081026/// the owned `Vec` is wrapped via the same mechanism at zero cost.
1009- /// Body of a zenoh subscriber thread.
1010- ///
1011- /// Receives samples from a declared subscriber, extracts the metadata
1012- /// attachment, and forwards each sample as an [`EventItem::ZenohInput`]
1013- /// to the event channel. Exits cleanly when the shutdown channel is
1014- /// disconnected (via [`EventStream`]'s `_zenoh_shutdown_tx` drop) or
1015- /// when the event channel receiver is dropped.
1016- fn zenoh_subscriber_loop (
1017- subscriber : zenoh:: pubsub:: Subscriber <
1018- zenoh:: handlers:: FifoChannelHandler < zenoh:: sample:: Sample > ,
1019- > ,
1020- input_id : DataId ,
1021- tx : tokio:: sync:: mpsc:: Sender < EventItem > ,
1022- shutdown_rx : flume:: Receiver < ( ) > ,
1023- ) {
1024- // block_on + select: race the subscriber against the shutdown
1025- // channel so dropping the EventStream wakes us immediately,
1026- // instead of blocking on a sync `subscriber.recv()` that only
1027- // unblocks when the underlying zenoh session tears down.
1028- futures:: executor:: block_on ( async {
1029- loop {
1030- let recv_fut = subscriber. recv_async ( ) ;
1031- let shutdown_fut = shutdown_rx. recv_async ( ) ;
1032- match select ( std:: pin:: pin!( recv_fut) , std:: pin:: pin!( shutdown_fut) ) . await {
1033- Either :: Left ( ( Ok ( sample) , _) ) => {
1034- let metadata = match sample. attachment ( ) {
1035- Some ( att) => {
1036- match bincode:: deserialize :: < dora_message:: metadata:: Metadata > (
1037- & att. to_bytes ( ) ,
1038- ) {
1039- Ok ( m) => m,
1040- Err ( e) => {
1041- tracing:: warn!( "zenoh metadata deserialization failed: {e}" ) ;
1042- continue ;
1043- }
1044- }
1045- }
1046- None => {
1047- tracing:: warn!( "zenoh sample missing metadata attachment" ) ;
1048- continue ;
1049- }
1050- } ;
1051-
1052- // Forward the raw ZBytes payload; the event loop wraps
1053- // it in an Arrow Buffer backed by the original zenoh
1054- // buffer for zero-copy conversion (dora-rs/adora#132).
1055- let payload = sample. payload ( ) . clone ( ) ;
1056- if tx
1057- . send ( EventItem :: ZenohInput {
1058- id : input_id. clone ( ) ,
1059- metadata : std:: sync:: Arc :: new ( metadata) ,
1060- payload,
1061- } )
1062- . await
1063- . is_err ( )
1064- {
1065- break ; // receiver dropped
1066- }
1067- }
1068- // Subscriber side error or shutdown signal — both mean stop.
1069- _ => break ,
1070- }
1071- }
1072- } ) ;
1073- }
1074-
10751027/// Convert a zenoh payload to an Arrow array (dora-rs/adora#132).
10761028fn zenoh_payload_to_arrow_array (
10771029 payload : zenoh:: bytes:: ZBytes ,
0 commit comments