node: route all data via zenoh, switch to callback subscribers#1787
Open
phil-opp wants to merge 2 commits into
Open
node: route all data via zenoh, switch to callback subscribers#1787phil-opp wants to merge 2 commits into
phil-opp wants to merge 2 commits into
Conversation
Contributor
|
❌ This pull request failed tests. It has been removed from the merge queue. PR #1788 was used for testing. See more details here.
After your PR is submitted to the merge queue, this comment will be automatically updated with its status. If the PR fails, failure details will also be posted here |
Data-plane messages now always go through zenoh when a session is open, regardless of payload size; the daemon control channel is reserved for control. Previously messages below 4 KiB fell back to the daemon-relay path. Also collapses two thresholds into one: the SHM-vs-heap decision inside `zenoh_publish` now uses `self.zenoh_zero_copy_threshold` instead of the separate `ZENOH_SHM_MIN_PAYLOAD` constant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the per-input dedicated thread + FifoChannelHandler + `block_on(select(recv_async, shutdown_rx))` loop with a callback subscriber. Zenoh's IO worker now writes the EventItem directly into the tokio mpsc, eliminating one wakeup hop on every receive. In the benchmark, this brings zenoh-routed small-message p50 down by 10-26% versus the FIFO path and back to parity with (or below) the previous daemon-relay baseline. Side effects: - `try_send` instead of `blocking_send`/`send().await`: the callback runs on a zenoh tokio worker, so `blocking_send` would panic. Full channel drops with a warning, matching `DropOldest`. - Panics inside the callback are caught and surfaced as `EventItem::FatalError` so a misbehaving handler doesn't unwind through zenoh's IO worker. - Drops the dedicated subscriber thread, the shutdown channel, the JoinHandles, and `zenoh_subscriber_loop` (~70 LOC net). - Subscribers are kept alive in `_zenoh_subscribers: Vec<Subscriber<()>>` on EventStream; dropping them stops further callbacks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
|
@phil-opp the Trunk merge queue failed for this PR. See the Trunk merge-status comment for details. Posted as a new comment so GitHub sends an email — Trunk's sticky comment is edited in place and won't trigger a notification. |
f824cab to
015d2b0
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Two related changes to the data path. Both kept inside `apis/rust/node`.
1. Route all data-plane messages through zenoh (4614c62)
When a zenoh session is open, every `send_output` call publishes via zenoh regardless of payload size. The control channel is now reserved for control; data only falls back to it if the zenoh session could not be opened (interactive/testing modes). Also drops the separate `ZENOH_SHM_MIN_PAYLOAD` constant — the SHM-vs-heap decision inside `zenoh_publish` now uses the existing `zenoh_zero_copy_threshold` field, so there's one threshold instead of two.
2. Switch zenoh subscribers to callback handler (f824cab)
The previous `FifoChannelHandler` + dedicated subscriber thread + `block_on(select!)` loop is replaced with a callback subscriber. Zenoh's IO worker writes the `EventItem` directly into the tokio mpsc, eliminating one wakeup hop per receive. Notes:
Net change across both commits: -64 LOC.
Local benchmark (`examples/benchmark`, dev container, `dora run`, payloads 8B–2KB, 5 runs each, p50 of medians):
Within ±20µs of the previous baseline despite the routing change, with the callback-subscriber win making up most of the FIFO-subscriber overhead. Throughput improves +12% to +60% in the same configuration.
Caveat: in environments without multicast (e.g. some CI / dev containers), zenoh peer discovery requires `ZENOH_CONFIG` with explicit endpoints. This isn't new behavior, but is now reachable for all message sizes instead of just ≥4KiB.