Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,42 @@
import yaml


def parse_repeat_transient_local_topics(values):
repeat_topics = {}
if not values:
return repeat_topics

for value in values:
topic = value
depth = 1
if '=' in value:
topic, parsed_depth = value.split('=', 1)
if not parsed_depth:
raise ValueError(
f'Invalid value for --repeat-transient-local: "{value}". '
'Expected format <topic> or <topic>=<depth>.')
try:
depth = int(parsed_depth)
except ValueError as exc:
raise ValueError(
f'Invalid depth for --repeat-transient-local: "{value}". '
'Depth must be a positive integer.') from exc

if not topic:
raise ValueError(
f'Invalid value for --repeat-transient-local: "{value}". '
'Topic name must not be empty.')

if depth <= 0:
raise ValueError(
f'Invalid depth for --repeat-transient-local: "{value}". '
'Depth must be greater than 0.')

repeat_topics[topic] = depth

return repeat_topics


def add_recorder_arguments(parser: ArgumentParser) -> None:
parser.formatter_class = SplitLineFormatter
writer_choices = get_registered_writers()
Expand Down Expand Up @@ -190,6 +226,11 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
help='Enable snapshot mode. Messages will not be written to the bagfile until '
'the "/rosbag2_recorder/snapshot" service is called. e.g. \n '
'ros2 service call /rosbag2_recorder/snapshot rosbag2_interfaces/Snapshot')
parser.add_argument(
'--repeat-transient-local', type=str, default=[], metavar='Topic[=Depth]', nargs='+',
help='Space-delimited list of transient-local topics whose last messages should be '
'prepended on bag split and snapshot writes. Format: <topic> or <topic>=<depth>. '
'Default depth is 1 when omitted.')
parser.add_argument(
'--log-level', type=str, default='info',
choices=['debug', 'info', 'warn', 'error', 'fatal'],
Expand Down Expand Up @@ -304,6 +345,12 @@ def validate_parsed_arguments(args, uri) -> str:
if args.compression_queue_size < 0:
return print_error('Compression queue size must be at least 0.')

try:
args.repeat_transient_local_messages = parse_repeat_transient_local_topics(
args.repeat_transient_local)
except ValueError as exc:
return print_error(str(exc))

return None


Expand Down Expand Up @@ -394,6 +441,7 @@ def main(self, *, args): # noqa: D102
record_options.ignore_leaf_topics = args.ignore_leaf_topics
record_options.use_sim_time = args.use_sim_time
record_options.disable_keyboard_controls = args.disable_keyboard_controls
record_options.repeat_transient_local_messages = args.repeat_transient_local_messages

recorder = Recorder(storage_options, record_options, args.log_level, args.node_name)

Expand Down
30 changes: 30 additions & 0 deletions ros2bag/test/test_recorder_args_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,36 @@ def test_recorder_custom_data_list_argument(test_arguments_parser):
assert output_path.as_posix() == args.output


def test_recorder_repeat_transient_local_list_argument(test_arguments_parser):
"""Test recorder --repeat-transient-local list argument parser."""
output_path = RESOURCES_PATH / 'ros2bag_tmp_file'
args = test_arguments_parser.parse_args(
['--repeat-transient-local', '/map=2', '/tf_static', '--all-topics',
'--output', output_path.as_posix()]
)
assert ['/map=2', '/tf_static'] == args.repeat_transient_local

uri = args.output or datetime.datetime.now().strftime('rosbag2_%Y_%m_%d-%H_%M_%S')
error_str = validate_parsed_arguments(args, uri)
assert error_str is None
assert {'/map': 2, '/tf_static': 1} == args.repeat_transient_local_messages


def test_recorder_repeat_transient_local_invalid_depth_argument(test_arguments_parser):
"""Test recorder --repeat-transient-local invalid depth fails validation."""
output_path = RESOURCES_PATH / 'ros2bag_tmp_file'
args = test_arguments_parser.parse_args(
['--repeat-transient-local', '/map=0', '--all-topics', '--output', output_path.as_posix()]
)

uri = args.output or datetime.datetime.now().strftime('rosbag2_%Y_%m_%d-%H_%M_%S')
error_str = validate_parsed_arguments(args, uri)
assert error_str is not None
expected_output = 'Depth must be greater than 0'
matches = expected_output in error_str
assert matches, ERROR_STRING_MSG.format(expected_output, error_str)


def test_recorder_validate_exclude_regex_needs_inclusive_args(test_arguments_parser):
"""Test that --exclude-regex needs inclusive arguments."""
output_path = RESOURCES_PATH / 'ros2bag_tmp_file'
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ add_library(${PROJECT_NAME} SHARED
src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp
src/rosbag2_cpp/cache/message_cache.cpp
src/rosbag2_cpp/cache/circular_message_cache.cpp
src/rosbag2_cpp/cache/transient_local_messages_cache.cpp
src/rosbag2_cpp/clocks/time_controller_clock.cpp
src/rosbag2_cpp/converter.cpp
src/rosbag2_cpp/info.cpp
Expand Down Expand Up @@ -208,6 +209,11 @@ if(BUILD_TESTING)
target_link_libraries(test_circular_message_cache ${PROJECT_NAME})
endif()

ament_add_gmock(test_transient_local_messages_cache
test/rosbag2_cpp/test_transient_local_messages_cache.cpp)
if(TARGET test_transient_local_messages_cache)
target_link_libraries(test_transient_local_messages_cache ${PROJECT_NAME})
endif()

# If compiling with gcc, run this test with sanitizers enabled
ament_add_gmock(test_ros2_message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2025 Dexory
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE__TRANSIENT_LOCAL_MESSAGES_CACHE_HPP_
#define ROSBAG2_CPP__CACHE__TRANSIENT_LOCAL_MESSAGES_CACHE_HPP_

#include <cstddef>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

namespace rosbag2_cpp
{
namespace cache
{

class ROSBAG2_CPP_PUBLIC TransientLocalMessagesCache
{
public:
using MessageSharedPtr = std::shared_ptr<const rosbag2_storage::SerializedBagMessage>;

void add_topic(const std::string & topic_name, size_t queue_depth);
void remove_topic(const std::string & topic_name);
bool has_topic(const std::string & topic_name) const;

void push(const std::string & topic_name, MessageSharedPtr message);

std::vector<MessageSharedPtr> get_messages_sorted_by_timestamp() const;

/// Comparator for ordering serialized bag messages by (recv_timestamp, send_timestamp, topic).
static bool message_timestamp_less(
const MessageSharedPtr & left, const MessageSharedPtr & right);

void clear();
size_t size() const;

private:
struct TopicQueue
{
size_t max_depth{0};
std::deque<MessageSharedPtr> messages;
};

mutable std::mutex mutex_;
std::unordered_map<std::string, TopicQueue> topic_queues_;
};

} // namespace cache
} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__CACHE__TRANSIENT_LOCAL_MESSAGES_CACHE_HPP_
10 changes: 10 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_CPP__WRITER_HPP_
#define ROSBAG2_CPP__WRITER_HPP_

#include <cstddef>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -116,6 +117,15 @@ class ROSBAG2_CPP_PUBLIC Writer
const rosbag2_storage::TopicMetadata & topic_with_type,
const rosbag2_storage::MessageDefinition & message_definition);

void create_transient_local_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
size_t num_last_messages);

void create_transient_local_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
size_t num_last_messages,
const rosbag2_storage::MessageDefinition & message_definition);

/**
* Trigger a snapshot when snapshot mode is enabled.
* \returns true if snapshot is successful, false if snapshot fails or is not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_CPP__WRITER_INTERFACES__BASE_WRITER_INTERFACE_HPP_
#define ROSBAG2_CPP__WRITER_INTERFACES__BASE_WRITER_INTERFACE_HPP_

#include <cstddef>
#include <memory>

#include "rosbag2_cpp/bag_events.hpp"
Expand Down Expand Up @@ -48,6 +49,15 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
const rosbag2_storage::TopicMetadata & topic_with_type,
const rosbag2_storage::MessageDefinition & message_definition) = 0;

virtual void create_transient_local_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
size_t num_last_messages) = 0;

virtual void create_transient_local_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
size_t num_last_messages,
const rosbag2_storage::MessageDefinition & message_definition) = 0;

virtual void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) = 0;

virtual void write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message) = 0;
Expand Down
22 changes: 22 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "rosbag2_cpp/cache/circular_message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/cache/transient_local_messages_cache.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/message_definitions/local_message_definition_source.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
Expand Down Expand Up @@ -107,6 +108,15 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
const rosbag2_storage::TopicMetadata & topic_with_type,
const rosbag2_storage::MessageDefinition & message_definition) override;

void create_transient_local_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
size_t num_last_messages) override;

void create_transient_local_topic(
const rosbag2_storage::TopicMetadata & topic_with_type,
size_t num_last_messages,
const rosbag2_storage::MessageDefinition & message_definition) override;

/**
* \brief Removes a new topic in the underlying storage.
* \details Expected to be used if creation of subscription fails and cleanup is needed.
Expand Down Expand Up @@ -152,9 +162,14 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
bool use_cache_ {false};
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;
std::shared_ptr<rosbag2_cpp::cache::TransientLocalMessagesCache> transient_local_cache_;

std::string split_bagfile_local(bool execute_callbacks = true);

void prepend_transient_local_messages(
rcutils_time_point_value_t recv_timestamp,
rcutils_time_point_value_t send_timestamp);

void execute_bag_split_callbacks(
const std::string & closed_file, const std::string & opened_file);

Expand Down Expand Up @@ -206,12 +221,19 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
get_writeable_message(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message);

std::shared_ptr<rosbag2_storage::SerializedBagMessage> copy_with_timestamps(
const std::shared_ptr<const rosbag2_storage::SerializedBagMessage> & message,
rcutils_time_point_value_t recv_timestamp,
rcutils_time_point_value_t send_timestamp) const;

private:
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
bool is_first_message_ {true};
std::atomic_bool is_open_{false};
rcutils_time_point_value_t last_received_timestamp_{0};
rcutils_time_point_value_t last_sent_timestamp_{0};

bag_events::EventCallbackManager callback_manager_;
};
Expand Down
Loading