Skip to content

Commit 9a6125b

Browse files
Add --content-filter support to ros2 topic echo|hz|bw
Add DDS content filter expression support to `ros2 topic echo`, `ros2 topic hz`, and `ros2 topic bw` commands via new `--content-filter` and `--content-filter-params` CLI arguments. Content filters are applied at the middleware level, so only matching messages are delivered to the subscriber — reducing bandwidth and CPU usage compared to client-side Python filtering. Closes #1126 Signed-off-by: Pavel Guzenfeld <pavelguzenfeld@gmail.com>
1 parent 2d90ccd commit 9a6125b

3 files changed

Lines changed: 60 additions & 9 deletions

File tree

ros2topic/ros2topic/verb/bw.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import rclpy
3737
from rclpy.qos import qos_profile_sensor_data
38+
from rclpy.subscription_content_filter_options import ContentFilterOptions
3839
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
3940
from ros2cli.node.direct import DirectNode
4041
from ros2topic.api import get_msg_class
@@ -77,11 +78,24 @@ def add_arguments(self, parser, cli_name):
7778
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
7879
help='maximum window size, in # of messages, for calculating rate '
7980
f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW')
81+
parser.add_argument(
82+
'--content-filter', dest='content_filter_expr', default=None,
83+
help='DDS content filter expression applied at the middleware level')
84+
parser.add_argument(
85+
'--content-filter-params', dest='content_filter_params', nargs='*', default=[],
86+
help='Parameters for the content filter expression')
8087
add_direct_node_arguments(parser)
8188

8289
def main(self, *, args):
90+
content_filter_options = None
91+
if args.content_filter_expr:
92+
content_filter_options = ContentFilterOptions(
93+
filter_expression=args.content_filter_expr,
94+
expression_parameters=args.content_filter_params)
8395
with DirectNode(args) as node:
84-
return _rostopic_bw(node.node, args.topic, window_size=args.window)
96+
return _rostopic_bw(
97+
node.node, args.topic, window_size=args.window,
98+
content_filter_options=content_filter_options)
8599

86100

87101
class ROSTopicBandwidth(object):
@@ -157,7 +171,7 @@ def print_bw(self):
157171
print(f'{bw} from {n} messages\n\tMessage size mean: {mean} min: {min_s} max: {max_s}')
158172

159173

160-
def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE):
174+
def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE, content_filter_options=None):
161175
"""Periodically print the received bandwidth of a topic to console until shutdown."""
162176
# pause bw until topic is published
163177
msg_class = get_msg_class(node, topic, blocking=True, include_hidden_topics=True)
@@ -171,7 +185,8 @@ def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE):
171185
topic,
172186
rt.callback,
173187
qos_profile_sensor_data,
174-
raw=True
188+
raw=True,
189+
content_filter_options=content_filter_options
175190
)
176191

177192
print(f'Subscribed to [{topic}]')

ros2topic/ros2topic/verb/echo.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from rclpy.qos import QoSPresetProfiles
2424
from rclpy.qos import QoSProfile
2525
from rclpy.qos import QoSReliabilityPolicy
26+
from rclpy.subscription_content_filter_options import ContentFilterOptions
2627
from rclpy.task import Future
2728
from ros2cli.helpers import unsigned_int
2829
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
@@ -99,6 +100,15 @@ def add_arguments(self, parser, cli_name):
99100
'--filter', dest='filter_expr', help='Python expression to filter messages that '
100101
'are printed. Expression can use Python builtins '
101102
'as well as m (the message).')
103+
parser.add_argument(
104+
'--content-filter', dest='content_filter_expr', default=None,
105+
help='DDS content filter expression applied at the middleware level '
106+
'(e.g. "data > 100"). Only messages matching the filter are '
107+
'delivered to the subscriber. Use %%0, %%1, ... as parameter '
108+
'placeholders with --content-filter-params.')
109+
parser.add_argument(
110+
'--content-filter-params', dest='content_filter_params', nargs='*', default=[],
111+
help='Parameters for the content filter expression')
102112
parser.add_argument(
103113
'--once', action='store_true', help='Print the first message received and then exit.')
104114
parser.add_argument(
@@ -216,13 +226,20 @@ def main(self, *, args):
216226
if args.timeout is not None:
217227
self.timer = node.create_timer(args.timeout, self._timed_out)
218228

229+
content_filter_options = None
230+
if args.content_filter_expr:
231+
content_filter_options = ContentFilterOptions(
232+
filter_expression=args.content_filter_expr,
233+
expression_parameters=args.content_filter_params)
234+
219235
self.subscribe_and_spin(
220236
node,
221237
args.topic_name,
222238
message_type,
223239
qos_profile,
224240
args.no_lost_messages,
225-
args.raw)
241+
args.raw,
242+
content_filter_options=content_filter_options)
226243

227244
def subscribe_and_spin(
228245
self,
@@ -232,6 +249,7 @@ def subscribe_and_spin(
232249
qos_profile: QoSProfile,
233250
no_report_lost_messages: bool,
234251
raw: bool,
252+
content_filter_options: Optional[ContentFilterOptions] = None,
235253
) -> Optional[str]:
236254
"""Initialize a node with a single subscription and spin."""
237255
event_callbacks = None
@@ -245,7 +263,8 @@ def subscribe_and_spin(
245263
self._subscriber_callback,
246264
qos_profile,
247265
event_callbacks=event_callbacks,
248-
raw=raw)
266+
raw=raw,
267+
content_filter_options=content_filter_options)
249268
except UnsupportedEventTypeError:
250269
assert not no_report_lost_messages
251270
node.create_subscription(
@@ -254,7 +273,8 @@ def subscribe_and_spin(
254273
self._subscriber_callback,
255274
qos_profile,
256275
event_callbacks=None,
257-
raw=raw)
276+
raw=raw,
277+
content_filter_options=content_filter_options)
258278

259279
if self.future is not None:
260280
rclpy.spin_until_future_complete(node, self.future)

ros2topic/ros2topic/verb/hz.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from rclpy.clock import Clock
4141
from rclpy.clock import ClockType
4242
from rclpy.qos import qos_profile_sensor_data
43+
from rclpy.subscription_content_filter_options import ContentFilterOptions
4344
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
4445
from ros2cli.node.direct import DirectNode
4546
from ros2topic.api import get_msg_class
@@ -80,6 +81,12 @@ def add_arguments(self, parser, cli_name):
8081
dest='use_wtime', default=False, action='store_true',
8182
help='calculates rate using wall time which can be helpful'
8283
' when clock is not published during simulation')
84+
parser.add_argument(
85+
'--content-filter', dest='content_filter_expr', default=None,
86+
help='DDS content filter expression applied at the middleware level')
87+
parser.add_argument(
88+
'--content-filter-params', dest='content_filter_params', nargs='*', default=[],
89+
help='Parameters for the content filter expression')
8390
add_direct_node_arguments(parser)
8491

8592
def main(self, *, args):
@@ -97,10 +104,16 @@ def eval_fn(m):
97104
else:
98105
filter_expr = None
99106

107+
content_filter_options = None
108+
if args.content_filter_expr:
109+
content_filter_options = ContentFilterOptions(
110+
filter_expression=args.content_filter_expr,
111+
expression_parameters=args.content_filter_params)
112+
100113
with DirectNode(args) as node:
101114
return _rostopic_hz(
102115
node.node, topic, window_size=args.window_size, filter_expr=filter_expr,
103-
use_wtime=args.use_wtime)
116+
use_wtime=args.use_wtime, content_filter_options=content_filter_options)
104117

105118

106119
class ROSTopicHz(object):
@@ -244,13 +257,15 @@ def print_hz(self, topic=None):
244257
return
245258

246259

247-
def _rostopic_hz(node, topic, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
260+
def _rostopic_hz(node, topic, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None,
261+
use_wtime=False, content_filter_options=None):
248262
"""
249263
Periodically print the publishing rate of a topic to console until shutdown.
250264
251265
:param topic: topic name, ``list`` of ``str``
252266
:param window_size: number of messages to average over, -1 for infinite, ``int``
253267
:param filter_expr: Python filter expression that is called with m, the message instance
268+
:param content_filter_options: DDS content filter options, ``ContentFilterOptions``
254269
"""
255270
# pause hz until topic is published
256271
msg_class = get_msg_class(
@@ -266,7 +281,8 @@ def _rostopic_hz(node, topic, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None,
266281
topic,
267282
functools.partial(rt.callback_hz, topic=topic),
268283
qos_profile_sensor_data,
269-
raw=filter_expr is None)
284+
raw=filter_expr is None,
285+
content_filter_options=content_filter_options)
270286

271287
while rclpy.ok():
272288
rclpy.spin_once(node)

0 commit comments

Comments
 (0)