Skip to content

Commit 7df6693

Browse files
committed
Fix topic statistics for IPC subscriptions
If use_intra_process_comms is enabled, messages are delivered via SubscriptionIntraProcess::execute_impl() which previously never called the topic statistics handler, this causes all statistics to report NaN values. The fix here puts a type-erased StatsHandlerFn in SubscriptionIntraProcess and wires it up from the Subscription via a lambda. Using std::function avoids a circular include chain that would occur if subscription_topic_statistics.hpp were included directly from subscription_intra_process.hpp via publisher.hpp/callback_group.hpp. The source_timestamp is set to the receive time so that message_age reports 0ms rather than an un-initialised value. IPC delivery has little/no expected transport latency, by definition so near-zero age is expected. Fixes #2911 Signed-off-by: jayyoung <jay.young@gmail.com>
1 parent 8369e04 commit 7df6693

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <rmw/types.h>
1919

20+
#include <functional>
2021
#include <memory>
2122
#include <stdexcept>
2223
#include <string>
@@ -29,7 +30,9 @@
2930
#include "rclcpp/context.hpp"
3031
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
3132
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
33+
#include "rclcpp/logging.hpp"
3234
#include "rclcpp/qos.hpp"
35+
#include "rclcpp/time.hpp"
3336
#include "rclcpp/type_support_decl.hpp"
3437
#include "tracetools/tracetools.h"
3538

@@ -70,22 +73,25 @@ class SubscriptionIntraProcess
7073
using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr;
7174
using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
7275
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
76+
using StatsHandlerFn = std::function<void(const rmw_message_info_t &, const rclcpp::Time &)>;
7377

7478
SubscriptionIntraProcess(
7579
AnySubscriptionCallback<MessageT, Alloc> callback,
7680
std::shared_ptr<Alloc> allocator,
7781
rclcpp::Context::SharedPtr context,
7882
const std::string & topic_name,
7983
const rclcpp::QoS & qos_profile,
80-
rclcpp::IntraProcessBufferType buffer_type)
84+
rclcpp::IntraProcessBufferType buffer_type,
85+
StatsHandlerFn stats_handler = nullptr)
8186
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
8287
SubscribedTypeDeleter, ROSMessageType>(
8388
std::make_shared<SubscribedTypeAlloc>(*allocator),
8489
context,
8590
topic_name,
8691
qos_profile,
8792
buffer_type),
88-
any_callback_(callback)
93+
any_callback_(callback),
94+
stats_handler_(std::move(stats_handler))
8995
{
9096
TRACETOOLS_TRACEPOINT(
9197
rclcpp_subscription_callback_added,
@@ -197,6 +203,19 @@ class SubscriptionIntraProcess
197203
msg_info.publisher_gid = {0, {0}};
198204
msg_info.from_intra_process = true;
199205

206+
std::chrono::time_point<std::chrono::system_clock> now;
207+
if (stats_handler_) {
208+
RCLCPP_WARN_ONCE(
209+
rclcpp::get_logger("rclcpp"),
210+
"Intra-process communication does not support accurate message age statistics");
211+
now = std::chrono::system_clock::now();
212+
// Set source_timestamp to "now" so that message_age reports 0ms rather than
213+
// an invalid value taken from an un-initialised timestamp. IPC delivery
214+
// has little/no transport latency by definition, so near-zero age is expected.
215+
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
216+
msg_info.source_timestamp = nanos.time_since_epoch().count();
217+
}
218+
200219
auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
201220
data);
202221

@@ -208,9 +227,15 @@ class SubscriptionIntraProcess
208227
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
209228
}
210229
shared_ptr.reset();
230+
231+
if (stats_handler_) {
232+
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
233+
stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count()));
234+
}
211235
}
212236

213237
AnySubscriptionCallback<MessageT, Alloc> any_callback_;
238+
StatsHandlerFn stats_handler_;
214239
};
215240

216241
} // namespace experimental

rclcpp/include/rclcpp/subscription.hpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ class Subscription : public SubscriptionBase
164164
ROSMessageT,
165165
AllocatorT>;
166166

167+
// Build a type-erased stats handler to avoid a circular include chain
168+
// via publisher.hpp and callback_group.hpp
169+
typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr;
170+
if (subscription_topic_statistics) {
171+
stats_handler =
172+
[subscription_topic_statistics](
173+
const rmw_message_info_t & info, const rclcpp::Time & time)
174+
{
175+
subscription_topic_statistics->handle_message(info, time);
176+
};
177+
}
178+
167179
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
168180
auto context = node_base->get_context();
169181
subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
@@ -172,7 +184,8 @@ class Subscription : public SubscriptionBase
172184
context,
173185
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
174186
qos_profile,
175-
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
187+
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback),
188+
std::move(stats_handler));
176189
TRACETOOLS_TRACEPOINT(
177190
rclcpp_subscription_init,
178191
static_cast<const void *>(get_subscription_handle().get()),

0 commit comments

Comments
 (0)