Skip to content

feature: service (request-reply pattern) implementation#1578

Closed
MinPoe wants to merge 7 commits into
dora-rs:mainfrom
MinPoe:main
Closed

feature: service (request-reply pattern) implementation#1578
MinPoe wants to merge 7 commits into
dora-rs:mainfrom
MinPoe:main

Conversation

@MinPoe
Copy link
Copy Markdown

@MinPoe MinPoe commented Mar 31, 2026

Closes #390

Summary

Implements the services (request-reply) pattern for dora-rs, allowing nodes and operators to perform synchronous request-reply communication built on top of the existing pub-sub infrastructure.

NOTE: The service implementation does not support operators to be a client as its on_event callback should not be blocked. This could be a future additional implementation (e.g., via an async callback or a separate on_service_reply handler).

Changes

YAML Dataflow Configuration

  • New services block on nodes and operators with type: server and type: client (with server: node_id/service_name reference)
  • JSON schema updated with ServiceEndpoint definition
  • YAML validation ensures client references point to valid server endpoints, including operator-level services

Rust Node API (apis/rust/node/)

  • DoraNode::send_request() -- blocking call that sends a request and waits for a matching reply using correlation IDs (UUID v7). Supports optional timeout. Events received while waiting are buffered and returned by subsequent recv() calls.
  • DoraNode::send_service_reply() -- sends a reply to a service request
  • Event::ServiceRequest variant for server-side event handling
  • ServiceReply struct returned by send_request()
  • recv_raw() and recv_raw_timeout() on EventStream to bypass the scheduler during blocking request calls

Python Node API (apis/python/node/)

  • node.send_request(service_name, data, metadata=None, timeout=None) -- blocking, returns dict with value and metadata
  • node.send_service_reply(service_name, data, metadata=None)
  • SERVICE_REQUEST event type delivered to the Python event loop
  • Type stubs updated in __init__.pyi

Daemon (binaries/daemon/)

  • Synthetic __service_request_ and __service_reply_ channels auto-created from service declarations
  • node_inputs() injects synthetic inputs for both custom nodes and operators
  • CoreNodeKindExt::run_config() injects synthetic outputs for both custom nodes and operators
  • Service mappings translate between client-side and server-side channel names (supporting operator-prefixed names)
  • DaemonRequest::SendServiceReply added to the control channel
  • send_output_to_local_receivers() routes service messages through dedicated mapping tables

Event Stream (apis/rust/node/src/event_stream/)

  • event_stream_loop converts incoming __service_request_* inputs into NodeEvent::ServiceRequest events with the prefix stripped
  • convert_event_item handles NodeEvent::ServiceRequest to Event::ServiceRequest conversion

Runtime / Operator Support (binaries/runtime/)

  • ServiceRequest events routed to operators with operator prefix stripped
  • SendServiceReplyCallback added to the Python operator event dict as send_service_reply key (backward compatible, no signature change to on_event)
  • OperatorEvent::ServiceReply forwarded through the runtime to the daemon with correct output id formatting

Examples (examples/request-reply/)

  • dataflow.yml -- custom node service example
  • dataflow_operator.yml -- operator service example
  • requester_node.py -- client node testing data types (int64, float64, uint8, string, bool, int32, empty), event buffering, and timeout
  • service_node.py -- custom node echo server
  • service_operator.py -- operator echo server

Testing

Both dataflows pass all tests:

  • 7 Arrow data types round-tripped correctly
  • Event buffering verified (ticks not lost during blocking requests)
  • Timeout on unanswered requests verified
  • Custom node and operator server paths both validated

@heyong4725
Copy link
Copy Markdown
Collaborator

Thanks for this PR @MinPoe, and for the thorough write-up — the operator-as-client constraint you flagged (operators can't block in on_event, so they can only serve, not call) is exactly the kind of design observation that's hard to spot until you've actually built the thing. Saving that finding for the record below.

Closing this PR because the service (request-reply) pattern has shipped in main, but via a structurally different design than this PR proposes. Issue #390 was closed when that work landed.

Design comparison

Aspect This PR Main (shipped)
YAML surface new services block with type: server / type: client none — standard inputs: / outputs:
Channel mechanism synthetic __service_request_* / __service_reply_* channels + daemon routing tables regular pub/sub with a REQUEST_ID metadata key
Event delivery new Event::ServiceRequest variant + SERVICE_REQUEST Python event type regular Event::Input carrying request_id in metadata
Client API send_request()blocking, buffers events during the wait send_service_request()non-blocking, returns the request_id
Reply matching implicit (via the blocking wait) explicit (caller maintains a HashMap<request_id, pending>)
Operator client support not supported (by design — on_event can't block) supported (no blocking)
Daemon plumbing substantial (synthetic input injection, service mapping tables) none — pub/sub handles it

The shipped design is documented at docs/patterns.md and there's a Rust + Python example at examples/service-example/.

Why the design diverged

Main's design is deliberately minimal — no new event variants, no synthetic channels, no daemon-level service mapping tables. The service pattern is just pub/sub with a REQUEST_ID metadata key that callers correlate on. That keeps the daemon's input/output graph homogeneous and lets the same wire format carry topic, service, action, and streaming patterns (see docs/patterns.md).

Your design's main differentiator is synchronous send_request — caller blocks until the reply lands, with events that arrive in the meantime buffered for the next recv(). That's genuinely ergonomic for simple request-reply, but the dora team chose the non-blocking path because:

  1. Blocking in a node's event loop is risky in dataflow systems. A blocked node can't react to Stop, can't process upstream back-pressure signals, and can't observe input-closure events from other producers. Buffering papers over the symptom but doesn't fix the root cause — the node is still effectively offline for the duration of the request.
  2. Reordering across buffered events. Events received during the blocking wait are returned in arrival order, but they were delayed relative to the request-reply pair. For some use cases (timer ticks, watchdog signals) the delay matters; for others it doesn't. The non-blocking model lets the user decide per call.
  3. The operator-as-client gap your PR acknowledges is fundamental to the blocking approach — you can't extend it to operators without also rewriting on_event to be async, which has its own cost. The non-blocking approach works for operators out of the box.

Neither design is "right"; they're different points on the latency-vs-control trade-off curve. Main happened to land on the non-blocking point of that curve.

What's still valuable from this PR

  • The operator-as-client gotcha you documented in the PR description is a durable finding. If anyone ever wants to add a send_request_async helper to dora, they'll need to confront exactly this problem; preserving your note here means they won't rediscover it from scratch.
  • The request_id-as-UUID-v7 choice matches what main shipped (Self::new_request_id at apis/rust/node/src/node/mod.rs:1244 returns UUID v7). Convergent insight; nice that both designs landed on the same correlation ID format.
  • The synchronous-API ergonomics argument is real. Even though main intentionally doesn't ship a blocking wrapper, anyone building a higher-level Python framework on top of dora could write a send_request_blocking helper as a 30-line user-space wrapper around send_service_request + event-loop polling. Worth a docs recipe if anyone wants to file one.

Won't-apply note

The PR's daemon plumbing (synthetic channel injection in node_inputs() / run_config()) is against a daemon surface that was substantially refactored in the dora 1.0 consolidation (#1820 and many follow-ups). Even if the design were aligned, the PR wouldn't rebase cleanly — most of the daemon changes would need to be rewritten.

Thanks again for the contribution and especially for the issue-level thinking. Apologies for the slow turn-around. If you have follow-up ideas for the service pattern (a docs recipe for blocking wrappers, a typed-service API on top of the existing metadata-based one, or an explicit critique of main's design that you think should change it), please feel free to open a fresh discussion or PR against current main.


🤖 Closing comment from Claude on behalf of the maintainers as part of the backlog triage pass.

@heyong4725 heyong4725 closed this May 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Adding a simple services implementation

2 participants