From 5b39f72fa1394544c9b585d2be337b683c63b250 Mon Sep 17 00:00:00 2001 From: Yong Date: Sun, 17 May 2026 15:15:31 -0500 Subject: [PATCH 1/3] feat(c++/node): add close_outputs, NodeFailed and Reload event types Rescue of #1410 (PavelGuzenfeld). Second in the 4-PR rescue stack for the C++ API parity work; stacked on top of #1846 (event receive variants). New FFI surface =============== * Two new `DoraEventType` variants: - `NodeFailed` -- an upstream node has failed. Inputs that depended on the failed node will stop receiving data; the payload (via `event_as_node_failed`) names the failed node, the error message, and the affected input ids. - `Reload` -- hot-reload notification for an operator. C++ nodes can now react (e.g. flush caches) instead of treating it as `Unknown`. * `event_as_node_failed(event) -> DoraNodeFailed` -- extracts `{ affected_input_ids, error, source_node_id }` from a NodeFailed event. * `close_outputs(sender, output_ids)` -- selectively close one or more of this node's outputs without shutting the whole node down. Downstream subscribers see the corresponding `InputClosed` event. Implementation notes ==================== * `event_as_node_failed` destructures via `EventOrReason::Event(...)` rather than the original PR's `event.event` (which assumed the named-field DoraEvent shape from #1409 that we never adopted). * `close_outputs` returns `DoraResult` like the existing `send_output` family for ergonomic consistency on the C++ side. Closes #1410. Co-Authored-By: Pavel Guzenfeld Co-Authored-By: Claude Opus 4.7 (1M context) --- apis/c++/node/src/lib.rs | 75 +++++++++++++++++++-- examples/c++-dataflow/node-rust-api/main.cc | 10 +++ 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index fac2be183..f7481efcb 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -9,12 +9,13 @@ use crate::ffi::MetadataValueType; use chrono::DateTime; use dora_node_api::{ - self, Event, EventStream, Metadata as DoraMetadata, - MetadataParameters as DoraMetadataParameters, Parameter as DoraParameter, TryRecvError, + self, arrow::array::{AsArray, UInt8Array}, merged::{MergeExternal, MergedEvent}, + Event, EventStream, Metadata as DoraMetadata, MetadataParameters as DoraMetadataParameters, + Parameter as DoraParameter, TryRecvError, }; -use eyre::{Result as EyreResult, bail, eyre}; +use eyre::{bail, eyre, Result as EyreResult}; use serde::Serialize; use serde_json::Value as JsonValue; @@ -24,7 +25,7 @@ pub use prelude::*; pub mod prelude { pub use dora_ros2_bridge::prelude::*; } -use futures_lite::{Stream, StreamExt, stream}; +use futures_lite::{stream, Stream, StreamExt}; #[cxx::bridge] #[allow(clippy::needless_lifetimes)] @@ -49,6 +50,15 @@ mod ffi { /// `next_event_timeout` returned without an event because the /// caller-supplied deadline elapsed first. Timeout, + /// An upstream node has failed. Use `event_as_node_failed` to + /// extract the failed node id, the error message, and the list + /// of downstream inputs that will stop receiving data. + NodeFailed, + /// Hot-reload notification for an operator. Operator id is + /// available via the daemon's reload protocol; this variant + /// exists so C++ nodes can react to reloads (e.g. flushing + /// caches) rather than treating them as `Unknown`. + Reload, } struct DoraInput { @@ -60,6 +70,17 @@ mod ffi { error: String, } + /// Payload of a `DoraEventType::NodeFailed` event. + struct DoraNodeFailed { + /// Inputs on this node that will stop receiving data because + /// the upstream node failed. + affected_input_ids: Vec, + /// Human-readable error message from the failed node. + error: String, + /// Id of the node that failed. + source_node_id: String, + } + struct ArrowInputInfo { id: String, metadata: Box, @@ -123,6 +144,15 @@ mod ffi { fn drained_events_next(drained: &mut Box) -> Box; fn event_type(event: &Box) -> DoraEventType; fn event_as_input(event: Box) -> Result; + /// Extract the failure payload from a `NodeFailed` event. + fn event_as_node_failed(event: Box) -> Result; + /// Selectively close one or more of this node's outputs without + /// shutting the whole node down. Subsequent downstream + /// subscribers see the corresponding `InputClosed` event. + fn close_outputs( + output_sender: &mut Box, + output_ids: Vec, + ) -> DoraResult; fn send_output( output_sender: &mut Box, id: String, @@ -359,6 +389,8 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType { Event::Input { .. } => ffi::DoraEventType::Input, Event::InputClosed { .. } => ffi::DoraEventType::InputClosed, Event::Error(_) => ffi::DoraEventType::Error, + Event::NodeFailed { .. } => ffi::DoraEventType::NodeFailed, + Event::Reload { .. } => ffi::DoraEventType::Reload, _ => ffi::DoraEventType::Unknown, }, EventOrReason::Closed => ffi::DoraEventType::AllInputsClosed, @@ -391,6 +423,41 @@ fn event_as_input(event: Box) -> eyre::Result { }) } +fn event_as_node_failed(event: Box) -> eyre::Result { + let EventOrReason::Event(Event::NodeFailed { + affected_input_ids, + error, + source_node_id, + }) = event.0 + else { + bail!("not a NodeFailed event"); + }; + Ok(ffi::DoraNodeFailed { + affected_input_ids: affected_input_ids + .into_iter() + .map(|id| id.to_string()) + .collect(), + error, + source_node_id: source_node_id.to_string(), + }) +} + +fn close_outputs( + output_sender: &mut Box, + output_ids: Vec, +) -> ffi::DoraResult { + let ids: Vec = + output_ids.into_iter().map(Into::into).collect(); + match output_sender.0.close_outputs(ids) { + Ok(()) => ffi::DoraResult { + error: String::new(), + }, + Err(err) => ffi::DoraResult { + error: format!("{err:?}"), + }, + } +} + unsafe fn event_as_arrow_input( event: Box, out_array: *mut u8, diff --git a/examples/c++-dataflow/node-rust-api/main.cc b/examples/c++-dataflow/node-rust-api/main.cc index 9407650a6..93c19a6bd 100644 --- a/examples/c++-dataflow/node-rust-api/main.cc +++ b/examples/c++-dataflow/node-rust-api/main.cc @@ -38,6 +38,16 @@ int main() return -1; } } + else if (ty == DoraEventType::NodeFailed) + { + auto failed = event_as_node_failed(std::move(event)); + std::cerr << "Node failed: source=" << std::string(failed.source_node_id) + << " error=" << std::string(failed.error) << std::endl; + } + else if (ty == DoraEventType::Reload) + { + std::cout << "Reload event received" << std::endl; + } else { std::cerr << "Unknown event type " << static_cast(ty) << std::endl; From 41d9b43a03269c084bd393eda85aa3f80d409676 Mon Sep 17 00:00:00 2001 From: Yong Date: Sun, 17 May 2026 16:01:22 -0500 Subject: [PATCH 2/3] fix(c++/node): parse close_outputs ids via FromStr to avoid FFI panic Review finding on #1847 head 5b39f72f: `close_outputs` converted caller-supplied `Vec` to `Vec` via `Into::into`, which calls `From for DataId` -- documented as panicking on invalid characters at `libraries/message/src/id.rs:186` with an explicit `# Panics` notice. A typo in a C++ string literal would have aborted the whole node instead of returning `DoraResult.error` like the rest of the C++ API surface does. Same panic-across-cxx::bridge category as the `Instant::now() + Duration::from_millis(u64::MAX)` overflow on #1846 -- a public C++ API accepting unconstrained input must be panic-safe for the entire input domain. Fix: parse each id via `id.parse::()` (`impl FromStr`, which is the safe path documented at `libraries/message/src/id.rs:167`) and on the first parse error return a `DoraResult` whose `error` names the offending id and the underlying validation error. The well-formed-id case is unchanged; only the previously- panicking error path now surfaces cleanly. Pattern bank update for me (second in this PR series): "public API accepts type T via FFI" + "T has a `From` impl that panics on invalid input" = "use `FromStr` / `TryFrom`, never the panicking `From`". The standard library's `String::from` is total, so it doesn't share this gotcha, but dora's id types deliberately panic in `From` to keep the happy path concise -- which is the right call for trusted Rust call sites and the wrong call for FFI boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) --- apis/c++/node/src/lib.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index f7481efcb..7dc261f2b 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -446,8 +446,24 @@ fn close_outputs( output_sender: &mut Box, output_ids: Vec, ) -> ffi::DoraResult { - let ids: Vec = - output_ids.into_iter().map(Into::into).collect(); + // Parse via `FromStr` instead of `From` so invalid IDs + // surface as a `DoraResult.error` rather than panicking across + // the cxx::bridge. `DataId::from(String)` is documented as + // panicking on invalid characters (see + // `libraries/message/src/id.rs:186`, `# Panics`); calling it on + // caller-supplied input would mean a typo in a C++ string literal + // aborts the whole node. + let mut ids = Vec::with_capacity(output_ids.len()); + for id in output_ids { + match id.parse::() { + Ok(parsed) => ids.push(parsed), + Err(e) => { + return ffi::DoraResult { + error: format!("invalid output id '{id}': {e}"), + }; + } + } + } match output_sender.0.close_outputs(ids) { Ok(()) => ffi::DoraResult { error: String::new(), From 532a17a864be2549c984e11a4e1d7e0df71dca4c Mon Sep 17 00:00:00 2001 From: Yong Date: Sun, 17 May 2026 16:07:04 -0500 Subject: [PATCH 3/3] docs(api-cxx): document close_outputs, NodeFailed/Reload, event_as_node_failed Review residual: docs/api-cxx.md was stale for the new C++ node API surface this PR adds (close_outputs, NodeFailed, Reload, DoraNodeFailed, event_as_node_failed). Also catches up on the #1846 additions that merged earlier in the stack (Empty, Timeout, try_next_event, next_event_timeout, events_is_empty, drain_events, DrainedEvents) since the DoraEventType enum and the downcast-helper section can't be accurate without them. Additions ========= * New "Non-blocking and timed receive" subsection under Events documenting try_next_event, next_event_timeout, events_is_empty, and the drain_events / DrainedEvents family. * DoraEvent downcast helpers gain event_as_node_failed. * DoraEventType enum now lists all 10 variants currently on main (was 6) with one-line descriptions, plus a paragraph explaining the Empty vs Timeout distinction. * New DoraNodeFailed struct section after DoraInput, with a short usage example. * New OutputSender::close_outputs subsection, noting that ids are validated and return DoraResult.error on invalid input (matching the FromStr parse fix landed earlier on this PR). Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/api-cxx.md | 81 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/docs/api-cxx.md b/docs/api-cxx.md index 2d9f0a1cf..685f40736 100644 --- a/docs/api-cxx.md +++ b/docs/api-cxx.md @@ -51,9 +51,44 @@ rust::Box next_event(rust::Box& events); Both forms block until the next event arrives and return an owned `DoraEvent`. +### Non-blocking and timed receive + +For nodes that need to do work between events, or react to deadlines, the `Events` stream also offers non-blocking and timed variants of `next_event()`: + +```cpp +// Block up to `timeout_ms` milliseconds for the next event. Returns +// an event with `event_type() == Timeout` if the deadline elapses +// before one arrives, or `AllInputsClosed` if the stream closed. +rust::Box next_event_timeout(rust::Box& events, uint64_t timeout_ms); + +// Non-blocking poll. Returns `event_type() == Empty` if no event is +// immediately available, or `AllInputsClosed` if the stream is closed. +rust::Box try_next_event(rust::Box& events); + +// Hint: true when the event queue is currently empty. Treat as +// advisory only -- the daemon can produce a new event between the +// check and a subsequent receive. +bool events_is_empty(const rust::Box& events); +``` + +For draining buffered events into a snapshot (useful when you want to process a batch all at once without racing the daemon): + +```cpp +// Take a snapshot of all currently-buffered events. Subsequent +// `next_event` / `try_next_event` calls only see events that arrive +// after this point. +rust::Box drain_events(rust::Box& events); + +size_t drained_events_len(const rust::Box& drained); + +// Pop the next event from a drained snapshot. Returns +// `event_type() == Empty` once the snapshot is exhausted. +rust::Box drained_events_next(rust::Box& drained); +``` + ### DoraEvent -Opaque Rust type. Inspect its kind with `event_type()`, then downcast with `event_as_input()` or `event_as_arrow_input()`. +Opaque Rust type. Inspect its kind with `event_type()`, then downcast with one of the variant extractors below. ```cpp // Determine the event kind. @@ -75,6 +110,9 @@ ArrowInputInfo event_as_arrow_input_with_info( rust::Box event, uint8_t* out_array, uint8_t* out_schema); + +// Downcast to a NodeFailed payload. Throws if the event is not NodeFailed. +DoraNodeFailed event_as_node_failed(rust::Box event); ``` ### DoraEventType @@ -87,9 +125,15 @@ enum class DoraEventType : uint8_t { Error, // an error occurred Unknown, // unrecognized event variant AllInputsClosed, // all inputs closed (stream ended) + Empty, // try_next_event / drained_events_next found no event ready + Timeout, // next_event_timeout deadline elapsed without an event + NodeFailed, // an upstream node failed (use event_as_node_failed to inspect) + Reload, // hot-reload notification for an operator }; ``` +`Empty` is distinct from `Timeout`: `Empty` means "no event available right now" (the caller did not request a timeout); `Timeout` means "the caller-supplied deadline elapsed before an event arrived". A non-blocking poll never returns `Timeout`; a timed receive returns either `Timeout` or `AllInputsClosed` if no event arrived in time. + ### DoraInput Returned by `event_as_input()`. Contains raw bytes. @@ -101,6 +145,29 @@ struct DoraInput { }; ``` +### DoraNodeFailed + +Returned by `event_as_node_failed()`. Carries the failure information from an upstream node that exited unexpectedly. Use it to log the cause, flush downstream state, or trigger graceful shutdown. + +```cpp +struct DoraNodeFailed { + rust::Vec affected_input_ids; // inputs on this node that will stop receiving data + rust::String error; // human-readable error message from the failed node + rust::String source_node_id; // id of the node that failed +}; +``` + +Example: + +```cpp +auto event = next_event(dora_node.events); +if (event_type(event) == DoraEventType::NodeFailed) { + auto failed = event_as_node_failed(std::move(event)); + std::cerr << "Upstream node `" << std::string(failed.source_node_id) + << "` failed: " << std::string(failed.error) << std::endl; +} +``` + ### ArrowInputInfo Returned by `event_as_arrow_input_with_info()`. Contains the input ID, metadata, and an error string. @@ -170,6 +237,18 @@ DoraResult send_arrow_output( rust::Box metadata); ``` +#### close_outputs + +Selectively close one or more of this node's outputs without shutting the whole node down. Downstream subscribers see the corresponding `InputClosed` event for each closed output. + +```cpp +DoraResult close_outputs( + rust::Box& sender, + rust::Vec output_ids); +``` + +`output_ids` are validated as `DataId`s; an invalid id (e.g. one containing characters not allowed by the data-id grammar) returns a `DoraResult` whose `error` names the offending id and the underlying validation message, instead of aborting the process. + #### log_message Send a log message through the Dora logging system.