diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index fac2be183b..7dc261f2bb 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -9,12 +9,13 @@ use crate::ffi::MetadataValueType; use chrono::DateTime; use dora_node_api::{ - self, Event, EventStream, Metadata as DoraMetadata, - MetadataParameters as DoraMetadataParameters, Parameter as DoraParameter, TryRecvError, + self, arrow::array::{AsArray, UInt8Array}, merged::{MergeExternal, MergedEvent}, + Event, EventStream, Metadata as DoraMetadata, MetadataParameters as DoraMetadataParameters, + Parameter as DoraParameter, TryRecvError, }; -use eyre::{Result as EyreResult, bail, eyre}; +use eyre::{bail, eyre, Result as EyreResult}; use serde::Serialize; use serde_json::Value as JsonValue; @@ -24,7 +25,7 @@ pub use prelude::*; pub mod prelude { pub use dora_ros2_bridge::prelude::*; } -use futures_lite::{Stream, StreamExt, stream}; +use futures_lite::{stream, Stream, StreamExt}; #[cxx::bridge] #[allow(clippy::needless_lifetimes)] @@ -49,6 +50,15 @@ mod ffi { /// `next_event_timeout` returned without an event because the /// caller-supplied deadline elapsed first. Timeout, + /// An upstream node has failed. Use `event_as_node_failed` to + /// extract the failed node id, the error message, and the list + /// of downstream inputs that will stop receiving data. + NodeFailed, + /// Hot-reload notification for an operator. Operator id is + /// available via the daemon's reload protocol; this variant + /// exists so C++ nodes can react to reloads (e.g. flushing + /// caches) rather than treating them as `Unknown`. + Reload, } struct DoraInput { @@ -60,6 +70,17 @@ mod ffi { error: String, } + /// Payload of a `DoraEventType::NodeFailed` event. + struct DoraNodeFailed { + /// Inputs on this node that will stop receiving data because + /// the upstream node failed. + affected_input_ids: Vec, + /// Human-readable error message from the failed node. + error: String, + /// Id of the node that failed. + source_node_id: String, + } + struct ArrowInputInfo { id: String, metadata: Box, @@ -123,6 +144,15 @@ mod ffi { fn drained_events_next(drained: &mut Box) -> Box; fn event_type(event: &Box) -> DoraEventType; fn event_as_input(event: Box) -> Result; + /// Extract the failure payload from a `NodeFailed` event. + fn event_as_node_failed(event: Box) -> Result; + /// Selectively close one or more of this node's outputs without + /// shutting the whole node down. Subsequent downstream + /// subscribers see the corresponding `InputClosed` event. + fn close_outputs( + output_sender: &mut Box, + output_ids: Vec, + ) -> DoraResult; fn send_output( output_sender: &mut Box, id: String, @@ -359,6 +389,8 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType { Event::Input { .. } => ffi::DoraEventType::Input, Event::InputClosed { .. } => ffi::DoraEventType::InputClosed, Event::Error(_) => ffi::DoraEventType::Error, + Event::NodeFailed { .. } => ffi::DoraEventType::NodeFailed, + Event::Reload { .. } => ffi::DoraEventType::Reload, _ => ffi::DoraEventType::Unknown, }, EventOrReason::Closed => ffi::DoraEventType::AllInputsClosed, @@ -391,6 +423,57 @@ fn event_as_input(event: Box) -> eyre::Result { }) } +fn event_as_node_failed(event: Box) -> eyre::Result { + let EventOrReason::Event(Event::NodeFailed { + affected_input_ids, + error, + source_node_id, + }) = event.0 + else { + bail!("not a NodeFailed event"); + }; + Ok(ffi::DoraNodeFailed { + affected_input_ids: affected_input_ids + .into_iter() + .map(|id| id.to_string()) + .collect(), + error, + source_node_id: source_node_id.to_string(), + }) +} + +fn close_outputs( + output_sender: &mut Box, + output_ids: Vec, +) -> ffi::DoraResult { + // Parse via `FromStr` instead of `From` so invalid IDs + // surface as a `DoraResult.error` rather than panicking across + // the cxx::bridge. `DataId::from(String)` is documented as + // panicking on invalid characters (see + // `libraries/message/src/id.rs:186`, `# Panics`); calling it on + // caller-supplied input would mean a typo in a C++ string literal + // aborts the whole node. + let mut ids = Vec::with_capacity(output_ids.len()); + for id in output_ids { + match id.parse::() { + Ok(parsed) => ids.push(parsed), + Err(e) => { + return ffi::DoraResult { + error: format!("invalid output id '{id}': {e}"), + }; + } + } + } + match output_sender.0.close_outputs(ids) { + Ok(()) => ffi::DoraResult { + error: String::new(), + }, + Err(err) => ffi::DoraResult { + error: format!("{err:?}"), + }, + } +} + unsafe fn event_as_arrow_input( event: Box, out_array: *mut u8, diff --git a/docs/api-cxx.md b/docs/api-cxx.md index 2d9f0a1cf4..685f407366 100644 --- a/docs/api-cxx.md +++ b/docs/api-cxx.md @@ -51,9 +51,44 @@ rust::Box next_event(rust::Box& events); Both forms block until the next event arrives and return an owned `DoraEvent`. +### Non-blocking and timed receive + +For nodes that need to do work between events, or react to deadlines, the `Events` stream also offers non-blocking and timed variants of `next_event()`: + +```cpp +// Block up to `timeout_ms` milliseconds for the next event. Returns +// an event with `event_type() == Timeout` if the deadline elapses +// before one arrives, or `AllInputsClosed` if the stream closed. +rust::Box next_event_timeout(rust::Box& events, uint64_t timeout_ms); + +// Non-blocking poll. Returns `event_type() == Empty` if no event is +// immediately available, or `AllInputsClosed` if the stream is closed. +rust::Box try_next_event(rust::Box& events); + +// Hint: true when the event queue is currently empty. Treat as +// advisory only -- the daemon can produce a new event between the +// check and a subsequent receive. +bool events_is_empty(const rust::Box& events); +``` + +For draining buffered events into a snapshot (useful when you want to process a batch all at once without racing the daemon): + +```cpp +// Take a snapshot of all currently-buffered events. Subsequent +// `next_event` / `try_next_event` calls only see events that arrive +// after this point. +rust::Box drain_events(rust::Box& events); + +size_t drained_events_len(const rust::Box& drained); + +// Pop the next event from a drained snapshot. Returns +// `event_type() == Empty` once the snapshot is exhausted. +rust::Box drained_events_next(rust::Box& drained); +``` + ### DoraEvent -Opaque Rust type. Inspect its kind with `event_type()`, then downcast with `event_as_input()` or `event_as_arrow_input()`. +Opaque Rust type. Inspect its kind with `event_type()`, then downcast with one of the variant extractors below. ```cpp // Determine the event kind. @@ -75,6 +110,9 @@ ArrowInputInfo event_as_arrow_input_with_info( rust::Box event, uint8_t* out_array, uint8_t* out_schema); + +// Downcast to a NodeFailed payload. Throws if the event is not NodeFailed. +DoraNodeFailed event_as_node_failed(rust::Box event); ``` ### DoraEventType @@ -87,9 +125,15 @@ enum class DoraEventType : uint8_t { Error, // an error occurred Unknown, // unrecognized event variant AllInputsClosed, // all inputs closed (stream ended) + Empty, // try_next_event / drained_events_next found no event ready + Timeout, // next_event_timeout deadline elapsed without an event + NodeFailed, // an upstream node failed (use event_as_node_failed to inspect) + Reload, // hot-reload notification for an operator }; ``` +`Empty` is distinct from `Timeout`: `Empty` means "no event available right now" (the caller did not request a timeout); `Timeout` means "the caller-supplied deadline elapsed before an event arrived". A non-blocking poll never returns `Timeout`; a timed receive returns either `Timeout` or `AllInputsClosed` if no event arrived in time. + ### DoraInput Returned by `event_as_input()`. Contains raw bytes. @@ -101,6 +145,29 @@ struct DoraInput { }; ``` +### DoraNodeFailed + +Returned by `event_as_node_failed()`. Carries the failure information from an upstream node that exited unexpectedly. Use it to log the cause, flush downstream state, or trigger graceful shutdown. + +```cpp +struct DoraNodeFailed { + rust::Vec affected_input_ids; // inputs on this node that will stop receiving data + rust::String error; // human-readable error message from the failed node + rust::String source_node_id; // id of the node that failed +}; +``` + +Example: + +```cpp +auto event = next_event(dora_node.events); +if (event_type(event) == DoraEventType::NodeFailed) { + auto failed = event_as_node_failed(std::move(event)); + std::cerr << "Upstream node `" << std::string(failed.source_node_id) + << "` failed: " << std::string(failed.error) << std::endl; +} +``` + ### ArrowInputInfo Returned by `event_as_arrow_input_with_info()`. Contains the input ID, metadata, and an error string. @@ -170,6 +237,18 @@ DoraResult send_arrow_output( rust::Box metadata); ``` +#### close_outputs + +Selectively close one or more of this node's outputs without shutting the whole node down. Downstream subscribers see the corresponding `InputClosed` event for each closed output. + +```cpp +DoraResult close_outputs( + rust::Box& sender, + rust::Vec output_ids); +``` + +`output_ids` are validated as `DataId`s; an invalid id (e.g. one containing characters not allowed by the data-id grammar) returns a `DoraResult` whose `error` names the offending id and the underlying validation message, instead of aborting the process. + #### log_message Send a log message through the Dora logging system. diff --git a/examples/c++-dataflow/node-rust-api/main.cc b/examples/c++-dataflow/node-rust-api/main.cc index 9407650a66..93c19a6bd2 100644 --- a/examples/c++-dataflow/node-rust-api/main.cc +++ b/examples/c++-dataflow/node-rust-api/main.cc @@ -38,6 +38,16 @@ int main() return -1; } } + else if (ty == DoraEventType::NodeFailed) + { + auto failed = event_as_node_failed(std::move(event)); + std::cerr << "Node failed: source=" << std::string(failed.source_node_id) + << " error=" << std::string(failed.error) << std::endl; + } + else if (ty == DoraEventType::Reload) + { + std::cout << "Reload event received" << std::endl; + } else { std::cerr << "Unknown event type " << static_cast(ty) << std::endl;