Skip to content

Commit 73e55e4

Browse files
committed
feat(gateway,#380): expose entity context on /faults/stream SSE events
Each `GET /api/v1/faults/stream` event payload now carries an optional `x-medkit` SOVD payload-extension object with `entity_type` and `entity_id` fields when the gateway can resolve the fault's first reporting source back to a SOVD entity. Consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly instead of enumerating apps + components and HEAD-probing each one. Nested under `x-medkit` per the SOVD payload-extension convention (matches `x-medkit.aggregation_level`, `x-medkit.phase`, etc.). Flat `x-medkit-*` names are reserved for endpoint paths (`/x-medkit-graph`) and error codes, not payload fields. Resolution chain (mirrors gateway_node's node_resolver lambda for the manifest path): - Manifest / hybrid: cache's node-to-app linking index, both with and without the leading slash on the FQN. - Runtime fallback: FQN's last segment, validated against `cache.has_app()` so we never point consumers at a 404. - Runtime collision fallback: `<ns_prefix>_<name>` form (slashes in the namespace replaced with `_`), matching the disambiguation rule in ros2_runtime_introspection.cpp for nodes that share a name across namespaces. - No match: `x-medkit` object omitted entirely; an `RCLCPP_DEBUG` trace records the unresolved source so operators can diagnose consumers stuck on the discovery fallback path. Backward-compatible for existing SOVD consumers. Entity resolution is snapshotted in `on_fault_event` (before the queue lock is acquired) and stored as `std::optional<EntityContext>` on the buffered event. A discovery refresh between enqueue and stream-out cannot retroactively flip the entity reported to consumers, and the format path stays lock-free with respect to the entity cache.
1 parent 09c73ac commit 73e55e4

6 files changed

Lines changed: 324 additions & 20 deletions

File tree

docs/api/rest.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2329,7 +2329,12 @@ Other extensions beyond SOVD:
23292329

23302330
- Vendor extension fields using ``x-medkit`` prefix (per SOVD extension mechanism)
23312331
- ``DELETE /faults`` - Clear all faults globally
2332-
- ``GET /faults/stream`` - SSE real-time fault notifications
2332+
- ``GET /faults/stream`` - SSE real-time fault notifications. Each event payload carries an
2333+
optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id``
2334+
fields when the gateway can resolve the fault's first reporting source back to an entity,
2335+
so consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly
2336+
without enumerating entities. Resolution is snapshotted at event arrival; the entire
2337+
``x-medkit`` object is omitted when no entity can be resolved.
23332338
- ``/health`` - Health check with discovery pipeline diagnostics
23342339
- ``/version-info`` - Gateway version information
23352340
- ``/docs`` - OpenAPI capability description

src/ros2_medkit_gateway/CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Unreleased
1111

1212
**Features:**
1313

14+
* ``GET /api/v1/faults/stream`` event payloads now carry an optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id`` fields. When the gateway can resolve the fault's first reporting source back to a SOVD entity (via the manifest-mode linking index, or a runtime-mode last-segment match against an existing App), consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly instead of HEAD-probing every entity. Resolution is snapshotted at event arrival, so a discovery refresh between enqueue and stream-out cannot retroactively change the entity reported to consumers. The ``x-medkit`` object is omitted entirely when no entity can be resolved, so existing SSE consumers ignore the addition (`#380 <https://github.com/selfpatch/ros2_medkit/issues/380>`_)
1415
* Plugin API version bumped to v7. Adds ``PluginContext::notify_entities_changed(EntityChangeScope)`` lifecycle hook for plugins that mutate the entity surface at runtime; default no-op keeps v6 source code compiling unchanged against v7 headers. Binary compatibility is not provided: the plugin loader uses a strict equality check on ``plugin_api_version()``, so out-of-tree plugins must be recompiled (`#376 <https://github.com/selfpatch/ros2_medkit/issues/376>`_)
1516
* New ``discovery.manifest.fragments_dir`` parameter: gateway scans the directory for ``*.yaml`` / ``*.yml`` fragment files on every manifest load / reload and merges apps, components, and functions on top of the base manifest. Fragments are forbidden from declaring top-level ``areas``, ``metadata``, ``discovery``, ``scripts``, ``capabilities``, or ``lock_overrides`` - those stay in the base manifest. Presence of any forbidden key (including empty-valued ones like ``areas: []``) is reported as a ``FRAGMENT_FORBIDDEN_FIELD`` validation error that fails the load / reload. Unknown top-level keys (typos such as ``app:`` vs ``apps:``) are ignored with a warning log. Files merged in alphabetical order for deterministic duplicate-id errors (`#376 <https://github.com/selfpatch/ros2_medkit/issues/376>`_)
1617
* Fragment files are size-capped at 1 MiB (``ManifestParser::kMaxFragmentBytes``) before being read into memory, and any symlink resolving outside the canonical ``fragments_dir`` is skipped with a warning, so misconfigurations or symlink-based escapes cannot hand arbitrary bytes to the YAML parser (`#376 <https://github.com/selfpatch/ros2_medkit/issues/376>`_)

src/ros2_medkit_gateway/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ Real-time fault event stream using Server-Sent Events (SSE). Clients receive ins
764764
- **Automatic reconnection**: Supports `Last-Event-ID` header for seamless reconnection
765765
- **Keepalive**: Sends `:keepalive` comment every 30 seconds to prevent timeouts
766766
- **Event buffer**: Buffers up to 100 recent events for reconnecting clients
767+
- **Entity context (SOVD payload extension)**: When the gateway can resolve the fault's first reporting source back to an entity, the payload carries an `x-medkit` object with `entity_type` and `entity_id` fields so consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly without enumerating entities
767768

768769
**Event Types:**
769770
- `fault_confirmed` - Fault transitioned to CONFIRMED status
@@ -781,13 +782,15 @@ curl -N http://localhost:8080/api/v1/faults/stream
781782
782783
id: 1
783784
event: fault_confirmed
784-
data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123}
785+
data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}}
785786
786787
id: 2
787788
event: fault_cleared
788-
data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456}
789+
data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}}
789790
```
790791

792+
**SOVD payload extension `x-medkit.entity_*`** (non-standard, SOVD-compatible): Resolution is best-effort and snapshotted at event arrival. Manifest and hybrid discovery use the linking result; runtime-only discovery falls back to the FQN's last segment and only emits the fields when an App with that ID exists in the cache. When no entity can be resolved (no reporting sources, orphan source, etc.), the entire `x-medkit` object is omitted and the consumer must fall back to discovery.
793+
791794
**Response (503 Service Unavailable - Client Limit Reached):**
792795
```json
793796
{

src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
#include <deque>
2121
#include <memory>
2222
#include <mutex>
23+
#include <optional>
2324
#include <string>
2425

26+
#include <nlohmann/json.hpp>
27+
2528
#include "rclcpp/rclcpp.hpp"
2629
#include "ros2_medkit_gateway/core/http/sse_client_tracker.hpp"
2730
#include "ros2_medkit_gateway/http/handlers/handler_context.hpp"
@@ -111,8 +114,33 @@ class SSEFaultHandler {
111114
/// Callback for fault events from ROS 2 topic
112115
void on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg);
113116

117+
/// Resolved owning entity for a fault. Populates the ``x-medkit`` SOVD
118+
/// payload-extension object on outgoing SSE events.
119+
struct EntityContext {
120+
std::string type;
121+
std::string id;
122+
};
123+
124+
/// Buffered queue entry. ``entity`` is resolved at enqueue time so a
125+
/// discovery refresh between enqueue and stream-out cannot retroactively
126+
/// flip the entity reported to consumers.
127+
struct QueuedEvent {
128+
uint64_t id;
129+
ros2_medkit_msgs::msg::FaultEvent event;
130+
std::optional<EntityContext> entity;
131+
};
132+
114133
/// Format a fault event as SSE message
115-
static std::string format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id);
134+
static std::string format_sse_event(const QueuedEvent & queued);
135+
136+
/// Resolve the owning entity for a fault, snapshotting the cache. Manifest
137+
/// / hybrid mode uses the cache's node-to-app index; runtime mode falls
138+
/// back to the FQN's last segment, and for collision-disambiguated runtime
139+
/// apps also to ``<ns_prefix>_<name>`` per
140+
/// ros2_runtime_introspection.cpp's renaming rule. Only emits a value when
141+
/// an App with the resolved id actually exists in the cache - returns
142+
/// ``std::nullopt`` otherwise so the consumer falls back to discovery.
143+
std::optional<EntityContext> resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const;
116144

117145
HandlerContext & ctx_;
118146
std::shared_ptr<SSEClientTracker> client_tracker_;
@@ -123,7 +151,7 @@ class SSEFaultHandler {
123151
/// Event queue for broadcasting to clients
124152
mutable std::mutex queue_mutex_;
125153
std::condition_variable queue_cv_;
126-
std::deque<std::pair<uint64_t, ros2_medkit_msgs::msg::FaultEvent>> event_queue_;
154+
std::deque<QueuedEvent> event_queue_;
127155

128156
/// Monotonically increasing event ID for Last-Event-ID support
129157
std::atomic<uint64_t> next_event_id_{1};

src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,17 @@ void SSEFaultHandler::request_shutdown() {
8686
void SSEFaultHandler::on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg) {
8787
uint64_t event_id = next_event_id_.fetch_add(1);
8888

89+
// Snapshot entity context before acquiring the queue lock so cache state
90+
// is pinned to the fault arrival timestamp and the formatting path stays
91+
// lock-free with respect to the cache.
92+
auto entity = resolve_entity_context(msg->fault);
93+
8994
std::size_t dropped_this_call = 0;
9095
{
9196
std::lock_guard<std::mutex> lock(queue_mutex_);
9297

93-
// Add event to queue
94-
event_queue_.emplace_back(event_id, *msg);
98+
// Add event to queue with resolved entity context
99+
event_queue_.push_back(QueuedEvent{event_id, *msg, std::move(entity)});
95100

96101
// Trim old events if buffer is full
97102
while (event_queue_.size() > kMaxBufferedEvents) {
@@ -156,13 +161,13 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo
156161
// First, send any buffered events the client missed (for reconnection)
157162
{
158163
std::lock_guard<std::mutex> lock(queue_mutex_);
159-
for (const auto & [id, event] : event_queue_) {
160-
if (id > last_event_id) {
161-
std::string sse_msg = format_sse_event(event, id);
164+
for (const auto & queued : event_queue_) {
165+
if (queued.id > last_event_id) {
166+
std::string sse_msg = format_sse_event(queued);
162167
if (!sink.write(sse_msg.data(), sse_msg.size())) {
163168
return false; // Client disconnected
164169
}
165-
last_event_id = id;
170+
last_event_id = queued.id;
166171
}
167172
}
168173
}
@@ -198,15 +203,15 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo
198203

199204
// Check for new events
200205
bool found_new = false;
201-
for (const auto & [id, event] : event_queue_) {
202-
if (id > last_event_id) {
203-
std::string sse_msg = format_sse_event(event, id);
206+
for (const auto & queued : event_queue_) {
207+
if (queued.id > last_event_id) {
208+
std::string sse_msg = format_sse_event(queued);
204209
lock.unlock();
205210
if (!sink.write(sse_msg.data(), sse_msg.size())) {
206211
return false; // Client disconnected
207212
}
208213
lock.lock();
209-
last_event_id = id;
214+
last_event_id = queued.id;
210215
found_new = true;
211216
}
212217
}
@@ -230,24 +235,89 @@ size_t SSEFaultHandler::connected_clients() const {
230235
return client_tracker_->connected_clients();
231236
}
232237

233-
std::string SSEFaultHandler::format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id) {
234-
const auto sanitized_event_type = sanitize_sse_event_type(event.event_type);
238+
std::string SSEFaultHandler::format_sse_event(const QueuedEvent & queued) {
239+
const auto sanitized_event_type = sanitize_sse_event_type(queued.event.event_type);
235240

236241
nlohmann::json json_event;
237242
json_event["event_type"] = sanitized_event_type;
238-
json_event["fault"] = ros2::conversions::fault_to_json(event.fault);
243+
json_event["fault"] = ros2::conversions::fault_to_json(queued.event.fault);
239244

240245
// Convert timestamp to seconds with nanosecond precision
241-
double timestamp_sec = static_cast<double>(event.timestamp.sec) + static_cast<double>(event.timestamp.nanosec) * 1e-9;
246+
double timestamp_sec =
247+
static_cast<double>(queued.event.timestamp.sec) + static_cast<double>(queued.event.timestamp.nanosec) * 1e-9;
242248
json_event["timestamp"] = timestamp_sec;
243249

250+
// SOVD payload extension: nest ``entity_type`` / ``entity_id`` under the
251+
// ``x-medkit`` response-extension object so global-stream consumers can
252+
// hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}``
253+
// directly instead of HEAD-probing every entity. Flat ``x-medkit-*``
254+
// names are reserved for endpoint paths (``/x-medkit-graph``) and error
255+
// codes, not payload fields.
256+
if (queued.entity) {
257+
json_event["x-medkit"] = {{"entity_type", queued.entity->type}, {"entity_id", queued.entity->id}};
258+
}
259+
244260
std::ostringstream sse;
245-
sse << "id: " << event_id << "\n";
261+
sse << "id: " << queued.id << "\n";
246262
sse << "event: " << sanitized_event_type << "\n";
247263
sse << "data: " << json_event.dump() << "\n\n";
248264

249265
return sse.str();
250266
}
251267

268+
std::optional<SSEFaultHandler::EntityContext>
269+
SSEFaultHandler::resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const {
270+
if (fault.reporting_sources.empty()) {
271+
return std::nullopt;
272+
}
273+
const auto & raw_fqn = fault.reporting_sources.front();
274+
if (raw_fqn.empty()) {
275+
return std::nullopt;
276+
}
277+
278+
const auto & cache = ctx_.node()->get_thread_safe_cache();
279+
280+
// Manifest / hybrid mode: the linking step populated node_to_app with the
281+
// ROS FQN -> manifest app id mapping. Try both FQN forms (with and without
282+
// the leading '/'), mirroring gateway_node's node_resolver lambda.
283+
std::string entity_id = cache.resolve_node_to_app(raw_fqn);
284+
if (entity_id.empty() && raw_fqn.front() == '/') {
285+
entity_id = cache.resolve_node_to_app(raw_fqn.substr(1));
286+
}
287+
288+
// Runtime fallback: synthetic apps are created with id = ROS node name
289+
// (the FQN's last segment) when there is no namespace collision, or
290+
// ``<ns_prefix>_<name>`` (slashes in the namespace replaced with '_') when
291+
// multiple nodes share the same name. See ros2_runtime_introspection.cpp.
292+
// Only accept a candidate that actually exists as an App in the cache so
293+
// we never point consumers at a 404.
294+
if (entity_id.empty()) {
295+
auto last_slash = raw_fqn.rfind('/');
296+
auto name = (last_slash != std::string::npos) ? raw_fqn.substr(last_slash + 1) : raw_fqn;
297+
if (!name.empty() && cache.has_app(name)) {
298+
entity_id = std::move(name);
299+
} else if (last_slash != std::string::npos && last_slash > 0) {
300+
// Try the collision-disambiguated form: ns prefix (sans leading '/'),
301+
// slashes replaced with '_', then '_' + name.
302+
auto ns_prefix = raw_fqn.substr(1, last_slash - 1);
303+
std::replace(ns_prefix.begin(), ns_prefix.end(), '/', '_');
304+
auto namespaced = ns_prefix + "_" + name;
305+
if (cache.has_app(namespaced)) {
306+
entity_id = std::move(namespaced);
307+
}
308+
}
309+
}
310+
311+
if (entity_id.empty()) {
312+
RCLCPP_DEBUG(HandlerContext::logger(),
313+
"SSE fault event: no entity match for reporting source '%s' (fault_code='%s'); "
314+
"omitting x-medkit extension",
315+
raw_fqn.c_str(), fault.fault_code.c_str());
316+
return std::nullopt;
317+
}
318+
319+
return EntityContext{"apps", std::move(entity_id)};
320+
}
321+
252322
} // namespace handlers
253323
} // namespace ros2_medkit_gateway

0 commit comments

Comments
 (0)