Skip to content

Commit e11ff1a

Browse files
committed
fix(aggregation): target-filtered per-entity fan-out
Per-entity collection endpoints (/components/{id}/logs, /apps/{id}/data, /areas/{id}/faults, /functions/{id}/logs, etc.) unconditionally fanned out req.path to every healthy peer. Peers that do not host the entity returned 404, the response was flagged as partial with those peers in failed_peers, even though nothing is broken. Fan-out is now directed at the peers that actually host or contribute to the entity: - AggregationManager tracks a peer_contributors_by_entity_ map alongside the existing routing_table_. gateway_node rebuilds both after every discovery cycle by walking contributors on the merged entities and stripping the 'peer:' prefix. - get_peer_contributors(id) unions the routing-table owner (routed leaf, collision-renamed peer-only entity) with the per-entity contributor map (merged Areas/Functions, hierarchical parent Components that were stripped from the routing table during classification). - fan_out_get accepts an optional target_peers list; when supplied, only matching healthy peers are queried. Non-contributors never see the request, so they cannot appear in failed_peers. - merge_peer_items extracts the entity id from per-entity paths, asks AggregationManager for contributors, and passes the list as the target filter. Local-only entities produce an empty list and skip fan-out entirely. Global collection endpoints without an entity id keep fan-out-to-all behavior. Covers the common cases: local-only entity (no fan-out), routed leaf (only the owning peer queried), merged/hierarchical entity (only contributing peers queried), partial failure isolated to the specific contributor that failed. Updates aggregation.rst design doc and extends unit tests for AggregationManager (has_peer_contributors, get_peer_contributors, empty-entry pruning) and fan-out helpers (owner- only, contributor-only, partial-only-on-contributor-failure).
1 parent 48ece75 commit e11ff1a

9 files changed

Lines changed: 751 additions & 11 deletions

File tree

src/ros2_medkit_gateway/design/aggregation.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,33 @@ peers fail, the response body includes ``x-medkit.partial: true`` and
424424
``X-Medkit-No-Fan-Out`` header to prevent recursive loops when peers have
425425
bidirectional aggregation.
426426

427+
**Target-filtered fan-out.** For per-entity paths, ``merge_peer_items()``
428+
asks ``AggregationManager::get_peer_contributors(id)`` for the list of
429+
peers that host or contribute to the entity, and passes it as a filter to
430+
``fan_out_get()``. Requests reach only those peers; non-contributors are
431+
never queried so they cannot appear in ``failed_peers``. The set unions:
432+
433+
- The routing table (remote leaves, collision-renamed peer-only entities).
434+
- A ``peer_contributors_by_entity_`` map maintained alongside the routing
435+
table. ``gateway_node`` rebuilds both after every discovery cycle by
436+
walking ``contributors`` on the merged Areas/Components/Apps/Functions,
437+
stripping the ``"peer:"`` prefix and accumulating peer names per id.
438+
Merged Areas/Functions with ID collisions and hierarchical parent
439+
Components - both deliberately stripped from the routing table - still
440+
reach their peers through this map.
441+
442+
When the resolved list is empty (local-only entity), fan-out is skipped:
443+
no peer hosts the entity, so hitting peers would only produce spurious
444+
``partial: true`` / ``failed_peers``. Global endpoints (paths with no
445+
entity id, e.g. ``GET /api/v1/faults``) pass a ``nullptr`` filter and keep
446+
fan-out-to-all-healthy behavior.
447+
448+
Entities freshly announced on a peer but not yet reflected in the local
449+
routing/contributor tables (a brief window between discovery cycles) are
450+
treated as local-only: their per-entity fan-out is deferred until the
451+
next cycle rebuilds the tables. This is a deliberate trade-off against
452+
re-enabling the spurious ``partial: true`` path.
453+
427454
.. warning::
428455

429456
Fan-out is synchronous on the httplib handler thread. Each request blocks

src/ros2_medkit_gateway/include/ros2_medkit_gateway/aggregation/aggregation_manager.hpp

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,43 @@ class AggregationManager {
203203
*/
204204
std::optional<std::string> find_peer_for_entity(const std::string & entity_id) const;
205205

206+
/**
207+
* @brief Replace the map of per-entity peer contributors.
208+
*
209+
* Covers both routed leaf entities (entity fully hosted by a peer - also
210+
* tracked via the routing table) and merged/hierarchical entities (served
211+
* locally with contributions from one or more peers - e.g. a Component
212+
* aggregating subcomponents from multiple gateways, or an Area/Function
213+
* with an ID shared across peers).
214+
*
215+
* Fan-out helpers consult this map to target per-entity collection
216+
* requests only at the peers that actually host / contribute to a given
217+
* entity, avoiding spurious 404s from non-contributing peers.
218+
*
219+
* @param contributors New map: entity id -> list of peer names that
220+
* contribute to it. Entries with empty lists are discarded.
221+
*/
222+
void update_peer_contributors(std::unordered_map<std::string, std::vector<std::string>> contributors);
223+
224+
/**
225+
* @brief Check whether an entity id has at least one peer contributor.
226+
*
227+
* Returns true when the entity is either routed to a peer (remote leaf) or
228+
* served locally but aggregated from peers (hierarchical / merged). Returns
229+
* false for local-only entities and for unknown IDs. Thread-safe.
230+
*/
231+
bool has_peer_contributors(const std::string & entity_id) const;
232+
233+
/**
234+
* @brief List the peers that host / contribute to a given entity.
235+
*
236+
* Combines information from the routing table (routed leaves) and the
237+
* per-entity contributors map (merged / hierarchical entities). Duplicates
238+
* are removed while preserving first-seen order. Returns an empty vector
239+
* for local-only entities and unknown IDs. Thread-safe.
240+
*/
241+
std::vector<std::string> get_peer_contributors(const std::string & entity_id) const;
242+
206243
/**
207244
* @brief Get the URL for a known peer by name
208245
* @param peer_name Name of the peer
@@ -223,17 +260,22 @@ class AggregationManager {
223260
void forward_request(const std::string & peer_name, const httplib::Request & req, httplib::Response & res);
224261

225262
/**
226-
* @brief Fan-out a GET request to all healthy peers in parallel
263+
* @brief Fan-out a GET request to healthy peers in parallel.
227264
*
228-
* Sends GET requests to all healthy peers concurrently via std::async,
229-
* merges the "items" arrays from their responses. Returns partial results
230-
* if some peers fail.
265+
* Sends GET requests concurrently via std::async, merges the "items"
266+
* arrays from their responses. Returns partial results if some peers fail.
231267
*
232268
* @param path Request path (e.g., "/api/v1/components")
233269
* @param auth_header Authorization header value (empty to omit)
234-
* @return FanOutResult with merged items and failure info
270+
* @param target_peers When non-null, only peers whose name is in this
271+
* list AND are currently healthy are queried. A non-null but empty
272+
* list means "no matching peers" and produces an empty result with
273+
* no fan-out. When null (default), all healthy peers are queried,
274+
* preserving the pre-filtering behavior for global endpoints.
275+
* @return FanOutResult with merged items and failure info.
235276
*/
236-
FanOutResult fan_out_get(const std::string & path, const std::string & auth_header);
277+
FanOutResult fan_out_get(const std::string & path, const std::string & auth_header,
278+
const std::vector<std::string> * target_peers = nullptr);
237279

238280
/**
239281
* @brief Get peer status for /health endpoint
@@ -248,6 +290,7 @@ class AggregationManager {
248290
mutable std::shared_mutex mutex_; // Declared before data it protects (destruction order)
249291
std::vector<std::shared_ptr<PeerClient>> peers_;
250292
std::unordered_map<std::string, std::string> routing_table_;
293+
std::unordered_map<std::string, std::vector<std::string>> peer_contributors_by_entity_;
251294
std::vector<LeafCollisionWarning> leaf_warnings_;
252295

253296
/**

src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/fan_out_helpers.hpp

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
#pragma once
1616

17+
#include <array>
18+
#include <optional>
1719
#include <string>
20+
#include <string_view>
1821

1922
#include <httplib.h>
2023
#include <nlohmann/json.hpp>
@@ -45,6 +48,43 @@ inline std::string url_encode_param(const std::string & value) {
4548
return result;
4649
}
4750

51+
// Extract the entity id from a per-entity collection path like
52+
// `/api/v1/components/<id>/logs` or `/api/v1/apps/<id>/data/<sub>`.
53+
// Matching is intentionally permissive: any path containing
54+
// `/components|apps|areas|functions/<id>/<anything>` is treated as
55+
// per-entity, regardless of what the segment after `<id>` is. This keeps
56+
// future per-entity sub-endpoints eligible for target-filtered fan-out
57+
// without requiring a whitelist bump every time.
58+
//
59+
// Returns std::nullopt for:
60+
// - global endpoints (`/api/v1/faults`, `/api/v1/health`, ...),
61+
// - entity-detail endpoints (`/api/v1/components/<id>` with no trailing
62+
// `/...` - these go through the forwarding path, not fan-out),
63+
// - malformed paths with an empty entity segment (`/components//logs`).
64+
inline std::optional<std::string> extract_entity_id_for_fan_out(const std::string & path) {
65+
static constexpr std::array<std::string_view, 4> kEntityCollections = {"/components/", "/apps/", "/areas/",
66+
"/functions/"};
67+
for (const auto & prefix : kEntityCollections) {
68+
auto start = path.find(prefix);
69+
if (start == std::string::npos) {
70+
continue;
71+
}
72+
start += prefix.size();
73+
auto end = path.find('/', start);
74+
if (end == std::string::npos) {
75+
// Entity detail endpoint like `/components/<id>` (no trailing collection).
76+
// These are routed to peers via a different mechanism; not a fan-out case.
77+
return std::nullopt;
78+
}
79+
std::string id = path.substr(start, end - start);
80+
if (id.empty()) {
81+
return std::nullopt;
82+
}
83+
return id;
84+
}
85+
return std::nullopt;
86+
}
87+
4888
inline std::string build_fan_out_path(const httplib::Request & req) {
4989
if (req.params.empty()) {
5090
return req.path;
@@ -76,8 +116,23 @@ inline void merge_peer_items(AggregationManager * agg, const httplib::Request &
76116
if (agg->healthy_peer_count() == 0) {
77117
return;
78118
}
119+
// For per-entity collection paths, target only the peers that host or
120+
// contribute to the entity (routed leaves and merged / hierarchical
121+
// entities). Local-only entities produce an empty target list and skip
122+
// fan-out entirely, avoiding spurious `partial: true` / `failed_peers`
123+
// from peers that do not own the entity. Global collection endpoints
124+
// (paths without an entity id) keep fan-out-to-all behavior.
125+
std::optional<std::vector<std::string>> contributors_buffer;
126+
const std::vector<std::string> * target_peers = nullptr;
127+
if (auto entity_id = extract_entity_id_for_fan_out(req.path); entity_id.has_value()) {
128+
contributors_buffer = agg->get_peer_contributors(*entity_id);
129+
if (contributors_buffer->empty()) {
130+
return; // local-only: no peer hosts this entity
131+
}
132+
target_peers = &contributors_buffer.value();
133+
}
79134
auto fan_path = build_fan_out_path(req);
80-
auto fan_result = agg->fan_out_get(fan_path, req.get_header_value("Authorization"));
135+
auto fan_result = agg->fan_out_get(fan_path, req.get_header_value("Authorization"), target_peers);
81136
if (fan_result.merged_items.is_array() && !fan_result.merged_items.empty()) {
82137
if (!result.contains("items") || !result["items"].is_array()) {
83138
result["items"] = nlohmann::json::array();

src/ros2_medkit_gateway/src/aggregation/aggregation_manager.cpp

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,50 @@ std::optional<std::string> AggregationManager::find_peer_for_entity(const std::s
547547
return std::nullopt;
548548
}
549549

550+
void AggregationManager::update_peer_contributors(
551+
std::unordered_map<std::string, std::vector<std::string>> contributors) {
552+
// Drop entries with empty contributor lists - they would be indistinguishable
553+
// from "local-only" lookups and only waste space.
554+
for (auto it = contributors.begin(); it != contributors.end();) {
555+
if (it->second.empty()) {
556+
it = contributors.erase(it);
557+
} else {
558+
++it;
559+
}
560+
}
561+
std::unique_lock<std::shared_mutex> lock(mutex_);
562+
peer_contributors_by_entity_ = std::move(contributors);
563+
}
564+
565+
bool AggregationManager::has_peer_contributors(const std::string & entity_id) const {
566+
std::shared_lock<std::shared_mutex> lock(mutex_);
567+
if (routing_table_.count(entity_id) > 0u) {
568+
return true;
569+
}
570+
auto it = peer_contributors_by_entity_.find(entity_id);
571+
return it != peer_contributors_by_entity_.end() && !it->second.empty();
572+
}
573+
574+
std::vector<std::string> AggregationManager::get_peer_contributors(const std::string & entity_id) const {
575+
std::shared_lock<std::shared_mutex> lock(mutex_);
576+
std::vector<std::string> result;
577+
// Routed-leaf entry wins ordering: it always reflects the authoritative
578+
// owner of the entity's runtime state.
579+
auto rt_it = routing_table_.find(entity_id);
580+
if (rt_it != routing_table_.end()) {
581+
result.push_back(rt_it->second);
582+
}
583+
auto pc_it = peer_contributors_by_entity_.find(entity_id);
584+
if (pc_it != peer_contributors_by_entity_.end()) {
585+
for (const auto & name : pc_it->second) {
586+
if (std::find(result.begin(), result.end(), name) == result.end()) {
587+
result.push_back(name);
588+
}
589+
}
590+
}
591+
return result;
592+
}
593+
550594
std::string AggregationManager::get_peer_url(const std::string & peer_name) const {
551595
std::shared_lock<std::shared_mutex> lock(mutex_);
552596

@@ -609,22 +653,39 @@ void AggregationManager::forward_request(const std::string & peer_name, const ht
609653
}
610654

611655
AggregationManager::FanOutResult AggregationManager::fan_out_get(const std::string & path,
612-
const std::string & auth_header) {
656+
const std::string & auth_header,
657+
const std::vector<std::string> * target_peers) {
613658
// Per-peer result collected by each async task
614659
struct PeerResult {
615660
std::string peer_name;
616661
bool success{false};
617662
nlohmann::json items = nlohmann::json::array();
618663
};
619664

665+
// A non-null, empty target list means "no peers match" - skip fan-out
666+
// entirely so callers don't see spurious partial flags. Keep the
667+
// merged_items invariant (always a JSON array, never null) for downstream
668+
// .is_array() checks.
669+
if (target_peers != nullptr && target_peers->empty()) {
670+
FanOutResult empty_result;
671+
empty_result.merged_items = nlohmann::json::array();
672+
return empty_result;
673+
}
674+
620675
// Snapshot healthy peers under lock, release before network I/O.
676+
// When target_peers is non-null, restrict the snapshot to matching peers.
621677
std::vector<std::shared_ptr<PeerClient>> snapshot;
622678
{
623679
std::shared_lock<std::shared_mutex> lock(mutex_);
624680
for (const auto & peer : peers_) {
625-
if (peer->is_healthy()) {
626-
snapshot.push_back(peer);
681+
if (!peer->is_healthy()) {
682+
continue;
627683
}
684+
if (target_peers != nullptr &&
685+
std::find(target_peers->begin(), target_peers->end(), peer->name()) == target_peers->end()) {
686+
continue;
687+
}
688+
snapshot.push_back(peer);
628689
}
629690
}
630691

src/ros2_medkit_gateway/src/gateway_node.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
#include <cinttypes>
2121
#include <mutex>
2222
#include <set>
23+
#include <string_view>
24+
#include <unordered_map>
2325
#include <unordered_set>
26+
#include <vector>
2427

2528
#include "ros2_medkit_gateway/aggregation/network_utils.hpp"
2629
#include "ros2_medkit_gateway/entity_validation.hpp"
@@ -1682,6 +1685,41 @@ void GatewayNode::refresh_cache() {
16821685
peer_routing_table = std::move(merged.routing_table);
16831686
aggregation_mgr_->update_routing_table(peer_routing_table);
16841687
aggregation_mgr_->set_leaf_warnings(std::move(merged.leaf_warnings));
1688+
1689+
// Track per-entity peer contributors: entity id -> list of peer
1690+
// names that host or contribute to it. Covers both routed leaves
1691+
// (also tracked via routing_table) and merged / hierarchical entities
1692+
// (Areas, Functions, parent Components). Fan-out helpers use this map
1693+
// to target per-entity collection requests at the peers that actually
1694+
// own the entity, avoiding spurious 404s from non-contributing peers.
1695+
//
1696+
// The "peer:" prefix is stripped from the contributors list produced
1697+
// by EntityMerger. Entries starting with anything else (including
1698+
// "local") are ignored.
1699+
std::unordered_map<std::string, std::vector<std::string>> peer_contributors;
1700+
static constexpr std::string_view kPeerPrefix = "peer:";
1701+
auto collect_contributors = [&peer_contributors](const auto & entities) {
1702+
for (const auto & e : entities) {
1703+
for (const auto & c : e.contributors) {
1704+
if (c.rfind(kPeerPrefix, 0) != 0) {
1705+
continue;
1706+
}
1707+
std::string peer_name = c.substr(kPeerPrefix.size());
1708+
if (peer_name.empty()) {
1709+
continue;
1710+
}
1711+
auto & list = peer_contributors[e.id];
1712+
if (std::find(list.begin(), list.end(), peer_name) == list.end()) {
1713+
list.push_back(std::move(peer_name));
1714+
}
1715+
}
1716+
}
1717+
};
1718+
collect_contributors(areas);
1719+
collect_contributors(all_components);
1720+
collect_contributors(apps);
1721+
collect_contributors(functions);
1722+
aggregation_mgr_->update_peer_contributors(std::move(peer_contributors));
16851723
}
16861724

16871725
// Inject plugin entities (non-hybrid) and refresh entity ownership (all modes).

src/ros2_medkit_gateway/src/updates/update_manager.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,17 +153,24 @@ tl::expected<void, UpdateError> UpdateManager::delete_update(const std::string &
153153
states_.erase(id);
154154
} else {
155155
// Rollback sentinel on failure
156+
const auto & err = result.error();
156157
std::lock_guard<std::mutex> lock(mutex_);
157158
if (had_state) {
158159
auto it = states_.find(id);
159160
if (it != states_.end() && it->second) {
161+
// Mark the package as failed consistently across all surfaces
162+
// exposed via GET /updates/{id}/status: the SOVD `status` field,
163+
// the `x-medkit-phase` vendor extension, and `error_message`.
164+
// Without updating all three, clients see payloads like
165+
// {status:pending, x-medkit-phase:failed} with no error context.
166+
it->second->status.status = UpdateStatus::Failed;
160167
it->second->status.phase = UpdatePhase::Failed;
168+
it->second->status.error_message = err.message;
161169
}
162170
} else {
163171
// Remove the sentinel we created - package never had state before
164172
states_.erase(id);
165173
}
166-
const auto & err = result.error();
167174
switch (err.code) {
168175
case UpdateBackendError::NotFound:
169176
return tl::make_unexpected(UpdateError{UpdateErrorCode::NotFound, err.message});

0 commit comments

Comments
 (0)