Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 87 additions & 4 deletions apis/c++/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use crate::ffi::MetadataValueType;

use chrono::DateTime;
use dora_node_api::{
self, Event, EventStream, Metadata as DoraMetadata,
MetadataParameters as DoraMetadataParameters, Parameter as DoraParameter, TryRecvError,
self,
arrow::array::{AsArray, UInt8Array},
merged::{MergeExternal, MergedEvent},
Event, EventStream, Metadata as DoraMetadata, MetadataParameters as DoraMetadataParameters,
Parameter as DoraParameter, TryRecvError,
};
use eyre::{Result as EyreResult, bail, eyre};
use eyre::{bail, eyre, Result as EyreResult};
use serde::Serialize;
use serde_json::Value as JsonValue;

Expand All @@ -24,7 +25,7 @@ pub use prelude::*;
pub mod prelude {
pub use dora_ros2_bridge::prelude::*;
}
use futures_lite::{Stream, StreamExt, stream};
use futures_lite::{stream, Stream, StreamExt};

#[cxx::bridge]
#[allow(clippy::needless_lifetimes)]
Expand All @@ -49,6 +50,15 @@ mod ffi {
/// `next_event_timeout` returned without an event because the
/// caller-supplied deadline elapsed first.
Timeout,
/// An upstream node has failed. Use `event_as_node_failed` to
/// extract the failed node id, the error message, and the list
/// of downstream inputs that will stop receiving data.
NodeFailed,
/// Hot-reload notification for an operator. Operator id is
/// available via the daemon's reload protocol; this variant
/// exists so C++ nodes can react to reloads (e.g. flushing
/// caches) rather than treating them as `Unknown`.
Reload,
}

struct DoraInput {
Expand All @@ -60,6 +70,17 @@ mod ffi {
error: String,
}

/// Payload of a `DoraEventType::NodeFailed` event.
struct DoraNodeFailed {
/// Inputs on this node that will stop receiving data because
/// the upstream node failed.
affected_input_ids: Vec<String>,
/// Human-readable error message from the failed node.
error: String,
/// Id of the node that failed.
source_node_id: String,
}

struct ArrowInputInfo {
id: String,
metadata: Box<Metadata>,
Expand Down Expand Up @@ -123,6 +144,15 @@ mod ffi {
fn drained_events_next(drained: &mut Box<DrainedEvents>) -> Box<DoraEvent>;
fn event_type(event: &Box<DoraEvent>) -> DoraEventType;
fn event_as_input(event: Box<DoraEvent>) -> Result<DoraInput>;
/// Extract the failure payload from a `NodeFailed` event.
fn event_as_node_failed(event: Box<DoraEvent>) -> Result<DoraNodeFailed>;
/// Selectively close one or more of this node's outputs without
/// shutting the whole node down. Subsequent downstream
/// subscribers see the corresponding `InputClosed` event.
fn close_outputs(
output_sender: &mut Box<OutputSender>,
output_ids: Vec<String>,
) -> DoraResult;
fn send_output(
output_sender: &mut Box<OutputSender>,
id: String,
Expand Down Expand Up @@ -359,6 +389,8 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType {
Event::Input { .. } => ffi::DoraEventType::Input,
Event::InputClosed { .. } => ffi::DoraEventType::InputClosed,
Event::Error(_) => ffi::DoraEventType::Error,
Event::NodeFailed { .. } => ffi::DoraEventType::NodeFailed,
Event::Reload { .. } => ffi::DoraEventType::Reload,
_ => ffi::DoraEventType::Unknown,
},
EventOrReason::Closed => ffi::DoraEventType::AllInputsClosed,
Expand Down Expand Up @@ -391,6 +423,57 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
})
}

fn event_as_node_failed(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraNodeFailed> {
let EventOrReason::Event(Event::NodeFailed {
affected_input_ids,
error,
source_node_id,
}) = event.0
else {
bail!("not a NodeFailed event");
};
Ok(ffi::DoraNodeFailed {
affected_input_ids: affected_input_ids
.into_iter()
.map(|id| id.to_string())
.collect(),
error,
source_node_id: source_node_id.to_string(),
})
}

fn close_outputs(
output_sender: &mut Box<OutputSender>,
output_ids: Vec<String>,
) -> ffi::DoraResult {
// Parse via `FromStr` instead of `From<String>` so invalid IDs
// surface as a `DoraResult.error` rather than panicking across
// the cxx::bridge. `DataId::from(String)` is documented as
// panicking on invalid characters (see
// `libraries/message/src/id.rs:186`, `# Panics`); calling it on
// caller-supplied input would mean a typo in a C++ string literal
// aborts the whole node.
let mut ids = Vec::with_capacity(output_ids.len());
for id in output_ids {
match id.parse::<dora_node_api::dora_core::config::DataId>() {
Ok(parsed) => ids.push(parsed),
Err(e) => {
return ffi::DoraResult {
error: format!("invalid output id '{id}': {e}"),
};
}
}
}
match output_sender.0.close_outputs(ids) {
Ok(()) => ffi::DoraResult {
error: String::new(),
},
Err(err) => ffi::DoraResult {
error: format!("{err:?}"),
},
}
}

unsafe fn event_as_arrow_input(
event: Box<DoraEvent>,
out_array: *mut u8,
Expand Down
81 changes: 80 additions & 1 deletion docs/api-cxx.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,44 @@ rust::Box<DoraEvent> next_event(rust::Box<Events>& events);

Both forms block until the next event arrives and return an owned `DoraEvent`.

### Non-blocking and timed receive

For nodes that need to do work between events, or react to deadlines, the `Events` stream also offers non-blocking and timed variants of `next_event()`:

```cpp
// Block up to `timeout_ms` milliseconds for the next event. Returns
// an event with `event_type() == Timeout` if the deadline elapses
// before one arrives, or `AllInputsClosed` if the stream closed.
rust::Box<DoraEvent> next_event_timeout(rust::Box<Events>& events, uint64_t timeout_ms);

// Non-blocking poll. Returns `event_type() == Empty` if no event is
// immediately available, or `AllInputsClosed` if the stream is closed.
rust::Box<DoraEvent> try_next_event(rust::Box<Events>& events);

// Hint: true when the event queue is currently empty. Treat as
// advisory only -- the daemon can produce a new event between the
// check and a subsequent receive.
bool events_is_empty(const rust::Box<Events>& events);
```

For draining buffered events into a snapshot (useful when you want to process a batch all at once without racing the daemon):

```cpp
// Take a snapshot of all currently-buffered events. Subsequent
// `next_event` / `try_next_event` calls only see events that arrive
// after this point.
rust::Box<DrainedEvents> drain_events(rust::Box<Events>& events);

size_t drained_events_len(const rust::Box<DrainedEvents>& drained);

// Pop the next event from a drained snapshot. Returns
// `event_type() == Empty` once the snapshot is exhausted.
rust::Box<DoraEvent> drained_events_next(rust::Box<DrainedEvents>& drained);
```

### DoraEvent

Opaque Rust type. Inspect its kind with `event_type()`, then downcast with `event_as_input()` or `event_as_arrow_input()`.
Opaque Rust type. Inspect its kind with `event_type()`, then downcast with one of the variant extractors below.

```cpp
// Determine the event kind.
Expand All @@ -75,6 +110,9 @@ ArrowInputInfo event_as_arrow_input_with_info(
rust::Box<DoraEvent> event,
uint8_t* out_array,
uint8_t* out_schema);

// Downcast to a NodeFailed payload. Throws if the event is not NodeFailed.
DoraNodeFailed event_as_node_failed(rust::Box<DoraEvent> event);
```

### DoraEventType
Expand All @@ -87,9 +125,15 @@ enum class DoraEventType : uint8_t {
Error, // an error occurred
Unknown, // unrecognized event variant
AllInputsClosed, // all inputs closed (stream ended)
Empty, // try_next_event / drained_events_next found no event ready
Timeout, // next_event_timeout deadline elapsed without an event
NodeFailed, // an upstream node failed (use event_as_node_failed to inspect)
Reload, // hot-reload notification for an operator
};
```

`Empty` is distinct from `Timeout`: `Empty` means "no event available right now" (the caller did not request a timeout); `Timeout` means "the caller-supplied deadline elapsed before an event arrived". A non-blocking poll never returns `Timeout`; a timed receive returns either `Timeout` or `AllInputsClosed` if no event arrived in time.

### DoraInput

Returned by `event_as_input()`. Contains raw bytes.
Expand All @@ -101,6 +145,29 @@ struct DoraInput {
};
```

### DoraNodeFailed

Returned by `event_as_node_failed()`. Carries the failure information from an upstream node that exited unexpectedly. Use it to log the cause, flush downstream state, or trigger graceful shutdown.

```cpp
struct DoraNodeFailed {
rust::Vec<rust::String> affected_input_ids; // inputs on this node that will stop receiving data
rust::String error; // human-readable error message from the failed node
rust::String source_node_id; // id of the node that failed
};
```

Example:

```cpp
auto event = next_event(dora_node.events);
if (event_type(event) == DoraEventType::NodeFailed) {
auto failed = event_as_node_failed(std::move(event));
std::cerr << "Upstream node `" << std::string(failed.source_node_id)
<< "` failed: " << std::string(failed.error) << std::endl;
}
```

### ArrowInputInfo

Returned by `event_as_arrow_input_with_info()`. Contains the input ID, metadata, and an error string.
Expand Down Expand Up @@ -170,6 +237,18 @@ DoraResult send_arrow_output(
rust::Box<Metadata> metadata);
```

#### close_outputs

Selectively close one or more of this node's outputs without shutting the whole node down. Downstream subscribers see the corresponding `InputClosed` event for each closed output.

```cpp
DoraResult close_outputs(
rust::Box<OutputSender>& sender,
rust::Vec<rust::String> output_ids);
```

`output_ids` are validated as `DataId`s; an invalid id (e.g. one containing characters not allowed by the data-id grammar) returns a `DoraResult` whose `error` names the offending id and the underlying validation message, instead of aborting the process.

#### log_message

Send a log message through the Dora logging system.
Expand Down
10 changes: 10 additions & 0 deletions examples/c++-dataflow/node-rust-api/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ int main()
return -1;
}
}
else if (ty == DoraEventType::NodeFailed)
{
auto failed = event_as_node_failed(std::move(event));
std::cerr << "Node failed: source=" << std::string(failed.source_node_id)
<< " error=" << std::string(failed.error) << std::endl;
}
else if (ty == DoraEventType::Reload)
{
std::cout << "Reload event received" << std::endl;
}
else
{
std::cerr << "Unknown event type " << static_cast<int>(ty) << std::endl;
Expand Down
Loading