Skip to content

Commit b6730f9

Browse files
authored
improve lookup time for matches_any_publishers(). (#3068)
Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
1 parent 9f79f40 commit b6730f9

3 files changed

Lines changed: 80 additions & 10 deletions

File tree

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,40 @@ class IntraProcessManager
386386
std::vector<uint64_t> take_ownership_subscriptions;
387387
};
388388

389+
/// Hash function for rmw_gid_t to enable use in unordered_map
390+
struct rmw_gid_hash
391+
{
392+
std::size_t operator()(const rmw_gid_t & gid) const noexcept
393+
{
394+
// Using the FNV-1a hash algorithm on the gid data
395+
constexpr std::size_t FNV_prime = 1099511628211u;
396+
std::size_t result = 14695981039346656037u;
397+
398+
for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
399+
result ^= gid.data[i];
400+
result *= FNV_prime;
401+
}
402+
return result;
403+
}
404+
};
405+
406+
/// Equality comparison for rmw_gid_t to enable use in unordered_map
407+
struct rmw_gid_equal
408+
{
409+
bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept
410+
{
411+
// Compare implementation identifier first for fast rejection
412+
if (lhs.implementation_identifier != rhs.implementation_identifier) {
413+
return false;
414+
}
415+
// Compare the data bytes
416+
return std::equal(
417+
std::begin(lhs.data),
418+
std::end(lhs.data),
419+
std::begin(rhs.data));
420+
}
421+
};
422+
389423
using SubscriptionMap =
390424
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
391425

@@ -398,6 +432,16 @@ class IntraProcessManager
398432
using PublisherToSubscriptionIdsMap =
399433
std::unordered_map<uint64_t, SplittedSubscriptions>;
400434

435+
/// Structure to store publisher information in GID lookup map
436+
struct PublisherInfo
437+
{
438+
uint64_t pub_id;
439+
rclcpp::PublisherBase::WeakPtr publisher;
440+
};
441+
442+
using GidToPublisherInfoMap =
443+
std::unordered_map<rmw_gid_t, PublisherInfo, rmw_gid_hash, rmw_gid_equal>;
444+
401445
RCLCPP_PUBLIC
402446
static
403447
uint64_t
@@ -640,6 +684,7 @@ class IntraProcessManager
640684
SubscriptionMap subscriptions_;
641685
PublisherMap publishers_;
642686
PublisherBufferMap publisher_buffers_;
687+
GidToPublisherInfoMap gid_to_publisher_info_;
643688

644689
mutable std::shared_timed_mutex mutex_;
645690
};

rclcpp/src/rclcpp/intra_process_manager.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ IntraProcessManager::add_publisher(
5151
}
5252
}
5353

54+
// Add GID to publisher info mapping for fast lookups (stores both ID and weak_ptr)
55+
gid_to_publisher_info_[publisher->get_gid()] = {pub_id, publisher};
56+
5457
// Initialize the subscriptions storage for this publisher.
5558
pub_to_subs_[pub_id] = SplittedSubscriptions();
5659

@@ -98,6 +101,15 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
98101
{
99102
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
100103

104+
// Remove GID to publisher info mapping
105+
auto pub_it = publishers_.find(intra_process_publisher_id);
106+
if (pub_it != publishers_.end()) {
107+
auto publisher = pub_it->second.lock();
108+
if (publisher) {
109+
gid_to_publisher_info_.erase(publisher->get_gid());
110+
}
111+
}
112+
101113
publishers_.erase(intra_process_publisher_id);
102114
publisher_buffers_.erase(intra_process_publisher_id);
103115
pub_to_subs_.erase(intra_process_publisher_id);
@@ -108,16 +120,15 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
108120
{
109121
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
110122

111-
for (auto & publisher_pair : publishers_) {
112-
auto publisher = publisher_pair.second.lock();
113-
if (!publisher) {
114-
continue;
115-
}
116-
if (*publisher.get() == id) {
117-
return true;
118-
}
123+
// Single O(1) hash map lookup - struct contains both ID and weak_ptr
124+
auto it = gid_to_publisher_info_.find(*id);
125+
if (it == gid_to_publisher_info_.end()) {
126+
return false;
119127
}
120-
return false;
128+
129+
// Verify the publisher still exists by checking the weak_ptr
130+
auto publisher = it->second.publisher.lock();
131+
return publisher != nullptr;
121132
}
122133

123134
size_t

rclcpp/test/rclcpp/test_intra_process_manager.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,14 @@ class PublisherBase
162162
explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos)
163163
: topic_name(topic),
164164
qos_profile(qos)
165-
{}
165+
{
166+
// Initialize a mock GID with unique data based on this pointer
167+
gid_.implementation_identifier = "mock_rmw";
168+
auto ptr_value = reinterpret_cast<std::uintptr_t>(this);
169+
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
170+
gid_.data[i] = static_cast<uint8_t>((ptr_value >> (i * 8)) & 0xFF);
171+
}
172+
}
166173

167174
virtual ~PublisherBase()
168175
{}
@@ -192,6 +199,12 @@ class PublisherBase
192199
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
193200
}
194201

202+
const rmw_gid_t &
203+
get_gid() const
204+
{
205+
return gid_;
206+
}
207+
195208
bool
196209
operator==([[maybe_unused]] const rmw_gid_t & gid) const
197210
{
@@ -210,6 +223,7 @@ class PublisherBase
210223
private:
211224
std::string topic_name;
212225
rclcpp::QoS qos_profile;
226+
rmw_gid_t gid_;
213227
};
214228

215229
template<typename T, typename Alloc = std::allocator<void>>

0 commit comments

Comments
 (0)