Skip to content

Commit 6c87466

Browse files
committed
fix(gateway): harden fault transport RPC handling
Centralise the seven fault service-call paths in invoke_fault_service<Service> to address three latent issues: - Clamp negative service_timeout_sec to zero before spinning. A misconfigured value would otherwise treat spin_until_future_complete as 'spin forever' under executor_mutex_, stalling every fault RPC. - Call remove_pending_request() on timeout, matching ros2_service_transport and ros2_action_transport. Abandoned futures no longer leak into the client's pending-request map on repeated timeouts. - Run wait_for_service() outside executor_mutex_. The graph listener that backs it is a per-context thread independent of any user executor, so blocking the lock for the full discovery window stalled unrelated fault RPCs unnecessarily. Local shared_ptr copies of the client and executor keep rclcpp state alive even if the transport destructor races the caller. The helper drops ~120 lines of duplicate boilerplate across the seven methods.
1 parent 02de6df commit 6c87466

1 file changed

Lines changed: 89 additions & 112 deletions

File tree

src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp

Lines changed: 89 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,53 @@
2626

2727
namespace ros2_medkit_gateway::ros2 {
2828

29+
namespace {
30+
31+
// Synchronous fault-manager RPC under the transport's private executor.
32+
//
33+
// Mirrors the pattern in ros2_service_transport.cpp / ros2_action_transport.cpp
34+
// (clamp negative timeouts, drop abandoned pending requests on timeout) and
35+
// adapts it to the private-node / private-executor design from issue #399.
36+
//
37+
// wait_for_service runs outside `executor_mutex` because the graph listener
38+
// that backs it is a per-context thread independent of any user executor; the
39+
// local shared_ptr copies of `client` and `executor` keep the rclcpp state
40+
// alive even if the transport destructor races the caller.
41+
template <typename Service>
42+
typename Service::Response::SharedPtr invoke_fault_service(
43+
// SharedPtrs are taken by value on purpose: they keep the rclcpp client
44+
// and executor alive for the duration of the call even if the transport
45+
// destructor concurrently resets the corresponding member shared_ptrs.
46+
typename rclcpp::Client<Service>::SharedPtr client, // NOLINT(performance-unnecessary-value-param)
47+
typename Service::Request::SharedPtr request, // NOLINT(performance-unnecessary-value-param)
48+
std::shared_ptr<rclcpp::executors::SingleThreadedExecutor> executor, // NOLINT(performance-unnecessary-value-param)
49+
std::mutex & executor_mutex, std::chrono::duration<double> timeout, const char * op_name, std::string & error_out) {
50+
const auto clamped = std::chrono::duration<double>(std::max(timeout.count(), 0.0));
51+
52+
if (!client || !executor) {
53+
error_out = std::string(op_name) + " transport not initialised";
54+
return nullptr;
55+
}
56+
57+
if (!client->wait_for_service(clamped)) {
58+
error_out = std::string(op_name) + " service not available";
59+
return nullptr;
60+
}
61+
62+
std::lock_guard<std::mutex> lock(executor_mutex);
63+
auto future = client->async_send_request(request);
64+
if (executor->spin_until_future_complete(future, clamped) != rclcpp::FutureReturnCode::SUCCESS) {
65+
// Drop the abandoned slot from the client's pending-request map; without
66+
// this, repeated timeouts leak entries until the client is destroyed.
67+
client->remove_pending_request(future.request_id);
68+
error_out = std::string(op_name) + " service call timed out";
69+
return nullptr;
70+
}
71+
return future.get();
72+
}
73+
74+
} // namespace
75+
2976
Ros2FaultServiceTransport::Ros2FaultServiceTransport(rclcpp::Node * node) : node_(node) {
3077
// Pick up configurable timeout. GatewayNode declares this parameter up front,
3178
// but unit tests may construct the transport with a plain rclcpp::Node.
@@ -93,7 +140,6 @@ bool Ros2FaultServiceTransport::is_available() const {
93140
FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_code, uint8_t severity,
94141
const std::string & description, const std::string & source_id) {
95142
FaultResult result;
96-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
97143

98144
auto request = std::make_shared<ros2_medkit_msgs::srv::ReportFault::Request>();
99145
request->fault_code = fault_code;
@@ -102,21 +148,12 @@ FaultResult Ros2FaultServiceTransport::report_fault(const std::string & fault_co
102148
request->description = description;
103149
request->source_id = source_id;
104150

105-
ros2_medkit_msgs::srv::ReportFault::Response::SharedPtr response;
106-
{
107-
std::lock_guard<std::mutex> lock(executor_mutex_);
108-
if (!report_fault_client_->wait_for_service(timeout)) {
109-
result.success = false;
110-
result.error_message = "ReportFault service not available";
111-
return result;
112-
}
113-
auto future = report_fault_client_->async_send_request(request);
114-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
115-
result.success = false;
116-
result.error_message = "ReportFault service call timed out";
117-
return result;
118-
}
119-
response = future.get();
151+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::ReportFault>(
152+
report_fault_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
153+
"ReportFault", result.error_message);
154+
if (!response) {
155+
result.success = false;
156+
return result;
120157
}
121158

122159
result.success = response->accepted;
@@ -132,7 +169,6 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id
132169
bool include_confirmed, bool include_cleared, bool include_healed,
133170
bool include_muted, bool include_clusters) {
134171
FaultResult result;
135-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
136172

137173
auto request = std::make_shared<ros2_medkit_msgs::srv::ListFaults::Request>();
138174
request->filter_by_severity = false;
@@ -155,21 +191,12 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id
155191
request->include_muted = include_muted;
156192
request->include_clusters = include_clusters;
157193

158-
ros2_medkit_msgs::srv::ListFaults::Response::SharedPtr response;
159-
{
160-
std::lock_guard<std::mutex> lock(executor_mutex_);
161-
if (!list_faults_client_->wait_for_service(timeout)) {
162-
result.success = false;
163-
result.error_message = "ListFaults service not available";
164-
return result;
165-
}
166-
auto future = list_faults_client_->async_send_request(request);
167-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
168-
result.success = false;
169-
result.error_message = "ListFaults service call timed out";
170-
return result;
171-
}
172-
response = future.get();
194+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::ListFaults>(
195+
list_faults_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
196+
"ListFaults", result.error_message);
197+
if (!response) {
198+
result.success = false;
199+
return result;
173200
}
174201

175202
// Filter by source_id if provided (uses prefix matching)
@@ -234,26 +261,16 @@ FaultResult Ros2FaultServiceTransport::list_faults(const std::string & source_id
234261
FaultWithEnvJsonResult Ros2FaultServiceTransport::get_fault_with_env(const std::string & fault_code,
235262
const std::string & source_id) {
236263
FaultWithEnvJsonResult result;
237-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
238264

239265
auto request = std::make_shared<ros2_medkit_msgs::srv::GetFault::Request>();
240266
request->fault_code = fault_code;
241267

242-
ros2_medkit_msgs::srv::GetFault::Response::SharedPtr response;
243-
{
244-
std::lock_guard<std::mutex> lock(executor_mutex_);
245-
if (!get_fault_client_->wait_for_service(timeout)) {
246-
result.success = false;
247-
result.error_message = "GetFault service not available";
248-
return result;
249-
}
250-
auto future = get_fault_client_->async_send_request(request);
251-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
252-
result.success = false;
253-
result.error_message = "GetFault service call timed out";
254-
return result;
255-
}
256-
response = future.get();
268+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::GetFault>(
269+
get_fault_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
270+
"GetFault", result.error_message);
271+
if (!response) {
272+
result.success = false;
273+
return result;
257274
}
258275

259276
if (!response->success) {
@@ -301,27 +318,17 @@ FaultResult Ros2FaultServiceTransport::get_fault(const std::string & fault_code,
301318

302319
FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_code, bool skip_correlation_auto_clear) {
303320
FaultResult result;
304-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
305321

306322
auto request = std::make_shared<ros2_medkit_msgs::srv::ClearFault::Request>();
307323
request->fault_code = fault_code;
308324
request->skip_correlation_auto_clear = skip_correlation_auto_clear;
309325

310-
ros2_medkit_msgs::srv::ClearFault::Response::SharedPtr response;
311-
{
312-
std::lock_guard<std::mutex> lock(executor_mutex_);
313-
if (!clear_fault_client_->wait_for_service(timeout)) {
314-
result.success = false;
315-
result.error_message = "ClearFault service not available";
316-
return result;
317-
}
318-
auto future = clear_fault_client_->async_send_request(request);
319-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
320-
result.success = false;
321-
result.error_message = "ClearFault service call timed out";
322-
return result;
323-
}
324-
response = future.get();
326+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::ClearFault>(
327+
clear_fault_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
328+
"ClearFault", result.error_message);
329+
if (!response) {
330+
result.success = false;
331+
return result;
325332
}
326333

327334
result.success = response->success;
@@ -339,27 +346,17 @@ FaultResult Ros2FaultServiceTransport::clear_fault(const std::string & fault_cod
339346

340347
FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_code, const std::string & topic) {
341348
FaultResult result;
342-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
343349

344350
auto request = std::make_shared<ros2_medkit_msgs::srv::GetSnapshots::Request>();
345351
request->fault_code = fault_code;
346352
request->topic = topic;
347353

348-
ros2_medkit_msgs::srv::GetSnapshots::Response::SharedPtr response;
349-
{
350-
std::lock_guard<std::mutex> lock(executor_mutex_);
351-
if (!get_snapshots_client_->wait_for_service(timeout)) {
352-
result.success = false;
353-
result.error_message = "GetSnapshots service not available";
354-
return result;
355-
}
356-
auto future = get_snapshots_client_->async_send_request(request);
357-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
358-
result.success = false;
359-
result.error_message = "GetSnapshots service call timed out";
360-
return result;
361-
}
362-
response = future.get();
354+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::GetSnapshots>(
355+
get_snapshots_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
356+
"GetSnapshots", result.error_message);
357+
if (!response) {
358+
result.success = false;
359+
return result;
363360
}
364361

365362
result.success = response->success;
@@ -383,26 +380,16 @@ FaultResult Ros2FaultServiceTransport::get_snapshots(const std::string & fault_c
383380

384381
FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code) {
385382
FaultResult result;
386-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
387383

388384
auto request = std::make_shared<ros2_medkit_msgs::srv::GetRosbag::Request>();
389385
request->fault_code = fault_code;
390386

391-
ros2_medkit_msgs::srv::GetRosbag::Response::SharedPtr response;
392-
{
393-
std::lock_guard<std::mutex> lock(executor_mutex_);
394-
if (!get_rosbag_client_->wait_for_service(timeout)) {
395-
result.success = false;
396-
result.error_message = "GetRosbag service not available";
397-
return result;
398-
}
399-
auto future = get_rosbag_client_->async_send_request(request);
400-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
401-
result.success = false;
402-
result.error_message = "GetRosbag service call timed out";
403-
return result;
404-
}
405-
response = future.get();
387+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::GetRosbag>(
388+
get_rosbag_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
389+
"GetRosbag", result.error_message);
390+
if (!response) {
391+
result.success = false;
392+
return result;
406393
}
407394

408395
result.success = response->success;
@@ -421,26 +408,16 @@ FaultResult Ros2FaultServiceTransport::get_rosbag(const std::string & fault_code
421408

422409
FaultResult Ros2FaultServiceTransport::list_rosbags(const std::string & entity_fqn) {
423410
FaultResult result;
424-
const auto timeout = std::chrono::duration<double>(service_timeout_sec_);
425411

426412
auto request = std::make_shared<ros2_medkit_msgs::srv::ListRosbags::Request>();
427413
request->entity_fqn = entity_fqn;
428414

429-
ros2_medkit_msgs::srv::ListRosbags::Response::SharedPtr response;
430-
{
431-
std::lock_guard<std::mutex> lock(executor_mutex_);
432-
if (!list_rosbags_client_->wait_for_service(timeout)) {
433-
result.success = false;
434-
result.error_message = "ListRosbags service not available";
435-
return result;
436-
}
437-
auto future = list_rosbags_client_->async_send_request(request);
438-
if (executor_->spin_until_future_complete(future, timeout) != rclcpp::FutureReturnCode::SUCCESS) {
439-
result.success = false;
440-
result.error_message = "ListRosbags service call timed out";
441-
return result;
442-
}
443-
response = future.get();
415+
auto response = invoke_fault_service<ros2_medkit_msgs::srv::ListRosbags>(
416+
list_rosbags_client_, request, executor_, executor_mutex_, std::chrono::duration<double>(service_timeout_sec_),
417+
"ListRosbags", result.error_message);
418+
if (!response) {
419+
result.success = false;
420+
return result;
444421
}
445422

446423
result.success = response->success;

0 commit comments

Comments
 (0)