feature: service (request-reply pattern) implementation#1578
Conversation
…Ext), test nodes tweaks
…, but basic pipeline setup test succeeded)
…ted example test to test timeout implementation
…w, blocking for client operator seems counterproductive), implemented tests for operators too
|
Thanks for this PR @MinPoe, and for the thorough write-up — the operator-as-client constraint you flagged (operators can't block in Closing this PR because the service (request-reply) pattern has shipped in main, but via a structurally different design than this PR proposes. Issue #390 was closed when that work landed. Design comparison
The shipped design is documented at Why the design divergedMain's design is deliberately minimal — no new event variants, no synthetic channels, no daemon-level service mapping tables. The service pattern is just pub/sub with a Your design's main differentiator is synchronous
Neither design is "right"; they're different points on the latency-vs-control trade-off curve. Main happened to land on the non-blocking point of that curve. What's still valuable from this PR
Won't-apply noteThe PR's daemon plumbing (synthetic channel injection in Thanks again for the contribution and especially for the issue-level thinking. Apologies for the slow turn-around. If you have follow-up ideas for the service pattern (a docs recipe for blocking wrappers, a typed-service API on top of the existing metadata-based one, or an explicit critique of main's design that you think should change it), please feel free to open a fresh discussion or PR against current main. 🤖 Closing comment from Claude on behalf of the maintainers as part of the backlog triage pass. |
Closes #390
Summary
Implements the services (request-reply) pattern for dora-rs, allowing nodes and operators to perform synchronous request-reply communication built on top of the existing pub-sub infrastructure.
NOTE: The service implementation does not support operators to be a client as its on_event callback should not be blocked. This could be a future additional implementation (e.g., via an async callback or a separate on_service_reply handler).
Changes
YAML Dataflow Configuration
servicesblock on nodes and operators withtype: serverandtype: client(withserver: node_id/service_namereference)ServiceEndpointdefinitionRust Node API (
apis/rust/node/)DoraNode::send_request()-- blocking call that sends a request and waits for a matching reply using correlation IDs (UUID v7). Supports optional timeout. Events received while waiting are buffered and returned by subsequentrecv()calls.DoraNode::send_service_reply()-- sends a reply to a service requestEvent::ServiceRequestvariant for server-side event handlingServiceReplystruct returned bysend_request()recv_raw()andrecv_raw_timeout()onEventStreamto bypass the scheduler during blocking request callsPython Node API (
apis/python/node/)node.send_request(service_name, data, metadata=None, timeout=None)-- blocking, returns dict withvalueandmetadatanode.send_service_reply(service_name, data, metadata=None)SERVICE_REQUESTevent type delivered to the Python event loop__init__.pyiDaemon (
binaries/daemon/)__service_request_and__service_reply_channels auto-created from service declarationsnode_inputs()injects synthetic inputs for both custom nodes and operatorsCoreNodeKindExt::run_config()injects synthetic outputs for both custom nodes and operatorsDaemonRequest::SendServiceReplyadded to the control channelsend_output_to_local_receivers()routes service messages through dedicated mapping tablesEvent Stream (
apis/rust/node/src/event_stream/)event_stream_loopconverts incoming__service_request_*inputs intoNodeEvent::ServiceRequestevents with the prefix strippedconvert_event_itemhandlesNodeEvent::ServiceRequesttoEvent::ServiceRequestconversionRuntime / Operator Support (
binaries/runtime/)ServiceRequestevents routed to operators with operator prefix strippedSendServiceReplyCallbackadded to the Python operator event dict assend_service_replykey (backward compatible, no signature change toon_event)OperatorEvent::ServiceReplyforwarded through the runtime to the daemon with correct output id formattingExamples (
examples/request-reply/)dataflow.yml-- custom node service exampledataflow_operator.yml-- operator service examplerequester_node.py-- client node testing data types (int64, float64, uint8, string, bool, int32, empty), event buffering, and timeoutservice_node.py-- custom node echo serverservice_operator.py-- operator echo serverTesting
Both dataflows pass all tests: