diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 97840cb4a5..6fbbf5503f 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -17,6 +17,8 @@ #include +#include +#include #include #include #include @@ -29,7 +31,9 @@ #include "rclcpp/context.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/time.hpp" #include "rclcpp/type_support_decl.hpp" #include "tracetools/tracetools.h" @@ -70,6 +74,7 @@ class SubscriptionIntraProcess using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; + using StatsHandlerFn = std::function; SubscriptionIntraProcess( AnySubscriptionCallback callback, @@ -77,7 +82,8 @@ class SubscriptionIntraProcess rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + StatsHandlerFn stats_handler = nullptr) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -85,7 +91,8 @@ class SubscriptionIntraProcess topic_name, qos_profile, buffer_type), - any_callback_(callback) + any_callback_(callback), + stats_handler_(std::move(stats_handler)) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, @@ -197,6 +204,18 @@ class SubscriptionIntraProcess msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; + const auto nanos = std::chrono::time_point_cast( + std::chrono::system_clock::now()); + if (stats_handler_) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger("rclcpp"), + "Intra-process communication does not support accurate message age statistics"); + // Set source_timestamp to "now" so that message_age reports 0ms rather than + // an invalid value taken from an un-initialised timestamp. IPC delivery + // has little/no transport latency by definition, so near-zero age is expected. + msg_info.source_timestamp = nanos.time_since_epoch().count(); + } + auto shared_ptr = std::static_pointer_cast>( data); @@ -208,9 +227,14 @@ class SubscriptionIntraProcess any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); } shared_ptr.reset(); + + if (stats_handler_) { + stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count())); + } } AnySubscriptionCallback any_callback_; + StatsHandlerFn stats_handler_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 9a0bf00cf9..01549ce3f3 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -164,6 +164,18 @@ class Subscription : public SubscriptionBase ROSMessageT, AllocatorT>; + // Build a type-erased stats handler to avoid a circular include chain + // via publisher.hpp and callback_group.hpp + typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr; + if (subscription_topic_statistics) { + stats_handler = + [subscription_topic_statistics]( + const rmw_message_info_t & info, const rclcpp::Time & time) + { + subscription_topic_statistics->handle_message(info, time); + }; + } + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( @@ -172,7 +184,8 @@ class Subscription : public SubscriptionBase context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, - resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); + resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), + std::move(stats_handler)); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), diff --git a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp index 0137caf036..439fc685e3 100644 --- a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp @@ -101,8 +101,9 @@ class PublisherNode : public rclcpp::Node public: PublisherNode( const std::string & name, const std::string & topic, - const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) - : Node(name) + const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}, + bool use_intra_process_comms = false) + : Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process_comms)) { publisher_ = create_publisher(topic, 10); publish_timer_ = this->create_wall_timer( @@ -181,8 +182,9 @@ class SubscriberWithTopicStatistics : public rclcpp::Node public: SubscriberWithTopicStatistics( const std::string & name, const std::string & topic, - std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod) - : Node(name) + std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod, + bool use_intra_process_comms = false) + : Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process_comms)) { // Manually enable topic statistics via options auto options = rclcpp::SubscriptionOptions(); @@ -195,7 +197,7 @@ class SubscriberWithTopicStatistics : public rclcpp::Node subscription_ = create_subscription>( topic, - rclcpp::QoS(rclcpp::KeepAll()), + use_intra_process_comms ? rclcpp::QoS(10) : rclcpp::QoS(rclcpp::KeepAll()), callback, options); } @@ -421,3 +423,58 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window } } } + +/** + * Test topic statistics are collected when use_intra_process_comms is enabled. + * This validates a fix for ros2/rclcpp#2911 where IPC subscriptions never called the + * stat handler causing all statistics to report NaN values. + * Also verifies message_age is non-NaN, validating that source_timestamp is set correctly. + */ +TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_with_intra_process_comms) +{ + auto empty_publisher = std::make_shared>( + kTestPubNodeName, + kTestSubStatsEmptyTopic, + std::chrono::milliseconds{100}, + true); + + auto statistics_listener = std::make_shared( + "test_ipc_stats_listener", + "/statistics", + kNumExpectedMessages); + + auto empty_subscriber = std::make_shared>( + kTestSubNodeName, + kTestSubStatsEmptyTopic, + defaultStatisticsPublishPeriod, + true); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(empty_publisher); + ex.add_node(statistics_listener); + ex.add_node(empty_subscriber); + + ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout); + + const auto received_messages = statistics_listener->GetReceivedMessages(); + EXPECT_EQ(kNumExpectedMessages, received_messages.size()); + + uint64_t message_age_count{0}; + uint64_t message_period_count{0}; + + for (const auto & msg : received_messages) { + if (msg.metrics_source == kMessageAgeSourceLabel) { + message_age_count++; + // Verify message_age stats are non-NaN to validates source_timestamp fix + for (const auto & stats_point : msg.statistics) { + EXPECT_FALSE(std::isnan(stats_point.data)); + } + } + if (msg.metrics_source == kMessagePeriodSourceLabel) { + message_period_count++; + } + } + + EXPECT_EQ(kNumExpectedMessageAgeMessages, message_age_count); + EXPECT_EQ(kNumExpectedMessagePeriodMessages, message_period_count); +}