Skip to content

Commit f19a5a1

Browse files
Add tests for --content-filter option on echo, hz, and bw
Integration tests: - test_echo_content_filter: matching and non-matching DDS content filter expressions - test_echo_content_filter_once: --content-filter combined with --once - test_echo_content_filter_combined_with_filter: DDS content filter combined with Python --filter (dual filtering) - test_hz_content_filter / test_bw_content_filter: happy path - test_hz_content_filter_no_match / test_bw_content_filter_no_match: non-matching filter produces no rate/bandwidth output Contract tests (test_content_filter_contract.py): - Argument parsing for --content-filter and --content-filter-params across echo, hz, and bw verbs - ContentFilterOptions construction from parsed arguments - None propagation when --content-filter is not provided Signed-off-by: Pavel Guzenfeld <67074795+PavelGuzenfeld@users.noreply.github.com>
1 parent 0128201 commit f19a5a1

3 files changed

Lines changed: 461 additions & 0 deletions

File tree

ros2topic/test/test_bw_delay_hz.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
from rclpy.qos import ReliabilityPolicy
4444
from rclpy.utilities import get_rmw_implementation_identifier
4545

46+
from std_msgs.msg import String
47+
4648

4749
# Skip cli tests on Windows while they exhibit pathological behavior
4850
# https://github.com/ros2/build_farmer/issues/248
@@ -504,3 +506,197 @@ def test_bw_both_all_and_topics_error(self, launch_service, proc_info, proc_outp
504506
assert 'Cannot specify both --all/-a and topic names' in command.output, (
505507
'bw command did not print expected error message'
506508
)
509+
510+
@launch_testing.markers.retry_on_failure(times=5)
511+
def test_hz_content_filter(self, launch_service, proc_info, proc_output):
512+
topic = '/clitest/topic/hz_content_filter'
513+
publisher = self.node.create_publisher(String, topic, 10)
514+
assert publisher
515+
516+
def publish_message():
517+
publisher.publish(String(data='hello'))
518+
519+
publish_timer = self.node.create_timer(0.5, publish_message)
520+
521+
# Wait for the publisher to be discovered
522+
publisher_count = 0
523+
timeout_count = 0
524+
while publisher_count == 0 and timeout_count < 10:
525+
self.executor.spin_once(timeout_sec=0.1)
526+
publisher_count = self.node.count_publishers(topic)
527+
timeout_count += 1
528+
assert publisher_count > 0, 'Publisher was not discovered'
529+
530+
try:
531+
command_action = ExecuteProcess(
532+
cmd=['ros2', 'topic', 'hz',
533+
'--content-filter', "data = 'hello'",
534+
topic],
535+
additional_env={
536+
'PYTHONUNBUFFERED': '1'
537+
},
538+
output='screen'
539+
)
540+
with launch_testing.tools.launch_process(
541+
launch_service, command_action, proc_info, proc_output,
542+
output_filter=launch_testing_ros.tools.basic_output_filter(
543+
filtered_rmw_implementation=get_rmw_implementation_identifier()
544+
)
545+
) as command:
546+
# The future won't complete - we will hit the timeout
547+
self.executor.spin_until_future_complete(
548+
rclpy.task.Future(), timeout_sec=5
549+
)
550+
command.wait_for_shutdown(timeout=10)
551+
assert command.output, 'hz with content filter printed no output'
552+
assert re.search(
553+
r'^average rate: [0-9\.]+$', command.output, flags=re.MULTILINE
554+
), 'hz with content filter did not print expected rate'
555+
finally:
556+
self.node.destroy_timer(publish_timer)
557+
self.node.destroy_publisher(publisher)
558+
559+
@launch_testing.markers.retry_on_failure(times=5)
560+
def test_bw_content_filter(self, launch_service, proc_info, proc_output):
561+
topic = '/clitest/topic/bw_content_filter'
562+
publisher = self.node.create_publisher(String, topic, 10)
563+
assert publisher
564+
565+
def publish_message():
566+
publisher.publish(String(data='hello'))
567+
568+
publish_timer = self.node.create_timer(0.5, publish_message)
569+
570+
# Wait for the publisher to be discovered
571+
publisher_count = 0
572+
timeout_count = 0
573+
while publisher_count == 0 and timeout_count < 10:
574+
self.executor.spin_once(timeout_sec=0.1)
575+
publisher_count = self.node.count_publishers(topic)
576+
timeout_count += 1
577+
assert publisher_count > 0, 'Publisher was not discovered'
578+
579+
try:
580+
command_action = ExecuteProcess(
581+
cmd=['ros2', 'topic', 'bw',
582+
'--content-filter', "data = 'hello'",
583+
topic],
584+
additional_env={
585+
'PYTHONUNBUFFERED': '1'
586+
},
587+
output='screen'
588+
)
589+
with launch_testing.tools.launch_process(
590+
launch_service, command_action, proc_info, proc_output,
591+
output_filter=launch_testing_ros.tools.basic_output_filter(
592+
filtered_rmw_implementation=get_rmw_implementation_identifier()
593+
)
594+
) as command:
595+
# The future won't complete - we will hit the timeout
596+
self.executor.spin_until_future_complete(
597+
rclpy.task.Future(), timeout_sec=5
598+
)
599+
command.wait_for_shutdown(timeout=10)
600+
assert command.output, 'bw with content filter printed no output'
601+
assert re.search(
602+
r'^[0-9]+ B/s from [0-9]+ messages$', command.output, flags=re.MULTILINE
603+
), 'bw with content filter did not print expected bandwidth'
604+
finally:
605+
self.node.destroy_timer(publish_timer)
606+
self.node.destroy_publisher(publisher)
607+
608+
@launch_testing.markers.retry_on_failure(times=5)
609+
def test_hz_content_filter_no_match(self, launch_service, proc_info, proc_output):
610+
topic = '/clitest/topic/hz_cfilter_nomatch'
611+
publisher = self.node.create_publisher(String, topic, 10)
612+
assert publisher
613+
614+
def publish_message():
615+
publisher.publish(String(data='hello'))
616+
617+
publish_timer = self.node.create_timer(0.5, publish_message)
618+
619+
# Wait for the publisher to be discovered
620+
publisher_count = 0
621+
timeout_count = 0
622+
while publisher_count == 0 and timeout_count < 10:
623+
self.executor.spin_once(timeout_sec=0.1)
624+
publisher_count = self.node.count_publishers(topic)
625+
timeout_count += 1
626+
assert publisher_count > 0, 'Publisher was not discovered'
627+
628+
try:
629+
command_action = ExecuteProcess(
630+
cmd=['ros2', 'topic', 'hz',
631+
'--content-filter', "data = 'NOMATCH'",
632+
topic],
633+
additional_env={
634+
'PYTHONUNBUFFERED': '1'
635+
},
636+
output='screen'
637+
)
638+
with launch_testing.tools.launch_process(
639+
launch_service, command_action, proc_info, proc_output,
640+
output_filter=launch_testing_ros.tools.basic_output_filter(
641+
filtered_rmw_implementation=get_rmw_implementation_identifier()
642+
)
643+
) as command:
644+
self.executor.spin_until_future_complete(
645+
rclpy.task.Future(), timeout_sec=5
646+
)
647+
command.wait_for_shutdown(timeout=10)
648+
# No messages should match, so no rate should be reported
649+
assert not re.search(
650+
r'^average rate:', command.output, flags=re.MULTILINE
651+
), 'hz should not report rate when content filter rejects all messages'
652+
finally:
653+
self.node.destroy_timer(publish_timer)
654+
self.node.destroy_publisher(publisher)
655+
656+
@launch_testing.markers.retry_on_failure(times=5)
657+
def test_bw_content_filter_no_match(self, launch_service, proc_info, proc_output):
658+
topic = '/clitest/topic/bw_cfilter_nomatch'
659+
publisher = self.node.create_publisher(String, topic, 10)
660+
assert publisher
661+
662+
def publish_message():
663+
publisher.publish(String(data='hello'))
664+
665+
publish_timer = self.node.create_timer(0.5, publish_message)
666+
667+
# Wait for the publisher to be discovered
668+
publisher_count = 0
669+
timeout_count = 0
670+
while publisher_count == 0 and timeout_count < 10:
671+
self.executor.spin_once(timeout_sec=0.1)
672+
publisher_count = self.node.count_publishers(topic)
673+
timeout_count += 1
674+
assert publisher_count > 0, 'Publisher was not discovered'
675+
676+
try:
677+
command_action = ExecuteProcess(
678+
cmd=['ros2', 'topic', 'bw',
679+
'--content-filter', "data = 'NOMATCH'",
680+
topic],
681+
additional_env={
682+
'PYTHONUNBUFFERED': '1'
683+
},
684+
output='screen'
685+
)
686+
with launch_testing.tools.launch_process(
687+
launch_service, command_action, proc_info, proc_output,
688+
output_filter=launch_testing_ros.tools.basic_output_filter(
689+
filtered_rmw_implementation=get_rmw_implementation_identifier()
690+
)
691+
) as command:
692+
self.executor.spin_until_future_complete(
693+
rclpy.task.Future(), timeout_sec=5
694+
)
695+
command.wait_for_shutdown(timeout=10)
696+
# No messages should match, so no bandwidth should be reported
697+
assert not re.search(
698+
r'^[0-9]+ B/s', command.output, flags=re.MULTILINE
699+
), 'bw should not report bandwidth when content filter rejects all messages'
700+
finally:
701+
self.node.destroy_timer(publish_timer)
702+
self.node.destroy_publisher(publisher)
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright 2026 Open Source Robotics Foundation, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Contract tests for --content-filter argument handling.
16+
17+
These are lightweight unit tests that verify the interface contract between
18+
CLI argument parsing and ContentFilterOptions construction, without requiring
19+
a running ROS system.
20+
"""
21+
22+
import argparse
23+
import unittest
24+
from unittest.mock import MagicMock
25+
from unittest.mock import patch
26+
27+
from rclpy.subscription_content_filter_options import ContentFilterOptions
28+
29+
30+
class TestContentFilterArgParsing(unittest.TestCase):
31+
"""Verify that each verb correctly parses --content-filter args."""
32+
33+
def _parse_echo_args(self, args_list):
34+
from ros2topic.verb.echo import EchoVerb
35+
verb = EchoVerb()
36+
parser = argparse.ArgumentParser()
37+
verb.add_arguments(parser, 'ros2 topic')
38+
return parser.parse_args(args_list)
39+
40+
def _parse_hz_args(self, args_list):
41+
from ros2topic.verb.hz import HzVerb
42+
verb = HzVerb()
43+
parser = argparse.ArgumentParser()
44+
verb.add_arguments(parser, 'ros2 topic')
45+
return parser.parse_args(args_list)
46+
47+
def _parse_bw_args(self, args_list):
48+
from ros2topic.verb.bw import BwVerb
49+
verb = BwVerb()
50+
parser = argparse.ArgumentParser()
51+
verb.add_arguments(parser, 'ros2 topic')
52+
return parser.parse_args(args_list)
53+
54+
def test_echo_content_filter_arg_present(self):
55+
args = self._parse_echo_args([
56+
'/topic', '--content-filter', "data = 'hello'"])
57+
assert args.content_filter_expr == "data = 'hello'"
58+
assert args.content_filter_params == []
59+
60+
def test_echo_content_filter_arg_absent(self):
61+
args = self._parse_echo_args(['/topic'])
62+
assert args.content_filter_expr is None
63+
assert args.content_filter_params == []
64+
65+
def test_echo_content_filter_with_params(self):
66+
args = self._parse_echo_args([
67+
'/topic', '--content-filter', 'data = %0',
68+
'--content-filter-params', 'hello'])
69+
assert args.content_filter_expr == 'data = %0'
70+
assert args.content_filter_params == ['hello']
71+
72+
def test_echo_content_filter_with_multiple_params(self):
73+
args = self._parse_echo_args([
74+
'/topic', '--content-filter', 'data BETWEEN %0 AND %1',
75+
'--content-filter-params', '10', '20'])
76+
assert args.content_filter_expr == 'data BETWEEN %0 AND %1'
77+
assert args.content_filter_params == ['10', '20']
78+
79+
def test_hz_content_filter_arg_present(self):
80+
args = self._parse_hz_args([
81+
'/topic', '--content-filter', "data = 'hello'"])
82+
assert args.content_filter_expr == "data = 'hello'"
83+
assert args.content_filter_params == []
84+
85+
def test_hz_content_filter_arg_absent(self):
86+
args = self._parse_hz_args(['/topic'])
87+
assert args.content_filter_expr is None
88+
assert args.content_filter_params == []
89+
90+
def test_bw_content_filter_arg_present(self):
91+
args = self._parse_bw_args([
92+
'/topic', '--content-filter', "data = 'hello'"])
93+
assert args.content_filter_expr == "data = 'hello'"
94+
assert args.content_filter_params == []
95+
96+
def test_bw_content_filter_arg_absent(self):
97+
args = self._parse_bw_args(['/topic'])
98+
assert args.content_filter_expr is None
99+
assert args.content_filter_params == []
100+
101+
102+
class TestContentFilterOptionsConstruction(unittest.TestCase):
103+
"""Verify ContentFilterOptions is constructed correctly from parsed args."""
104+
105+
def test_options_with_expression_only(self):
106+
opts = ContentFilterOptions(
107+
filter_expression="data = 'hello'",
108+
expression_parameters=[])
109+
assert opts.filter_expression == "data = 'hello'"
110+
assert opts.expression_parameters == []
111+
112+
def test_options_with_expression_and_params(self):
113+
opts = ContentFilterOptions(
114+
filter_expression='data = %0',
115+
expression_parameters=['hello'])
116+
assert opts.filter_expression == 'data = %0'
117+
assert opts.expression_parameters == ['hello']
118+
119+
def test_options_none_when_no_filter(self):
120+
"""Contract: when content_filter_expr is None, no options are created."""
121+
content_filter_expr = None
122+
content_filter_options = None
123+
if content_filter_expr:
124+
content_filter_options = ContentFilterOptions(
125+
filter_expression=content_filter_expr,
126+
expression_parameters=[])
127+
assert content_filter_options is None

0 commit comments

Comments
 (0)