Skip to content

feat: direct node-to-node TCP for small messages#1621

Open
haixuanTao wants to merge 2 commits into
mainfrom
feat/direct-node-to-node-small-messages
Open

feat: direct node-to-node TCP for small messages#1621
haixuanTao wants to merge 2 commits into
mainfrom
feat/direct-node-to-node-small-messages

Conversation

@haixuanTao
Copy link
Copy Markdown
Collaborator

@haixuanTao haixuanTao commented Apr 14, 2026

Summary

  • Bypass the daemon for small messages (<4KB) by connecting sender nodes directly to receiver nodes via loopback TCP
  • Eliminates one TCP hop from the data path, cutting latency significantly
  • Significantly reduces the load on the daemon for high-frequency nodes, which is especially beneficial for resource-constrained devices like Raspberry Pi
  • Particularly useful for passing small metadata like CUDA IPC memory handles between nodes with minimal overhead
  • Large messages (>=4KB) continue through the daemon for shared memory drop token tracking

Benchmark results (64-byte payload, 100 samples)

Platform Before (daemon) After (direct) Improvement
Linux x86_64 412us 47us p50 ~9x
macOS ARM 211us 105us p50 ~2x

How it works

  1. Receiver nodes bind a direct TCP listener on localhost during init
  2. Receiver registers the listener address with the daemon
  3. After all nodes are ready, sender queries the daemon for direct routes
  4. Sender establishes TCP connections directly to receivers
  5. Small Vec messages are sent directly; large shared-memory messages still go through daemon
  6. Daemon skips forwarding small messages to receivers that have direct connections

Key changes

  • New node_to_node message module (DirectMessage, DirectRouteInfo)
  • DaemonRequest::RegisterDirectListener and QueryDirectRoutes for control plane
  • Direct listener thread on receiver side, injecting events into the same EventStream
  • Scheduler fix: prioritize non-input events (Stop) to prevent starvation from continuous direct data
  • Only loopback connections - remote nodes use the existing daemon path

Test plan

  • Run examples/benchmark/dataflow.yml and verify latency improvement
  • Run existing integration tests for regressions
  • Verify large messages (>=4KB) still go through daemon path
  • Verify remote/multi-daemon setups are unaffected

@phil-opp
Copy link
Copy Markdown
Collaborator

This has some similarities with #1378, which also bypasses the deamon for data messages. It also does this for the shmem path.

It would be interesting to see how #1378 compares to this PR performance-wise.

@github-actions github-actions Bot added waiting-for-author The pull request requires adjustments by the PR author. and removed waiting-for-author The pull request requires adjustments by the PR author. labels Apr 14, 2026
haixuanTao pushed a commit to dora-rs/dora-benchmark that referenced this pull request Apr 14, 2026
…ization

Python: 770us -> 440us (median, 4KB)
Rust:   650us -> 380us (median, 4KB)
C++:    ~unchanged (still routes through arrow FFI + daemon)

Tested with dora-rs/dora#1621 (feat/direct-node-to-node-small-messages).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@haixuanTao
Copy link
Copy Markdown
Collaborator Author

haixuanTao commented Apr 14, 2026

@phil-opp I think it's slightly different from #1378 because this is really just about the not shared memory message ( below threshold ) going directly through tcp to the receiver without any middleware. It doesn't touch the shared memory layer. Zenoh data does look good though.

We can even keep everything on the stack for those messages.

I run test on iox2 and iox2 which is used by zenoh ( as I heard ) could be even faster on performance only metrics.

@phil-opp
Copy link
Copy Markdown
Collaborator

What I meant is that #1378 routes all data messages independent of the deamon. So it also bypasses the deamon for small messages, just like this PR. Shared memory is only used for bigger messages.

@github-actions github-actions Bot added the waiting-for-author The pull request requires adjustments by the PR author. label Apr 14, 2026
@haixuanTao
Copy link
Copy Markdown
Collaborator Author

@phil-opp ah ok i see, i'm not against keeping the copilot version with zenoh then.

it's a bit confusing that the message definition still lies in daemon_to_node.rs also i can't find the part where the daemon exchange socketaddr between nodes but maybe it's somewhere within it?

@github-actions github-actions Bot removed the waiting-for-author The pull request requires adjustments by the PR author. label Apr 14, 2026
@phil-opp
Copy link
Copy Markdown
Collaborator

phil-opp commented Apr 28, 2026

Code Review (correctness focus)

Posted by Claude Code on behalf of the reviewer.

Overview

Adds a fast path for sender→receiver loopback TCP, bypassing the daemon for small Vec messages. Receivers bind a listener at startup, register the address with the daemon, senders query routes and open connections. The daemon tracks direct_pairs to skip forwarding for those pairs. The architectural shape is reasonable, but there are several correctness issues I'd block on.


Correctness issues (blocking)

1. Mixed direct + non-direct receivers are starved — data loss

In apis/rust/node/src/node/mod.rs around the new send path:

let sent_directly = if let Some(DataMessage::Vec(ref vec_data)) = data {
    if let Some(connections) = self.direct_connections.get_mut(&output_id) {
        // ... iterate over direct connections only
        all_ok
    } else { false }
} else { false };

if !sent_directly && !zenoh_published {
    self.control_channel.send_message(...)
}

sent_directly = true means some direct connections succeeded, not that all receivers were notified. direct_connections only contains receivers that registered a direct listener — that excludes remote daemons, nodes whose channel was Shmem rather than Tcp (only the DaemonChannel::Tcp branch in event_stream/mod.rs registers a listener), and any node that started its query before the receiver registered.

When at least one direct connection exists but other receivers don't, the sender skips control_channel.send_message, the daemon never sees the output, and the non-direct receivers silently miss the message. The daemon-side filter in send_output_to_local_receivers only helps when send_message is actually called.

Fix: fall through to the daemon whenever any receiver of that output_id is not in direct_connections, or have the daemon forward only to non-direct receivers.

2. Per-stream reordering between small and large messages

A node that emits both DataMessage::Vec (direct TCP) and DataMessage::SharedMemory (daemon) on the same output_id hands them to different transports. There is no ordering guarantee between the two, so a receiver can observe vec_msg_2 before shm_msg_1. This breaks Dora's existing per-input ordering contract that consumers rely on.

At minimum this needs to be documented as a constraint; ideally the direct path should be disabled (or messages sequenced) when an output ever produces SHM data.

3. No size threshold despite PR description

The PR title/description say "<4KB" / "small messages", but the code never checks size:

let sent_directly = if let Some(DataMessage::Vec(ref vec_data)) = data {
    if let Some(connections) = self.direct_connections.get_mut(&output_id) {

A 50 MB Vec will be bincode::serialized into a fresh allocation, written to TCP synchronously, and clone-multiplied across receivers — defeating the zero-copy goal and producing latency cliffs the benchmarks won't show. Gate on vec_data.len() < ZERO_COPY_THRESHOLD or rename the feature.

4. Startup race silently degrades to daemon path

The sender queries QueryDirectRoutes immediately after ControlChannel::init, with no barrier ensuring all receivers have already registered their listeners. A sender that initializes first sees an empty (or partial) route list, caches direct_connections, and never retries — that pair uses the daemon path forever.

Not data-loss (assuming issue 1 above is fixed), but the optimization becomes non-deterministic. The PR description's step 3 ("After all nodes are ready, sender queries the daemon") is not actually enforced in code.


Other issues

5. Hacky 1 ms sleep on Drop

for conn in conns { let _ = conn.stream.shutdown(std::net::Shutdown::Write); }
std::thread::sleep(std::time::Duration::from_millis(1));
self.direct_connections.clear();

shutdown(Write) already queues a FIN after pending data; the receiver reads everything buffered then sees EOF cleanly. The 1 ms sleep buys nothing in the happy case and isn't enough on a loaded system. Drop the sleep, and ideally have the listener thread emit a synthetic end-marker so receivers don't depend on TCP EOF for lifecycle.

6. Doc-comment copy-paste bug

In apis/rust/node/src/node/mod.rs:

/// Allows sending outputs and retrieving node information.
///
/// The main purpose of this struct is to send outputs via Dora. There are also functions available
/// for retrieving the node configuration.
/// A direct TCP connection to a receiver node for small message delivery.
struct DirectConnection {

The first three lines belong to DoraNode below and got duplicated. Delete them.

7. Unbounded thread spawning per accepted connection

direct_listener_loop spawns a new OS thread per accepted connection and never joins them. No upper bound, no cancellation on shutdown. A fan-in topology with many senders can create surprising thread counts. A small pool — or polling all connections from the listener thread with mio/tokio — would be safer.

8. Scheduler "fix" semantics changed silently

The unbounded try_recv drain in event_stream/mod.rs is replaced with a bounded 100-message drain followed by an unconditional break. The PR description frames this as a fairness fix for direct traffic, but the change applies to all event sources, including the existing daemon path. Worth a comment explaining the previous behavior was "drain everything until empty".

9. RegisterDirectListener errors are swallowed

If registration fails, the code logs a warning and continues with direct_listener = Some(listener). The listener will accept connections that no sender will ever know about. Either fail init, or drop the listener when registration fails.

10. Test coverage

The only test changes are stub responses in interactive.rs and node_integration_testing.rs. No test for: mixed direct/non-direct receivers (issue 1), startup-race ordering, large Vec going direct, or graceful shutdown. The smoke-test infrastructure (tests/example-smoke.rs) supports this — at minimum a multi-receiver dataflow exercising this code path would catch issue 1.


Minor

  • DirectMessage is re-exported at the crate root in libraries/message/src/lib.rs but elsewhere it's referenced via dora_message::DirectMessage / dora_message::node_to_node::DirectRouteInfo. Pick one.
  • bincode::serialize allocates per-send; consider serialize_into(&mut stream) to write directly to the TCP stream and avoid the intermediate Vec.
  • The new direct listener accepts any local connection without auth. Consistent with the existing daemon TCP socket on localhost, but worth a note in the architecture doc — anything that injects events into EventStream is a privilege boundary.

Summary

The biggest concern is issue 1 (silent data loss for non-direct receivers) combined with issue 2 (cross-message reordering) and issue 3 (no size threshold). Together they mean a node graph that works correctly today could, after this PR, drop or reorder messages depending on which receivers happen to register direct listeners. I'd hold the PR until those are addressed and there's at least one integration test covering a mixed-receiver topology.

@phil-opp
Copy link
Copy Markdown
Collaborator

phil-opp commented Apr 28, 2026

Claude Code in a Dev Container:

Ran the benchmark on this branch vs main (two iterations each, on a Linux x86_64 dev container).

Latency p50 (lower is better)

Size main (run 1 / run 2) PR (run 1 / run 2) Δ
0B 64.7µs / 71.7µs 51.2µs / 58.8µs ~-19%
8B 78.6µs / 67.3µs 40.9µs / 73.0µs ~-22%
64B 80.2µs / 81.0µs 52.6µs / 57.6µs ~-31%
512B 77.8µs / 89.6µs 43.2µs / 56.6µs ~-40%
2KB 96.4µs / 85.9µs 69.1µs / 51.3µs ~-34%

Direction matches the PR's hypothesis — removing the daemon hop saves ~25–35µs on the small-message path. The 412µs → 47µs number from the PR description doesn't reproduce here; my main baseline is already ~80µs, so the absolute saving is smaller. The 412µs figure likely comes from a slower / more contended machine.

Throughput msg/s (higher is better)

Size main (run 1 / run 2) PR (run 1 / run 2) note
0B 138k / 121k 103k / 98k ~25% regression
8B 109k / 157k 98k / 99k small regression / noise
64B 110k / 107k 109k / 102k flat
512B 106k / 107k 95k / 89k ~10% regression
2KB 70k / 78k 88k / 58k noisy / flat

Throughput on the smallest messages looks slightly worse on the PR — possibly because each send is now its own TCP write rather than batched through the daemon, or because of the new "prioritize non-input events" scheduling change. Might be worth a closer look or a confirmation that the trade-off is intended.

Side observation

`main` runs only reported 5 of 10 size brackets (latency drops at 2KB); the PR runs reported all 10. The sink prints a bracket when it sees the next bracket's data, so this suggests input-queue drops on `main` due to daemon load — and less daemon load on the PR path lets the sink keep up. Anecdotal but a nice secondary signal.

Two iterations isn't statistically tight (8B p50 varied 41µs vs 73µs across PR runs) — a tighter comparison would want 5+ runs on a quieter host.

haixuanTao and others added 2 commits May 25, 2026 10:39
Bypass the daemon for small messages (<4KB) by having sender nodes
connect directly to receiver nodes via TCP. This eliminates one TCP
hop from the data path, reducing latency from ~412µs to ~47µs (p50)
on Linux for 64-byte messages.

Large messages (>=4KB) continue through the daemon for shared memory
drop token tracking. Direct connections are only used for local
(loopback) receivers.

Key changes:
- New `node_to_node` message module with `DirectMessage` and `DirectRouteInfo`
- Receiver nodes bind a direct TCP listener and register it with daemon
- Sender nodes query routes from daemon and establish direct connections
- Daemon skips forwarding small Vec messages to direct-paired receivers
- Scheduler updated to not prioritize InputClosed over pending Input events

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
With direct node-to-node TCP connections continuously injecting data,
the try_recv loop in recv_async never breaks because new events are
always available. This prevents scheduler.next() from being called,
so Stop events sit in the scheduler queue and are never delivered.

Limit the drain to 100 events per recv_async call so the scheduler
can deliver prioritized non-input events (like Stop) in a timely manner.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@haixuanTao haixuanTao force-pushed the feat/direct-node-to-node-small-messages branch from 45c045e to 996f85f Compare May 25, 2026 13:41
@phil-opp
Copy link
Copy Markdown
Collaborator

Benchmark: Main vs PR 1621 (2026-05-27)

Ran the examples/benchmark dataflow on the same machine (Linux, Zenoh 1.8.0, release build), 3 interleaved iterations each. Since PR 1621 was written, #1787 landed on main — it routes all data via Zenoh
node-to-node (with express(true), lowlatency transport, callback subscribers), so the daemon is no longer in the data path.

Latency (p50, median of 3 runs)

Size Main (default, SHM ≥ 4 KB) Main (SHM = 0, all sizes) PR 1621 (direct TCP < 4 KB)
0 B 61.7 µs 82.5 µs 63.5 µs
8 B 72.0 µs 71.1 µs 60.7 µs
64 B 73.4 µs 58.3 µs 51.5 µs
512 B 58.5 µs 57.8 µs 48.4 µs
2 KB 56.9 µs 54.9 µs 52.3 µs
4 KB 73.5 µs 58.2 µs 95.5 µs
16 KB 71.2 µs 57.3 µs 106.0 µs
40 KB 63.8 µs 63.1 µs 107.7 µs
400 KB 91.8 µs 95.1 µs 108.8 µs
3 MB 277.8 µs 306.1 µs 314.2 µs

Key findings

  1. PR 1621 still wins for sub-2 KB messages — ~15-30 % lower p50 latency vs default main. The direct TCP path avoids Zenoh's framing/routing/callback overhead.

  2. PR 1621 regresses for ≥ 4 KB messages — 20-70 % higher latency. It predates node: route all data via zenoh, switch to callback subscribers #1787, so large messages still take the old daemon-mediated TCP path instead of Zenoh SHM zero-copy.

  3. Lowering the SHM threshold to 0 on main narrows the gap for small messages (58 µs vs 52 µs at 64 B) and eliminates the 4 KB "cliff" (first-SHM-alloc spike), but hurts 0 B payloads due to page-alignment overhead.

  4. Unix socket transport (unixsock-stream instead of tcp/127.0.0.1) was also tested on main — no measurable improvement. The bottleneck is Zenoh protocol overhead, not the kernel transport layer.

  5. Throughput at small sizes favours PR 1621 (~73 K vs ~47 K msg/s at 64 B), but PR 1621 shows instability at the 4 KB boundary (stalls in the throughput phase).

Conclusion

The original 412 µs → 47 µs motivation is gone — express(true) + lowlatency + callback subscribers (#1787) already brought main down to ~60 µs. PR 1621's direct TCP still shaves ~10-20 µs for small messages, but at the cost of a second transport path and
a regression for everything ≥ 4 KB. Rebasing on top of #1787 (so large messages still use Zenoh SHM) would be needed to avoid the regression, adding significant complexity for a modest gain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants