Skip to content

Commit e58fa14

Browse files
committed
refactor: use shared packet slots
1 parent 87675de commit e58fa14

2 files changed

Lines changed: 152 additions & 27 deletions

File tree

README.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,26 @@ SharedTopicClient is a client module for multi-topic data sharing and transparen
1313
## 构造参数 / Constructor Arguments
1414

1515
- uart_name: 串口设备名 / UART device name (e.g., "uart_cdc")
16-
- buffer_size: 单个 Topic 串口包最大字节数 / Maximum bytes of one forwarded Topic packet (e.g., 256)
16+
- slot_count: 共享待发槽位数量。每个槽位的字节数由订阅 Topic 中最大的打包后长度自动计算。
17+
/ Number of shared pending slots. Each slot size is derived from the largest packed subscribed Topic.
1718
- topic_configs: 需要订阅并转发的 Topic 配置列表。每项可以只写 topic 名,也可以写
1819
`[topic, domain]`。/ Topic configs to subscribe and forward. Each item may be a
1920
topic name or `[topic, domain]`.
2021

2122
## 运行方式
2223

2324
`SharedTopicClient` 不创建发送线程。模块注册 Topic callback;每次 Topic 发布时,
24-
callback 内直接完成打包并把当前包写入 UART `write_port`。UART 写入仍走 libxr
25-
非阻塞写队列,包内容在 `Write()` 返回前已经复制到 UART 队列。
25+
callback 先从空槽位队列申请一个 packet 槽,完成打包后把 `{槽位, 长度}` 放入待发
26+
pool,然后尝试抢占当前 UART 发送服务:
27+
28+
- 所有 Topic 共用同一组固定 packet 槽位。
29+
- 空槽位用 `LockFreeQueue<uint32_t>` 管理;TX 消费完成后把槽位还回队列。
30+
- 待发 packet 用 `LockFreePool` 管理;message callback 打包完成后放入 pool。
31+
- packet 总数、空槽队列深度、待发 pool 深度相同。
32+
- 空槽申请失败时丢弃当前新 packet,并记录丢包计数;这是全局背压,不是同 Topic 覆盖。
33+
34+
写完成回调会继续调用 `TxService()`,直到 UART 写队列再次满或共享槽位池为空。
35+
这样不会让多个 Topic callback 同时推进 UART 服务,也不会为转发链路额外引入发送线程。
2636

2737
## Timestamp
2838

SharedTopicClient.hpp

Lines changed: 139 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module_description: SharedTopicClient 是一个多 Topic 数据共享与串口转发客户端模块。它用于通过 UART 将多个 Topic 的数据统一打包、发送,实现消息流的串口透明同步转发,适用于分布式系统的多主题数据同步或边缘数据采集。 / SharedTopicClient is a client module for multi-topic data sharing and transparent UART forwarding. It subscribes to multiple Topics, packs their updates, and transmits them via UART, enabling efficient and reliable message synchronization over serial connections—ideal for distributed systems or edge data acquisition.
66
constructor_args:
77
- uart_name: "uart_cdc"
8-
- buffer_size: 256
8+
- slot_count: 16
99
- topic_configs:
1010
- "topic1"
1111
- ["topic2", "libxr_def_domain"]
@@ -15,21 +15,33 @@ depends: []
1515
=== END MANIFEST === */
1616
// clang-format on
1717

18+
#include <atomic>
1819
#include <cstddef>
1920
#include <cstdint>
2021

2122
#include "app_framework.hpp"
23+
#include "lockfree_pool.hpp"
24+
#include "lockfree_queue.hpp"
2225
#include "message.hpp"
2326
#include "uart.hpp"
2427

2528
class SharedTopicClient : public LibXR::Application {
26-
public:
29+
private:
2730
struct CallbackInfo {
2831
SharedTopicClient* client;
2932
uint32_t topic_crc32;
30-
uint32_t index;
3133
};
3234

35+
struct PacketSlot {
36+
LibXR::RawData buffer;
37+
};
38+
39+
struct ReadyPacket {
40+
uint32_t slot_index = 0;
41+
size_t packet_size = 0;
42+
};
43+
44+
public:
3345
struct TopicConfig {
3446
const char* name;
3547
const char* domain = "libxr_def_domain";
@@ -42,15 +54,16 @@ class SharedTopicClient : public LibXR::Application {
4254

4355
SharedTopicClient(LibXR::HardwareContainer& hw,
4456
LibXR::ApplicationManager& app, const char* uart_name,
45-
uint32_t buffer_size,
57+
uint32_t slot_count,
4658
std::initializer_list<TopicConfig> topic_configs)
4759
: uart_(hw.template Find<LibXR::UART>(uart_name)) {
4860
ASSERT(uart_ != nullptr);
4961
ASSERT(uart_->write_port_ != nullptr);
5062
ASSERT(uart_->write_port_->queue_data_ != nullptr);
63+
ASSERT(uart_->write_port_->Writable());
64+
ASSERT(topic_configs.size() > 0);
65+
ASSERT(slot_count > 0);
5166

52-
topics_pack_buffer_ = new LibXR::RawData[topic_configs.size()];
53-
uint32_t i = 0;
5467
size_t max_packet_size = 0;
5568

5669
for (auto config : topic_configs) {
@@ -62,44 +75,146 @@ class SharedTopicClient : public LibXR::Application {
6275
}
6376
const size_t packet_size =
6477
ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE;
65-
ASSERT(packet_size <= buffer_size);
6678
max_packet_size = LibXR::max(max_packet_size, packet_size);
67-
topics_pack_buffer_[i] = LibXR::RawData(
68-
new uint8_t[packet_size], packet_size);
79+
}
80+
81+
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
6982

83+
packets_ = new PacketSlot[slot_count];
84+
free_slots_ = new LibXR::LockFreeQueue<uint32_t>(slot_count);
85+
ready_packets_ = new LibXR::LockFreePool<ReadyPacket>(slot_count);
86+
for (uint32_t i = 0; i < slot_count; i++) {
87+
packets_[i].buffer =
88+
LibXR::RawData(new uint8_t[max_packet_size], max_packet_size);
89+
auto ans = free_slots_->Push(i);
90+
UNUSED(ans);
91+
ASSERT(ans == LibXR::ErrorCode::OK);
92+
}
93+
94+
tx_callback_ = LibXR::Callback<LibXR::ErrorCode>::CreateGuarded(
95+
[](bool in_isr, SharedTopicClient* self, LibXR::ErrorCode status) {
96+
self->OnWriteDone(in_isr, status);
97+
},
98+
this);
99+
tx_op_ = LibXR::WriteOperation(tx_callback_);
100+
101+
for (auto config : topic_configs) {
102+
auto domain = LibXR::Topic::Domain(config.domain);
103+
auto ans = LibXR::Topic::Find(config.name, &domain);
104+
ASSERT(ans != nullptr);
70105
void (*func)(bool, CallbackInfo, LibXR::MicrosecondTimestamp,
71106
LibXR::ConstRawData&) =
72107
[](bool in_isr, CallbackInfo info,
73108
LibXR::MicrosecondTimestamp timestamp, LibXR::ConstRawData& data) {
74-
auto& buffer = info.client->topics_pack_buffer_[info.index];
75-
ASSERT(data.size_ + LibXR::Topic::PACK_BASE_SIZE <= buffer.size_);
76-
LibXR::Topic::PackData(info.topic_crc32, buffer, timestamp, data);
77-
78-
LibXR::WriteOperation op;
79-
auto ans = info.client->uart_->Write(
80-
{buffer.addr_, data.size_ + LibXR::Topic::PACK_BASE_SIZE}, op,
81-
in_isr);
82-
UNUSED(ans);
109+
info.client->OnTopic(in_isr, info, timestamp, data);
83110
};
84111

85112
auto msg_cb = LibXR::Topic::Callback::Create(
86-
func, CallbackInfo{this, ans->data_.crc32, i});
113+
func, CallbackInfo{this, ans->data_.crc32});
87114

88115
LibXR::Topic topic(ans);
89116

90117
topic.RegisterCallback(msg_cb);
91-
92-
i++;
93118
}
94119

95-
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
96-
97120
app.Register(*this);
98121
}
99122

100123
void OnMonitor() override {}
101124

102125
private:
126+
void OnTopic(bool in_isr, CallbackInfo info,
127+
LibXR::MicrosecondTimestamp timestamp,
128+
LibXR::ConstRawData& data) {
129+
const size_t packet_size = data.size_ + LibXR::Topic::PACK_BASE_SIZE;
130+
uint32_t slot_index = 0;
131+
132+
if (free_slots_->Pop(slot_index) != LibXR::ErrorCode::OK) {
133+
dropped_packets_.fetch_add(1U, std::memory_order_relaxed);
134+
return;
135+
}
136+
137+
auto& slot = packets_[slot_index];
138+
ASSERT(packet_size <= slot.buffer.size_);
139+
LibXR::Topic::PackData(info.topic_crc32, slot.buffer, timestamp, data);
140+
141+
auto ans = ready_packets_->Put(ReadyPacket{slot_index, packet_size});
142+
UNUSED(ans);
143+
ASSERT(ans == LibXR::ErrorCode::OK);
144+
TxService(in_isr);
145+
}
146+
147+
void TxService(bool in_isr) {
148+
tx_pend_.store(1U, std::memory_order_release);
149+
150+
uint32_t expected = 0U;
151+
if (!tx_lock_.compare_exchange_strong(expected, 1U,
152+
std::memory_order_acquire,
153+
std::memory_order_relaxed)) {
154+
return;
155+
}
156+
157+
while (true) {
158+
tx_pend_.store(0U, std::memory_order_release);
159+
bool blocked = false;
160+
161+
while (true) {
162+
ReadyPacket packet = retry_packet_;
163+
if (!retry_pending_) {
164+
if (ready_packets_->Get(packet) != LibXR::ErrorCode::OK) {
165+
break;
166+
}
167+
}
168+
169+
auto& slot = packets_[packet.slot_index];
170+
auto ans = uart_->Write(
171+
LibXR::ConstRawData{slot.buffer.addr_, packet.packet_size}, tx_op_,
172+
in_isr);
173+
if (static_cast<int8_t>(ans) < 0) {
174+
retry_packet_ = packet;
175+
retry_pending_ = true;
176+
blocked = true;
177+
break;
178+
}
179+
180+
retry_pending_ = false;
181+
ReturnFreeSlot(packet.slot_index);
182+
}
183+
184+
tx_lock_.store(0U, std::memory_order_release);
185+
if (blocked || tx_pend_.load(std::memory_order_acquire) == 0U) {
186+
return;
187+
}
188+
189+
expected = 0U;
190+
if (!tx_lock_.compare_exchange_strong(expected, 1U,
191+
std::memory_order_acquire,
192+
std::memory_order_relaxed)) {
193+
return;
194+
}
195+
}
196+
}
197+
198+
void OnWriteDone(bool in_isr, LibXR::ErrorCode status) {
199+
UNUSED(status);
200+
TxService(in_isr);
201+
}
202+
203+
void ReturnFreeSlot(uint32_t slot_index) {
204+
auto ans = free_slots_->Push(slot_index);
205+
UNUSED(ans);
206+
ASSERT(ans == LibXR::ErrorCode::OK);
207+
}
208+
103209
LibXR::UART* uart_;
104-
LibXR::RawData* topics_pack_buffer_;
210+
PacketSlot* packets_ = nullptr;
211+
LibXR::LockFreeQueue<uint32_t>* free_slots_ = nullptr;
212+
LibXR::LockFreePool<ReadyPacket>* ready_packets_ = nullptr;
213+
ReadyPacket retry_packet_;
214+
bool retry_pending_ = false;
215+
std::atomic<uint32_t> tx_lock_{0};
216+
std::atomic<uint32_t> tx_pend_{0};
217+
std::atomic<uint32_t> dropped_packets_{0};
218+
LibXR::Callback<LibXR::ErrorCode> tx_callback_;
219+
LibXR::WriteOperation tx_op_;
105220
};

0 commit comments

Comments
 (0)