Skip to content

node: route all data via zenoh, switch to callback subscribers#1787

Open
phil-opp wants to merge 2 commits into
mainfrom
everything-through-zenoh
Open

node: route all data via zenoh, switch to callback subscribers#1787
phil-opp wants to merge 2 commits into
mainfrom
everything-through-zenoh

Conversation

@phil-opp
Copy link
Copy Markdown
Collaborator

Two related changes to the data path. Both kept inside `apis/rust/node`.

1. Route all data-plane messages through zenoh (4614c62)

When a zenoh session is open, every `send_output` call publishes via zenoh regardless of payload size. The control channel is now reserved for control; data only falls back to it if the zenoh session could not be opened (interactive/testing modes). Also drops the separate `ZENOH_SHM_MIN_PAYLOAD` constant — the SHM-vs-heap decision inside `zenoh_publish` now uses the existing `zenoh_zero_copy_threshold` field, so there's one threshold instead of two.

2. Switch zenoh subscribers to callback handler (f824cab)

The previous `FifoChannelHandler` + dedicated subscriber thread + `block_on(select!)` loop is replaced with a callback subscriber. Zenoh's IO worker writes the `EventItem` directly into the tokio mpsc, eliminating one wakeup hop per receive. Notes:

  • `try_send` instead of `blocking_send`/`send().await` (callback runs on a tokio worker, where `blocking_send` would panic). A full channel drops with a warning, matching `DropOldest`.
  • Panics in the callback are caught with `catch_unwind` and surfaced as `EventItem::FatalError`.
  • Drops the dedicated subscriber thread, the shutdown channel, the `JoinHandle`s, and the `zenoh_subscriber_loop` function.

Net change across both commits: -64 LOC.

Local benchmark (`examples/benchmark`, dev container, `dora run`, payloads 8B–2KB, 5 runs each, p50 of medians):

size daemon path (before) this PR
8B 78µs 97µs
64B 87µs 82µs
512B 75µs 87µs
2KB 84µs 90µs

Within ±20µs of the previous baseline despite the routing change, with the callback-subscriber win making up most of the FIFO-subscriber overhead. Throughput improves +12% to +60% in the same configuration.

Caveat: in environments without multicast (e.g. some CI / dev containers), zenoh peer discovery requires `ZENOH_CONFIG` with explicit endpoints. This isn't new behavior, but is now reachable for all message sizes instead of just ≥4KiB.

@trunk-io
Copy link
Copy Markdown
Contributor

trunk-io Bot commented Apr 29, 2026

❌ This pull request failed tests. It has been removed from the merge queue. PR #1788 was used for testing. See more details here.

Failed Required Status Conclusion
E2E Tests Failure
  • To merge this pull request, check the box to the left or comment /trunk merge below.

After your PR is submitted to the merge queue, this comment will be automatically updated with its status. If the PR fails, failure details will also be posted here

phil-opp and others added 2 commits April 29, 2026 19:12
Data-plane messages now always go through zenoh when a session is open,
regardless of payload size; the daemon control channel is reserved for
control. Previously messages below 4 KiB fell back to the daemon-relay
path.

Also collapses two thresholds into one: the SHM-vs-heap decision inside
`zenoh_publish` now uses `self.zenoh_zero_copy_threshold` instead of the
separate `ZENOH_SHM_MIN_PAYLOAD` constant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the per-input dedicated thread + FifoChannelHandler +
`block_on(select(recv_async, shutdown_rx))` loop with a callback
subscriber. Zenoh's IO worker now writes the EventItem directly into
the tokio mpsc, eliminating one wakeup hop on every receive. In the
benchmark, this brings zenoh-routed small-message p50 down by 10-26%
versus the FIFO path and back to parity with (or below) the previous
daemon-relay baseline.

Side effects:
- `try_send` instead of `blocking_send`/`send().await`: the callback
  runs on a zenoh tokio worker, so `blocking_send` would panic. Full
  channel drops with a warning, matching `DropOldest`.
- Panics inside the callback are caught and surfaced as
  `EventItem::FatalError` so a misbehaving handler doesn't unwind
  through zenoh's IO worker.
- Drops the dedicated subscriber thread, the shutdown channel, the
  JoinHandles, and `zenoh_subscriber_loop` (~70 LOC net).
- Subscribers are kept alive in `_zenoh_subscribers: Vec<Subscriber<()>>`
  on EventStream; dropping them stops further callbacks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

@phil-opp the Trunk merge queue failed for this PR.

See the Trunk merge-status comment for details.

Posted as a new comment so GitHub sends an email — Trunk's sticky comment is edited in place and won't trigger a notification.

@phil-opp phil-opp force-pushed the everything-through-zenoh branch from f824cab to 015d2b0 Compare May 13, 2026 16:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant