Skip to content

Commit 489779f

Browse files
committed
refactor: use shared packet slots
1 parent 87675de commit 489779f

2 files changed

Lines changed: 116 additions & 27 deletions

File tree

README.md

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,28 @@ SharedTopicClient is a client module for multi-topic data sharing and transparen
1313
## 构造参数 / Constructor Arguments
1414

1515
- uart_name: 串口设备名 / UART device name (e.g., "uart_cdc")
16-
- buffer_size: 单个 Topic 串口包最大字节数 / Maximum bytes of one forwarded Topic packet (e.g., 256)
16+
- slot_count: 共享待发槽位数量。每个槽位的字节数由订阅 Topic 中最大的打包后长度自动计算。
17+
/ Number of shared pending slots. Each slot size is derived from the largest packed subscribed Topic.
1718
- topic_configs: 需要订阅并转发的 Topic 配置列表。每项可以只写 topic 名,也可以写
1819
`[topic, domain]`。/ Topic configs to subscribe and forward. Each item may be a
1920
topic name or `[topic, domain]`.
2021

2122
## 运行方式
2223

2324
`SharedTopicClient` 不创建发送线程。模块注册 Topic callback;每次 Topic 发布时,
24-
callback 内直接完成打包并把当前包写入 UART `write_port`。UART 写入仍走 libxr
25-
非阻塞写队列,包内容在 `Write()` 返回前已经复制到 UART 队列。
25+
callback 先从空槽位队列申请一个 packet 槽,完成打包后把 `{槽位, 长度}` 放入待发
26+
pool,然后尝试交给 UART `WritePort`
27+
28+
- 所有 Topic 共用同一组固定 packet 槽位。
29+
- 空槽位用 `LockFreeQueue<uint32_t>` 管理;TX 消费完成后把槽位还回队列。
30+
- 待发 packet 用 `LockFreePool` 管理;message callback 打包完成后放入 pool。
31+
- packet 总数、空槽队列深度、待发 pool 深度相同。
32+
- 空槽申请失败时丢弃当前新 packet;这是全局背压,不是同 Topic 覆盖。
33+
34+
`TxService()` 每次只尝试交付一个待发 packet,不单独维护发送锁;并发提交与写队列容量
35+
由 libxr `WritePort` 负责。如果 `WritePort` 暂时忙或写队列已满,当前待发 packet
36+
会放回待发 pool,等待写完成回调或下一次 Topic callback 再推进。这样不会为转发链路
37+
额外引入发送线程,也不会重复实现 `WritePort` 已经具备的互斥语义。
2638

2739
## Timestamp
2840

SharedTopicClient.hpp

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module_description: SharedTopicClient 是一个多 Topic 数据共享与串口转发客户端模块。它用于通过 UART 将多个 Topic 的数据统一打包、发送,实现消息流的串口透明同步转发,适用于分布式系统的多主题数据同步或边缘数据采集。 / SharedTopicClient is a client module for multi-topic data sharing and transparent UART forwarding. It subscribes to multiple Topics, packs their updates, and transmits them via UART, enabling efficient and reliable message synchronization over serial connections—ideal for distributed systems or edge data acquisition.
66
constructor_args:
77
- uart_name: "uart_cdc"
8-
- buffer_size: 256
8+
- slot_count: 16
99
- topic_configs:
1010
- "topic1"
1111
- ["topic2", "libxr_def_domain"]
@@ -19,17 +19,28 @@ depends: []
1919
#include <cstdint>
2020

2121
#include "app_framework.hpp"
22+
#include "lockfree_pool.hpp"
23+
#include "lockfree_queue.hpp"
2224
#include "message.hpp"
2325
#include "uart.hpp"
2426

2527
class SharedTopicClient : public LibXR::Application {
26-
public:
28+
private:
2729
struct CallbackInfo {
2830
SharedTopicClient* client;
2931
uint32_t topic_crc32;
30-
uint32_t index;
3132
};
3233

34+
struct PacketSlot {
35+
LibXR::RawData buffer;
36+
};
37+
38+
struct ReadyPacket {
39+
uint32_t slot_index = 0;
40+
size_t packet_size = 0;
41+
};
42+
43+
public:
3344
struct TopicConfig {
3445
const char* name;
3546
const char* domain = "libxr_def_domain";
@@ -42,15 +53,16 @@ class SharedTopicClient : public LibXR::Application {
4253

4354
SharedTopicClient(LibXR::HardwareContainer& hw,
4455
LibXR::ApplicationManager& app, const char* uart_name,
45-
uint32_t buffer_size,
56+
uint32_t slot_count,
4657
std::initializer_list<TopicConfig> topic_configs)
4758
: uart_(hw.template Find<LibXR::UART>(uart_name)) {
4859
ASSERT(uart_ != nullptr);
4960
ASSERT(uart_->write_port_ != nullptr);
5061
ASSERT(uart_->write_port_->queue_data_ != nullptr);
62+
ASSERT(uart_->write_port_->Writable());
63+
ASSERT(topic_configs.size() > 0);
64+
ASSERT(slot_count > 0);
5165

52-
topics_pack_buffer_ = new LibXR::RawData[topic_configs.size()];
53-
uint32_t i = 0;
5466
size_t max_packet_size = 0;
5567

5668
for (auto config : topic_configs) {
@@ -62,44 +74,109 @@ class SharedTopicClient : public LibXR::Application {
6274
}
6375
const size_t packet_size =
6476
ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE;
65-
ASSERT(packet_size <= buffer_size);
6677
max_packet_size = LibXR::max(max_packet_size, packet_size);
67-
topics_pack_buffer_[i] = LibXR::RawData(
68-
new uint8_t[packet_size], packet_size);
78+
}
6979

80+
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
81+
82+
packets_ = new PacketSlot[slot_count];
83+
free_slots_ = new LibXR::LockFreeQueue<uint32_t>(slot_count);
84+
ready_packets_ = new LibXR::LockFreePool<ReadyPacket>(slot_count);
85+
for (uint32_t i = 0; i < slot_count; i++) {
86+
packets_[i].buffer =
87+
LibXR::RawData(new uint8_t[max_packet_size], max_packet_size);
88+
auto ans = free_slots_->Push(i);
89+
UNUSED(ans);
90+
ASSERT(ans == LibXR::ErrorCode::OK);
91+
}
92+
93+
tx_callback_ = LibXR::Callback<LibXR::ErrorCode>::CreateGuarded(
94+
[](bool in_isr, SharedTopicClient* self, LibXR::ErrorCode status) {
95+
self->OnWriteDone(in_isr, status);
96+
},
97+
this);
98+
tx_op_ = LibXR::WriteOperation(tx_callback_);
99+
100+
for (auto config : topic_configs) {
101+
auto domain = LibXR::Topic::Domain(config.domain);
102+
auto ans = LibXR::Topic::Find(config.name, &domain);
103+
ASSERT(ans != nullptr);
70104
void (*func)(bool, CallbackInfo, LibXR::MicrosecondTimestamp,
71105
LibXR::ConstRawData&) =
72106
[](bool in_isr, CallbackInfo info,
73107
LibXR::MicrosecondTimestamp timestamp, LibXR::ConstRawData& data) {
74-
auto& buffer = info.client->topics_pack_buffer_[info.index];
75-
ASSERT(data.size_ + LibXR::Topic::PACK_BASE_SIZE <= buffer.size_);
76-
LibXR::Topic::PackData(info.topic_crc32, buffer, timestamp, data);
77-
78-
LibXR::WriteOperation op;
79-
auto ans = info.client->uart_->Write(
80-
{buffer.addr_, data.size_ + LibXR::Topic::PACK_BASE_SIZE}, op,
81-
in_isr);
82-
UNUSED(ans);
108+
info.client->OnTopic(in_isr, info, timestamp, data);
83109
};
84110

85111
auto msg_cb = LibXR::Topic::Callback::Create(
86-
func, CallbackInfo{this, ans->data_.crc32, i});
112+
func, CallbackInfo{this, ans->data_.crc32});
87113

88114
LibXR::Topic topic(ans);
89115

90116
topic.RegisterCallback(msg_cb);
91-
92-
i++;
93117
}
94118

95-
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
96-
97119
app.Register(*this);
98120
}
99121

100122
void OnMonitor() override {}
101123

102124
private:
125+
void OnTopic(bool in_isr, CallbackInfo info,
126+
LibXR::MicrosecondTimestamp timestamp,
127+
LibXR::ConstRawData& data) {
128+
const size_t packet_size = data.size_ + LibXR::Topic::PACK_BASE_SIZE;
129+
uint32_t slot_index = 0;
130+
131+
if (free_slots_->Pop(slot_index) != LibXR::ErrorCode::OK) {
132+
return;
133+
}
134+
135+
auto& slot = packets_[slot_index];
136+
ASSERT(packet_size <= slot.buffer.size_);
137+
LibXR::Topic::PackData(info.topic_crc32, slot.buffer, timestamp, data);
138+
139+
auto ans = ready_packets_->Put(ReadyPacket{slot_index, packet_size});
140+
UNUSED(ans);
141+
ASSERT(ans == LibXR::ErrorCode::OK);
142+
TxService(in_isr);
143+
}
144+
145+
void TxService(bool in_isr) {
146+
ReadyPacket packet;
147+
if (ready_packets_->Get(packet) != LibXR::ErrorCode::OK) {
148+
return;
149+
}
150+
151+
auto& slot = packets_[packet.slot_index];
152+
auto ans = uart_->Write(
153+
LibXR::ConstRawData{slot.buffer.addr_, packet.packet_size}, tx_op_,
154+
in_isr);
155+
if (static_cast<int8_t>(ans) < 0) {
156+
ans = ready_packets_->Put(packet);
157+
UNUSED(ans);
158+
ASSERT(ans == LibXR::ErrorCode::OK);
159+
return;
160+
}
161+
162+
ReturnFreeSlot(packet.slot_index);
163+
}
164+
165+
void OnWriteDone(bool in_isr, LibXR::ErrorCode status) {
166+
UNUSED(status);
167+
TxService(in_isr);
168+
}
169+
170+
void ReturnFreeSlot(uint32_t slot_index) {
171+
auto ans = free_slots_->Push(slot_index);
172+
UNUSED(ans);
173+
ASSERT(ans == LibXR::ErrorCode::OK);
174+
}
175+
103176
LibXR::UART* uart_;
104-
LibXR::RawData* topics_pack_buffer_;
177+
PacketSlot* packets_ = nullptr;
178+
LibXR::LockFreeQueue<uint32_t>* free_slots_ = nullptr;
179+
LibXR::LockFreePool<ReadyPacket>* ready_packets_ = nullptr;
180+
LibXR::Callback<LibXR::ErrorCode> tx_callback_;
181+
LibXR::WriteOperation tx_op_;
105182
};

0 commit comments

Comments
 (0)