Skip to content

Commit ae51ea0

Browse files
committed
refactor: use shared pending slots
1 parent 87675de commit ae51ea0

2 files changed

Lines changed: 199 additions & 27 deletions

File tree

README.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,24 @@ SharedTopicClient is a client module for multi-topic data sharing and transparen
1313
## 构造参数 / Constructor Arguments
1414

1515
- uart_name: 串口设备名 / UART device name (e.g., "uart_cdc")
16-
- buffer_size: 单个 Topic 串口包最大字节数 / Maximum bytes of one forwarded Topic packet (e.g., 256)
16+
- slot_count: 共享待发槽位数量。每个槽位的字节数由订阅 Topic 中最大的打包后长度自动计算。
17+
/ Number of shared pending slots. Each slot size is derived from the largest packed subscribed Topic.
1718
- topic_configs: 需要订阅并转发的 Topic 配置列表。每项可以只写 topic 名,也可以写
1819
`[topic, domain]`。/ Topic configs to subscribe and forward. Each item may be a
1920
topic name or `[topic, domain]`.
2021

2122
## 运行方式
2223

2324
`SharedTopicClient` 不创建发送线程。模块注册 Topic callback;每次 Topic 发布时,
24-
callback 内直接完成打包并把当前包写入 UART `write_port`。UART 写入仍走 libxr
25-
非阻塞写队列,包内容在 `Write()` 返回前已经复制到 UART 队列。
25+
callback 内先完成打包并写入全局共享槽位池,然后尝试抢占当前 UART 发送服务:
26+
27+
- 所有 Topic 共用一组固定槽位,packet 按进入槽位池的顺序发送。
28+
- 发送空闲:`TxService()` 从共享槽位池取最早 packet 写入 UART。
29+
- 发送忙碌:新 packet 继续进入共享槽位池,不按 Topic 覆盖旧 packet。
30+
- 槽位池满:丢弃当前新 packet,并记录丢包计数;这是全局背压,不是同 Topic 覆盖。
31+
32+
写完成回调会继续调用 `TxService()`,直到 UART 写队列再次满或共享槽位池为空。
33+
这样不会让多个 Topic callback 同时推进 UART 服务,也不会为转发链路额外引入发送线程。
2634

2735
## Timestamp
2836

SharedTopicClient.hpp

Lines changed: 188 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module_description: SharedTopicClient 是一个多 Topic 数据共享与串口转发客户端模块。它用于通过 UART 将多个 Topic 的数据统一打包、发送,实现消息流的串口透明同步转发,适用于分布式系统的多主题数据同步或边缘数据采集。 / SharedTopicClient is a client module for multi-topic data sharing and transparent UART forwarding. It subscribes to multiple Topics, packs their updates, and transmits them via UART, enabling efficient and reliable message synchronization over serial connections—ideal for distributed systems or edge data acquisition.
66
constructor_args:
77
- uart_name: "uart_cdc"
8-
- buffer_size: 256
8+
- slot_count: 16
99
- topic_configs:
1010
- "topic1"
1111
- ["topic2", "libxr_def_domain"]
@@ -15,21 +15,107 @@ depends: []
1515
=== END MANIFEST === */
1616
// clang-format on
1717

18+
#include <atomic>
1819
#include <cstddef>
1920
#include <cstdint>
2021

2122
#include "app_framework.hpp"
23+
#include "libxr_mem.hpp"
2224
#include "message.hpp"
2325
#include "uart.hpp"
2426

2527
class SharedTopicClient : public LibXR::Application {
26-
public:
28+
private:
2729
struct CallbackInfo {
2830
SharedTopicClient* client;
2931
uint32_t topic_crc32;
30-
uint32_t index;
3132
};
3233

34+
struct PacketSlot {
35+
LibXR::RawData buffer;
36+
size_t packet_size = 0;
37+
std::atomic<uint32_t> sequence{0};
38+
};
39+
40+
class PacketQueue {
41+
public:
42+
PacketQueue(uint32_t slot_count, size_t slot_size)
43+
: slot_count_(slot_count), slot_size_(slot_size) {
44+
slots_ = new PacketSlot[slot_count_];
45+
for (uint32_t i = 0; i < slot_count_; i++) {
46+
slots_[i].buffer = LibXR::RawData(new uint8_t[slot_size_], slot_size_);
47+
slots_[i].sequence.store(i, std::memory_order_relaxed);
48+
}
49+
}
50+
51+
bool Push(uint32_t topic_crc32, LibXR::MicrosecondTimestamp timestamp,
52+
LibXR::ConstRawData& data, size_t packet_size) {
53+
uint32_t pos = write_pos_.load(std::memory_order_relaxed);
54+
55+
while (true) {
56+
auto& slot = slots_[pos % slot_count_];
57+
const uint32_t seq = slot.sequence.load(std::memory_order_acquire);
58+
const auto diff = static_cast<int32_t>(seq - pos);
59+
60+
if (diff == 0) {
61+
if (!write_pos_.compare_exchange_weak(pos, pos + 1U,
62+
std::memory_order_relaxed)) {
63+
continue;
64+
}
65+
66+
ASSERT(packet_size <= slot_size_);
67+
LibXR::Topic::PackData(topic_crc32, slot.buffer, timestamp, data);
68+
slot.packet_size = packet_size;
69+
slot.sequence.store(pos + 1U, std::memory_order_release);
70+
return true;
71+
}
72+
73+
if (diff < 0) {
74+
return false;
75+
}
76+
77+
pos = write_pos_.load(std::memory_order_relaxed);
78+
}
79+
}
80+
81+
bool PopTo(LibXR::RawData& out, size_t& packet_size) {
82+
uint32_t pos = read_pos_.load(std::memory_order_relaxed);
83+
84+
while (true) {
85+
auto& slot = slots_[pos % slot_count_];
86+
const uint32_t seq = slot.sequence.load(std::memory_order_acquire);
87+
const auto diff = static_cast<int32_t>(seq - (pos + 1U));
88+
89+
if (diff == 0) {
90+
if (!read_pos_.compare_exchange_weak(pos, pos + 1U,
91+
std::memory_order_relaxed)) {
92+
continue;
93+
}
94+
95+
packet_size = slot.packet_size;
96+
ASSERT(packet_size <= out.size_);
97+
LibXR::Memory::FastCopy(out.addr_, slot.buffer.addr_, packet_size);
98+
slot.sequence.store(pos + slot_count_, std::memory_order_release);
99+
return true;
100+
}
101+
102+
if (diff < 0) {
103+
return false;
104+
}
105+
106+
pos = read_pos_.load(std::memory_order_relaxed);
107+
}
108+
}
109+
110+
private:
111+
PacketSlot* slots_ = nullptr;
112+
uint32_t slot_count_ = 0;
113+
size_t slot_size_ = 0;
114+
std::atomic<uint32_t> write_pos_{0};
115+
std::atomic<uint32_t> read_pos_{0};
116+
};
117+
118+
public:
33119
struct TopicConfig {
34120
const char* name;
35121
const char* domain = "libxr_def_domain";
@@ -42,15 +128,16 @@ class SharedTopicClient : public LibXR::Application {
42128

43129
SharedTopicClient(LibXR::HardwareContainer& hw,
44130
LibXR::ApplicationManager& app, const char* uart_name,
45-
uint32_t buffer_size,
131+
uint32_t slot_count,
46132
std::initializer_list<TopicConfig> topic_configs)
47133
: uart_(hw.template Find<LibXR::UART>(uart_name)) {
48134
ASSERT(uart_ != nullptr);
49135
ASSERT(uart_->write_port_ != nullptr);
50136
ASSERT(uart_->write_port_->queue_data_ != nullptr);
137+
ASSERT(uart_->write_port_->Writable());
138+
ASSERT(topic_configs.size() > 0);
139+
ASSERT(slot_count > 0);
51140

52-
topics_pack_buffer_ = new LibXR::RawData[topic_configs.size()];
53-
uint32_t i = 0;
54141
size_t max_packet_size = 0;
55142

56143
for (auto config : topic_configs) {
@@ -62,44 +149,121 @@ class SharedTopicClient : public LibXR::Application {
62149
}
63150
const size_t packet_size =
64151
ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE;
65-
ASSERT(packet_size <= buffer_size);
66152
max_packet_size = LibXR::max(max_packet_size, packet_size);
67-
topics_pack_buffer_[i] = LibXR::RawData(
68-
new uint8_t[packet_size], packet_size);
153+
}
154+
155+
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
69156

157+
tx_buffer_ = LibXR::RawData(new uint8_t[max_packet_size], max_packet_size);
158+
pending_packets_ = new PacketQueue(slot_count, max_packet_size);
159+
160+
tx_callback_ = LibXR::Callback<LibXR::ErrorCode>::CreateGuarded(
161+
[](bool in_isr, SharedTopicClient* self, LibXR::ErrorCode status) {
162+
self->OnWriteDone(in_isr, status);
163+
},
164+
this);
165+
tx_op_ = LibXR::WriteOperation(tx_callback_);
166+
167+
for (auto config : topic_configs) {
168+
auto domain = LibXR::Topic::Domain(config.domain);
169+
auto ans = LibXR::Topic::Find(config.name, &domain);
170+
ASSERT(ans != nullptr);
70171
void (*func)(bool, CallbackInfo, LibXR::MicrosecondTimestamp,
71172
LibXR::ConstRawData&) =
72173
[](bool in_isr, CallbackInfo info,
73174
LibXR::MicrosecondTimestamp timestamp, LibXR::ConstRawData& data) {
74-
auto& buffer = info.client->topics_pack_buffer_[info.index];
75-
ASSERT(data.size_ + LibXR::Topic::PACK_BASE_SIZE <= buffer.size_);
76-
LibXR::Topic::PackData(info.topic_crc32, buffer, timestamp, data);
77-
78-
LibXR::WriteOperation op;
79-
auto ans = info.client->uart_->Write(
80-
{buffer.addr_, data.size_ + LibXR::Topic::PACK_BASE_SIZE}, op,
81-
in_isr);
82-
UNUSED(ans);
175+
info.client->OnTopic(in_isr, info, timestamp, data);
83176
};
84177

85178
auto msg_cb = LibXR::Topic::Callback::Create(
86-
func, CallbackInfo{this, ans->data_.crc32, i});
179+
func, CallbackInfo{this, ans->data_.crc32});
87180

88181
LibXR::Topic topic(ans);
89182

90183
topic.RegisterCallback(msg_cb);
91-
92-
i++;
93184
}
94185

95-
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
96-
97186
app.Register(*this);
98187
}
99188

100189
void OnMonitor() override {}
101190

102191
private:
192+
void OnTopic(bool in_isr, CallbackInfo info,
193+
LibXR::MicrosecondTimestamp timestamp,
194+
LibXR::ConstRawData& data) {
195+
const size_t packet_size = data.size_ + LibXR::Topic::PACK_BASE_SIZE;
196+
ASSERT(packet_size <= tx_buffer_.size_);
197+
198+
if (!pending_packets_->Push(info.topic_crc32, timestamp, data,
199+
packet_size)) {
200+
dropped_packets_.fetch_add(1U, std::memory_order_relaxed);
201+
}
202+
TxService(in_isr);
203+
}
204+
205+
void TxService(bool in_isr) {
206+
tx_pend_.store(1U, std::memory_order_release);
207+
208+
uint32_t expected = 0U;
209+
if (!tx_lock_.compare_exchange_strong(expected, 1U,
210+
std::memory_order_acquire,
211+
std::memory_order_relaxed)) {
212+
return;
213+
}
214+
215+
while (true) {
216+
tx_pend_.store(0U, std::memory_order_release);
217+
bool blocked = false;
218+
219+
while (true) {
220+
size_t packet_size = retry_packet_size_;
221+
if (!retry_pending_) {
222+
if (!pending_packets_->PopTo(tx_buffer_, packet_size)) {
223+
break;
224+
}
225+
}
226+
227+
auto ans = uart_->Write(
228+
LibXR::ConstRawData{tx_buffer_.addr_, packet_size}, tx_op_, in_isr);
229+
if (static_cast<int8_t>(ans) < 0) {
230+
retry_packet_size_ = packet_size;
231+
retry_pending_ = true;
232+
blocked = true;
233+
break;
234+
}
235+
236+
retry_pending_ = false;
237+
retry_packet_size_ = 0;
238+
}
239+
240+
tx_lock_.store(0U, std::memory_order_release);
241+
if (blocked || tx_pend_.load(std::memory_order_acquire) == 0U) {
242+
return;
243+
}
244+
245+
expected = 0U;
246+
if (!tx_lock_.compare_exchange_strong(expected, 1U,
247+
std::memory_order_acquire,
248+
std::memory_order_relaxed)) {
249+
return;
250+
}
251+
}
252+
}
253+
254+
void OnWriteDone(bool in_isr, LibXR::ErrorCode status) {
255+
UNUSED(status);
256+
TxService(in_isr);
257+
}
258+
103259
LibXR::UART* uart_;
104-
LibXR::RawData* topics_pack_buffer_;
260+
LibXR::RawData tx_buffer_;
261+
PacketQueue* pending_packets_ = nullptr;
262+
size_t retry_packet_size_ = 0;
263+
bool retry_pending_ = false;
264+
std::atomic<uint32_t> tx_lock_{0};
265+
std::atomic<uint32_t> tx_pend_{0};
266+
std::atomic<uint32_t> dropped_packets_{0};
267+
LibXR::Callback<LibXR::ErrorCode> tx_callback_;
268+
LibXR::WriteOperation tx_op_;
105269
};

0 commit comments

Comments
 (0)