Skip to content

Commit 72ae9b8

Browse files
committed
fix(tests,ci): stabilize shutdown-race flakes across distros
Integration-test CI has been flaky on Humble and Jazzy with a rotating victim (demo_calibration_service, demo_brake_actuator, demo_brake_pressure_sensor, demo_engine_temp_sensor, demo_lidar_sensor, fault_manager_node) dying during shutdown and failing test_exit_codes. Prior commits in this area (3adf71b, 1dc0a7e, 2f23c09 and others) added defensive cleanup to individual demo nodes; the crashes kept moving which was a sign the root cause was not in the nodes. This change addresses the six distinct bugs that were actually responsible. 1. SIGINT-during-rclcpp-init race in all demo nodes ----------------------------------------------------- If SIGINT fires between rclcpp::init() and the executor's guard condition being allocated, rclcpp's default signal handler invalidates the shared context mid-call; in-flight rcl_service_init / rcl_guard_condition_init throws RCLError, the exception escapes the bare demo main() and std::terminate aborts the process with SIGABRT. Fix: block SIGINT/SIGTERM with pthread_sigmask, let rclcpp::init() install its handler, construct the node, add it to an explicit SingleThreadedExecutor (so the guard condition is allocated while signals are still blocked), then restore the mask. A signal queued in that window is delivered to a fully-assembled executor and shuts the node down normally. Applied uniformly to all 10 demo nodes. The pre-existing std::set_terminate in long_calibration_action.cpp documents a separate rclcpp_action mid-goal-shutdown race and is kept. Local repro on Humble: 3 crashes / 180 iters unpatched vs 0 / 180 patched, in the 50-200 ms post-launch SIGINT window. 2. FastRTPS discovery-teardown use-after-free ----------------------------------------------- Humble ships FastRTPS 2.6 which has a use-after-free in its endpoint-discovery-protocol proxy tables on peer disappearance. When the gateway shuts down cleanly, its "writer leaving" messages are processed by peer nodes' FastRTPS listener thread which segfaults in EDP::unpairWriterProxy / PDP::removeWriterProxyData. Backtrace captured under stress via LD_PRELOAD=libSegFault.so confirms the crash is in libfastrtps.so, not in our code. Fix: set RMW_IMPLEMENTATION=rmw_cyclonedds_cpp in the CI test run environment (both build-and-test matrix covering humble/rolling and the jazzy-test job). CycloneDDS has a more robust discovery teardown path. Local repro: 1 crash / 6 iters with FastRTPS, 0 / 30 with CycloneDDS. Also install ros-${distro}-rmw-cyclonedds-cpp in the deps step. 3. Unhandled context-invalid throws in fault_manager ------------------------------------------------------ RosbagCapture::get_topic_type, RosbagCapture::resolve_topics, and SnapshotCapture::get_topic_type invoke node_->get_topic_names_and_types which throws std::runtime_error when the rcl context is invalidated mid-call. In RosbagCapture the call is reachable from a discovery retry timer that fires every 500 ms, and the race window widens under sanitizer load to the point where the TSan CI job hits it reliably (SIGABRT / exit -6). The gateway's cache refresh already catches and logs this gracefully. Apply the same approach at the three fault_manager call sites: wrap the rcl call in try/catch and treat invalid-context as "no topic info available right now" - callers already handle the empty return (skip subscribe, skip snapshot, retry next tick). 4. TSan-flagged race on RosbagCapture::post_fault_timer_ ---------------------------------------------------------- TSan flags a data race on the post_fault_timer_ member (rclcpp::TimerBase::SharedPtr): WRITE on_fault_confirmed() at rosbag_capture.cpp:208 (service callback thread) READ post_fault_timer_callback() at rosbag_capture.cpp:656/658 (main executor thread) Both paths touch the SharedPtr without synchronisation. Even outside TSan this is a real use-after-free waiting for the right scheduling. Fix: introduce std::mutex post_fault_timer_mutex_ and guard every access (on_fault_confirmed, post_fault_timer_callback, stop). Scope kept minimal so the executor is not blocked while the service callback creates the timer. 5. REST server start/stop race in GatewayNode ----------------------------------------------- GatewayNode::start_rest_server() signalled "server running" on the worker thread before calling rest_server_->start() (which is where cpp-httplib's listen() actually binds and enters the accept loop). For short-lived test fixtures that construct and immediately destroy a GatewayNode (e.g. the per-test SetUp/TearDown in test_plugin_notify_integration), the main thread can race ahead and call stop_rest_server() before the worker has reached listen(). In that order the stop() request is dropped, listen() subsequently blocks forever, and the destructor's condition-variable wait hangs until the 60s test timeout fires. Fix: drop the server_running_/server_cv_/server_mutex_ machinery and poll rest_server_->is_running() (which reflects cpp-httplib's actual accept-loop state). start_rest_server now returns only once the server is observably ready; stop_rest_server stops and joins directly. Added RESTServer::is_running() as a passthrough to HttpServerManager. 6. test_cross_ecu_fanout poll accepts pre-discovery responses --------------------------------------------------------------- test_data_include_peer_topics used `lambda d: d if d.get('items')` as its readiness predicate. The gateway's /data response contains a /rosout item as soon as LogManager subscribes, well before demo-node discovery completes; the poll therefore returned immediately with only {topic: '/rosout'} and the subsequent assertTrue(has_local) failed. Tighten the predicate to require both a /powertrain/engine/ item and a /chassis/brakes/ or /perception/lidar/ item before returning. Verification ------------ Local stress (Humble container under 16-CPU stress-ng pressure, LD_PRELOAD=libSegFault.so, RMW_IMPLEMENTATION=rmw_cyclonedds_cpp): - 200 iters of test_entity_listing: 0 fails - 30 iters of test_triggers_updates: 0 fails - fault_manager unit + integration tests: pass - test_plugin_notify_integration (Jazzy): 5/5 subtests pass - test_cross_ecu_fanout (Jazzy): pass
1 parent ccbddd7 commit 72ae9b8

19 files changed

Lines changed: 332 additions & 79 deletions

File tree

.github/workflows/ci.yml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ jobs:
5454
run: |
5555
apt-get update
5656
apt-get install -y ros-${{ matrix.ros_distro }}-test-msgs
57+
if [ "${{ matrix.ros_distro }}" = "humble" ]; then
58+
apt-get install -y ros-humble-rmw-cyclonedds-cpp
59+
fi
5760
source /opt/ros/${{ matrix.ros_distro }}/setup.bash
5861
rosdep update
5962
rosdep install --from-paths src --ignore-src -y
@@ -73,6 +76,15 @@ jobs:
7376
7477
- name: Run unit and integration tests
7578
timeout-minutes: 15
79+
env:
80+
# FastRTPS 2.6 on Humble has a known use-after-free in the
81+
# discovery-teardown path (EDP::unpairWriterProxy) that segfaults
82+
# peer nodes when the gateway shuts down, so we force CycloneDDS
83+
# there. Rolling + Jazzy ship a newer FastRTPS without that bug
84+
# and hit a separate iceoryx-shared-memory crash under CycloneDDS
85+
# during shutdown ("string capacity was zero for allocated data"),
86+
# so on those distros we keep the default FastRTPS.
87+
RMW_IMPLEMENTATION: ${{ matrix.ros_distro == 'humble' && 'rmw_cyclonedds_cpp' || '' }}
7688
run: |
7789
source /opt/ros/${{ matrix.ros_distro }}/setup.bash
7890
colcon test --return-code-on-test-failure \
@@ -185,7 +197,9 @@ jobs:
185197
- name: Install dependencies
186198
run: |
187199
apt-get update
188-
apt-get install -y ros-jazzy-test-msgs
200+
apt-get install -y \
201+
ros-jazzy-test-msgs \
202+
ros-jazzy-rmw-cyclonedds-cpp
189203
source /opt/ros/jazzy/setup.bash
190204
rosdep update
191205
rosdep install --from-paths src --ignore-src -y
@@ -199,6 +213,11 @@ jobs:
199213
run: tar xf jazzy-build.tar && rm jazzy-build.tar
200214

201215
- name: Run unit and integration tests
216+
env:
217+
# FastRTPS has a known use-after-free in discovery-teardown that
218+
# can segfault peer nodes when another node (typically the gateway)
219+
# shuts down. CycloneDDS avoids it.
220+
RMW_IMPLEMENTATION: rmw_cyclonedds_cpp
202221
run: |
203222
source /opt/ros/jazzy/setup.bash
204223
colcon test --return-code-on-test-failure \

src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/rosbag_capture.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ class RosbagCapture {
165165
/// Post-fault recording state
166166
std::string current_fault_code_;
167167
std::string current_bag_path_;
168+
/// Protects post_fault_timer_ against concurrent assignment in
169+
/// on_fault_confirmed() (service thread) and reset in
170+
/// post_fault_timer_callback() / stop() (executor thread).
171+
std::mutex post_fault_timer_mutex_;
168172
rclcpp::TimerBase::SharedPtr post_fault_timer_;
169173
std::atomic<bool> recording_post_fault_{false};
170174

src/ros2_medkit_fault_manager/src/rosbag_capture.cpp

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,12 @@ void RosbagCapture::stop() {
135135
running_.store(false);
136136

137137
// Cancel any pending post-fault timer
138-
if (post_fault_timer_) {
139-
post_fault_timer_->cancel();
140-
post_fault_timer_.reset();
138+
{
139+
std::lock_guard<std::mutex> lock(post_fault_timer_mutex_);
140+
if (post_fault_timer_) {
141+
post_fault_timer_->cancel();
142+
post_fault_timer_.reset();
143+
}
141144
}
142145

143146
if (discovery_retry_timer_) {
@@ -202,10 +205,13 @@ void RosbagCapture::on_fault_confirmed(const std::string & fault_code) {
202205

203206
// Create timer for post-fault recording
204207
auto duration = std::chrono::duration<double>(config_.duration_after_sec);
205-
post_fault_timer_ =
206-
node_->create_wall_timer(std::chrono::duration_cast<std::chrono::nanoseconds>(duration), [this]() {
207-
post_fault_timer_callback();
208-
});
208+
{
209+
std::lock_guard<std::mutex> lock(post_fault_timer_mutex_);
210+
post_fault_timer_ =
211+
node_->create_wall_timer(std::chrono::duration_cast<std::chrono::nanoseconds>(duration), [this]() {
212+
post_fault_timer_callback();
213+
});
214+
}
209215

210216
RCLCPP_DEBUG(node_->get_logger(), "Recording %.1fs more after fault confirmation", config_.duration_after_sec);
211217
} else {
@@ -436,14 +442,19 @@ std::vector<std::string> RosbagCapture::resolve_topics() const {
436442
}
437443
}
438444
} else if (config_.topics == "all" || config_.topics == "auto") {
439-
// Discover all available topics ("auto" is an alias for "all")
440-
auto topic_names_and_types = node_->get_topic_names_and_types();
441-
for (const auto & [topic, types] : topic_names_and_types) {
442-
// Skip internal ROS topics
443-
if (topic.find("/parameter_events") != std::string::npos || topic.find("/rosout") != std::string::npos) {
444-
continue;
445+
// Discover all available topics ("auto" is an alias for "all").
446+
// Skip gracefully if the context is invalidated mid-call (shutdown race).
447+
try {
448+
auto topic_names_and_types = node_->get_topic_names_and_types();
449+
for (const auto & [topic, types] : topic_names_and_types) {
450+
// Skip internal ROS topics
451+
if (topic.find("/parameter_events") != std::string::npos || topic.find("/rosout") != std::string::npos) {
452+
continue;
453+
}
454+
topics_set.insert(topic);
445455
}
446-
topics_set.insert(topic);
456+
} catch (const std::runtime_error &) {
457+
// context invalid during shutdown - no topics to add
447458
}
448459
} else if (config_.topics == "explicit") {
449460
// Explicit mode: use only include_topics, no topic derivation
@@ -476,10 +487,18 @@ std::vector<std::string> RosbagCapture::resolve_topics() const {
476487
}
477488

478489
std::string RosbagCapture::get_topic_type(const std::string & topic) const {
479-
auto topic_names_and_types = node_->get_topic_names_and_types();
480-
auto it = topic_names_and_types.find(topic);
481-
if (it != topic_names_and_types.end() && !it->second.empty()) {
482-
return it->second[0];
490+
// node_->get_topic_names_and_types() throws if the rcl context is
491+
// invalidated mid-call (e.g. SIGINT fires between the check and the rcl
492+
// call). During shutdown this is expected and not actionable - treat it
493+
// as "no topic info available right now".
494+
try {
495+
auto topic_names_and_types = node_->get_topic_names_and_types();
496+
auto it = topic_names_and_types.find(topic);
497+
if (it != topic_names_and_types.end() && !it->second.empty()) {
498+
return it->second[0];
499+
}
500+
} catch (const std::runtime_error &) {
501+
// context invalid - caller handles empty return
483502
}
484503
return "";
485504
}
@@ -640,9 +659,12 @@ void RosbagCapture::post_fault_timer_callback() {
640659
}
641660

642661
// Cancel timer (one-shot)
643-
if (post_fault_timer_) {
644-
post_fault_timer_->cancel();
645-
post_fault_timer_.reset();
662+
{
663+
std::lock_guard<std::mutex> lock(post_fault_timer_mutex_);
664+
if (post_fault_timer_) {
665+
post_fault_timer_->cancel();
666+
post_fault_timer_.reset();
667+
}
646668
}
647669

648670
// Stop post-fault recording (no more direct writes to bag)

src/ros2_medkit_fault_manager/src/snapshot_capture.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,18 @@ std::vector<std::string> SnapshotCapture::collect_all_configured_topics() const
348348
}
349349

350350
std::string SnapshotCapture::get_topic_type(const std::string & topic) const {
351-
auto topic_names_and_types = node_->get_topic_names_and_types();
352-
auto it = topic_names_and_types.find(topic);
353-
if (it != topic_names_and_types.end() && !it->second.empty()) {
354-
return it->second[0];
351+
// node_->get_topic_names_and_types() throws if the rcl context is
352+
// invalidated mid-call (e.g. SIGINT fires between the check and the rcl
353+
// call). During shutdown this is expected and not actionable - treat it
354+
// as "no topic info available right now".
355+
try {
356+
auto topic_names_and_types = node_->get_topic_names_and_types();
357+
auto it = topic_names_and_types.find(topic);
358+
if (it != topic_names_and_types.end() && !it->second.empty()) {
359+
return it->second[0];
360+
}
361+
} catch (const std::runtime_error &) {
362+
// context invalid - caller handles empty return
355363
}
356364
return "";
357365
}

src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@
1414

1515
#pragma once
1616

17-
#include <atomic>
18-
#include <condition_variable>
1917
#include <memory>
20-
#include <mutex>
2118
#include <rclcpp/rclcpp.hpp>
2219
#include <string>
2320
#include <thread>
@@ -296,9 +293,6 @@ class GatewayNode : public rclcpp::Node {
296293

297294
// REST server thread management
298295
std::unique_ptr<std::thread> server_thread_;
299-
std::atomic<bool> server_running_{false};
300-
std::mutex server_mutex_;
301-
std::condition_variable server_cv_;
302296
};
303297

304298
/**

src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/rest_server.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ class RESTServer {
8181
return http_server_ && http_server_->is_tls_enabled();
8282
}
8383

84+
/// True once cpp-httplib's listen() has reached its accept loop.
85+
/// GatewayNode uses this as the start-up readiness signal so shutdown
86+
/// cannot race a listen() that has not yet started.
87+
bool is_running() const {
88+
return http_server_ && http_server_->is_running();
89+
}
90+
8491
private:
8592
void setup_routes();
8693
void setup_pre_routing_handler();

src/ros2_medkit_gateway/src/gateway_node.cpp

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,46 +1827,36 @@ void GatewayNode::refresh_cache() {
18271827

18281828
void GatewayNode::start_rest_server() {
18291829
server_thread_ = std::make_unique<std::thread>([this]() {
1830-
{
1831-
std::lock_guard<std::mutex> lock(server_mutex_);
1832-
server_running_ = true;
1833-
}
1834-
server_cv_.notify_all();
1835-
18361830
try {
18371831
rest_server_->start();
18381832
} catch (const std::exception & e) {
18391833
RCLCPP_ERROR(get_logger(), "REST server failed to start: %s", e.what());
18401834
} catch (...) {
18411835
RCLCPP_ERROR(get_logger(), "REST server failed to start: unknown exception");
18421836
}
1843-
1844-
{
1845-
std::lock_guard<std::mutex> lock(server_mutex_);
1846-
server_running_ = false;
1847-
}
1848-
server_cv_.notify_all();
18491837
});
18501838

1851-
// Wait for server to start
1852-
std::unique_lock<std::mutex> lock(server_mutex_);
1853-
server_cv_.wait(lock, [this] {
1854-
return server_running_.load();
1855-
});
1839+
// Wait for the server to actually reach cpp-httplib's accept loop before
1840+
// returning. is_running() becomes true only after listen() has passed
1841+
// bind and entered its select/poll loop; using a "thread started" flag
1842+
// here is not enough because stop() called before listen() entered its
1843+
// loop could be missed, leaving listen() blocking forever and the
1844+
// subsequent join() hanging indefinitely.
1845+
using namespace std::chrono_literals;
1846+
const auto deadline = std::chrono::steady_clock::now() + 5s;
1847+
while (!rest_server_->is_running() && std::chrono::steady_clock::now() < deadline) {
1848+
std::this_thread::sleep_for(1ms);
1849+
}
1850+
if (!rest_server_->is_running()) {
1851+
RCLCPP_ERROR(get_logger(), "REST server did not become ready within 5s");
1852+
}
18561853
}
18571854

18581855
void GatewayNode::stop_rest_server() {
18591856
if (rest_server_) {
18601857
rest_server_->stop();
18611858
}
1862-
1863-
// Wait for server thread to finish
18641859
if (server_thread_ && server_thread_->joinable()) {
1865-
std::unique_lock<std::mutex> lock(server_mutex_);
1866-
server_cv_.wait(lock, [this] {
1867-
return !server_running_.load();
1868-
});
1869-
lock.unlock(); // Release before join to avoid deadlock
18701860
server_thread_->join();
18711861
}
18721862
}

src/ros2_medkit_integration_tests/demo_nodes/beacon_publisher.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <csignal>
1516
#include <diagnostic_msgs/msg/key_value.hpp>
1617
#include <mutex>
1718
#include <rclcpp/rclcpp.hpp>
@@ -84,9 +85,25 @@ class BeaconPublisher : public rclcpp::Node {
8485
};
8586

8687
int main(int argc, char * argv[]) {
88+
// Block SIGINT/SIGTERM until the executor has allocated its guard
89+
// condition; a signal arriving mid-init invalidates the rcl context and
90+
// causes rcl_* calls to throw RCLError. Unblocking after add_node() lets
91+
// any queued signal be handled as a normal shutdown.
92+
sigset_t mask, old;
93+
sigemptyset(&mask);
94+
sigaddset(&mask, SIGINT);
95+
sigaddset(&mask, SIGTERM);
96+
pthread_sigmask(SIG_BLOCK, &mask, &old);
97+
8798
rclcpp::init(argc, argv);
8899
auto node = std::make_shared<BeaconPublisher>();
89-
rclcpp::spin(node);
100+
rclcpp::executors::SingleThreadedExecutor executor;
101+
executor.add_node(node);
102+
103+
pthread_sigmask(SIG_SETMASK, &old, nullptr);
104+
105+
executor.spin();
106+
executor.remove_node(node);
90107
node.reset();
91108
rclcpp::shutdown();
92109
return 0;

src/ros2_medkit_integration_tests/demo_nodes/brake_actuator.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
*/
2424

2525
#include <chrono>
26+
#include <csignal>
2627
#include <mutex>
2728
#include <rclcpp/rclcpp.hpp>
2829
#include <std_msgs/msg/float32.hpp>
@@ -103,9 +104,25 @@ class BrakeActuator : public rclcpp::Node {
103104
};
104105

105106
int main(int argc, char * argv[]) {
107+
// Block SIGINT/SIGTERM until the executor has allocated its guard
108+
// condition; a signal arriving mid-init invalidates the rcl context and
109+
// causes rcl_* calls to throw RCLError. Unblocking after add_node() lets
110+
// any queued signal be handled as a normal shutdown.
111+
sigset_t mask, old;
112+
sigemptyset(&mask);
113+
sigaddset(&mask, SIGINT);
114+
sigaddset(&mask, SIGTERM);
115+
pthread_sigmask(SIG_BLOCK, &mask, &old);
116+
106117
rclcpp::init(argc, argv);
107118
auto node = std::make_shared<BrakeActuator>();
108-
rclcpp::spin(node);
119+
rclcpp::executors::SingleThreadedExecutor executor;
120+
executor.add_node(node);
121+
122+
pthread_sigmask(SIG_SETMASK, &old, nullptr);
123+
124+
executor.spin();
125+
executor.remove_node(node);
109126
node.reset();
110127
rclcpp::shutdown();
111128
return 0;

src/ros2_medkit_integration_tests/demo_nodes/brake_pressure_sensor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <csignal>
1516
#include <mutex>
1617
#include <rclcpp/rclcpp.hpp>
1718
#include <std_msgs/msg/float32.hpp>
@@ -60,9 +61,25 @@ class BrakePressureSensor : public rclcpp::Node {
6061
};
6162

6263
int main(int argc, char ** argv) {
64+
// Block SIGINT/SIGTERM until the executor has allocated its guard
65+
// condition; a signal arriving mid-init invalidates the rcl context and
66+
// causes rcl_* calls to throw RCLError. Unblocking after add_node() lets
67+
// any queued signal be handled as a normal shutdown.
68+
sigset_t mask, old;
69+
sigemptyset(&mask);
70+
sigaddset(&mask, SIGINT);
71+
sigaddset(&mask, SIGTERM);
72+
pthread_sigmask(SIG_BLOCK, &mask, &old);
73+
6374
rclcpp::init(argc, argv);
6475
auto node = std::make_shared<BrakePressureSensor>();
65-
rclcpp::spin(node);
76+
rclcpp::executors::SingleThreadedExecutor executor;
77+
executor.add_node(node);
78+
79+
pthread_sigmask(SIG_SETMASK, &old, nullptr);
80+
81+
executor.spin();
82+
executor.remove_node(node);
6683
node.reset();
6784
rclcpp::shutdown();
6885
return 0;

0 commit comments

Comments
 (0)