22
33#include " behaviortree_cpp/xml_parsing.h"
44
5+ #include < array>
6+ #include < condition_variable>
7+ #include < cstring>
8+ #include < deque>
9+ #include < fstream>
10+ #include < mutex>
11+ #include < thread>
12+
513#include " flatbuffers/base.h"
614
715namespace BT
@@ -15,22 +23,26 @@ int64_t ToUsec(Duration ts)
1523}
1624} // namespace
1725
18- struct FileLogger2 ::PImpl
26+ // Define the private implementation struct
27+ struct FileLogger2 ::Pimpl
1928{
2029 std::ofstream file_stream;
30+ std::mutex file_mutex; // Protects file_stream access from multiple threads
2131
2232 Duration first_timestamp = {};
2333
24- std::deque<Transition> transitions_queue;
34+ std::deque<FileLogger2:: Transition> transitions_queue;
2535 std::condition_variable queue_cv;
2636 std::mutex queue_mutex;
2737
2838 std::thread writer_thread;
2939 std::atomic_bool loop = true ;
40+ std::atomic_bool writer_ready = false ; // Signals writer thread is in wait loop
3041};
3142
3243FileLogger2::FileLogger2 (const BT::Tree& tree, std::filesystem::path const & filepath)
33- : StatusChangeLogger(tree.rootNode()), _p(new PImpl)
44+ : StatusChangeLogger() // Deferred subscription
45+ , _p(std::make_unique<Pimpl>())
3446{
3547 if (filepath.filename ().extension () != " .btlog" )
3648 {
@@ -71,6 +83,13 @@ FileLogger2::FileLogger2(const BT::Tree& tree, std::filesystem::path const& file
7183 _p->file_stream .write (write_buffer.data (), 8 );
7284
7385 _p->writer_thread = std::thread (&FileLogger2::writerLoop, this );
86+
87+ // Wait for writer thread to signal readiness (under mutex for proper synchronization)
88+ {
89+ std::unique_lock lock (_p->queue_mutex );
90+ _p->queue_cv .wait (lock, [this ]() { return _p->writer_ready .load (); });
91+ }
92+ subscribeToTreeChanges (tree.rootNode ());
7493}
7594
7695FileLogger2::~FileLogger2 ()
@@ -97,6 +116,7 @@ void FileLogger2::callback(Duration timestamp, const TreeNode& node,
97116
98117void FileLogger2::flush ()
99118{
119+ const std::scoped_lock lock (_p->file_mutex );
100120 _p->file_stream .flush ();
101121}
102122
@@ -105,29 +125,39 @@ void FileLogger2::writerLoop()
105125 // local buffer in this thread
106126 std::deque<Transition> transitions;
107127
128+ // Signal readiness while holding the lock (establishes happens-before with constructor)
129+ {
130+ std::scoped_lock lock (_p->queue_mutex );
131+ _p->writer_ready .store (true , std::memory_order_release);
132+ }
133+ _p->queue_cv .notify_one ();
134+
108135 while (_p->loop )
109136 {
110137 transitions.clear ();
111138 {
112139 std::unique_lock lock (_p->queue_mutex );
113140 _p->queue_cv .wait_for (lock, std::chrono::milliseconds (10 ), [this ]() {
114- return !_p->transitions_queue .empty () && _p->loop ;
141+ return !_p->transitions_queue .empty () || ! _p->loop ;
115142 });
116143 // simple way to pop all the transitions from _p->transitions_queue into transitions
117144 std::swap (transitions, _p->transitions_queue );
118145 }
119- while (!transitions.empty ())
120146 {
121- const auto trans = transitions.front ();
122- std::array<char , 9 > write_buffer{};
123- std::memcpy (write_buffer.data (), &trans.timestamp_usec , 6 );
124- std::memcpy (write_buffer.data () + 6 , &trans.node_uid , 2 );
125- std::memcpy (write_buffer.data () + 8 , &trans.status , 1 );
126-
127- _p->file_stream .write (write_buffer.data (), 9 );
128- transitions.pop_front ();
147+ const std::scoped_lock file_lock (_p->file_mutex );
148+ while (!transitions.empty ())
149+ {
150+ const auto trans = transitions.front ();
151+ std::array<char , 9 > write_buffer{};
152+ std::memcpy (write_buffer.data (), &trans.timestamp_usec , 6 );
153+ std::memcpy (write_buffer.data () + 6 , &trans.node_uid , 2 );
154+ std::memcpy (write_buffer.data () + 8 , &trans.status , 1 );
155+
156+ _p->file_stream .write (write_buffer.data (), 9 );
157+ transitions.pop_front ();
158+ }
159+ _p->file_stream .flush ();
129160 }
130- _p->file_stream .flush ();
131161 }
132162}
133163
0 commit comments