From 8f3a19c752caf55d359aefdf97b9f5d3ad155256 Mon Sep 17 00:00:00 2001 From: Tomoya Fujita Date: Fri, 9 Jan 2026 17:01:34 +0900 Subject: [PATCH 01/13] 1st draft bring up for "ros2 log watch" sub-command. Signed-off-by: Tomoya Fujita clean up the package and implementation. Signed-off-by: Tomoya Fujita fix ros2log test_watch.py. Signed-off-by: Tomoya Fujita support QoS configuration argument for ros2 log watch. Signed-off-by: Tomoya Fujita Call content filtering API for logger name and level if available. Signed-off-by: Tomoya Fujita add test_cli.py to test "ros2 log watch". Signed-off-by: Tomoya Fujita remove ros2log/README.md. Signed-off-by: Tomoya Fujita :construction: add list subcommand Signed-off-by: Decwest bug: fix test Signed-off-by: Decwest :recycle: delete non-necessary fixture node for test Signed-off-by: Decwest :bug: fix to print one by one Signed-off-by: Decwest support "ros2 log levels". Signed-off-by: Tomoya Fujita :zap: add get and set subcommand Signed-off-by: Decwest :bug: align logger's initial state during test Signed-off-by: Decwest FIX: add sleep to reduce cpu consumption Signed-off-by: Decwest FIX: rewrite waiting acync process Signed-off-by: Decwest :FIX: delete --include-hidden-nodes option Signed-off-by: Decwest FIX: import from api Signed-off-by: Decwest FIX: add ValueError when empty string is inputted in get_absolute_node_name Signed-off-by: Decwest Signed-off-by: Tomoya.Fujita --- ros2log/package.xml | 33 ++ ros2log/resource/ros2log | 0 ros2log/ros2log/__init__.py | 0 ros2log/ros2log/api/__init__.py | 215 ++++++++++ ros2log/ros2log/command/__init__.py | 0 ros2log/ros2log/command/log.py | 45 +++ ros2log/ros2log/py.typed | 0 ros2log/ros2log/verb/__init__.py | 44 ++ ros2log/ros2log/verb/get.py | 102 +++++ ros2log/ros2log/verb/levels.py | 101 +++++ ros2log/ros2log/verb/list.py | 33 ++ ros2log/ros2log/verb/set.py | 122 ++++++ ros2log/ros2log/verb/watch.py | 271 +++++++++++++ ros2log/setup.py | 54 +++ ros2log/test/fixtures/listener_node.py | 56 +++ ros2log/test/fixtures/set_logger_level.py | 102 +++++ ros2log/test/fixtures/talker_node.py | 61 +++ ros2log/test/test_api.py | 113 ++++++ ros2log/test/test_cli.py | 466 ++++++++++++++++++++++ ros2log/test/test_copyright.py | 26 ++ ros2log/test/test_flake8.py | 25 ++ ros2log/test/test_get.py | 142 +++++++ ros2log/test/test_levels.py | 171 ++++++++ ros2log/test/test_pep257.py | 23 ++ ros2log/test/test_set.py | 151 +++++++ ros2log/test/test_watch.py | 246 ++++++++++++ ros2log/test/test_xmllint.py | 23 ++ 27 files changed, 2625 insertions(+) create mode 100644 ros2log/package.xml create mode 100644 ros2log/resource/ros2log create mode 100644 ros2log/ros2log/__init__.py create mode 100644 ros2log/ros2log/api/__init__.py create mode 100644 ros2log/ros2log/command/__init__.py create mode 100644 ros2log/ros2log/command/log.py create mode 100644 ros2log/ros2log/py.typed create mode 100644 ros2log/ros2log/verb/__init__.py create mode 100644 ros2log/ros2log/verb/get.py create mode 100644 ros2log/ros2log/verb/levels.py create mode 100644 ros2log/ros2log/verb/list.py create mode 100644 ros2log/ros2log/verb/set.py create mode 100644 ros2log/ros2log/verb/watch.py create mode 100644 ros2log/setup.py create mode 100644 ros2log/test/fixtures/listener_node.py create mode 100644 ros2log/test/fixtures/set_logger_level.py create mode 100644 ros2log/test/fixtures/talker_node.py create mode 100644 ros2log/test/test_api.py create mode 100644 ros2log/test/test_cli.py create mode 100644 ros2log/test/test_copyright.py create mode 100644 ros2log/test/test_flake8.py create mode 100644 ros2log/test/test_get.py create mode 100644 ros2log/test/test_levels.py create mode 100644 ros2log/test/test_pep257.py create mode 100644 ros2log/test/test_set.py create mode 100644 ros2log/test/test_watch.py create mode 100644 ros2log/test/test_xmllint.py diff --git a/ros2log/package.xml b/ros2log/package.xml new file mode 100644 index 000000000..ee7f3747b --- /dev/null +++ b/ros2log/package.xml @@ -0,0 +1,33 @@ + + + + ros2log + 0.40.4 + The log command for ROS 2 command line tools. + Tomoya Fujita + Fumiya Ohnishi + Apache License 2.0 + + Tomoya Fujita + Fumiya Ohnishi + + python3-argcomplete + python3-packaging + python3-psutil + rcl_interfaces + rclpy + ros2cli + rosgraph_msgs + + ament_copyright + ament_flake8 + ament_pep257 + ament_xmllint + python3-pytest + python3-pytest-timeout + test_msgs + + + ament_python + + diff --git a/ros2log/resource/ros2log b/ros2log/resource/ros2log new file mode 100644 index 000000000..e69de29bb diff --git a/ros2log/ros2log/__init__.py b/ros2log/ros2log/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ros2log/ros2log/api/__init__.py b/ros2log/ros2log/api/__init__.py new file mode 100644 index 000000000..f882d5f32 --- /dev/null +++ b/ros2log/ros2log/api/__init__.py @@ -0,0 +1,215 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Iterator +from collections.abc import Sequence + +from rcl_interfaces.msg import LoggerLevel +from rcl_interfaces.msg import SetLoggerLevelsResult +from rcl_interfaces.srv import GetLoggerLevels +from rcl_interfaces.srv import SetLoggerLevels +import rclpy + +from ros2node.api import get_absolute_node_name +from ros2node.api import get_node_names +from ros2node.api import NodeName + + +LOGGER_GET_SERVICE_SUFFIX = '/get_logger_levels' +LOGGER_SET_SERVICE_SUFFIX = '/set_logger_levels' +LOGGER_GET_SERVICE_TYPE = 'rcl_interfaces/srv/GetLoggerLevels' +LOGGER_SET_SERVICE_TYPE = 'rcl_interfaces/srv/SetLoggerLevels' + + +def _require_absolute_node_name(node_name: str) -> str: + absolute_node_name = get_absolute_node_name(node_name) + if absolute_node_name is None: + raise ValueError('node_name must not be empty') + return absolute_node_name + + +def get_get_logger_levels_service_name(node_name: str) -> str: + """Return the get-logger-levels service name for a node.""" + return f'{_require_absolute_node_name(node_name)}{LOGGER_GET_SERVICE_SUFFIX}' + + +def get_set_logger_levels_service_name(node_name: str) -> str: + """Return the set-logger-levels service name for a node.""" + return f'{_require_absolute_node_name(node_name)}{LOGGER_SET_SERVICE_SUFFIX}' + + +def get_logger_name_for_node(node_name: str) -> str: + """ + Convert a fully qualified node name into its root logger name. + + ROS node logger names use dot-separated namespaces, e.g. `/demo/talker` + becomes `demo.talker`. + """ + absolute_node_name = _require_absolute_node_name(node_name) + return absolute_node_name.lstrip('/').replace('/', '.') + + +def format_logger_service_unavailable_error(node_name: str) -> str: + """Return the user-facing error for nodes without logger services.""" + absolute_node_name = _require_absolute_node_name(node_name) + return ( + f"Logger service not available for node '{absolute_node_name}'.\n" + "The 'enable_logger_service' node option must be enabled for this node." + ) + + +def iter_logger_service_nodes( + *, + node, +) -> Iterator[NodeName]: + """Yield nodes that expose the logger get/set services as they are discovered.""" + for node_name in get_node_names(node=node): + if node_has_logger_services(node, node_name): + yield node_name + + +def get_logger_service_nodes(*, node) -> list[NodeName]: + """Return all nodes that expose the logger get/set services.""" + return list(iter_logger_service_nodes(node=node)) + + +def get_target_node_names( + *, + node, + node_name: str | None = None, + all_nodes: bool = False, +) -> tuple[list[str] | None, str | None]: + """Resolve the node names targeted by a get/set command.""" + logger_service_nodes = get_logger_service_nodes(node=node) + logger_service_node_names = { + logger_node.full_name for logger_node in logger_service_nodes + } + + if all_nodes: + return sorted(logger_service_node_names), None + + absolute_node_name = get_absolute_node_name(node_name) + all_node_names = { + discovered_node.full_name for discovered_node in get_node_names(node=node) + } + + if absolute_node_name not in all_node_names: + return None, 'Node not found' + + if absolute_node_name not in logger_service_node_names: + return None, format_logger_service_unavailable_error(absolute_node_name) + + return [absolute_node_name], None + + +def node_has_logger_services(node, node_name: NodeName) -> bool: + """Check if a node provides both get/set logger level services.""" + services = node.get_service_names_and_types_by_node(node_name.name, node_name.namespace) + service_map = dict(services) + + expected_get = get_get_logger_levels_service_name(node_name.full_name) + expected_set = get_set_logger_levels_service_name(node_name.full_name) + + return ( + _service_has_type(service_map, expected_get, LOGGER_GET_SERVICE_TYPE) and + _service_has_type(service_map, expected_set, LOGGER_SET_SERVICE_TYPE) + ) + + +def call_get_logger_levels( + *, + node, + logger_names_by_node: dict[str, Sequence[str]], + timeout_sec: float = 5.0, +) -> dict[str, list[LoggerLevel] | Exception]: + """Call the get-logger-levels service for one or more nodes.""" + clients = {} + futures = {} + results = {} + + for node_name, logger_names in logger_names_by_node.items(): + client = node.create_client( + GetLoggerLevels, + get_get_logger_levels_service_name(node_name), + ) + clients[node_name] = client + if not client.wait_for_service(timeout_sec=timeout_sec): + results[node_name] = RuntimeError( + 'Wait for service timed out waiting for logger services ' + f'for node {node_name}' + ) + continue + + request = GetLoggerLevels.Request() + request.names = list(logger_names) + futures[node_name] = client.call_async(request) + + while futures and not all(future.done() for future in futures.values()): + rclpy.spin_once(node, timeout_sec=0.1) + + for node_name, future in futures.items(): + if future.result() is not None: + results[node_name] = list(future.result().levels) + else: + results[node_name] = future.exception() + + return results + + +def call_set_logger_levels( + *, + node, + levels_by_node: dict[str, Sequence[LoggerLevel]], + timeout_sec: float = 5.0, +) -> dict[str, list[SetLoggerLevelsResult] | Exception]: + """Call the set-logger-levels service for one or more nodes.""" + clients = {} + futures = {} + results = {} + + for node_name, levels in levels_by_node.items(): + client = node.create_client( + SetLoggerLevels, + get_set_logger_levels_service_name(node_name), + ) + clients[node_name] = client + if not client.wait_for_service(timeout_sec=timeout_sec): + results[node_name] = RuntimeError( + 'Wait for service timed out waiting for logger services ' + f'for node {node_name}' + ) + continue + + request = SetLoggerLevels.Request() + request.levels = list(levels) + futures[node_name] = client.call_async(request) + + while futures and not all(future.done() for future in futures.values()): + rclpy.spin_once(node, timeout_sec=0.1) + + for node_name, future in futures.items(): + if future.result() is not None: + results[node_name] = list(future.result().results) + else: + results[node_name] = future.exception() + + return results + + +def _service_has_type(service_map, service_name: str, service_type: str) -> bool: + """Check if a service exists and matches the expected type.""" + types = service_map.get(service_name) + if not types: + return False + return service_type in types diff --git a/ros2log/ros2log/command/__init__.py b/ros2log/ros2log/command/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ros2log/ros2log/command/log.py b/ros2log/ros2log/command/log.py new file mode 100644 index 000000000..b52c14bc8 --- /dev/null +++ b/ros2log/ros2log/command/log.py @@ -0,0 +1,45 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ros2cli.command import add_subparsers_on_demand +from ros2cli.command import CommandExtension + + +class LogCommand(CommandExtension): + """Various log related sub-commands.""" + + def add_arguments(self, parser, cli_name): + self._subparser = parser + + # Add global debug flag for all log subcommands + parser.add_argument( + '--debug', + action='store_true', + default=False, + help='Enable debug output for verbose information') + + # add arguments and sub-commands of verbs + add_subparsers_on_demand( + parser, cli_name, '_verb', 'ros2log.verb', required=False) + + def main(self, *, parser, args): + if not hasattr(args, '_verb'): + # in case no verb was passed + self._subparser.print_help() + return 0 + + extension = getattr(args, '_verb') + + # call the verb's main method + return extension.main(args=args) diff --git a/ros2log/ros2log/py.typed b/ros2log/ros2log/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/ros2log/ros2log/verb/__init__.py b/ros2log/ros2log/verb/__init__.py new file mode 100644 index 000000000..25b4b6a71 --- /dev/null +++ b/ros2log/ros2log/verb/__init__.py @@ -0,0 +1,44 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ros2cli.plugin_system import PLUGIN_SYSTEM_VERSION +from ros2cli.plugin_system import satisfies_version + + +class VerbExtension: + """ + The extension point for 'log' verb extensions. + + The following properties must be defined: + * `NAME` (will be set to the entry point name) + + The following methods can be defined: + * `main` - handles CLI invocation + + The following methods can be defined: + * `add_arguments` + """ + + NAME = None + EXTENSION_POINT_VERSION = '0.1' + + def __init__(self): + super(VerbExtension, self).__init__() + satisfies_version(PLUGIN_SYSTEM_VERSION, '^0.1') + + def add_arguments(self, parser, cli_name): + pass + + def main(self, *, args): + raise NotImplementedError() diff --git a/ros2log/ros2log/verb/get.py b/ros2log/ros2log/verb/get.py new file mode 100644 index 000000000..b60854cd0 --- /dev/null +++ b/ros2log/ros2log/verb/get.py @@ -0,0 +1,102 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from ros2cli.node.direct import DirectNode +from ros2cli.node.strategy import add_arguments +from ros2cli.node.strategy import NodeStrategy + +from ros2log.api import call_get_logger_levels +from ros2log.api import get_logger_name_for_node +from ros2log.api import get_target_node_names +from ros2log.verb import VerbExtension +from ros2log.verb.levels import LEVEL_VALUE_TO_NAME +from ros2node.api import NodeNameCompleter + + +class GetVerb(VerbExtension): + """Get a node's current log level.""" + + def add_arguments(self, parser, cli_name): # noqa: D102 + add_arguments(parser) + parser.add_argument( + '--all', '-a', action='store_true', + help='Get log levels for all nodes with logger services enabled') + arg = parser.add_argument( + 'node_name', nargs='?', + help='Name of the ROS node') + arg.completer = NodeNameCompleter() + + def main(self, *, args): # noqa: D102 + validation_error = _validate_arguments(args) + if validation_error: + return validation_error + + with NodeStrategy(args) as node: + nodes_to_query, error = get_target_node_names( + node=node, + node_name=args.node_name, + all_nodes=args.all, + ) + + if error: + return error + + if not nodes_to_query: + return 'No nodes with logger services found' + + logger_names_by_node = { + node_name: [get_logger_name_for_node(node_name)] + for node_name in nodes_to_query + } + + with DirectNode(args) as node: + levels_by_node = call_get_logger_levels( + node=node, + logger_names_by_node=logger_names_by_node, + ) + + had_error = False + for node_name in nodes_to_query: + levels = levels_by_node[node_name] + if isinstance(levels, Exception): + message = f"Exception while calling service of node '{node_name}': {levels}" + if args.all: + print(message, file=sys.stderr) + had_error = True + continue + return message + + if len(levels) != 1: + message = f"Unexpected response while getting logger level for node '{node_name}'" + if args.all: + print(message, file=sys.stderr) + had_error = True + continue + return message + + level_name = LEVEL_VALUE_TO_NAME.get(levels[0].level, str(levels[0].level)) + prefix = f'{node_name}: ' if args.all else '' + print(prefix + level_name) + + return 1 if had_error else 0 + + +def _validate_arguments(args) -> str | None: + if args.all and args.node_name is not None: + return 'Node name cannot be used with --all' + if not args.all and args.node_name is None: + return 'Either a node name or --all must be specified' + return None diff --git a/ros2log/ros2log/verb/levels.py b/ros2log/ros2log/verb/levels.py new file mode 100644 index 000000000..42f7ca864 --- /dev/null +++ b/ros2log/ros2log/verb/levels.py @@ -0,0 +1,101 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from rcl_interfaces.msg import LoggerLevel + +from ros2log.verb import VerbExtension + +# Mapping from LoggerLevel constant names (LOG_LEVEL_*) to display names. +# LOG_LEVEL_UNKNOWN is displayed as UNSET to match rcutils convention. +_LOGGER_LEVEL_DISPLAY_NAMES = { + 'LOG_LEVEL_UNKNOWN': 'UNSET', + 'LOG_LEVEL_DEBUG': 'DEBUG', + 'LOG_LEVEL_INFO': 'INFO', + 'LOG_LEVEL_WARN': 'WARN', + 'LOG_LEVEL_ERROR': 'ERROR', + 'LOG_LEVEL_FATAL': 'FATAL', +} + +# Descriptions for each log level. +LOG_LEVEL_DESCRIPTIONS = { + 'UNSET': ( + 'The logger level is not set explicitly and will use ' + 'the default or inherited level.' + ), + 'DEBUG': ( + 'Debug is for pedantic information, which is useful ' + 'when debugging issues.' + ), + 'INFO': ( + 'Info is the standard informational level and is used ' + 'to report expected information.' + ), + 'WARN': ( + 'Warning is for information that may potentially cause ' + 'issues or possibly unexpected behavior.' + ), + 'ERROR': ( + 'Error is for information that this node cannot resolve.' + ), + 'FATAL': ( + 'Information about an impending node shutdown.' + ), +} + + +def get_log_levels_from_msg(): + """ + Extract log level constants from rcl_interfaces.msg.LoggerLevel. + + Returns a sorted list of (name, value) tuples, where name is the + display name (e.g. 'DEBUG') rather than the constant name (e.g. + 'LOG_LEVEL_DEBUG'). + """ + levels = [] + for attr in dir(LoggerLevel): + if attr not in _LOGGER_LEVEL_DISPLAY_NAMES: + continue + val = getattr(LoggerLevel, attr) + display_name = _LOGGER_LEVEL_DISPLAY_NAMES[attr] + levels.append((display_name, val)) + + levels.sort(key=lambda x: x[1]) + return levels + + +# Build the level map once at import time for reuse by other verbs (get, set). +_LOG_LEVELS = get_log_levels_from_msg() +LEVEL_NAME_TO_VALUE = dict(_LOG_LEVELS) +LEVEL_VALUE_TO_NAME = {value: name for name, value in _LOG_LEVELS} + + +class LevelsVerb(VerbExtension): + """Show all valid log level values.""" + + def add_arguments(self, parser, cli_name): + parser.add_argument( + '--value', '-v', + action='store_true', + default=False, + help='Show numeric values alongside level names') + + def main(self, *, args): + levels = get_log_levels_from_msg() + for name, value in levels: + desc = LOG_LEVEL_DESCRIPTIONS.get(name, '') + if args.value: + print(f'{name:<7} ({value:>2}) : {desc}') + else: + print(f'{name:<7} : {desc}') + return 0 diff --git a/ros2log/ros2log/verb/list.py b/ros2log/ros2log/verb/list.py new file mode 100644 index 000000000..45a5c69bd --- /dev/null +++ b/ros2log/ros2log/verb/list.py @@ -0,0 +1,33 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ros2cli.node.strategy import add_arguments +from ros2cli.node.strategy import NodeStrategy +from ros2log.api import iter_logger_service_nodes +from ros2log.verb import VerbExtension + + +class ListVerb(VerbExtension): + """Output a list of nodes with logger services enabled.""" + + def add_arguments(self, parser, cli_name): + """Add CLI arguments for the list verb.""" + add_arguments(parser) + + def main(self, *, args): + """Execute the list verb.""" + with NodeStrategy(args) as node: + for node_name in iter_logger_service_nodes(node=node): + print(node_name.full_name) + return 0 diff --git a/ros2log/ros2log/verb/set.py b/ros2log/ros2log/verb/set.py new file mode 100644 index 000000000..c9c07b993 --- /dev/null +++ b/ros2log/ros2log/verb/set.py @@ -0,0 +1,122 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from rcl_interfaces.msg import LoggerLevel + +from ros2cli.node.direct import DirectNode +from ros2cli.node.strategy import add_arguments +from ros2cli.node.strategy import NodeStrategy + +from ros2log.api import call_set_logger_levels +from ros2log.api import get_logger_name_for_node +from ros2log.api import get_target_node_names +from ros2log.verb import VerbExtension +from ros2log.verb.levels import LEVEL_NAME_TO_VALUE +from ros2node.api import NodeNameCompleter + + +class SetVerb(VerbExtension): + """Set a node's log level.""" + + def add_arguments(self, parser, cli_name): # noqa: D102 + add_arguments(parser) + parser.add_argument( + '--all', '-a', action='store_true', + help='Set log levels for all nodes with logger services enabled') + arg = parser.add_argument( + 'node_name', nargs='?', + help='Name of the ROS node') + arg.completer = NodeNameCompleter() + parser.add_argument( + 'level', + type=str.upper, + choices=list(LEVEL_NAME_TO_VALUE.keys()), + help='The log level to set') + + def main(self, *, args): # noqa: D102 + validation_error = _validate_arguments(args) + if validation_error: + return validation_error + + with NodeStrategy(args) as node: + nodes_to_query, error = get_target_node_names( + node=node, + node_name=args.node_name, + all_nodes=args.all, + ) + + if error: + return error + + if not nodes_to_query: + return 'No nodes with logger services found' + + requested_level = LEVEL_NAME_TO_VALUE[args.level] + levels_by_node = {} + for node_name in nodes_to_query: + logger_level = LoggerLevel() + logger_level.name = get_logger_name_for_node(node_name) + logger_level.level = requested_level + levels_by_node[node_name] = [logger_level] + + with DirectNode(args) as node: + results_by_node = call_set_logger_levels( + node=node, + levels_by_node=levels_by_node, + ) + + had_error = False + for node_name in nodes_to_query: + results = results_by_node[node_name] + if isinstance(results, Exception): + message = f"Exception while calling service of node '{node_name}': {results}" + if args.all: + print(message, file=sys.stderr) + had_error = True + continue + return message + + if len(results) != 1: + message = f"Unexpected response while setting logger level for node '{node_name}'" + if args.all: + print(message, file=sys.stderr) + had_error = True + continue + return message + + result = results[0] + message = 'Set logger level successful' + if not result.successful: + message = 'Setting logger level failed' + if result.reason: + message += ': ' + result.reason + + prefix = f'{node_name}: ' if args.all else '' + if result.successful: + print(prefix + message) + else: + print(prefix + message, file=sys.stderr) + had_error = True + + return 1 if had_error else 0 + + +def _validate_arguments(args) -> str | None: + if args.all and args.node_name is not None: + return 'Node name cannot be used with --all' + if not args.all and args.node_name is None: + return 'Either a node name or --all must be specified' + return None diff --git a/ros2log/ros2log/verb/watch.py b/ros2log/ros2log/verb/watch.py new file mode 100644 index 000000000..aaf6454ce --- /dev/null +++ b/ros2log/ros2log/verb/watch.py @@ -0,0 +1,271 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +import sys +from typing import Optional + +from rcl_interfaces.msg import Log + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_rosout_default + +from ros2cli.node.direct import DirectNode +from ros2cli.qos import add_qos_arguments +from ros2cli.qos import choose_qos + +from ros2log.verb import VerbExtension + + +# Log level mapping +LOG_LEVELS = { + 'DEBUG': Log.DEBUG, + 'INFO': Log.INFO, + 'WARN': Log.WARN, + 'ERROR': Log.ERROR, + 'FATAL': Log.FATAL, +} + +LOG_LEVEL_NAMES = { + Log.DEBUG: 'DEBUG', + Log.INFO: 'INFO', + Log.WARN: 'WARN', + Log.ERROR: 'ERROR', + Log.FATAL: 'FATAL', +} + +# ANSI color codes +COLOR_RESET = '\033[0m' +COLOR_DEBUG = '\033[37m' # White +COLOR_INFO = '\033[32m' # Green +COLOR_WARN = '\033[33m' # Yellow +COLOR_ERROR = '\033[31m' # Red +COLOR_FATAL = '\033[35m' # Magenta + +LOG_LEVEL_COLORS = { + Log.DEBUG: COLOR_DEBUG, + Log.INFO: COLOR_INFO, + Log.WARN: COLOR_WARN, + Log.ERROR: COLOR_ERROR, + Log.FATAL: COLOR_FATAL, +} + + +class WatchVerb(VerbExtension): + """Monitor and display logs in real-time.""" + + def add_arguments(self, parser, cli_name): + parser.add_argument( + '--level', + type=str, + choices=['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'], + help='Show only logs at or above the specified severity level') + parser.add_argument( + '--logger', + type=str, + help='Filter logs by logger name') + parser.add_argument( + '--regex', + type=str, + help='Filter log messages matching the specified regular expression pattern. ' + 'e.g. "topic.*(/\\w+)"') + parser.add_argument( + '--no-color', + action='store_true', + default=False, + help='Disable colorized output') + parser.add_argument( + '--no-timestamp', + action='store_true', + default=False, + help='Disable timestamp display') + parser.add_argument( + '--function-detail', + action='store_true', + default=False, + help='Output function name, file, and line number') + add_qos_arguments( + parser, + entity_type='subscribe', + default_profile_str='rosout_default') + + def main(self, *, args): + with DirectNode(args) as node: + # Configure QoS profile based on arguments and available publishers + qos_profile = choose_qos(node, '/rosout', args) + LogWatcher( + node, + level_filter=args.level, + logger_filter=args.logger, + regex_filter=args.regex, + enable_color=not args.no_color, + show_timestamp=not args.no_timestamp, + show_function_detail=args.function_detail, + qos_profile=qos_profile, + debug=args.debug, + ) + + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + + return 0 + + +class LogWatcher: + """Helper class to watch and filter logs from /rosout topic.""" + + def __init__( + self, + node: Node, + level_filter: Optional[str] = None, + logger_filter: Optional[str] = None, + regex_filter: Optional[str] = None, + enable_color: bool = True, + show_timestamp: bool = True, + show_function_detail: bool = False, + qos_profile=qos_profile_rosout_default, + enable_content_filter: bool = True, + debug: bool = False, + ): + self.node = node + self.enable_color = enable_color + self.show_timestamp = show_timestamp + self.show_function_detail = show_function_detail + self.debug = debug + + # Set up level filter + self.min_level = LOG_LEVELS.get(level_filter, Log.DEBUG) if level_filter else Log.DEBUG + + # Set up logger filter + self.logger_filter = logger_filter + + # Set up regex filter + self.regex_pattern = None + if regex_filter: + try: + self.regex_pattern = re.compile(regex_filter) + except re.error as e: + node.get_logger().error(f'Invalid regex pattern: {e}') + sys.exit(1) + + # Create subscription to /rosout + self.subscription = node.create_subscription( + Log, + '/rosout', + self._log_callback, + qos_profile + ) + + # Try to set content filter for performance optimization + # This filters at RMW level if supported, reducing network traffic and CPU usage + if enable_content_filter: + self._setup_content_filter() + + def _setup_content_filter(self): + """ + Set up content filter for log level and logger name if supported by RMW. + + Content filtering is a DDS feature that filters messages at the middleware level, + reducing network traffic and CPU usage. Currently, only DDS-based RMW implementations + (e.g., FastDDS, Connext DDS) support this feature. DDS specifications limit the number + of expression parameters to 100 at a time. If content filtering is not supported or + fails, the implementation automatically falls back to client-side filtering. + """ + filter_expressions = [] + expression_parameters = [] + + # Add level filter expression + if self.min_level > Log.DEBUG: + filter_expressions.append('level >= %0') + expression_parameters.append(str(self.min_level)) + + # Add logger name filter expression + if self.logger_filter: + param_index = len(expression_parameters) + filter_expressions.append(f'name = %{param_index}') + # String parameters need to be quoted for DDS SQL filter + expression_parameters.append(f"'{self.logger_filter}'") + + # Only set content filter if we have filter expressions + if filter_expressions: + filter_expression = ' AND '.join(filter_expressions) + try: + self.subscription.set_content_filter(filter_expression, expression_parameters) + # Check if content filtering was successfully enabled + if self.subscription.is_cft_enabled: + if self.debug: + print( + f'Content filter enabled: {filter_expression} with parameters ' + f'{expression_parameters}') + else: + if self.debug: + print( + 'Content filtering not supported by RMW implementation. ' + 'Falling back to client-side filtering.') + except Exception as ex: + # Content filtering may not be supported by the RMW implementation + if self.debug: + print( + f'Failed to set content filter: {ex}. ' + 'Falling back to client-side filtering.') + + def _log_callback(self, msg: Log): + """Process log messages.""" + # Apply level filter + if msg.level < self.min_level: + return + + # Apply logger filter + if self.logger_filter and msg.name != self.logger_filter: + return + + # Apply regex filter on message text + if self.regex_pattern and not self.regex_pattern.search(msg.msg): + return + + # Format and print the log message + self._print_log(msg) + + def _print_log(self, msg: Log): + """Format and print a log message.""" + parts = [] + + # Add timestamp if enabled + if self.show_timestamp: + timestamp_sec = msg.stamp.sec + msg.stamp.nanosec / 1e9 + parts.append(f'[{timestamp_sec:.6f}]') + + # Add log level with color + level_name = LOG_LEVEL_NAMES.get(msg.level, 'UNKNOWN') + if self.enable_color: + color = LOG_LEVEL_COLORS.get(msg.level, COLOR_RESET) + parts.append(f'{color}[{level_name}]{COLOR_RESET}') + else: + parts.append(f'[{level_name}]') + + # Add logger name + parts.append(f'[{msg.name}]') + + # Add function details if enabled + if self.show_function_detail: + parts.append(f'[{msg.function}@{msg.file}:{msg.line}]') + + # Add message + parts.append(f': {msg.msg}') + + # Print the formatted message + print(' '.join(parts), flush=True) diff --git a/ros2log/setup.py b/ros2log/setup.py new file mode 100644 index 000000000..6d88c0900 --- /dev/null +++ b/ros2log/setup.py @@ -0,0 +1,54 @@ +from setuptools import find_packages +from setuptools import setup + +package_name = 'ros2log' + +setup( + name=package_name, + version='0.40.4', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/' + package_name, ['package.xml']), + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ], + package_data={'': ['py.typed']}, + install_requires=['ros2cli'], + zip_safe=True, + author='Tomoya Fujita, Fumiya Ohnishi', + author_email='tomoya.fujita825@gmail.com, fumiya-onishi@keio.jp', + maintainer='Tomoya Fujita', + maintainer_email='tomoya.fujita825@gmail.com', + url='https://github.com/ros2/ros2cli/tree/master/ros2log', + download_url='https://github.com/ros2/ros2cli/releases', + keywords=[], + classifiers=[ + 'Environment :: Console', + 'Intended Audience :: Developers', + 'Programming Language :: Python', + ], + description='The log command for ROS 2 command line tools.', + long_description="""\ +The package provides the log command for the ROS 2 command line tools.""", + license='Apache License, Version 2.0', + extras_require={ + 'test': [ + 'pytest', + ], + }, + entry_points={ + 'ros2cli.command': [ + 'log = ros2log.command.log:LogCommand', + ], + 'ros2cli.extension_point': [ + 'ros2log.verb = ros2log.verb:VerbExtension', + ], + 'ros2log.verb': [ + 'get = ros2log.verb.get:GetVerb', + 'levels = ros2log.verb.levels:LevelsVerb', + 'list = ros2log.verb.list:ListVerb', + 'set = ros2log.verb.set:SetVerb', + 'watch = ros2log.verb.watch:WatchVerb', + ], + }, +) diff --git a/ros2log/test/fixtures/listener_node.py b/ros2log/test/fixtures/listener_node.py new file mode 100644 index 000000000..c36da4b8c --- /dev/null +++ b/ros2log/test/fixtures/listener_node.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Listener node fixture that subscribes and generates logs.""" + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String + + +class ListenerNode(Node): + """Test node that subscribes to messages and generates logs.""" + + def __init__(self): + super().__init__('listener', enable_logger_service=False) + self.subscription = self.create_subscription( + String, + 'chatter', + self.listener_callback, + 10) + self.get_logger().info('Listener node started') + + def listener_callback(self, msg): + """Process received messages and log them.""" + self.get_logger().debug(f'Debug message: {msg.data}') + self.get_logger().info(f'Info message: {msg.data}') + self.get_logger().warning(f'Warning message: {msg.data}') + self.get_logger().error(f'Error message: {msg.data}') + + +def main(args=None): + rclpy.init(args=args) + node = ListenerNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2log/test/fixtures/set_logger_level.py b/ros2log/test/fixtures/set_logger_level.py new file mode 100644 index 000000000..756c26492 --- /dev/null +++ b/ros2log/test/fixtures/set_logger_level.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test helper that sets a node logger level from a fresh ROS process.""" + +import sys + +from rcl_interfaces.msg import LoggerLevel +from rcl_interfaces.srv import SetLoggerLevels +import rclpy + +from ros2log.api import get_logger_name_for_node +from ros2node.api import get_absolute_node_name + + +LEVEL_NAME_TO_VALUE = { + 'UNSET': LoggerLevel.LOG_LEVEL_UNKNOWN, + 'DEBUG': LoggerLevel.LOG_LEVEL_DEBUG, + 'INFO': LoggerLevel.LOG_LEVEL_INFO, + 'WARN': LoggerLevel.LOG_LEVEL_WARN, + 'ERROR': LoggerLevel.LOG_LEVEL_ERROR, + 'FATAL': LoggerLevel.LOG_LEVEL_FATAL, +} + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + if len(argv) != 2: + print('usage: set_logger_level.py NODE_NAME LEVEL', file=sys.stderr) + return 2 + + node_name, level_name = argv + if level_name not in LEVEL_NAME_TO_VALUE: + print(f'unknown level: {level_name}', file=sys.stderr) + return 2 + + absolute_node_name = get_absolute_node_name(node_name) + logger_level = LoggerLevel() + logger_level.name = get_logger_name_for_node(absolute_node_name) + logger_level.level = LEVEL_NAME_TO_VALUE[level_name] + + rclpy.init(args=[]) + node = rclpy.create_node('set_logger_level_helper') + try: + client = node.create_client( + SetLoggerLevels, + f'{absolute_node_name}/set_logger_levels', + ) + if not client.wait_for_service(timeout_sec=10.0): + print( + 'Wait for service timed out waiting for logger services ' + f'for node {absolute_node_name}', + file=sys.stderr, + ) + return 1 + + request = SetLoggerLevels.Request() + request.levels = [logger_level] + future = client.call_async(request) + rclpy.spin_until_future_complete(node, future) + + response = future.result() + if response is None: + print(future.exception(), file=sys.stderr) + return 1 + if len(response.results) != 1: + print( + 'Unexpected response while setting logger level for node ' + f"'{absolute_node_name}'", + file=sys.stderr, + ) + return 1 + + result = response.results[0] + if not result.successful: + message = 'Setting logger level failed' + if result.reason: + message += f': {result.reason}' + print(message, file=sys.stderr) + return 1 + + return 0 + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/ros2log/test/fixtures/talker_node.py b/ros2log/test/fixtures/talker_node.py new file mode 100644 index 000000000..5db21657c --- /dev/null +++ b/ros2log/test/fixtures/talker_node.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Talker node fixture that publishes log messages at different levels.""" + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String + + +class TalkerNode(Node): + """Test node that publishes messages and generates logs.""" + + def __init__(self): + super().__init__('talker', enable_logger_service=True) + self.publisher_ = self.create_publisher(String, 'chatter', 10) + self.timer = self.create_timer(0.5, self.timer_callback) + self.count = 0 + self.get_logger().info('Talker node started') + + def timer_callback(self): + """Publish messages and generate various log levels.""" + msg = String() + msg.data = f'Publishing: Hello World {self.count}' + self.publisher_.publish(msg) + + # Generate different log levels for testing + self.get_logger().debug(f'Debug message {msg.data}') + self.get_logger().info(f'Info message: {msg.data}') + self.get_logger().warning(f'Warning message {msg.data}') + self.get_logger().error(f'Error message {msg.data}') + + self.count += 1 + + +def main(args=None): + rclpy.init(args=args) + node = TalkerNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2log/test/test_api.py b/ros2log/test/test_api.py new file mode 100644 index 000000000..685387283 --- /dev/null +++ b/ros2log/test/test_api.py @@ -0,0 +1,113 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch + +from ros2log.api import format_logger_service_unavailable_error +from ros2log.api import get_get_logger_levels_service_name +from ros2log.api import get_logger_name_for_node +from ros2log.api import get_set_logger_levels_service_name +from ros2log.api import get_target_node_names +from ros2node.api import NodeName + + +class TestLoggerServiceNameHelpers(unittest.TestCase): + """Test logger service name helpers.""" + + def test_empty_node_name_raises_value_error_for_get_service_name(self): + with self.assertRaisesRegex(ValueError, 'node_name must not be empty'): + get_get_logger_levels_service_name('') + + def test_empty_node_name_raises_value_error_for_set_service_name(self): + with self.assertRaisesRegex(ValueError, 'node_name must not be empty'): + get_set_logger_levels_service_name('') + + def test_empty_node_name_raises_value_error_for_unavailable_error(self): + with self.assertRaisesRegex(ValueError, 'node_name must not be empty'): + format_logger_service_unavailable_error('') + + +class TestGetLoggerNameForNode(unittest.TestCase): + """Test node-name to logger-name conversion helpers.""" + + def test_empty_node_name_raises_value_error(self): + with self.assertRaisesRegex(ValueError, 'node_name must not be empty'): + get_logger_name_for_node('') + + def test_root_namespace(self): + self.assertEqual(get_logger_name_for_node('/talker'), 'talker') + + def test_relative_node_name(self): + self.assertEqual(get_logger_name_for_node('talker'), 'talker') + + def test_namespaced_node(self): + self.assertEqual(get_logger_name_for_node('/demo/talker'), 'demo.talker') + + +class TestGetTargetNodeNames(unittest.TestCase): + """Test node-resolution for logger service verbs.""" + + @patch('ros2log.api.get_logger_service_nodes') + @patch('ros2log.api.get_node_names') + def test_all_nodes_returns_sorted_logger_service_nodes( + self, + mock_get_node_names, + mock_get_logger_service_nodes, + ): + mock_get_node_names.return_value = [] + mock_get_logger_service_nodes.return_value = [ + NodeName('b', '/', '/b'), + NodeName('a', '/', '/a'), + ] + + node_names, error = get_target_node_names(node=object(), all_nodes=True) + + self.assertEqual(['/a', '/b'], node_names) + self.assertIsNone(error) + + @patch('ros2log.api.get_logger_service_nodes') + @patch('ros2log.api.get_node_names') + def test_missing_node_returns_not_found( + self, + mock_get_node_names, + mock_get_logger_service_nodes, + ): + mock_get_node_names.return_value = [] + mock_get_logger_service_nodes.return_value = [] + + node_names, error = get_target_node_names(node=object(), node_name='/missing') + + self.assertIsNone(node_names) + self.assertEqual('Node not found', error) + + @patch('ros2log.api.get_logger_service_nodes') + @patch('ros2log.api.get_node_names') + def test_node_without_logger_service_returns_specific_error( + self, + mock_get_node_names, + mock_get_logger_service_nodes, + ): + mock_get_node_names.return_value = [ + NodeName('listener', '/', '/listener'), + ] + mock_get_logger_service_nodes.return_value = [] + + node_names, error = get_target_node_names(node=object(), node_name='/listener') + + self.assertIsNone(node_names) + self.assertEqual( + format_logger_service_unavailable_error('/listener'), + error, + ) diff --git a/ros2log/test/test_cli.py b/ros2log/test/test_cli.py new file mode 100644 index 000000000..5534233dc --- /dev/null +++ b/ros2log/test/test_cli.py @@ -0,0 +1,466 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import functools +import os +import re +import sys +import time +import unittest +import xmlrpc + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import RegisterEventHandler +from launch.actions import ResetEnvironment +from launch.actions import SetEnvironmentVariable +from launch.event_handlers import OnShutdown + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.actions +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +from launch_testing_ros.actions import EnableRmwIsolation +import launch_testing_ros.tools + +import pytest + +import rclpy +from rclpy.utilities import get_available_rmw_implementations +from ros2cli.helpers import get_rmw_additional_env +from ros2cli.node.strategy import NodeStrategy + +from ros2node.api import get_node_names + + +# Skip cli tests on Windows while they exhibit pathological behavior +# https://github.com/ros2/build_farmer/issues/248 +if sys.platform.startswith('win'): + pytest.skip( + 'CLI tests can block for a pathological amount of time on Windows.', + allow_module_level=True) + + +TEST_TIMEOUT = 20.0 +DISCOVERY_POLL_INTERVAL = 0.1 + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation): + path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures') + additional_env = get_rmw_additional_env(rmw_implementation) + additional_env['PYTHONUNBUFFERED'] = '1' + set_env_actions = [SetEnvironmentVariable(k, v) for k, v in additional_env.items()] + + path_to_talker_node_script = os.path.join(path_to_fixtures, 'talker_node.py') + path_to_listener_node_script = os.path.join(path_to_fixtures, 'listener_node.py') + + talker_node_action = Node( + executable=sys.executable, + arguments=[path_to_talker_node_script], + name='talker', + ) + + listener_node_action = Node( + executable=sys.executable, + arguments=[path_to_listener_node_script], + name='listener', + ) + + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + *set_env_actions, + EnableRmwIsolation(), + RegisterEventHandler(OnShutdown(on_shutdown=[ + # Stop daemon in isolated environment with proper ROS_DOMAIN_ID + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop-isolated', + # Use the same isolated environment + additional_env=dict(additional_env), + ), + # This must be done after stopping the daemon in the isolated environment + ResetEnvironment(), + ])), + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + talker_node_action, + listener_node_action, + launch_testing.actions.ReadyToTest(), + ], + ) + ] + ), + ]) + + +class TestROS2LogCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + cls.path_to_set_logger_level_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'set_logger_level.py') + rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter( + filtered_patterns=['WARNING:.*'], + filtered_rmw_implementation=rmw_implementation + ) + + @contextlib.contextmanager + def launch_process_command(self, command, *, name): + command_action = ExecuteProcess( + cmd=command, + name=name, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) as command_process: + yield command_process + cls.launch_process_command = launch_process_command + + @contextlib.contextmanager + def launch_log_command(self, arguments): + with self.launch_process_command( + ['ros2', 'log', *arguments], + name='ros2log-cli', + ) as log_command: + yield log_command + cls.launch_log_command = launch_log_command + + def setUp(self): + start_time = time.time() + timed_out = True + with NodeStrategy(None) as node: + while (time.time() - start_time) < TEST_TIMEOUT: + try: + node_names = { + discovered_node.full_name + for discovered_node in get_node_names(node=node) + } + talker_services = node.get_service_names_and_types_by_node('talker', '/') + except rclpy.node.NodeNameNonExistentError: + time.sleep(DISCOVERY_POLL_INTERVAL) + continue + except ConnectionRefusedError: + time.sleep(DISCOVERY_POLL_INTERVAL) + continue + except xmlrpc.client.Fault as exc: + if 'NodeNameNonExistentError' in exc.faultString: + time.sleep(DISCOVERY_POLL_INTERVAL) + continue + raise + + talker_service_names = {name for name, _ in talker_services} + if ( + '/talker' in node_names and + '/listener' in node_names and + '/talker/get_logger_levels' in talker_service_names and + '/talker/set_logger_levels' in talker_service_names + ): + timed_out = False + break + time.sleep(DISCOVERY_POLL_INTERVAL) + + if timed_out: + self.fail(f'CLI daemon failed to find test nodes after {TEST_TIMEOUT} seconds') + + self._set_logger_level_directly('/talker', 'UNSET') + + def _set_logger_level_directly(self, node_name, level_name): + # Launch a fresh helper process so each parametrized RMW test gets a + # clean rclpy context instead of reusing the one from a previous run. + with self.launch_process_command( + [sys.executable, self.path_to_set_logger_level_script, node_name, level_name], + name='ros2log-set-level-helper', + ) as helper_command: + assert helper_command.wait_for_shutdown(timeout=TEST_TIMEOUT) + + if helper_command.exit_code != launch_testing.asserts.EXIT_OK: + self.fail( + 'Failed to set logger level directly:\n' + f'{helper_command.output}' + ) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_basic(self): + """Test basic ros2 log watch command.""" + with self.launch_log_command(arguments=['watch']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[(talker|listener)\] : Info message:'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_level_filter(self): + """Test ros2 log watch with level filter.""" + with self.launch_log_command( + arguments=['watch', '--level', 'ERROR'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[ERROR\].*\[(talker|listener)\] : Error message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_logger_filter(self): + """Test ros2 log watch with logger name filter.""" + with self.launch_log_command( + arguments=['watch', '--logger', 'talker'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[talker\] : Info message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_regex_filter(self): + """Test ros2 log watch with regex filter.""" + with self.launch_log_command( + arguments=['watch', '--regex', 'Publishing.*'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*Publishing: Hello World'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_no_color(self): + """Test ros2 log watch with color disabled.""" + with self.launch_log_command( + arguments=['watch', '--no-color'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[(talker|listener)\] : Info message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + # Check that no ANSI escape codes are present + assert '\033[' not in log_command.output + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_no_timestamp(self): + """Test ros2 log watch with timestamp disabled.""" + with self.launch_log_command( + arguments=['watch', '--no-timestamp'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[(talker|listener)\] : Info message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_function_detail(self): + """Test ros2 log watch with function details enabled.""" + with self.launch_log_command( + arguments=['watch', '--function-detail'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[(talker|listener)\] \[.*@.*:\d+\] : Info message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_combined_filters(self): + """Test ros2 log watch with multiple filters.""" + with self.launch_log_command( + arguments=[ + 'watch', + '--level', 'INFO', + '--logger', 'talker', + '--no-color', + '--no-timestamp' + ] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[talker\] : Info message:'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + # Should have no ANSI codes + assert '\033[' not in log_command.output + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_with_debug_flag(self): + """Test ros2 log watch with global debug flag.""" + with self.launch_log_command( + arguments=['--debug', 'watch', '--logger', 'talker'] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[talker\] : Info message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_watch_with_qos_options(self): + """Test ros2 log watch with QoS options.""" + with self.launch_log_command( + arguments=[ + 'watch', + '--qos-reliability', 'reliable', + '--qos-durability', 'transient_local' + ] + ) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'.*\[INFO\].*\[(talker|listener)\] : Info message'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_levels_basic(self): + """Test ros2 log levels command.""" + with self.launch_log_command(arguments=['levels']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^UNSET\s+:.*'), + re.compile(r'^DEBUG\s+:.*'), + re.compile(r'^INFO\s+:.*'), + re.compile(r'^WARN\s+:.*'), + re.compile(r'^ERROR\s+:.*'), + re.compile(r'^FATAL\s+:.*'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_levels_with_value(self): + """Test ros2 log levels --value command.""" + with self.launch_log_command(arguments=['levels', '--value']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^UNSET\s+\(\s*\d+\)\s*:.*'), + re.compile(r'^DEBUG\s+\(\s*\d+\)\s*:.*'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_list_logger_service_nodes(self): + """Test ros2 log list command.""" + with self.launch_log_command(arguments=['list']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^/talker$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + assert '/no_logger_service' not in log_command.output + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_get_single_node(self): + """Test ros2 log get for a single node.""" + with self.launch_log_command(arguments=['get', '/talker']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^UNSET$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_get_all_nodes(self): + """Test ros2 log get --all.""" + with self.launch_log_command(arguments=['get', '--all']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^/talker: UNSET$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + assert '/listener:' not in log_command.output + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_get_reports_missing_logger_service(self): + """Test ros2 log get on a node without logger services.""" + with self.launch_log_command(arguments=['get', '/listener']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r"Logger service not available for node '/listener'\."), + re.compile(r'.*enable_logger_service.*'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_set_single_node(self): + """Test ros2 log set for a single node.""" + with self.launch_log_command(arguments=['set', '/talker', 'DEBUG']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^Set logger level successful$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + with self.launch_log_command(arguments=['get', '/talker']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^DEBUG$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + @launch_testing.markers.retry_on_failure(times=2, delay=1) + def test_set_all_nodes(self): + """Test ros2 log set --all.""" + with self.launch_log_command(arguments=['set', '--all', 'ERROR']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^/talker: Set logger level successful$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) + + with self.launch_log_command(arguments=['get', '--all']) as log_command: + assert log_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'^/talker: ERROR$'), + ], strict=False + ), timeout=10) + assert log_command.wait_for_shutdown(timeout=10) diff --git a/ros2log/test/test_copyright.py b/ros2log/test/test_copyright.py new file mode 100644 index 000000000..1458d2745 --- /dev/null +++ b/ros2log/test/test_copyright.py @@ -0,0 +1,26 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_copyright.main import main +import pytest + + +# Remove the formatter when ament_copyright is being tested +# since it adds a format comment to the top of the file which +# can fail the copyright test +@pytest.mark.copyright +@pytest.mark.linter +def test_copyright(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found errors' diff --git a/ros2log/test/test_flake8.py b/ros2log/test/test_flake8.py new file mode 100644 index 000000000..b156a4a10 --- /dev/null +++ b/ros2log/test/test_flake8.py @@ -0,0 +1,25 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_flake8.main import main_with_errors +import pytest + + +@pytest.mark.flake8 +@pytest.mark.linter +def test_flake8(): + rc, errors = main_with_errors(argv=[]) + assert rc == 0, \ + 'Found %d code style errors / warnings:\n' % len(errors) + \ + '\n'.join(errors) diff --git a/ros2log/test/test_get.py b/ros2log/test/test_get.py new file mode 100644 index 000000000..2e6b88627 --- /dev/null +++ b/ros2log/test/test_get.py @@ -0,0 +1,142 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from io import StringIO +import sys +from types import SimpleNamespace +import unittest +from unittest.mock import MagicMock +from unittest.mock import patch + +from rcl_interfaces.msg import LoggerLevel + +from ros2log.verb.get import GetVerb + + +def _make_context_manager(value): + context_manager = MagicMock() + context_manager.__enter__.return_value = value + context_manager.__exit__.return_value = False + return context_manager + + +class TestGetVerb(unittest.TestCase): + """Test the get verb.""" + + def test_requires_node_name_or_all(self): + verb = GetVerb() + args = SimpleNamespace(all=False, node_name=None) + + self.assertEqual( + 'Either a node name or --all must be specified', + verb.main(args=args), + ) + + def test_rejects_node_name_with_all(self): + verb = GetVerb() + args = SimpleNamespace(all=True, node_name='/talker') + + self.assertEqual( + 'Node name cannot be used with --all', + verb.main(args=args), + ) + + @patch('ros2log.verb.get.call_get_logger_levels') + @patch('ros2log.verb.get.get_target_node_names') + @patch('ros2log.verb.get.DirectNode') + @patch('ros2log.verb.get.NodeStrategy') + def test_single_node_output( + self, + mock_node_strategy, + mock_direct_node, + mock_get_target_node_names, + mock_call_get_logger_levels, + ): + mock_node_strategy.return_value = _make_context_manager(object()) + mock_direct_node.return_value = _make_context_manager(object()) + mock_get_target_node_names.return_value = (['/talker'], None) + mock_call_get_logger_levels.return_value = { + '/talker': [LoggerLevel(name='talker', level=20)], + } + + verb = GetVerb() + args = SimpleNamespace(all=False, node_name='/talker') + + captured = StringIO() + sys.stdout = captured + try: + result = verb.main(args=args) + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(0, result) + self.assertEqual('INFO\n', captured.getvalue()) + + @patch('ros2log.verb.get.call_get_logger_levels') + @patch('ros2log.verb.get.get_target_node_names') + @patch('ros2log.verb.get.DirectNode') + @patch('ros2log.verb.get.NodeStrategy') + def test_all_nodes_prefixes_output( + self, + mock_node_strategy, + mock_direct_node, + mock_get_target_node_names, + mock_call_get_logger_levels, + ): + mock_node_strategy.return_value = _make_context_manager(object()) + mock_direct_node.return_value = _make_context_manager(object()) + mock_get_target_node_names.return_value = (['/talker', '/worker'], None) + mock_call_get_logger_levels.return_value = { + '/talker': [LoggerLevel(name='talker', level=0)], + '/worker': [LoggerLevel(name='worker', level=40)], + } + + verb = GetVerb() + args = SimpleNamespace(all=True, node_name=None) + + captured = StringIO() + sys.stdout = captured + try: + result = verb.main(args=args) + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(0, result) + self.assertEqual('/talker: UNSET\n/worker: ERROR\n', captured.getvalue()) + + @patch('ros2log.verb.get.call_get_logger_levels') + @patch('ros2log.verb.get.get_target_node_names') + @patch('ros2log.verb.get.DirectNode') + @patch('ros2log.verb.get.NodeStrategy') + def test_single_node_service_exception_returns_message( + self, + mock_node_strategy, + mock_direct_node, + mock_get_target_node_names, + mock_call_get_logger_levels, + ): + mock_node_strategy.return_value = _make_context_manager(object()) + mock_direct_node.return_value = _make_context_manager(object()) + mock_get_target_node_names.return_value = (['/talker'], None) + mock_call_get_logger_levels.return_value = { + '/talker': RuntimeError('boom'), + } + + verb = GetVerb() + args = SimpleNamespace(all=False, node_name='/talker') + + self.assertEqual( + "Exception while calling service of node '/talker': boom", + verb.main(args=args), + ) diff --git a/ros2log/test/test_levels.py b/ros2log/test/test_levels.py new file mode 100644 index 000000000..14d6bbe09 --- /dev/null +++ b/ros2log/test/test_levels.py @@ -0,0 +1,171 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from io import StringIO +import sys +import unittest + +from ros2log.verb.levels import get_log_levels_from_msg +from ros2log.verb.levels import LEVEL_NAME_TO_VALUE +from ros2log.verb.levels import LEVEL_VALUE_TO_NAME +from ros2log.verb.levels import LevelsVerb +from ros2log.verb.levels import LOG_LEVEL_DESCRIPTIONS + + +class TestGetLogLevelsFromMsg(unittest.TestCase): + """Test the get_log_levels_from_msg helper function.""" + + def test_returns_list_of_tuples(self): + """Test that the function returns a list of (name, value) tuples.""" + levels = get_log_levels_from_msg() + self.assertIsInstance(levels, list) + for item in levels: + self.assertIsInstance(item, tuple) + self.assertEqual(len(item), 2) + self.assertIsInstance(item[0], str) + self.assertIsInstance(item[1], int) + + def test_contains_expected_levels(self): + """Test that all expected log levels are present.""" + levels = get_log_levels_from_msg() + level_names = [name for name, _ in levels] + for expected in ['UNSET', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']: + self.assertIn(expected, level_names) + + def test_sorted_by_value(self): + """Test that levels are sorted by numeric value.""" + levels = get_log_levels_from_msg() + values = [value for _, value in levels] + self.assertEqual(values, sorted(values)) + + def test_unset_is_zero(self): + """Test that UNSET has value 0.""" + levels = get_log_levels_from_msg() + level_dict = dict(levels) + self.assertEqual(level_dict['UNSET'], 0) + + def test_level_ordering(self): + """Test that levels are in expected severity order.""" + levels = get_log_levels_from_msg() + level_dict = dict(levels) + self.assertLess(level_dict['UNSET'], level_dict['DEBUG']) + self.assertLess(level_dict['DEBUG'], level_dict['INFO']) + self.assertLess(level_dict['INFO'], level_dict['WARN']) + self.assertLess(level_dict['WARN'], level_dict['ERROR']) + self.assertLess(level_dict['ERROR'], level_dict['FATAL']) + + +class TestLevelMaps(unittest.TestCase): + """Test the module-level level mapping dictionaries.""" + + def test_name_to_value_contains_all_levels(self): + """Test that LEVEL_NAME_TO_VALUE contains all expected levels.""" + for name in ['UNSET', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']: + self.assertIn(name, LEVEL_NAME_TO_VALUE) + + def test_value_to_name_contains_all_levels(self): + """Test that LEVEL_VALUE_TO_NAME contains all expected levels.""" + for name in ['UNSET', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']: + value = LEVEL_NAME_TO_VALUE[name] + self.assertIn(value, LEVEL_VALUE_TO_NAME) + self.assertEqual(LEVEL_VALUE_TO_NAME[value], name) + + def test_round_trip(self): + """Test that name->value->name round-trips correctly.""" + for name, value in LEVEL_NAME_TO_VALUE.items(): + self.assertEqual(LEVEL_VALUE_TO_NAME[value], name) + + +class TestLogLevelDescriptions(unittest.TestCase): + """Test the LOG_LEVEL_DESCRIPTIONS dictionary.""" + + def test_all_levels_have_descriptions(self): + """Test that every level returned by get_log_levels_from_msg has a description.""" + levels = get_log_levels_from_msg() + for name, _ in levels: + self.assertIn(name, LOG_LEVEL_DESCRIPTIONS) + self.assertTrue(len(LOG_LEVEL_DESCRIPTIONS[name]) > 0) + + +class TestLevelsVerb(unittest.TestCase): + """Test the LevelsVerb class.""" + + def _run_verb(self, value=False): + """Run the LevelsVerb and capture its output.""" + verb = LevelsVerb() + + class Args: + pass + + args = Args() + args.value = value + + captured = StringIO() + sys.stdout = captured + try: + result = verb.main(args=args) + finally: + sys.stdout = sys.__stdout__ + + return result, captured.getvalue() + + def test_main_returns_zero(self): + """Test that main() returns 0.""" + result, _ = self._run_verb() + self.assertEqual(result, 0) + + def test_output_contains_all_level_names(self): + """Test that output contains all expected level names.""" + _, output = self._run_verb() + for name in ['UNSET', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']: + self.assertIn(name, output) + + def test_output_contains_descriptions(self): + """Test that output contains the descriptions.""" + _, output = self._run_verb() + for desc in LOG_LEVEL_DESCRIPTIONS.values(): + self.assertIn(desc, output) + + def test_output_without_values(self): + """Test output format without --value flag.""" + _, output = self._run_verb(value=False) + lines = output.strip().split('\n') + self.assertGreater(len(lines), 0) + for line in lines: + # Should not contain parenthesized numeric values + self.assertNotRegex(line, r'\(\s*\d+\)') + + def test_output_with_values(self): + """Test output format with --value flag.""" + _, output = self._run_verb(value=True) + lines = output.strip().split('\n') + self.assertGreater(len(lines), 0) + for line in lines: + # Each line should contain a parenthesized numeric value + self.assertRegex(line, r'\(\s*\d+\)') + + def test_output_levels_in_order(self): + """Test that levels are printed in ascending severity order.""" + _, output = self._run_verb() + lines = output.strip().split('\n') + level_names_in_output = [] + for line in lines: + name = line.split(':')[0].strip().split()[0] + level_names_in_output.append(name) + expected_order = ['UNSET', 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'] + self.assertEqual(level_names_in_output, expected_order) + + +if __name__ == '__main__': + unittest.main() diff --git a/ros2log/test/test_pep257.py b/ros2log/test/test_pep257.py new file mode 100644 index 000000000..d81c0d31a --- /dev/null +++ b/ros2log/test/test_pep257.py @@ -0,0 +1,23 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_pep257.main import main +import pytest + + +@pytest.mark.linter +@pytest.mark.pep257 +def test_pep257(): + rc = main(argv=['.', 'test']) + assert rc == 0, 'Found code style errors / warnings' diff --git a/ros2log/test/test_set.py b/ros2log/test/test_set.py new file mode 100644 index 000000000..5685d49ea --- /dev/null +++ b/ros2log/test/test_set.py @@ -0,0 +1,151 @@ +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from io import StringIO +import sys +from types import SimpleNamespace +import unittest +from unittest.mock import MagicMock +from unittest.mock import patch + +from rcl_interfaces.msg import SetLoggerLevelsResult + +from ros2log.verb.set import SetVerb + + +def _make_context_manager(value): + context_manager = MagicMock() + context_manager.__enter__.return_value = value + context_manager.__exit__.return_value = False + return context_manager + + +class TestSetVerb(unittest.TestCase): + """Test the set verb.""" + + def test_requires_node_name_or_all(self): + verb = SetVerb() + args = SimpleNamespace(all=False, node_name=None, level='INFO') + + self.assertEqual( + 'Either a node name or --all must be specified', + verb.main(args=args), + ) + + def test_rejects_node_name_with_all(self): + verb = SetVerb() + args = SimpleNamespace(all=True, node_name='/talker', level='INFO') + + self.assertEqual( + 'Node name cannot be used with --all', + verb.main(args=args), + ) + + @patch('ros2log.verb.set.call_set_logger_levels') + @patch('ros2log.verb.set.get_target_node_names') + @patch('ros2log.verb.set.DirectNode') + @patch('ros2log.verb.set.NodeStrategy') + def test_single_node_success_output( + self, + mock_node_strategy, + mock_direct_node, + mock_get_target_node_names, + mock_call_set_logger_levels, + ): + mock_node_strategy.return_value = _make_context_manager(object()) + mock_direct_node.return_value = _make_context_manager(object()) + mock_get_target_node_names.return_value = (['/talker'], None) + mock_call_set_logger_levels.return_value = { + '/talker': [SetLoggerLevelsResult(successful=True, reason='')], + } + + verb = SetVerb() + args = SimpleNamespace(all=False, node_name='/talker', level='DEBUG') + + captured = StringIO() + sys.stdout = captured + try: + result = verb.main(args=args) + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(0, result) + self.assertEqual('Set logger level successful\n', captured.getvalue()) + + @patch('ros2log.verb.set.call_set_logger_levels') + @patch('ros2log.verb.set.get_target_node_names') + @patch('ros2log.verb.set.DirectNode') + @patch('ros2log.verb.set.NodeStrategy') + def test_all_nodes_prefixes_success_output( + self, + mock_node_strategy, + mock_direct_node, + mock_get_target_node_names, + mock_call_set_logger_levels, + ): + mock_node_strategy.return_value = _make_context_manager(object()) + mock_direct_node.return_value = _make_context_manager(object()) + mock_get_target_node_names.return_value = (['/talker', '/worker'], None) + mock_call_set_logger_levels.return_value = { + '/talker': [SetLoggerLevelsResult(successful=True, reason='')], + '/worker': [SetLoggerLevelsResult(successful=True, reason='')], + } + + verb = SetVerb() + args = SimpleNamespace(all=True, node_name=None, level='WARN') + + captured = StringIO() + sys.stdout = captured + try: + result = verb.main(args=args) + finally: + sys.stdout = sys.__stdout__ + + self.assertEqual(0, result) + self.assertEqual( + '/talker: Set logger level successful\n' + '/worker: Set logger level successful\n', + captured.getvalue(), + ) + + @patch('ros2log.verb.set.call_set_logger_levels') + @patch('ros2log.verb.set.get_target_node_names') + @patch('ros2log.verb.set.DirectNode') + @patch('ros2log.verb.set.NodeStrategy') + def test_failure_is_reported_to_stderr( + self, + mock_node_strategy, + mock_direct_node, + mock_get_target_node_names, + mock_call_set_logger_levels, + ): + mock_node_strategy.return_value = _make_context_manager(object()) + mock_direct_node.return_value = _make_context_manager(object()) + mock_get_target_node_names.return_value = (['/talker'], None) + mock_call_set_logger_levels.return_value = { + '/talker': [SetLoggerLevelsResult(successful=False, reason='denied')], + } + + verb = SetVerb() + args = SimpleNamespace(all=False, node_name='/talker', level='ERROR') + + captured = StringIO() + sys.stderr = captured + try: + result = verb.main(args=args) + finally: + sys.stderr = sys.__stderr__ + + self.assertEqual(1, result) + self.assertEqual('Setting logger level failed: denied\n', captured.getvalue()) diff --git a/ros2log/test/test_watch.py b/ros2log/test/test_watch.py new file mode 100644 index 000000000..9599306d1 --- /dev/null +++ b/ros2log/test/test_watch.py @@ -0,0 +1,246 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from io import StringIO +import sys +import unittest + +from rcl_interfaces.msg import Log +import rclpy +from rclpy.node import Node + +from ros2log.verb.watch import LogWatcher + + +class TestWatchVerb(unittest.TestCase): + """Test the watch verb functionality.""" + + @classmethod + def setUpClass(cls): + """Set up test fixtures.""" + rclpy.init() + + @classmethod + def tearDownClass(cls): + """Tear down test fixtures.""" + rclpy.shutdown() + + def setUp(self): + """Set up each test.""" + self.node = Node('test_log_watch_node') + + def tearDown(self): + """Clean up after each test.""" + self.node.destroy_node() + + def test_level_filter(self): + """Test that level filtering works correctly.""" + watcher = LogWatcher( + self.node, + level_filter='ERROR', + enable_color=False, + show_timestamp=False, + enable_content_filter=False, + ) + + # Create test log messages + debug_msg = Log(level=Log.DEBUG, name='test_logger', msg='Debug message') + info_msg = Log(level=Log.INFO, name='test_logger', msg='Info message') + warn_msg = Log(level=Log.WARN, name='test_logger', msg='Warn message') + error_msg = Log(level=Log.ERROR, name='test_logger', msg='Error message') + fatal_msg = Log(level=Log.FATAL, name='test_logger', msg='Fatal message') + + # Capture stdout + captured_output = StringIO() + sys.stdout = captured_output + + # Send messages through callback + watcher._log_callback(debug_msg) + watcher._log_callback(info_msg) + watcher._log_callback(warn_msg) + watcher._log_callback(error_msg) + watcher._log_callback(fatal_msg) + + # Restore stdout + sys.stdout = sys.__stdout__ + + # Check that only ERROR message was printed + output = captured_output.getvalue() + self.assertNotIn('Debug message', output) + self.assertNotIn('Info message', output) + self.assertNotIn('Warn message', output) + self.assertIn('Error message', output) + self.assertIn('Fatal message', output) + + def test_logger_filter(self): + """Test that logger name filtering works correctly.""" + watcher = LogWatcher( + self.node, + logger_filter='my_logger', + enable_color=False, + show_timestamp=False, + enable_content_filter=False, + ) + + # Create test log messages + msg1 = Log(level=Log.INFO, name='my_logger', msg='Message from my_logger') + msg2 = Log(level=Log.INFO, name='other_logger', msg='Message from other_logger') + + # Capture stdout + captured_output = StringIO() + sys.stdout = captured_output + + # Send messages through callback + watcher._log_callback(msg1) + watcher._log_callback(msg2) + + # Restore stdout + sys.stdout = sys.__stdout__ + + # Check that only my_logger message was printed + output = captured_output.getvalue() + self.assertIn('Message from my_logger', output) + self.assertNotIn('Message from other_logger', output) + + def test_regex_filter(self): + """Test that regex filtering works correctly.""" + watcher = LogWatcher( + self.node, + regex_filter='sensor.*timeout', + enable_color=False, + show_timestamp=False, + enable_content_filter=False, + ) + + # Create test log messages + msg1 = Log(level=Log.INFO, name='test', msg='sensor camera timeout') + msg2 = Log(level=Log.INFO, name='test', msg='sensor lidar timeout') + msg3 = Log(level=Log.INFO, name='test', msg='motor timeout') + + # Capture stdout + captured_output = StringIO() + sys.stdout = captured_output + + # Send messages through callback + watcher._log_callback(msg1) + watcher._log_callback(msg2) + watcher._log_callback(msg3) + + # Restore stdout + sys.stdout = sys.__stdout__ + + # Check that only messages matching the pattern were printed + output = captured_output.getvalue() + self.assertIn('sensor camera timeout', output) + self.assertIn('sensor lidar timeout', output) + self.assertNotIn('motor timeout', output) + + def test_combined_filters(self): + """Test that multiple filters work together.""" + watcher = LogWatcher( + self.node, + level_filter='ERROR', + logger_filter='my_logger', + regex_filter='camera', + enable_color=False, + show_timestamp=False, + enable_content_filter=False, + ) + + # Create test log messages + msg1 = Log(level=Log.ERROR, name='my_logger', msg='camera error') + msg2 = Log(level=Log.ERROR, name='my_logger', msg='lidar error') + msg3 = Log(level=Log.ERROR, name='other_logger', msg='camera error') + msg4 = Log(level=Log.INFO, name='my_logger', msg='camera info') + + # Capture stdout + captured_output = StringIO() + sys.stdout = captured_output + + # Send messages through callback + watcher._log_callback(msg1) + watcher._log_callback(msg2) + watcher._log_callback(msg3) + watcher._log_callback(msg4) + + # Restore stdout + sys.stdout = sys.__stdout__ + + # Check that only the message matching all filters was printed + output = captured_output.getvalue() + self.assertIn('camera error', output) + # Verify the output appears exactly once (only msg1) + self.assertEqual(output.count('camera error'), 1) + + def test_no_color_output(self): + """Test that no-color option removes ANSI codes.""" + watcher = LogWatcher( + self.node, + enable_color=False, + show_timestamp=False, + enable_content_filter=False, + ) + + msg = Log(level=Log.ERROR, name='test', msg='Test message') + + # Capture stdout + captured_output = StringIO() + sys.stdout = captured_output + + watcher._log_callback(msg) + + # Restore stdout + sys.stdout = sys.__stdout__ + + output = captured_output.getvalue() + # Check that no ANSI escape codes are present + self.assertNotIn('\033[', output) + + def test_function_detail_output(self): + """Test that function detail option includes function info.""" + watcher = LogWatcher( + self.node, + enable_color=False, + show_timestamp=False, + show_function_detail=True, + enable_content_filter=False, + ) + + msg = Log( + level=Log.INFO, + name='test', + msg='Test message', + function='my_function', + file='test.py', + line=42 + ) + + # Capture stdout + captured_output = StringIO() + sys.stdout = captured_output + + watcher._log_callback(msg) + + # Restore stdout + sys.stdout = sys.__stdout__ + + output = captured_output.getvalue() + # Check that function details are included + self.assertIn('my_function', output) + self.assertIn('test.py', output) + self.assertIn('42', output) + + +if __name__ == '__main__': + unittest.main() diff --git a/ros2log/test/test_xmllint.py b/ros2log/test/test_xmllint.py new file mode 100644 index 000000000..a392d8e2c --- /dev/null +++ b/ros2log/test/test_xmllint.py @@ -0,0 +1,23 @@ +# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ament_xmllint.main import main +import pytest + + +@pytest.mark.linter +@pytest.mark.xmllint +def test_xmllint(): + rc = main(argv=[]) + assert rc == 0, 'Found errors' From 0ee0e4d99736ce1dd8fb61191a586798ccf6d1af Mon Sep 17 00:00:00 2001 From: Decwest Date: Mon, 30 Mar 2026 22:41:24 +0900 Subject: [PATCH 02/13] fix copyright year Signed-off-by: Decwest --- ros2log/ros2log/command/log.py | 2 +- ros2log/ros2log/verb/__init__.py | 2 +- ros2log/ros2log/verb/list.py | 2 +- ros2log/ros2log/verb/watch.py | 2 +- ros2log/test/fixtures/listener_node.py | 2 +- ros2log/test/fixtures/talker_node.py | 2 +- ros2log/test/test_cli.py | 2 +- ros2log/test/test_copyright.py | 2 +- ros2log/test/test_flake8.py | 2 +- ros2log/test/test_pep257.py | 2 +- ros2log/test/test_watch.py | 2 +- ros2log/test/test_xmllint.py | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ros2log/ros2log/command/log.py b/ros2log/ros2log/command/log.py index b52c14bc8..c59b5bb09 100644 --- a/ros2log/ros2log/command/log.py +++ b/ros2log/ros2log/command/log.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/ros2log/verb/__init__.py b/ros2log/ros2log/verb/__init__.py index 25b4b6a71..a599ba9f5 100644 --- a/ros2log/ros2log/verb/__init__.py +++ b/ros2log/ros2log/verb/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/ros2log/verb/list.py b/ros2log/ros2log/verb/list.py index 45a5c69bd..675fda612 100644 --- a/ros2log/ros2log/verb/list.py +++ b/ros2log/ros2log/verb/list.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/ros2log/verb/watch.py b/ros2log/ros2log/verb/watch.py index aaf6454ce..cad165e8d 100644 --- a/ros2log/ros2log/verb/watch.py +++ b/ros2log/ros2log/verb/watch.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/fixtures/listener_node.py b/ros2log/test/fixtures/listener_node.py index c36da4b8c..9ca088edd 100644 --- a/ros2log/test/fixtures/listener_node.py +++ b/ros2log/test/fixtures/listener_node.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/fixtures/talker_node.py b/ros2log/test/fixtures/talker_node.py index 5db21657c..73df9e609 100644 --- a/ros2log/test/fixtures/talker_node.py +++ b/ros2log/test/fixtures/talker_node.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/test_cli.py b/ros2log/test/test_cli.py index 5534233dc..5b1bf385e 100644 --- a/ros2log/test/test_cli.py +++ b/ros2log/test/test_cli.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/test_copyright.py b/ros2log/test/test_copyright.py index 1458d2745..dbd37a6ba 100644 --- a/ros2log/test/test_copyright.py +++ b/ros2log/test/test_copyright.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/test_flake8.py b/ros2log/test/test_flake8.py index b156a4a10..941099c88 100644 --- a/ros2log/test/test_flake8.py +++ b/ros2log/test/test_flake8.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/test_pep257.py b/ros2log/test/test_pep257.py index d81c0d31a..8cdc95217 100644 --- a/ros2log/test/test_pep257.py +++ b/ros2log/test/test_pep257.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/test_watch.py b/ros2log/test/test_watch.py index 9599306d1..8ea8ada8d 100644 --- a/ros2log/test/test_watch.py +++ b/ros2log/test/test_watch.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/ros2log/test/test_xmllint.py b/ros2log/test/test_xmllint.py index a392d8e2c..b9526f746 100644 --- a/ros2log/test/test_xmllint.py +++ b/ros2log/test/test_xmllint.py @@ -1,4 +1,4 @@ -# Copyright 2025 Tomoya Fujita, Fumiya Ohnishi +# Copyright 2026 Tomoya Fujita, Fumiya Ohnishi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 934503e4efa262e7ad3a0caffb95ac79d8c24d52 Mon Sep 17 00:00:00 2001 From: Decwest Date: Mon, 30 Mar 2026 23:09:26 +0900 Subject: [PATCH 03/13] fix to destroy clients Signed-off-by: Decwest --- ros2log/ros2log/api/__init__.py | 104 +++++++++++++++++--------------- 1 file changed, 56 insertions(+), 48 deletions(-) diff --git a/ros2log/ros2log/api/__init__.py b/ros2log/ros2log/api/__init__.py index f882d5f32..96f825ff0 100644 --- a/ros2log/ros2log/api/__init__.py +++ b/ros2log/ros2log/api/__init__.py @@ -138,33 +138,37 @@ def call_get_logger_levels( futures = {} results = {} - for node_name, logger_names in logger_names_by_node.items(): - client = node.create_client( - GetLoggerLevels, - get_get_logger_levels_service_name(node_name), - ) - clients[node_name] = client - if not client.wait_for_service(timeout_sec=timeout_sec): - results[node_name] = RuntimeError( - 'Wait for service timed out waiting for logger services ' - f'for node {node_name}' + try: + for node_name, logger_names in logger_names_by_node.items(): + client = node.create_client( + GetLoggerLevels, + get_get_logger_levels_service_name(node_name), ) - continue + clients[node_name] = client + if not client.wait_for_service(timeout_sec=timeout_sec): + results[node_name] = RuntimeError( + 'Wait for service timed out waiting for logger services ' + f'for node {node_name}' + ) + continue - request = GetLoggerLevels.Request() - request.names = list(logger_names) - futures[node_name] = client.call_async(request) + request = GetLoggerLevels.Request() + request.names = list(logger_names) + futures[node_name] = client.call_async(request) - while futures and not all(future.done() for future in futures.values()): - rclpy.spin_once(node, timeout_sec=0.1) + while futures and not all(future.done() for future in futures.values()): + rclpy.spin_once(node, timeout_sec=0.1) - for node_name, future in futures.items(): - if future.result() is not None: - results[node_name] = list(future.result().levels) - else: - results[node_name] = future.exception() + for node_name, future in futures.items(): + if future.result() is not None: + results[node_name] = list(future.result().levels) + else: + results[node_name] = future.exception() - return results + return results + finally: + for client in clients.values(): + node.destroy_client(client) def call_set_logger_levels( @@ -178,33 +182,37 @@ def call_set_logger_levels( futures = {} results = {} - for node_name, levels in levels_by_node.items(): - client = node.create_client( - SetLoggerLevels, - get_set_logger_levels_service_name(node_name), - ) - clients[node_name] = client - if not client.wait_for_service(timeout_sec=timeout_sec): - results[node_name] = RuntimeError( - 'Wait for service timed out waiting for logger services ' - f'for node {node_name}' + try: + for node_name, levels in levels_by_node.items(): + client = node.create_client( + SetLoggerLevels, + get_set_logger_levels_service_name(node_name), ) - continue - - request = SetLoggerLevels.Request() - request.levels = list(levels) - futures[node_name] = client.call_async(request) - - while futures and not all(future.done() for future in futures.values()): - rclpy.spin_once(node, timeout_sec=0.1) - - for node_name, future in futures.items(): - if future.result() is not None: - results[node_name] = list(future.result().results) - else: - results[node_name] = future.exception() - - return results + clients[node_name] = client + if not client.wait_for_service(timeout_sec=timeout_sec): + results[node_name] = RuntimeError( + 'Wait for service timed out waiting for logger services ' + f'for node {node_name}' + ) + continue + + request = SetLoggerLevels.Request() + request.levels = list(levels) + futures[node_name] = client.call_async(request) + + while futures and not all(future.done() for future in futures.values()): + rclpy.spin_once(node, timeout_sec=0.1) + + for node_name, future in futures.items(): + if future.result() is not None: + results[node_name] = list(future.result().results) + else: + results[node_name] = future.exception() + + return results + finally: + for client in clients.values(): + node.destroy_client(client) def _service_has_type(service_map, service_name: str, service_type: str) -> bool: From 10cdadb3a52660226e4f568670ca9df28fd1aa07 Mon Sep 17 00:00:00 2001 From: Decwest Date: Mon, 30 Mar 2026 23:12:54 +0900 Subject: [PATCH 04/13] fix to remove dependencies Signed-off-by: Decwest --- ros2log/package.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/ros2log/package.xml b/ros2log/package.xml index ee7f3747b..fdee4c1d9 100644 --- a/ros2log/package.xml +++ b/ros2log/package.xml @@ -12,8 +12,6 @@ Fumiya Ohnishi python3-argcomplete - python3-packaging - python3-psutil rcl_interfaces rclpy ros2cli From 2254ca5ce4b68daad4dfaff4c4dda9e45dbb7faf Mon Sep 17 00:00:00 2001 From: Decwest Date: Mon, 30 Mar 2026 23:21:27 +0900 Subject: [PATCH 05/13] add missing dependencies Signed-off-by: Decwest --- ros2log/package.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ros2log/package.xml b/ros2log/package.xml index fdee4c1d9..8f18f002a 100644 --- a/ros2log/package.xml +++ b/ros2log/package.xml @@ -15,12 +15,17 @@ rcl_interfaces rclpy ros2cli + ros2node rosgraph_msgs ament_copyright ament_flake8 ament_pep257 ament_xmllint + launch + launch_ros + launch_testing + launch_testing_ros python3-pytest python3-pytest-timeout test_msgs From d5bdf970aaf627fd4166dcab3f50bca8d0bf4742 Mon Sep 17 00:00:00 2001 From: Decwest Date: Wed, 1 Apr 2026 01:12:13 +0900 Subject: [PATCH 06/13] fix type annotation Signed-off-by: Decwest --- ros2log/ros2log/api/__init__.py | 10 ++++++---- ros2log/ros2log/verb/get.py | 3 ++- ros2log/ros2log/verb/set.py | 3 ++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ros2log/ros2log/api/__init__.py b/ros2log/ros2log/api/__init__.py index 96f825ff0..8fa41413e 100644 --- a/ros2log/ros2log/api/__init__.py +++ b/ros2log/ros2log/api/__init__.py @@ -14,6 +14,8 @@ from collections.abc import Iterator from collections.abc import Sequence +from typing import Optional +from typing import Union from rcl_interfaces.msg import LoggerLevel from rcl_interfaces.msg import SetLoggerLevelsResult @@ -87,9 +89,9 @@ def get_logger_service_nodes(*, node) -> list[NodeName]: def get_target_node_names( *, node, - node_name: str | None = None, + node_name: Optional[str] = None, all_nodes: bool = False, -) -> tuple[list[str] | None, str | None]: +) -> tuple[Optional[list[str]], Optional[str]]: """Resolve the node names targeted by a get/set command.""" logger_service_nodes = get_logger_service_nodes(node=node) logger_service_node_names = { @@ -132,7 +134,7 @@ def call_get_logger_levels( node, logger_names_by_node: dict[str, Sequence[str]], timeout_sec: float = 5.0, -) -> dict[str, list[LoggerLevel] | Exception]: +) -> dict[str, Union[list[LoggerLevel], Exception]]: """Call the get-logger-levels service for one or more nodes.""" clients = {} futures = {} @@ -176,7 +178,7 @@ def call_set_logger_levels( node, levels_by_node: dict[str, Sequence[LoggerLevel]], timeout_sec: float = 5.0, -) -> dict[str, list[SetLoggerLevelsResult] | Exception]: +) -> dict[str, Union[list[SetLoggerLevelsResult], Exception]]: """Call the set-logger-levels service for one or more nodes.""" clients = {} futures = {} diff --git a/ros2log/ros2log/verb/get.py b/ros2log/ros2log/verb/get.py index b60854cd0..c55df3793 100644 --- a/ros2log/ros2log/verb/get.py +++ b/ros2log/ros2log/verb/get.py @@ -13,6 +13,7 @@ # limitations under the License. import sys +from typing import Optional from ros2cli.node.direct import DirectNode from ros2cli.node.strategy import add_arguments @@ -94,7 +95,7 @@ def main(self, *, args): # noqa: D102 return 1 if had_error else 0 -def _validate_arguments(args) -> str | None: +def _validate_arguments(args) -> Optional[str]: if args.all and args.node_name is not None: return 'Node name cannot be used with --all' if not args.all and args.node_name is None: diff --git a/ros2log/ros2log/verb/set.py b/ros2log/ros2log/verb/set.py index c9c07b993..b3c987293 100644 --- a/ros2log/ros2log/verb/set.py +++ b/ros2log/ros2log/verb/set.py @@ -13,6 +13,7 @@ # limitations under the License. import sys +from typing import Optional from rcl_interfaces.msg import LoggerLevel @@ -114,7 +115,7 @@ def main(self, *, args): # noqa: D102 return 1 if had_error else 0 -def _validate_arguments(args) -> str | None: +def _validate_arguments(args) -> Optional[str]: if args.all and args.node_name is not None: return 'Node name cannot be used with --all' if not args.all and args.node_name is None: From d4df5d061593c9b0e5583034826946d5ab8c79cd Mon Sep 17 00:00:00 2001 From: Decwest Date: Fri, 3 Apr 2026 20:53:27 +0900 Subject: [PATCH 07/13] add type hints to node Signed-off-by: Decwest --- ros2log/ros2log/api/__init__.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ros2log/ros2log/api/__init__.py b/ros2log/ros2log/api/__init__.py index 8fa41413e..4e6624b18 100644 --- a/ros2log/ros2log/api/__init__.py +++ b/ros2log/ros2log/api/__init__.py @@ -23,6 +23,7 @@ from rcl_interfaces.srv import SetLoggerLevels import rclpy +from rclpy.node import Node from ros2node.api import get_absolute_node_name from ros2node.api import get_node_names from ros2node.api import NodeName @@ -73,7 +74,7 @@ def format_logger_service_unavailable_error(node_name: str) -> str: def iter_logger_service_nodes( *, - node, + node: Node, ) -> Iterator[NodeName]: """Yield nodes that expose the logger get/set services as they are discovered.""" for node_name in get_node_names(node=node): @@ -81,14 +82,14 @@ def iter_logger_service_nodes( yield node_name -def get_logger_service_nodes(*, node) -> list[NodeName]: +def get_logger_service_nodes(*, node: Node) -> list[NodeName]: """Return all nodes that expose the logger get/set services.""" return list(iter_logger_service_nodes(node=node)) def get_target_node_names( *, - node, + node: Node, node_name: Optional[str] = None, all_nodes: bool = False, ) -> tuple[Optional[list[str]], Optional[str]]: @@ -115,7 +116,7 @@ def get_target_node_names( return [absolute_node_name], None -def node_has_logger_services(node, node_name: NodeName) -> bool: +def node_has_logger_services(node: Node, node_name: NodeName) -> bool: """Check if a node provides both get/set logger level services.""" services = node.get_service_names_and_types_by_node(node_name.name, node_name.namespace) service_map = dict(services) @@ -131,7 +132,7 @@ def node_has_logger_services(node, node_name: NodeName) -> bool: def call_get_logger_levels( *, - node, + node: Node, logger_names_by_node: dict[str, Sequence[str]], timeout_sec: float = 5.0, ) -> dict[str, Union[list[LoggerLevel], Exception]]: @@ -175,7 +176,7 @@ def call_get_logger_levels( def call_set_logger_levels( *, - node, + node: Node, levels_by_node: dict[str, Sequence[LoggerLevel]], timeout_sec: float = 5.0, ) -> dict[str, Union[list[SetLoggerLevelsResult], Exception]]: From 81e4cb4a96381a5cf298af7f5b96d9642f8164f6 Mon Sep 17 00:00:00 2001 From: Fumiya Ohnishi Date: Fri, 3 Apr 2026 20:57:40 +0900 Subject: [PATCH 08/13] Update ros2log/ros2log/verb/watch.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alejandro Hernández Cordero Signed-off-by: Fumiya Ohnishi --- ros2log/ros2log/verb/watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ros2log/ros2log/verb/watch.py b/ros2log/ros2log/verb/watch.py index cad165e8d..cd80363d5 100644 --- a/ros2log/ros2log/verb/watch.py +++ b/ros2log/ros2log/verb/watch.py @@ -137,7 +137,7 @@ def __init__( enable_color: bool = True, show_timestamp: bool = True, show_function_detail: bool = False, - qos_profile=qos_profile_rosout_default, + qos_profile: QoSProfile = qos_profile_rosout_default, enable_content_filter: bool = True, debug: bool = False, ): From 35099fadba12de924f15002c2f55d33305ec6ca6 Mon Sep 17 00:00:00 2001 From: Fumiya Ohnishi Date: Fri, 3 Apr 2026 20:58:14 +0900 Subject: [PATCH 09/13] Update ros2log/ros2log/verb/watch.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alejandro Hernández Cordero Signed-off-by: Fumiya Ohnishi --- ros2log/ros2log/verb/watch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ros2log/ros2log/verb/watch.py b/ros2log/ros2log/verb/watch.py index cd80363d5..152f3a977 100644 --- a/ros2log/ros2log/verb/watch.py +++ b/ros2log/ros2log/verb/watch.py @@ -21,6 +21,7 @@ import rclpy from rclpy.node import Node from rclpy.qos import qos_profile_rosout_default +from rclpy.qos import QoSProfile from ros2cli.node.direct import DirectNode from ros2cli.qos import add_qos_arguments From c74d9b9fbdae73e21d3ec151d48b472026c397be Mon Sep 17 00:00:00 2001 From: Fumiya Ohnishi Date: Fri, 3 Apr 2026 20:58:54 +0900 Subject: [PATCH 10/13] Update ros2log/ros2log/verb/watch.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alejandro Hernández Cordero Signed-off-by: Fumiya Ohnishi --- ros2log/ros2log/verb/watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ros2log/ros2log/verb/watch.py b/ros2log/ros2log/verb/watch.py index 152f3a977..6bd41b297 100644 --- a/ros2log/ros2log/verb/watch.py +++ b/ros2log/ros2log/verb/watch.py @@ -141,7 +141,7 @@ def __init__( qos_profile: QoSProfile = qos_profile_rosout_default, enable_content_filter: bool = True, debug: bool = False, - ): + ) -> None: self.node = node self.enable_color = enable_color self.show_timestamp = show_timestamp From ce868df2b825add2aaac71124b9233b0e32caf2a Mon Sep 17 00:00:00 2001 From: Fumiya Ohnishi Date: Fri, 3 Apr 2026 20:59:36 +0900 Subject: [PATCH 11/13] Update ros2log/ros2log/api/__init__.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alejandro Hernández Cordero Signed-off-by: Fumiya Ohnishi --- ros2log/ros2log/api/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ros2log/ros2log/api/__init__.py b/ros2log/ros2log/api/__init__.py index 4e6624b18..7345a503d 100644 --- a/ros2log/ros2log/api/__init__.py +++ b/ros2log/ros2log/api/__init__.py @@ -137,6 +137,8 @@ def call_get_logger_levels( timeout_sec: float = 5.0, ) -> dict[str, Union[list[LoggerLevel], Exception]]: """Call the get-logger-levels service for one or more nodes.""" + if timeout_sec <= 0: + raise ValueError(f'timeout_sec must be positive, got {timeout_sec}') clients = {} futures = {} results = {} From d55e3722c42cc5ef0d586748579992474fcc714c Mon Sep 17 00:00:00 2001 From: Decwest Date: Fri, 3 Apr 2026 21:19:08 +0900 Subject: [PATCH 12/13] add checking negative timeout_sec Signed-off-by: Decwest --- ros2log/ros2log/api/__init__.py | 9 +++++-- ros2log/test/test_api.py | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/ros2log/ros2log/api/__init__.py b/ros2log/ros2log/api/__init__.py index 7345a503d..d6592fd8b 100644 --- a/ros2log/ros2log/api/__init__.py +++ b/ros2log/ros2log/api/__init__.py @@ -35,6 +35,11 @@ LOGGER_SET_SERVICE_TYPE = 'rcl_interfaces/srv/SetLoggerLevels' +def _validate_timeout_sec(timeout_sec: float) -> None: + if timeout_sec < 0: + raise ValueError(f'timeout_sec must be non-negative, got {timeout_sec}') + + def _require_absolute_node_name(node_name: str) -> str: absolute_node_name = get_absolute_node_name(node_name) if absolute_node_name is None: @@ -137,8 +142,7 @@ def call_get_logger_levels( timeout_sec: float = 5.0, ) -> dict[str, Union[list[LoggerLevel], Exception]]: """Call the get-logger-levels service for one or more nodes.""" - if timeout_sec <= 0: - raise ValueError(f'timeout_sec must be positive, got {timeout_sec}') + _validate_timeout_sec(timeout_sec) clients = {} futures = {} results = {} @@ -183,6 +187,7 @@ def call_set_logger_levels( timeout_sec: float = 5.0, ) -> dict[str, Union[list[SetLoggerLevelsResult], Exception]]: """Call the set-logger-levels service for one or more nodes.""" + _validate_timeout_sec(timeout_sec) clients = {} futures = {} results = {} diff --git a/ros2log/test/test_api.py b/ros2log/test/test_api.py index 685387283..d6d507916 100644 --- a/ros2log/test/test_api.py +++ b/ros2log/test/test_api.py @@ -15,6 +15,8 @@ import unittest from unittest.mock import patch +from ros2log.api import call_get_logger_levels +from ros2log.api import call_set_logger_levels from ros2log.api import format_logger_service_unavailable_error from ros2log.api import get_get_logger_levels_service_name from ros2log.api import get_logger_name_for_node @@ -111,3 +113,49 @@ def test_node_without_logger_service_returns_specific_error( format_logger_service_unavailable_error('/listener'), error, ) + + +class TestLoggerLevelServiceCalls(unittest.TestCase): + """Test logger-level service call helpers.""" + + def test_call_get_logger_levels_rejects_negative_timeout(self): + with self.assertRaisesRegex( + ValueError, + 'timeout_sec must be non-negative, got -1.0', + ): + call_get_logger_levels( + node=object(), + logger_names_by_node={}, + timeout_sec=-1.0, + ) + + def test_call_get_logger_levels_allows_zero_timeout(self): + self.assertEqual( + {}, + call_get_logger_levels( + node=object(), + logger_names_by_node={}, + timeout_sec=0.0, + ), + ) + + def test_call_set_logger_levels_rejects_negative_timeout(self): + with self.assertRaisesRegex( + ValueError, + 'timeout_sec must be non-negative, got -1.0', + ): + call_set_logger_levels( + node=object(), + levels_by_node={}, + timeout_sec=-1.0, + ) + + def test_call_set_logger_levels_allows_zero_timeout(self): + self.assertEqual( + {}, + call_set_logger_levels( + node=object(), + levels_by_node={}, + timeout_sec=0.0, + ), + ) From 89b8337b304b63b32055e72d112006517393e040 Mon Sep 17 00:00:00 2001 From: Tomoya Fujita Date: Mon, 13 Apr 2026 20:49:35 +0900 Subject: [PATCH 13/13] LogWatcher constructor should raise the exception instead of sys.exit(). Signed-off-by: Tomoya Fujita --- ros2log/ros2log/verb/watch.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/ros2log/ros2log/verb/watch.py b/ros2log/ros2log/verb/watch.py index 6bd41b297..0c45a7bf0 100644 --- a/ros2log/ros2log/verb/watch.py +++ b/ros2log/ros2log/verb/watch.py @@ -106,17 +106,21 @@ def main(self, *, args): with DirectNode(args) as node: # Configure QoS profile based on arguments and available publishers qos_profile = choose_qos(node, '/rosout', args) - LogWatcher( - node, - level_filter=args.level, - logger_filter=args.logger, - regex_filter=args.regex, - enable_color=not args.no_color, - show_timestamp=not args.no_timestamp, - show_function_detail=args.function_detail, - qos_profile=qos_profile, - debug=args.debug, - ) + try: + watcher = LogWatcher( + node, + level_filter=args.level, + logger_filter=args.logger, + regex_filter=args.regex, + enable_color=not args.no_color, + show_timestamp=not args.no_timestamp, + show_function_detail=args.function_detail, + qos_profile=qos_profile, + debug=args.debug, + ) + except ValueError as e: + print(str(e), file=sys.stderr) + return 1 try: rclpy.spin(node) @@ -160,8 +164,7 @@ def __init__( try: self.regex_pattern = re.compile(regex_filter) except re.error as e: - node.get_logger().error(f'Invalid regex pattern: {e}') - sys.exit(1) + raise ValueError(f'Invalid regex pattern: {e}') from e # Create subscription to /rosout self.subscription = node.create_subscription(