55module_description: SharedTopicClient 是一个多 Topic 数据共享与串口转发客户端模块。它用于通过 UART 将多个 Topic 的数据统一打包、发送,实现消息流的串口透明同步转发,适用于分布式系统的多主题数据同步或边缘数据采集。 / SharedTopicClient is a client module for multi-topic data sharing and transparent UART forwarding. It subscribes to multiple Topics, packs their updates, and transmits them via UART, enabling efficient and reliable message synchronization over serial connections—ideal for distributed systems or edge data acquisition.
66constructor_args:
77 - uart_name: "uart_cdc"
8- - buffer_size: 256
8+ - slot_count: 16
99 - topic_configs:
1010 - "topic1"
1111 - ["topic2", "libxr_def_domain"]
@@ -19,17 +19,28 @@ depends: []
1919#include < cstdint>
2020
2121#include " app_framework.hpp"
22+ #include " lockfree_pool.hpp"
23+ #include " lockfree_queue.hpp"
2224#include " message.hpp"
2325#include " uart.hpp"
2426
2527class SharedTopicClient : public LibXR ::Application {
26- public :
28+ private :
2729 struct CallbackInfo {
2830 SharedTopicClient* client;
2931 uint32_t topic_crc32;
30- uint32_t index;
3132 };
3233
34+ struct PacketSlot {
35+ LibXR::RawData buffer;
36+ };
37+
38+ struct ReadyPacket {
39+ uint32_t slot_index = 0 ;
40+ size_t packet_size = 0 ;
41+ };
42+
43+ public:
3344 struct TopicConfig {
3445 const char * name;
3546 const char * domain = " libxr_def_domain" ;
@@ -42,64 +53,121 @@ class SharedTopicClient : public LibXR::Application {
4253
4354 SharedTopicClient (LibXR::HardwareContainer& hw,
4455 LibXR::ApplicationManager& app, const char * uart_name,
45- uint32_t buffer_size ,
56+ uint32_t slot_count ,
4657 std::initializer_list<TopicConfig> topic_configs)
47- : uart_(hw.template Find<LibXR::UART>(uart_name)) {
48- ASSERT (uart_ != nullptr );
58+ : uart_(hw.template FindOrExit<LibXR::UART>({uart_name})) {
4959 ASSERT (uart_->write_port_ != nullptr );
5060 ASSERT (uart_->write_port_ ->queue_data_ != nullptr );
61+ ASSERT (uart_->write_port_ ->Writable ());
62+ ASSERT (topic_configs.size () > 0 );
63+ ASSERT (slot_count > 0 );
5164
52- topics_pack_buffer_ = new LibXR::RawData[topic_configs.size ()];
53- uint32_t i = 0 ;
5465 size_t max_packet_size = 0 ;
5566
5667 for (auto config : topic_configs) {
5768 auto domain = LibXR::Topic::Domain (config.domain );
58- auto ans = LibXR::Topic::Find (config.name , &domain);
59- if (ans == nullptr ) {
69+ auto topic = LibXR::Topic::Find (config.name , &domain);
70+ if (topic == nullptr ) {
6071 XR_LOG_ERROR (" Topic not found: %s/%s" , config.domain , config.name );
6172 ASSERT (false );
6273 }
6374 const size_t packet_size =
64- ans->data_ .max_length + LibXR::Topic::PACK_BASE_SIZE;
65- ASSERT (packet_size <= buffer_size);
75+ topic->data_ .max_length + LibXR::Topic::PACK_BASE_SIZE;
6676 max_packet_size = LibXR::max (max_packet_size, packet_size);
67- topics_pack_buffer_[i] = LibXR::RawData (
68- new uint8_t [packet_size], packet_size);
77+ }
6978
79+ ASSERT (max_packet_size <= uart_->write_port_ ->queue_data_ ->MaxSize ());
80+
81+ packets_ = new PacketSlot[slot_count];
82+ free_slots_ = new LibXR::LockFreeQueue<uint32_t >(slot_count + 1 );
83+ ready_packets_ = new LibXR::LockFreePool<ReadyPacket>(slot_count);
84+ for (uint32_t i = 0 ; i < slot_count; i++) {
85+ packets_[i].buffer =
86+ LibXR::RawData (new uint8_t [max_packet_size], max_packet_size);
87+ ASSERT (free_slots_->Push (i) == LibXR::ErrorCode::OK);
88+ }
89+
90+ tx_callback_ = LibXR::Callback<LibXR::ErrorCode>::CreateGuarded (
91+ [](bool in_isr, SharedTopicClient* self, LibXR::ErrorCode status) {
92+ self->OnWriteDone (in_isr, status);
93+ },
94+ this );
95+ tx_op_ = LibXR::WriteOperation (tx_callback_);
96+
97+ for (auto config : topic_configs) {
98+ auto domain = LibXR::Topic::Domain (config.domain );
99+ auto topic_handle = LibXR::Topic::Find (config.name , &domain);
100+ ASSERT (topic_handle != nullptr );
70101 void (*func)(bool , CallbackInfo, LibXR::MicrosecondTimestamp,
71102 LibXR::ConstRawData&) =
72103 [](bool in_isr, CallbackInfo info,
73104 LibXR::MicrosecondTimestamp timestamp, LibXR::ConstRawData& data) {
74- auto & buffer = info.client ->topics_pack_buffer_ [info.index ];
75- ASSERT (data.size_ + LibXR::Topic::PACK_BASE_SIZE <= buffer.size_ );
76- LibXR::Topic::PackData (info.topic_crc32 , buffer, timestamp, data);
77-
78- LibXR::WriteOperation op;
79- auto ans = info.client ->uart_ ->Write (
80- {buffer.addr_ , data.size_ + LibXR::Topic::PACK_BASE_SIZE}, op,
81- in_isr);
82- UNUSED (ans);
105+ info.client ->OnTopic (in_isr, info, timestamp, data);
83106 };
84107
85108 auto msg_cb = LibXR::Topic::Callback::Create (
86- func, CallbackInfo{this , ans ->data_ .crc32 , i });
109+ func, CallbackInfo{this , topic_handle ->data_ .crc32 });
87110
88- LibXR::Topic topic (ans );
111+ LibXR::Topic topic (topic_handle );
89112
90113 topic.RegisterCallback (msg_cb);
91-
92- i++;
93114 }
94115
95- ASSERT (max_packet_size <= uart_->write_port_ ->queue_data_ ->MaxSize ());
96-
97116 app.Register (*this );
98117 }
99118
100119 void OnMonitor () override {}
101120
102121 private:
122+ void OnTopic (bool in_isr, CallbackInfo info,
123+ LibXR::MicrosecondTimestamp timestamp,
124+ LibXR::ConstRawData& data) {
125+ const size_t packet_size = data.size_ + LibXR::Topic::PACK_BASE_SIZE;
126+ uint32_t slot_index = 0 ;
127+
128+ if (free_slots_->Pop (slot_index) != LibXR::ErrorCode::OK) {
129+ return ;
130+ }
131+
132+ auto & slot = packets_[slot_index];
133+ ASSERT (packet_size <= slot.buffer .size_ );
134+ LibXR::Topic::PackData (info.topic_crc32 , slot.buffer , timestamp, data);
135+
136+ ASSERT (ready_packets_->Put (ReadyPacket{slot_index, packet_size}) ==
137+ LibXR::ErrorCode::OK);
138+ TxService (in_isr);
139+ }
140+
141+ void TxService (bool in_isr) {
142+ ReadyPacket packet;
143+ if (ready_packets_->Get (packet) != LibXR::ErrorCode::OK) {
144+ return ;
145+ }
146+
147+ auto & slot = packets_[packet.slot_index ];
148+ auto write_status = uart_->Write (
149+ LibXR::ConstRawData{slot.buffer .addr_ , packet.packet_size }, tx_op_,
150+ in_isr);
151+ if (static_cast <int8_t >(write_status) < 0 ) {
152+ ASSERT (ready_packets_->Put (packet) == LibXR::ErrorCode::OK);
153+ return ;
154+ }
155+
156+ ReturnFreeSlot (packet.slot_index );
157+ }
158+
159+ void OnWriteDone (bool in_isr, LibXR::ErrorCode) {
160+ TxService (in_isr);
161+ }
162+
163+ void ReturnFreeSlot (uint32_t slot_index) {
164+ ASSERT (free_slots_->Push (slot_index) == LibXR::ErrorCode::OK);
165+ }
166+
103167 LibXR::UART* uart_;
104- LibXR::RawData* topics_pack_buffer_;
168+ PacketSlot* packets_ = nullptr ;
169+ LibXR::LockFreeQueue<uint32_t >* free_slots_ = nullptr ;
170+ LibXR::LockFreePool<ReadyPacket>* ready_packets_ = nullptr ;
171+ LibXR::Callback<LibXR::ErrorCode> tx_callback_;
172+ LibXR::WriteOperation tx_op_;
105173};
0 commit comments