Skip to content

Commit 02caec1

Browse files
ParameterEventHandler support ContentFiltering (#2971)
* ParameterEventHandler support ContentFiltering Signed-off-by: Barry Xu <barry.xu@sony.com> * Address review comments Signed-off-by: Barry Xu <barry.xu@sony.com> --------- Signed-off-by: Barry Xu <barry.xu@sony.com>
1 parent 87a4cef commit 02caec1

3 files changed

Lines changed: 260 additions & 0 deletions

File tree

rclcpp/include/rclcpp/parameter_event_handler.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ class ParameterEventHandler
234234
/**
235235
* If a node_name is not provided, defaults to the current node.
236236
*
237+
* The configure_nodes_filter() function will affect the behavior of this function.
238+
* If the node specified in this function isn't included in the nodes specified in
239+
* configure_nodes_filter(), the callback will never be called.
240+
*
237241
* Note: if the returned callback handle smart pointer is not captured, the callback
238242
* is immediately unregistered. A compiler warning should be generated to warn
239243
* of this.
@@ -251,6 +255,31 @@ class ParameterEventHandler
251255
ParameterCallbackType callback,
252256
const std::string & node_name = "");
253257

258+
/// Configure which node parameter events will be received.
259+
/**
260+
* This function depends on rmw implementation support for content filtering.
261+
* If middleware doesn't support contentfilter, return false.
262+
*
263+
* If node_names is empty, the configured node filter will be cleared.
264+
*
265+
* If this function return true, only parameter events from the specified node will be received.
266+
* It affects the behavior of the following two functions.
267+
* - add_parameter_event_callback()
268+
* The callback will only be called for parameter events from the specified nodes which are
269+
* configured in this function.
270+
* - add_parameter_callback()
271+
* The callback will only be called for parameter events from the specified nodes which are
272+
* configured in this function and add_parameter_callback().
273+
* If the nodes specified in this function is different from the nodes specified in
274+
* add_parameter_callback(), the callback will never be called.
275+
*
276+
* \param[in] node_names Node names to filter parameter events from.
277+
* \returns true if configuring was successfully applied, false otherwise.
278+
* \throws rclcpp::exceptions::RCLError if internal error occurred when calling the rcl function.
279+
*/
280+
RCLCPP_PUBLIC
281+
bool configure_nodes_filter(const std::vector<std::string> & node_names);
282+
254283
/// Remove a parameter callback registered with add_parameter_callback.
255284
/**
256285
* The parameter name and node name are inspected from the callback handle. The callback handle

rclcpp/src/rclcpp/parameter_event_handler.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,38 @@ ParameterEventHandler::add_parameter_callback(
7474
return handle;
7575
}
7676

77+
bool
78+
ParameterEventHandler::configure_nodes_filter(const std::vector<std::string> & node_names)
79+
{
80+
if (node_names.empty()) {
81+
// Clear content filter
82+
event_subscription_->set_content_filter("");
83+
if (event_subscription_->is_cft_enabled()) {
84+
return false;
85+
}
86+
return true;
87+
}
88+
89+
std::string filter_expression;
90+
size_t total = node_names.size();
91+
for (size_t i = 0; i < total; ++i) {
92+
filter_expression += "node = %" + std::to_string(i);
93+
if (i < total - 1) {
94+
filter_expression += " OR ";
95+
}
96+
}
97+
98+
// Enclose each node name in "'".
99+
std::vector<std::string> quoted_node_names;
100+
for (const auto & name : node_names) {
101+
quoted_node_names.push_back("'" + resolve_path(name) + "'");
102+
}
103+
104+
event_subscription_->set_content_filter(filter_expression, quoted_node_names);
105+
106+
return event_subscription_->is_cft_enabled();
107+
}
108+
77109
void
78110
ParameterEventHandler::remove_parameter_callback(
79111
ParameterCallbackHandle::SharedPtr callback_handle)

rclcpp/test/rclcpp/test_parameter_event_handler.cpp

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <atomic>
1516
#include <chrono>
1617
#include <memory>
1718
#include <string>
@@ -20,6 +21,8 @@
2021
#include "gtest/gtest.h"
2122
#include "rclcpp/rclcpp.hpp"
2223

24+
using namespace std::chrono_literals;
25+
2326
class TestParameterEventHandler : public rclcpp::ParameterEventHandler
2427
{
2528
public:
@@ -443,3 +446,199 @@ TEST_F(TestNode, LastInFirstCallForParameterEventCallbacks)
443446
param_handler->remove_parameter_event_callback(h2);
444447
EXPECT_EQ(param_handler->num_event_callbacks(), 0UL);
445448
}
449+
450+
TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterEventCallback)
451+
{
452+
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
453+
if (rmw_implementation_str != "rmw_fastrtps_cpp" && rmw_implementation_str != "rmw_connextdds") {
454+
GTEST_SKIP() << "Content filter is now only supported in FastDDS and ConnextDDS.";
455+
}
456+
457+
const std::string remote_node_name1 = "remote_node1";
458+
const std::string remote_node_name2 = "remote_node2";
459+
rclcpp::NodeOptions options;
460+
options.enable_rosout(false);
461+
auto remote_node1 = std::make_shared<rclcpp::Node>(
462+
remote_node_name1, options);
463+
auto remote_node2 = std::make_shared<rclcpp::Node>(
464+
remote_node_name2, options);
465+
466+
const std::string remote_node1_param_name = "param_node1";
467+
const std::string remote_node2_param_name = "param_node2";
468+
remote_node1->declare_parameter(remote_node1_param_name, 10);
469+
remote_node2->declare_parameter(remote_node2_param_name, "Default");
470+
471+
std::atomic_bool received_from_remote_node1{false};
472+
std::atomic_bool received_from_remote_node2{false};
473+
474+
auto cb =
475+
[&remote_node_name1, &remote_node_name2, &received_from_remote_node1,
476+
&received_from_remote_node2]
477+
(const rcl_interfaces::msg::ParameterEvent & parm) {
478+
if (parm.node == "/" + remote_node_name1) {
479+
received_from_remote_node1 = true;
480+
} else if (parm.node == "/" + remote_node_name2) {
481+
received_from_remote_node2 = true;
482+
}
483+
};
484+
485+
// Configure to only receive parameter events from remote_node_name2
486+
ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2}));
487+
488+
auto handler = param_handler->add_parameter_event_callback(cb);
489+
490+
rclcpp::executors::SingleThreadedExecutor executor;
491+
executor.add_node(node);
492+
executor.add_node(remote_node1);
493+
executor.add_node(remote_node2);
494+
495+
auto wait_param_event = [&executor]
496+
(std::chrono::seconds timeout, std::function<bool()> condition = nullptr) {
497+
auto start = std::chrono::steady_clock::now();
498+
while (std::chrono::steady_clock::now() - start < timeout) {
499+
executor.spin_some();
500+
if (condition && condition()) {
501+
break;
502+
}
503+
std::this_thread::sleep_for(100ms);
504+
}
505+
};
506+
507+
{
508+
std::thread thread(wait_param_event, 1s, nullptr);
509+
std::this_thread::sleep_for(100ms);
510+
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20));
511+
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc"));
512+
thread.join();
513+
}
514+
515+
EXPECT_EQ(received_from_remote_node1, false);
516+
EXPECT_EQ(received_from_remote_node2, true);
517+
518+
// Clear node filter and all parameter events from remote nodes should be received
519+
ASSERT_TRUE(param_handler->configure_nodes_filter({}));
520+
521+
received_from_remote_node1 = false;
522+
received_from_remote_node2 = false;
523+
524+
{
525+
std::thread thread(
526+
wait_param_event,
527+
2s,
528+
[&received_from_remote_node1, &received_from_remote_node2]() {
529+
if (received_from_remote_node1 && received_from_remote_node2) {
530+
return true;
531+
}
532+
return false;
533+
});
534+
std::this_thread::sleep_for(100ms);
535+
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 30));
536+
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "def"));
537+
thread.join();
538+
}
539+
540+
EXPECT_EQ(received_from_remote_node1, true);
541+
EXPECT_EQ(received_from_remote_node2, true);
542+
}
543+
544+
TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterCallback)
545+
{
546+
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
547+
if (rmw_implementation_str != "rmw_fastrtps_cpp" && rmw_implementation_str != "rmw_connextdds") {
548+
GTEST_SKIP() << "Content filter is now only supported in FastDDS and ConnextDDS.";
549+
}
550+
551+
const std::string remote_node_name1 = "remote_node1";
552+
const std::string remote_node_name2 = "remote_node2";
553+
rclcpp::NodeOptions options;
554+
options.enable_rosout(false);
555+
auto remote_node1 = std::make_shared<rclcpp::Node>(
556+
remote_node_name1, options);
557+
auto remote_node2 = std::make_shared<rclcpp::Node>(
558+
remote_node_name2, options);
559+
560+
const std::string remote_node1_param_name = "param_node1";
561+
const std::string remote_node2_param_name = "param_node2";
562+
remote_node1->declare_parameter(remote_node1_param_name, 10);
563+
remote_node2->declare_parameter(remote_node2_param_name, "Default");
564+
565+
std::atomic_bool received_from_remote_node1{false};
566+
std::atomic_bool received_from_remote_node2{false};
567+
568+
auto cb_remote_node1 =
569+
[&remote_node1_param_name, &received_from_remote_node1]
570+
(const rclcpp::Parameter & parm) {
571+
if (parm.get_name() == remote_node1_param_name) {
572+
received_from_remote_node1 = true;
573+
}
574+
};
575+
576+
auto cb_remote_node2 =
577+
[&remote_node2_param_name, &received_from_remote_node2]
578+
(const rclcpp::Parameter & parm) {
579+
if (parm.get_name() == remote_node2_param_name) {
580+
received_from_remote_node2 = true;
581+
}
582+
};
583+
584+
// Configure to only receive parameter events from remote_node_name2
585+
ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2}));
586+
587+
auto handler1 = param_handler->add_parameter_callback(
588+
remote_node1_param_name, cb_remote_node1, remote_node_name1);
589+
auto handler2 = param_handler->add_parameter_callback(
590+
remote_node2_param_name, cb_remote_node2, remote_node_name2);
591+
592+
rclcpp::executors::SingleThreadedExecutor executor;
593+
executor.add_node(node);
594+
executor.add_node(remote_node1);
595+
executor.add_node(remote_node2);
596+
597+
auto wait_param_event = [&executor]
598+
(std::chrono::seconds timeout, std::function<bool()> condition = nullptr) {
599+
auto start = std::chrono::steady_clock::now();
600+
while (std::chrono::steady_clock::now() - start < timeout) {
601+
executor.spin_some();
602+
if (condition && condition()) {
603+
break;
604+
}
605+
std::this_thread::sleep_for(100ms);
606+
}
607+
};
608+
609+
{
610+
std::thread thread(wait_param_event, 1s, nullptr);
611+
std::this_thread::sleep_for(100ms);
612+
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20));
613+
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc"));
614+
thread.join();
615+
}
616+
617+
EXPECT_EQ(received_from_remote_node1, false);
618+
EXPECT_EQ(received_from_remote_node2, true);
619+
620+
// Clear node filter and all parameter events from remote nodes should be received
621+
ASSERT_TRUE(param_handler->configure_nodes_filter({}));
622+
623+
received_from_remote_node1 = false;
624+
received_from_remote_node2 = false;
625+
626+
{
627+
std::thread thread(
628+
wait_param_event,
629+
2s,
630+
[&received_from_remote_node1, &received_from_remote_node2]() {
631+
if (received_from_remote_node1 && received_from_remote_node2) {
632+
return true;
633+
}
634+
return false;
635+
});
636+
std::this_thread::sleep_for(100ms);
637+
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 30));
638+
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "def"));
639+
thread.join();
640+
}
641+
642+
EXPECT_EQ(received_from_remote_node1, true);
643+
EXPECT_EQ(received_from_remote_node2, true);
644+
}

0 commit comments

Comments
 (0)