Skip to content

Commit 237c9d9

Browse files
Add --content-filter support to ros2 topic echo|hz|bw
Add DDS content filter expression support to `ros2 topic echo`, `ros2 topic hz`, and `ros2 topic bw` commands via new `--content-filter` and `--content-filter-params` CLI arguments. Content filters are applied at the middleware level, so only matching messages are delivered to the subscriber — reducing bandwidth and CPU usage compared to client-side Python filtering. Closes #1126 Signed-off-by: Pavel Guzenfeld <me@pavelguzenfeld.com> Signed-off-by: Pavel Guzenfeld <pavelguzenfeld@gmail.com>
1 parent ef39b60 commit 237c9d9

3 files changed

Lines changed: 61 additions & 9 deletions

File tree

ros2topic/ros2topic/verb/bw.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import rclpy
4141

4242
from rclpy.executors import ExternalShutdownException
43+
from rclpy.subscription_content_filter_options import ContentFilterOptions
4344

4445
from ros2cli.helpers import interactive_select
4546
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
@@ -96,6 +97,12 @@ def add_arguments(self, parser, cli_name):
9697
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
9798
help='maximum window size, in # of messages, for calculating rate '
9899
f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW')
100+
parser.add_argument(
101+
'--content-filter', dest='content_filter_expr', default=None,
102+
help='DDS content filter expression applied at the middleware level')
103+
parser.add_argument(
104+
'--content-filter-params', dest='content_filter_params', nargs='*', default=[],
105+
help='Parameters for the content filter expression')
99106
add_direct_node_arguments(parser)
100107

101108
def main(self, *, args):
@@ -127,6 +134,12 @@ def main(args):
127134

128135
topics = args.topic_name
129136

137+
content_filter_options = None
138+
if args.content_filter_expr:
139+
content_filter_options = ContentFilterOptions(
140+
filter_expression=args.content_filter_expr,
141+
expression_parameters=args.content_filter_params)
142+
130143
with DirectNode(args) as node:
131144
# Get all available topics at this moment
132145
if args.all_topics:
@@ -139,7 +152,8 @@ def main(args):
139152
return
140153
print(f'Subscribing to all {len(topics)} available topics...')
141154
return _rostopic_bw(node.node, topics, qos_args=args, window_size=args.window_size,
142-
all_topics=args.all_topics)
155+
all_topics=args.all_topics,
156+
content_filter_options=content_filter_options)
143157

144158

145159
class ROSTopicBandwidth(object):
@@ -293,7 +307,8 @@ def _get_ascii_table(header, cols):
293307
return table
294308

295309

296-
def _rostopic_bw(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, all_topics=False):
310+
def _rostopic_bw(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, all_topics=False,
311+
content_filter_options=None):
297312
"""Periodically print the received bandwidth of topics to console until shutdown."""
298313
# pause bw until topic is published
299314
rt = ROSTopicBandwidth(node, window_size)
@@ -314,7 +329,8 @@ def _rostopic_bw(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, all_to
314329
topic,
315330
functools.partial(rt.callback, topic=topic),
316331
qos_profile,
317-
raw=True
332+
raw=True,
333+
content_filter_options=content_filter_options
318334
)
319335
print('Subscribed to [%s]' % topic)
320336

ros2topic/ros2topic/verb/echo.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from rclpy.event_handler import UnsupportedEventTypeError
2222
from rclpy.node import Node
2323
from rclpy.qos import QoSProfile
24+
from rclpy.subscription_content_filter_options import ContentFilterOptions
2425
from rclpy.task import Future
2526

2627
from ros2cli.helpers import interactive_select
@@ -107,6 +108,15 @@ def add_arguments(self, parser, cli_name):
107108
'--filter', dest='filter_expr', help='Python expression to filter messages that '
108109
'are printed. Expression can use Python builtins '
109110
'as well as m (the message).')
111+
parser.add_argument(
112+
'--content-filter', dest='content_filter_expr', default=None,
113+
help='DDS content filter expression applied at the middleware level '
114+
'(e.g. "data > 100"). Only messages matching the filter are '
115+
'delivered to the subscriber. Use %%0, %%1, ... as parameter '
116+
'placeholders with --content-filter-params.')
117+
parser.add_argument(
118+
'--content-filter-params', dest='content_filter_params', nargs='*', default=[],
119+
help='Parameters for the content filter expression')
110120
parser.add_argument(
111121
'--once', action='store_true', help='Print the first message received and then exit.')
112122
parser.add_argument(
@@ -192,13 +202,20 @@ def main(self, *, args):
192202
if args.timeout is not None:
193203
self.timer = node.create_timer(args.timeout, self._timed_out)
194204

205+
content_filter_options = None
206+
if args.content_filter_expr:
207+
content_filter_options = ContentFilterOptions(
208+
filter_expression=args.content_filter_expr,
209+
expression_parameters=args.content_filter_params)
210+
195211
self.subscribe_and_spin(
196212
node,
197213
args.topic_name,
198214
message_type,
199215
qos_profile,
200216
args.no_lost_messages,
201-
args.raw)
217+
args.raw,
218+
content_filter_options=content_filter_options)
202219

203220
def subscribe_and_spin(
204221
self,
@@ -208,6 +225,7 @@ def subscribe_and_spin(
208225
qos_profile: QoSProfile,
209226
no_report_lost_messages: bool,
210227
raw: bool,
228+
content_filter_options: Optional[ContentFilterOptions] = None,
211229
) -> Optional[str]:
212230
"""Initialize a node with a single subscription and spin."""
213231
event_callbacks = None
@@ -221,7 +239,8 @@ def subscribe_and_spin(
221239
self._subscriber_callback,
222240
qos_profile,
223241
event_callbacks=event_callbacks,
224-
raw=raw)
242+
raw=raw,
243+
content_filter_options=content_filter_options)
225244
except UnsupportedEventTypeError:
226245
assert not no_report_lost_messages
227246
node.create_subscription(
@@ -230,7 +249,8 @@ def subscribe_and_spin(
230249
self._subscriber_callback,
231250
qos_profile,
232251
event_callbacks=None,
233-
raw=raw)
252+
raw=raw,
253+
content_filter_options=content_filter_options)
234254

235255
if self.future is not None:
236256
rclpy.spin_until_future_complete(node, self.future)

ros2topic/ros2topic/verb/hz.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from rclpy.clock import Clock
4242
from rclpy.clock import ClockType
4343
from rclpy.executors import ExternalShutdownException
44+
from rclpy.subscription_content_filter_options import ContentFilterOptions
4445

4546
from ros2cli.helpers import interactive_select
4647
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
@@ -95,6 +96,12 @@ def add_arguments(self, parser, cli_name):
9596
dest='use_wtime', default=False, action='store_true',
9697
help='calculates rate using wall time which can be helpful'
9798
' when clock is not published during simulation')
99+
parser.add_argument(
100+
'--content-filter', dest='content_filter_expr', default=None,
101+
help='DDS content filter expression applied at the middleware level')
102+
parser.add_argument(
103+
'--content-filter-params', dest='content_filter_params', nargs='*', default=[],
104+
help='Parameters for the content filter expression')
98105
add_direct_node_arguments(parser)
99106

100107
def main(self, *, args):
@@ -134,6 +141,12 @@ def eval_fn(m):
134141
else:
135142
filter_expr = None
136143

144+
content_filter_options = None
145+
if args.content_filter_expr:
146+
content_filter_options = ContentFilterOptions(
147+
filter_expression=args.content_filter_expr,
148+
expression_parameters=args.content_filter_params)
149+
137150
with DirectNode(args) as node:
138151
# Get all available topics at this moment
139152
if args.all_topics:
@@ -149,7 +162,8 @@ def eval_fn(m):
149162
return _rostopic_hz(
150163
node.node, topics, qos_args=args, window_size=args.window_size,
151164
filter_expr=filter_expr, use_wtime=args.use_wtime,
152-
all_topics=args.all_topics)
165+
all_topics=args.all_topics,
166+
content_filter_options=content_filter_options)
153167

154168

155169
class ROSTopicHz(object):
@@ -337,7 +351,7 @@ def _get_ascii_table(header, cols):
337351

338352

339353
def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None,
340-
use_wtime=False, all_topics=False):
354+
use_wtime=False, all_topics=False, content_filter_options=None):
341355
"""
342356
Periodically print the publishing rate of a topic to console until shutdown.
343357
@@ -346,6 +360,7 @@ def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter
346360
:param window_size: number of messages to average over, -1 for infinite, ``int``
347361
:param filter_expr: Python filter expression that is called with m, the message instance
348362
:param all_topics: whether all topics are being monitored, ``bool``
363+
:param content_filter_options: DDS content filter options, ``ContentFilterOptions``
349364
"""
350365
# pause hz until topic is published
351366
rt = ROSTopicHz(node, window_size, filter_expr=filter_expr, use_wtime=use_wtime)
@@ -367,7 +382,8 @@ def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter
367382
topic,
368383
functools.partial(rt.callback_hz, topic=topic),
369384
qos_profile,
370-
raw=filter_expr is None)
385+
raw=filter_expr is None,
386+
content_filter_options=content_filter_options)
371387
if topics_len > 1:
372388
print('Subscribed to [%s]' % topic)
373389

0 commit comments

Comments
 (0)