From 15caf827e1848ff3619068f36cec50f95193a25a Mon Sep 17 00:00:00 2001 From: jayyoung Date: Wed, 15 Apr 2026 11:36:24 +0000 Subject: [PATCH 1/4] Fix topic statistics for IPC subscriptions If use_intra_process_comms is enabled, messages are delivered via SubscriptionIntraProcess::execute_impl() which previously never called the topic statistics handler, this causes all statistics to report NaN values. The fix here puts a type-erased StatsHandlerFn in SubscriptionIntraProcess and wires it up from the Subscription via a lambda. Using std::function avoids a circular include chain that would occur if subscription_topic_statistics.hpp were included directly from subscription_intra_process.hpp via publisher.hpp/callback_group.hpp. The source_timestamp is set to the receive time so that message_age reports 0ms rather than an un-initialised value. IPC delivery has little/no expected transport latency, by definition so near-zero age is expected. Fixes #2911 Signed-off-by: jayyoung --- .../subscription_intra_process.hpp | 29 +++++++++++++++++-- rclcpp/include/rclcpp/subscription.hpp | 15 +++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 97840cb4a5..26d9d42065 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -29,7 +30,9 @@ #include "rclcpp/context.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/time.hpp" #include "rclcpp/type_support_decl.hpp" #include "tracetools/tracetools.h" @@ -70,6 +73,7 @@ class SubscriptionIntraProcess using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; + using StatsHandlerFn = std::function; SubscriptionIntraProcess( AnySubscriptionCallback callback, @@ -77,7 +81,8 @@ class SubscriptionIntraProcess rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + StatsHandlerFn stats_handler = nullptr) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -85,7 +90,8 @@ class SubscriptionIntraProcess topic_name, qos_profile, buffer_type), - any_callback_(callback) + any_callback_(callback), + stats_handler_(std::move(stats_handler)) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, @@ -197,6 +203,19 @@ class SubscriptionIntraProcess msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; + std::chrono::time_point now; + if (stats_handler_) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger("rclcpp"), + "Intra-process communication does not support accurate message age statistics"); + now = std::chrono::system_clock::now(); + // Set source_timestamp to "now" so that message_age reports 0ms rather than + // an invalid value taken from an un-initialised timestamp. IPC delivery + // has little/no transport latency by definition, so near-zero age is expected. + const auto nanos = std::chrono::time_point_cast(now); + msg_info.source_timestamp = nanos.time_since_epoch().count(); + } + auto shared_ptr = std::static_pointer_cast>( data); @@ -208,9 +227,15 @@ class SubscriptionIntraProcess any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); } shared_ptr.reset(); + + if (stats_handler_) { + const auto nanos = std::chrono::time_point_cast(now); + stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count())); + } } AnySubscriptionCallback any_callback_; + StatsHandlerFn stats_handler_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 9a0bf00cf9..01549ce3f3 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -164,6 +164,18 @@ class Subscription : public SubscriptionBase ROSMessageT, AllocatorT>; + // Build a type-erased stats handler to avoid a circular include chain + // via publisher.hpp and callback_group.hpp + typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr; + if (subscription_topic_statistics) { + stats_handler = + [subscription_topic_statistics]( + const rmw_message_info_t & info, const rclcpp::Time & time) + { + subscription_topic_statistics->handle_message(info, time); + }; + } + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( @@ -172,7 +184,8 @@ class Subscription : public SubscriptionBase context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, - resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); + resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), + std::move(stats_handler)); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), From f12988aadad889f20738410f461b0456f11b3a19 Mon Sep 17 00:00:00 2001 From: jayyoung Date: Thu, 16 Apr 2026 11:51:32 +0000 Subject: [PATCH 2/4] Address comment: declare nanos once outside stats_handler_ block and drop now variable. Signed-off-by: jayyoung --- .../rclcpp/experimental/subscription_intra_process.hpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 26d9d42065..229d0d7fb5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -203,16 +203,15 @@ class SubscriptionIntraProcess msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; - std::chrono::time_point now; + const auto nanos = std::chrono::time_point_cast( + std::chrono::system_clock::now()); if (stats_handler_) { RCLCPP_WARN_ONCE( rclcpp::get_logger("rclcpp"), "Intra-process communication does not support accurate message age statistics"); - now = std::chrono::system_clock::now(); // Set source_timestamp to "now" so that message_age reports 0ms rather than // an invalid value taken from an un-initialised timestamp. IPC delivery // has little/no transport latency by definition, so near-zero age is expected. - const auto nanos = std::chrono::time_point_cast(now); msg_info.source_timestamp = nanos.time_since_epoch().count(); } @@ -229,7 +228,6 @@ class SubscriptionIntraProcess shared_ptr.reset(); if (stats_handler_) { - const auto nanos = std::chrono::time_point_cast(now); stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count())); } } From e87ed41ce7296c8d7f4ed98adc50cf90d162121b Mon Sep 17 00:00:00 2001 From: jayyoung Date: Thu, 16 Apr 2026 11:52:49 +0000 Subject: [PATCH 3/4] Address comment: add missing include for chrono Signed-off-by: jayyoung --- .../include/rclcpp/experimental/subscription_intra_process.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 229d0d7fb5..6fbbf5503f 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -17,6 +17,7 @@ #include +#include #include #include #include From cdf6632da4c66a34916b0b2a8b8464756d95c161 Mon Sep 17 00:00:00 2001 From: jayyoung Date: Thu, 16 Apr 2026 14:15:45 +0000 Subject: [PATCH 4/4] Adds test_stats_with_intra_process_comms to verify that statistics are collected when use_intra_process_comms=true. Signed-off-by: jayyoung --- .../test_subscription_topic_statistics.cpp | 67 +++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp index 0137caf036..439fc685e3 100644 --- a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp @@ -101,8 +101,9 @@ class PublisherNode : public rclcpp::Node public: PublisherNode( const std::string & name, const std::string & topic, - const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) - : Node(name) + const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}, + bool use_intra_process_comms = false) + : Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process_comms)) { publisher_ = create_publisher(topic, 10); publish_timer_ = this->create_wall_timer( @@ -181,8 +182,9 @@ class SubscriberWithTopicStatistics : public rclcpp::Node public: SubscriberWithTopicStatistics( const std::string & name, const std::string & topic, - std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod) - : Node(name) + std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod, + bool use_intra_process_comms = false) + : Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process_comms)) { // Manually enable topic statistics via options auto options = rclcpp::SubscriptionOptions(); @@ -195,7 +197,7 @@ class SubscriberWithTopicStatistics : public rclcpp::Node subscription_ = create_subscription>( topic, - rclcpp::QoS(rclcpp::KeepAll()), + use_intra_process_comms ? rclcpp::QoS(10) : rclcpp::QoS(rclcpp::KeepAll()), callback, options); } @@ -421,3 +423,58 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window } } } + +/** + * Test topic statistics are collected when use_intra_process_comms is enabled. + * This validates a fix for ros2/rclcpp#2911 where IPC subscriptions never called the + * stat handler causing all statistics to report NaN values. + * Also verifies message_age is non-NaN, validating that source_timestamp is set correctly. + */ +TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_with_intra_process_comms) +{ + auto empty_publisher = std::make_shared>( + kTestPubNodeName, + kTestSubStatsEmptyTopic, + std::chrono::milliseconds{100}, + true); + + auto statistics_listener = std::make_shared( + "test_ipc_stats_listener", + "/statistics", + kNumExpectedMessages); + + auto empty_subscriber = std::make_shared>( + kTestSubNodeName, + kTestSubStatsEmptyTopic, + defaultStatisticsPublishPeriod, + true); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(empty_publisher); + ex.add_node(statistics_listener); + ex.add_node(empty_subscriber); + + ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout); + + const auto received_messages = statistics_listener->GetReceivedMessages(); + EXPECT_EQ(kNumExpectedMessages, received_messages.size()); + + uint64_t message_age_count{0}; + uint64_t message_period_count{0}; + + for (const auto & msg : received_messages) { + if (msg.metrics_source == kMessageAgeSourceLabel) { + message_age_count++; + // Verify message_age stats are non-NaN to validates source_timestamp fix + for (const auto & stats_point : msg.statistics) { + EXPECT_FALSE(std::isnan(stats_point.data)); + } + } + if (msg.metrics_source == kMessagePeriodSourceLabel) { + message_period_count++; + } + } + + EXPECT_EQ(kNumExpectedMessageAgeMessages, message_age_count); + EXPECT_EQ(kNumExpectedMessagePeriodMessages, message_period_count); +}