Skip to content

Commit 86b69da

Browse files
committed
feat(plugin_context): aggregate peer faults via SOVD service interface
The ROS 2 SOVD service interface (plugin context) returned only local fault-manager faults, while the HTTP API already fans out to aggregation peers. Wire list_entity_faults and list_all_faults to the aggregation manager (fan_out_get) so a gateway with aggregation.enabled exposes peer faults over the ROS 2 service path too, consistent with the HTTP /api/v1/faults and /api/v1/<entity>/faults endpoints. - list_entity_faults: fans out /api/v1/<type>/<id>/faults to peers and merges. - list_all_faults: fans out /api/v1/faults and merges. - Aggregation is gated on get_aggregation_manager() != nullptr (no behavior change when aggregation is disabled). Extract entity -> source-FQN resolution and fault scoping into a shared neutral helper (core/faults/fault_scope) used by both the HTTP fault handlers and the plugin-context fault path. list_entity_faults now scopes its local faults to the entity's resolved App-FQN set (APP, COMPONENT, AREA, FUNCTION) exactly as the HTTP per-entity handler does, instead of returning a bare fault-manager envelope. Log partial peer fan-out and document that the ROS 2 service path queries peers without an Authorization header. Recompute list_all_faults "count" after merging peer faults so it does not undercount. Add test_plugin_context_aggregation: no-aggregation baseline, live-mock-peer fan-out, and local-fault-path coverage with a fake fault transport (scoped local fault, component/area/function resolution, local+peer merge, and a peer that fails the fan-out while local faults survive). Closes #418
1 parent be1b03a commit 86b69da

7 files changed

Lines changed: 1074 additions & 147 deletions

File tree

src/ros2_medkit_gateway/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,10 @@ if(BUILD_TESTING)
526526
target_link_libraries(test_plugin_notify_integration gateway_ros2)
527527
medkit_set_test_domain(test_plugin_notify_integration)
528528

529+
ament_add_gtest(test_plugin_context_aggregation test/test_plugin_context_aggregation.cpp)
530+
target_link_libraries(test_plugin_context_aggregation gateway_ros2)
531+
medkit_set_test_domain(test_plugin_context_aggregation)
532+
529533
ament_add_gtest(test_auth_manager test/test_auth_manager.cpp)
530534
target_link_libraries(test_auth_manager gateway_ros2)
531535

@@ -1022,6 +1026,7 @@ if(BUILD_TESTING)
10221026
test_stream_proxy
10231027
test_mdns_discovery
10241028
test_network_utils
1029+
test_plugin_context_aggregation
10251030
)
10261031
foreach(_target ${_test_targets})
10271032
target_compile_options(${_target} PRIVATE --coverage -O0 -g)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2026 bburda
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <nlohmann/json.hpp>
18+
#include <set>
19+
#include <string>
20+
21+
#include "ros2_medkit_gateway/core/models/entity_types.hpp"
22+
23+
namespace ros2_medkit_gateway {
24+
25+
class ThreadSafeEntityCache;
26+
27+
namespace faults {
28+
29+
/// Resolve the set of App effective-FQNs that fall within an entity's scope by
30+
/// walking the entity cache:
31+
/// - APP: the app's own effective FQN
32+
/// - COMPONENT: every hosted app's FQN
33+
/// - AREA: every app under the area and its (recursive) subareas
34+
/// - FUNCTION: every app hosted directly or via a hosted component
35+
/// Returns an empty set for SERVER / UNKNOWN.
36+
///
37+
/// Shared by the HTTP fault handlers (`GET /{entity}/faults`) and the ROS 2
38+
/// plugin-context fault path so both agree on entity -> source-set resolution.
39+
std::set<std::string> resolve_entity_source_fqns(const ThreadSafeEntityCache & cache, SovdEntityType type,
40+
const std::string & entity_id);
41+
42+
/// True when `fault` has at least one reporting source and *every* reporting
43+
/// source is within `source_fqns` (exact match or a path-boundary prefix).
44+
/// An empty scope set or an empty/absent source list returns false.
45+
bool fault_in_source_scope(const nlohmann::json & fault, const std::set<std::string> & source_fqns);
46+
47+
/// Subset of `faults_array` whose faults satisfy `fault_in_source_scope`.
48+
nlohmann::json filter_faults_by_sources(const nlohmann::json & faults_array, const std::set<std::string> & source_fqns);
49+
50+
} // namespace faults
51+
} // namespace ros2_medkit_gateway
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2026 bburda
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "ros2_medkit_gateway/core/faults/fault_scope.hpp"
16+
17+
#include <utility>
18+
#include <vector>
19+
20+
#include "ros2_medkit_gateway/core/models/thread_safe_entity_cache.hpp"
21+
22+
namespace ros2_medkit_gateway {
23+
namespace faults {
24+
25+
namespace {
26+
27+
void collect_app_fqn(const ThreadSafeEntityCache & cache, const std::string & app_id, std::set<std::string> & out) {
28+
auto app = cache.get_app(app_id);
29+
if (!app) {
30+
return;
31+
}
32+
auto fqn = app->effective_fqn();
33+
if (fqn.empty()) {
34+
return;
35+
}
36+
out.insert(std::move(fqn));
37+
}
38+
39+
void collect_component_app_fqns(const ThreadSafeEntityCache & cache, const std::string & comp_id,
40+
std::set<std::string> & out) {
41+
for (const auto & app_id : cache.get_apps_for_component(comp_id)) {
42+
collect_app_fqn(cache, app_id, out);
43+
}
44+
}
45+
46+
void collect_area_app_fqns(const ThreadSafeEntityCache & cache, const std::string & area_id,
47+
std::set<std::string> & out) {
48+
// BFS over (area, subareas...) so a top-level area whose components live in
49+
// nested subareas still resolves to the union of every descendant's apps.
50+
// Without the recursion, e.g. `/areas/powertrain/...` returns an empty set
51+
// when components are attached to `engine` (subarea of `powertrain`).
52+
std::vector<std::string> pending = {area_id};
53+
std::set<std::string> visited;
54+
while (!pending.empty()) {
55+
auto current = std::move(pending.back());
56+
pending.pop_back();
57+
if (!visited.insert(current).second) {
58+
continue;
59+
}
60+
for (const auto & comp_id : cache.get_components_for_area(current)) {
61+
collect_component_app_fqns(cache, comp_id, out);
62+
}
63+
for (const auto & sub_id : cache.get_subareas(current)) {
64+
pending.push_back(sub_id);
65+
}
66+
}
67+
}
68+
69+
void collect_function_app_fqns(const ThreadSafeEntityCache & cache, const std::string & function_id,
70+
std::set<std::string> & out) {
71+
// Function.hosts can contain either App IDs or Component IDs; the indexed
72+
// lookups in the cache only resolve the App-host case (function_to_apps_),
73+
// so we walk the raw `hosts` list and dispatch per host kind ourselves.
74+
auto func = cache.get_function(function_id);
75+
if (!func) {
76+
return;
77+
}
78+
for (const auto & host_id : func->hosts) {
79+
if (cache.get_app(host_id)) {
80+
collect_app_fqn(cache, host_id, out);
81+
} else if (cache.get_component(host_id)) {
82+
collect_component_app_fqns(cache, host_id, out);
83+
}
84+
// Unknown host - silently skip; it would have been flagged by manifest validation.
85+
}
86+
}
87+
88+
bool source_matches_scope(const std::string & src, const std::set<std::string> & scope_fqns) {
89+
for (const auto & fqn : scope_fqns) {
90+
if (src == fqn) {
91+
return true;
92+
}
93+
if (src.size() > fqn.size() && src.compare(0, fqn.size(), fqn) == 0 && src[fqn.size()] == '/') {
94+
return true;
95+
}
96+
}
97+
return false;
98+
}
99+
100+
} // namespace
101+
102+
std::set<std::string> resolve_entity_source_fqns(const ThreadSafeEntityCache & cache, SovdEntityType type,
103+
const std::string & entity_id) {
104+
std::set<std::string> fqns;
105+
switch (type) {
106+
case SovdEntityType::APP:
107+
collect_app_fqn(cache, entity_id, fqns);
108+
break;
109+
case SovdEntityType::COMPONENT:
110+
collect_component_app_fqns(cache, entity_id, fqns);
111+
break;
112+
case SovdEntityType::AREA:
113+
collect_area_app_fqns(cache, entity_id, fqns);
114+
break;
115+
case SovdEntityType::FUNCTION:
116+
collect_function_app_fqns(cache, entity_id, fqns);
117+
break;
118+
case SovdEntityType::SERVER:
119+
case SovdEntityType::UNKNOWN:
120+
break;
121+
}
122+
return fqns;
123+
}
124+
125+
bool fault_in_source_scope(const nlohmann::json & fault, const std::set<std::string> & source_fqns) {
126+
if (source_fqns.empty()) {
127+
return false;
128+
}
129+
if (!fault.contains("reporting_sources") || !fault["reporting_sources"].is_array()) {
130+
return false;
131+
}
132+
const auto & sources = fault["reporting_sources"];
133+
if (sources.empty()) {
134+
return false;
135+
}
136+
for (const auto & src : sources) {
137+
if (!src.is_string()) {
138+
return false;
139+
}
140+
if (!source_matches_scope(src.get<std::string>(), source_fqns)) {
141+
return false;
142+
}
143+
}
144+
return true;
145+
}
146+
147+
nlohmann::json filter_faults_by_sources(const nlohmann::json & faults_array,
148+
const std::set<std::string> & source_fqns) {
149+
nlohmann::json filtered = nlohmann::json::array();
150+
for (const auto & fault : faults_array) {
151+
if (fault_in_source_scope(fault, source_fqns)) {
152+
filtered.push_back(fault);
153+
}
154+
}
155+
return filtered;
156+
}
157+
158+
} // namespace faults
159+
} // namespace ros2_medkit_gateway

src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp

Lines changed: 8 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <vector>
2828

2929
#include "ros2_medkit_gateway/aggregation/aggregation_manager.hpp"
30+
#include "ros2_medkit_gateway/core/faults/fault_scope.hpp"
3031
#include "ros2_medkit_gateway/core/http/entity_path_utils.hpp"
3132
#include "ros2_medkit_gateway/core/http/error_codes.hpp"
3233
#include "ros2_medkit_gateway/core/http/fan_out_helpers.hpp"
@@ -106,39 +107,6 @@ tl::expected<FaultStatusFilter, ErrorInfo> read_fault_status_filter(const http::
106107
// SOVD-compliant response helpers (legacy free functions kept verbatim)
107108
// =============================================================================
108109

109-
/// Check if a ROS node FQN falls within the entity's source FQN set.
110-
///
111-
/// A node is in scope iff it equals one of the entity's owned FQNs, OR is a
112-
/// strict path-child of one (i.e., `<owned-fqn>/<...>`). We deliberately do
113-
/// NOT use raw `rfind(prefix, 0)` because that would let `/ns/node_extra`
114-
/// pass for an entity owning `/ns/node`. Path boundary is enforced by
115-
/// requiring the byte after the prefix to be `/`.
116-
bool source_matches_scope(const std::string & src, const std::set<std::string> & scope_fqns) {
117-
for (const auto & fqn : scope_fqns) {
118-
if (src == fqn) {
119-
return true;
120-
}
121-
if (src.size() > fqn.size() && src.compare(0, fqn.size(), fqn) == 0 && src[fqn.size()] == '/') {
122-
return true;
123-
}
124-
}
125-
return false;
126-
}
127-
128-
/// Filter a faults JSON array down to faults whose every reporting source is
129-
/// in the entity's scope. Shares the same all-sources / path-boundary
130-
/// semantics as `FaultHandlers::fault_in_source_scope` so per-entity
131-
/// collection routes and per-fault routes agree on what counts as "in scope".
132-
json filter_faults_by_sources(const json & faults_array, const std::set<std::string> & source_fqns) {
133-
json filtered = json::array();
134-
for (const auto & fault : faults_array) {
135-
if (FaultHandlers::fault_in_source_scope(fault, source_fqns)) {
136-
filtered.push_back(fault);
137-
}
138-
}
139-
return filtered;
140-
}
141-
142110
/// Build SOVD status object from fault status string
143111
/// Maps ROS 2 medkit status (PREFAILED, PREPASSED, CONFIRMED, HEALED, CLEARED)
144112
/// to SOVD aggregated status (active, passive, cleared)
@@ -249,25 +217,9 @@ dto::FaultDetailResult wrap_detail_result(json payload) {
249217
} // namespace
250218

251219
bool FaultHandlers::fault_in_source_scope(const json & fault, const std::set<std::string> & source_fqns) {
252-
if (source_fqns.empty()) {
253-
return false;
254-
}
255-
if (!fault.contains("reporting_sources") || !fault["reporting_sources"].is_array()) {
256-
return false;
257-
}
258-
const auto & sources = fault["reporting_sources"];
259-
if (sources.empty()) {
260-
return false;
261-
}
262-
for (const auto & src : sources) {
263-
if (!src.is_string()) {
264-
return false;
265-
}
266-
if (!source_matches_scope(src.get<std::string>(), source_fqns)) {
267-
return false;
268-
}
269-
}
270-
return true;
220+
// Thin wrapper preserving the public static API; the scope logic now lives in
221+
// the neutral core helper shared with the ROS 2 plugin-context fault path.
222+
return faults::fault_in_source_scope(fault, source_fqns);
271223
}
272224

273225
// Static method: Build SOVD-compliant fault response from transport-supplied JSON.
@@ -553,7 +505,7 @@ http::Result<dto::FaultListResult> FaultHandlers::list_faults(const http::TypedR
553505
const auto & cache = ctx_.node()->get_thread_safe_cache();
554506
auto source_fqns = HandlerContext::resolve_entity_source_fqns(cache, entity_info);
555507

556-
json filtered_faults = filter_faults_by_sources(result.data["faults"], source_fqns);
508+
json filtered_faults = faults::filter_faults_by_sources(result.data["faults"], source_fqns);
557509
json response = {{"items", filtered_faults}};
558510

559511
// x-medkit extension (typed DTO)
@@ -600,7 +552,7 @@ http::Result<dto::FaultListResult> FaultHandlers::list_faults(const http::TypedR
600552
const auto & cache = ctx_.node()->get_thread_safe_cache();
601553
auto app_fqns = HandlerContext::resolve_entity_source_fqns(cache, entity_info);
602554

603-
json filtered_faults = filter_faults_by_sources(result.data["faults"], app_fqns);
555+
json filtered_faults = faults::filter_faults_by_sources(result.data["faults"], app_fqns);
604556
json response = {{"items", filtered_faults}};
605557

606558
dto::FaultListAggXMedkit xm;
@@ -639,7 +591,7 @@ http::Result<dto::FaultListResult> FaultHandlers::list_faults(const http::TypedR
639591
json{{"details", result.error_message}, {entity_info.id_field, entity_id}}));
640592
}
641593

642-
json filtered_faults = filter_faults_by_sources(result.data["faults"], app_fqns);
594+
json filtered_faults = faults::filter_faults_by_sources(result.data["faults"], app_fqns);
643595
json response = {{"items", filtered_faults}};
644596

645597
// x-medkit extension for ros2_medkit-specific fields (typed DTO)
@@ -981,7 +933,7 @@ http::Result<http::NoContent> FaultHandlers::clear_all_faults(const http::TypedR
981933
}
982934
const auto & cache = ctx_.node()->get_thread_safe_cache();
983935
auto entity_fqns = HandlerContext::resolve_entity_source_fqns(cache, entity_info);
984-
json faults_to_clear = filter_faults_by_sources(result.data["faults"], entity_fqns);
936+
json faults_to_clear = faults::filter_faults_by_sources(result.data["faults"], entity_fqns);
985937

986938
// Clear each matching fault. Use `skip_correlation_auto_clear=true` for
987939
// the same reason as the single-fault DELETE: keep this entity's clear

0 commit comments

Comments
 (0)