feat: direct node-to-node TCP for small messages#1621
Conversation
…ization Python: 770us -> 440us (median, 4KB) Rust: 650us -> 380us (median, 4KB) C++: ~unchanged (still routes through arrow FFI + daemon) Tested with dora-rs/dora#1621 (feat/direct-node-to-node-small-messages). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@phil-opp I think it's slightly different from #1378 because this is really just about the not shared memory message ( below threshold ) going directly through tcp to the receiver without any middleware. It doesn't touch the shared memory layer. Zenoh data does look good though. We can even keep everything on the stack for those messages. I run test on iox2 and iox2 which is used by zenoh ( as I heard ) could be even faster on performance only metrics. |
|
What I meant is that #1378 routes all data messages independent of the deamon. So it also bypasses the deamon for small messages, just like this PR. Shared memory is only used for bigger messages. |
|
@phil-opp ah ok i see, i'm not against keeping the copilot version with zenoh then. it's a bit confusing that the message definition still lies in daemon_to_node.rs also i can't find the part where the daemon exchange socketaddr between nodes but maybe it's somewhere within it? |
Code Review (correctness focus)Posted by Claude Code on behalf of the reviewer. OverviewAdds a fast path for sender→receiver loopback TCP, bypassing the daemon for small Correctness issues (blocking)1. Mixed direct + non-direct receivers are starved — data lossIn let sent_directly = if let Some(DataMessage::Vec(ref vec_data)) = data {
if let Some(connections) = self.direct_connections.get_mut(&output_id) {
// ... iterate over direct connections only
all_ok
} else { false }
} else { false };
if !sent_directly && !zenoh_published {
self.control_channel.send_message(...)
}
When at least one direct connection exists but other receivers don't, the sender skips Fix: fall through to the daemon whenever any receiver of that 2. Per-stream reordering between small and large messagesA node that emits both At minimum this needs to be documented as a constraint; ideally the direct path should be disabled (or messages sequenced) when an output ever produces SHM data. 3. No size threshold despite PR descriptionThe PR title/description say "<4KB" / "small messages", but the code never checks size: let sent_directly = if let Some(DataMessage::Vec(ref vec_data)) = data {
if let Some(connections) = self.direct_connections.get_mut(&output_id) {A 50 MB 4. Startup race silently degrades to daemon pathThe sender queries Not data-loss (assuming issue 1 above is fixed), but the optimization becomes non-deterministic. The PR description's step 3 ("After all nodes are ready, sender queries the daemon") is not actually enforced in code. Other issues5. Hacky 1 ms sleep on Dropfor conn in conns { let _ = conn.stream.shutdown(std::net::Shutdown::Write); }
std::thread::sleep(std::time::Duration::from_millis(1));
self.direct_connections.clear();
6. Doc-comment copy-paste bugIn /// Allows sending outputs and retrieving node information.
///
/// The main purpose of this struct is to send outputs via Dora. There are also functions available
/// for retrieving the node configuration.
/// A direct TCP connection to a receiver node for small message delivery.
struct DirectConnection {The first three lines belong to 7. Unbounded thread spawning per accepted connection
8. Scheduler "fix" semantics changed silentlyThe unbounded 9.
|
|
Claude Code in a Dev Container: Ran the benchmark on this branch vs Latency p50 (lower is better)
Direction matches the PR's hypothesis — removing the daemon hop saves ~25–35µs on the small-message path. The 412µs → 47µs number from the PR description doesn't reproduce here; my Throughput msg/s (higher is better)
Throughput on the smallest messages looks slightly worse on the PR — possibly because each send is now its own TCP write rather than batched through the daemon, or because of the new "prioritize non-input events" scheduling change. Might be worth a closer look or a confirmation that the trade-off is intended. Side observation`main` runs only reported 5 of 10 size brackets (latency drops at 2KB); the PR runs reported all 10. The sink prints a bracket when it sees the next bracket's data, so this suggests input-queue drops on `main` due to daemon load — and less daemon load on the PR path lets the sink keep up. Anecdotal but a nice secondary signal. Two iterations isn't statistically tight (8B p50 varied 41µs vs 73µs across PR runs) — a tighter comparison would want 5+ runs on a quieter host. |
Bypass the daemon for small messages (<4KB) by having sender nodes connect directly to receiver nodes via TCP. This eliminates one TCP hop from the data path, reducing latency from ~412µs to ~47µs (p50) on Linux for 64-byte messages. Large messages (>=4KB) continue through the daemon for shared memory drop token tracking. Direct connections are only used for local (loopback) receivers. Key changes: - New `node_to_node` message module with `DirectMessage` and `DirectRouteInfo` - Receiver nodes bind a direct TCP listener and register it with daemon - Sender nodes query routes from daemon and establish direct connections - Daemon skips forwarding small Vec messages to direct-paired receivers - Scheduler updated to not prioritize InputClosed over pending Input events Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
With direct node-to-node TCP connections continuously injecting data, the try_recv loop in recv_async never breaks because new events are always available. This prevents scheduler.next() from being called, so Stop events sit in the scheduler queue and are never delivered. Limit the drain to 100 events per recv_async call so the scheduler can deliver prioritized non-input events (like Stop) in a timely manner. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
45c045e to
996f85f
Compare
Benchmark: Main vs PR 1621 (2026-05-27)Ran the Latency (p50, median of 3 runs)
Key findings
ConclusionThe original 412 µs → 47 µs motivation is gone — |
Summary
Benchmark results (64-byte payload, 100 samples)
How it works
Key changes
Test plan