Skip to content

Commit 87675de

Browse files
committed
refactor: forward shared topics from callbacks
1 parent cc1ed9d commit 87675de

2 files changed

Lines changed: 49 additions & 45 deletions

File tree

README.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,26 @@ SharedTopicClient is a client module for multi-topic data sharing and transparen
1313
## 构造参数 / Constructor Arguments
1414

1515
- uart_name: 串口设备名 / UART device name (e.g., "uart_cdc")
16-
- task_stack_depth: 任务堆栈大小 / Task stack depth (e.g., 512)
17-
- buffer_size: 发送缓冲区字节数 / TX buffer size (e.g., 256)
18-
- topic_names: 需要订阅并转发的 Topic 名称列表 / List of topic names to subscribe and forward (e.g., ["topic1", "topic2"])
16+
- buffer_size: 单个 Topic 串口包最大字节数 / Maximum bytes of one forwarded Topic packet (e.g., 256)
17+
- topic_configs: 需要订阅并转发的 Topic 配置列表。每项可以只写 topic 名,也可以写
18+
`[topic, domain]`。/ Topic configs to subscribe and forward. Each item may be a
19+
topic name or `[topic, domain]`.
20+
21+
## 运行方式
22+
23+
`SharedTopicClient` 不创建发送线程。模块注册 Topic callback;每次 Topic 发布时,
24+
callback 内直接完成打包并把当前包写入 UART `write_port`。UART 写入仍走 libxr
25+
非阻塞写队列,包内容在 `Write()` 返回前已经复制到 UART 队列。
26+
27+
## Timestamp
28+
29+
`SharedTopicClient` 转发 Topic 时会保留 libxr message envelope timestamp:
30+
31+
1. 本地 Topic callback 收到 `(timestamp, payload)`
32+
2. `Topic::PackData(topic_crc, buffer, timestamp, payload)` 写入串口包。
33+
3. 对端 `SharedTopic` 解析后用同一个 timestamp 发布到对端 domain。
34+
35+
因此同步类 topic 不需要在 payload 里重复携带时间戳;payload 只保留业务字段即可。
1936

2037
## 依赖 / Depends
2138

SharedTopicClient.hpp

Lines changed: 29 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
module_description: SharedTopicClient 是一个多 Topic 数据共享与串口转发客户端模块。它用于通过 UART 将多个 Topic 的数据统一打包、发送,实现消息流的串口透明同步转发,适用于分布式系统的多主题数据同步或边缘数据采集。 / SharedTopicClient is a client module for multi-topic data sharing and transparent UART forwarding. It subscribes to multiple Topics, packs their updates, and transmits them via UART, enabling efficient and reliable message synchronization over serial connections—ideal for distributed systems or edge data acquisition.
66
constructor_args:
77
- uart_name: "uart_cdc"
8-
- task_stack_depth: 2048
98
- buffer_size: 256
109
- topic_configs:
1110
- "topic1"
@@ -16,16 +15,20 @@ depends: []
1615
=== END MANIFEST === */
1716
// clang-format on
1817

18+
#include <cstddef>
19+
#include <cstdint>
20+
1921
#include "app_framework.hpp"
22+
#include "message.hpp"
2023
#include "uart.hpp"
2124

2225
class SharedTopicClient : public LibXR::Application {
2326
public:
24-
typedef struct {
27+
struct CallbackInfo {
2528
SharedTopicClient* client;
2629
uint32_t topic_crc32;
2730
uint32_t index;
28-
} CallbackInfo;
31+
};
2932

3033
struct TopicConfig {
3134
const char* name;
@@ -39,16 +42,16 @@ class SharedTopicClient : public LibXR::Application {
3942

4043
SharedTopicClient(LibXR::HardwareContainer& hw,
4144
LibXR::ApplicationManager& app, const char* uart_name,
42-
uint32_t task_stack_depth, uint32_t buffer_size,
45+
uint32_t buffer_size,
4346
std::initializer_list<TopicConfig> topic_configs)
44-
: uart_(hw.template Find<LibXR::UART>(uart_name)),
45-
tx_buffer_(new uint8_t[buffer_size], buffer_size),
46-
tx_queue_(buffer_size) {
47+
: uart_(hw.template Find<LibXR::UART>(uart_name)) {
4748
ASSERT(uart_ != nullptr);
49+
ASSERT(uart_->write_port_ != nullptr);
50+
ASSERT(uart_->write_port_->queue_data_ != nullptr);
4851

4952
topics_pack_buffer_ = new LibXR::RawData[topic_configs.size()];
50-
5153
uint32_t i = 0;
54+
size_t max_packet_size = 0;
5255

5356
for (auto config : topic_configs) {
5457
auto domain = LibXR::Topic::Domain(config.domain);
@@ -57,21 +60,26 @@ class SharedTopicClient : public LibXR::Application {
5760
XR_LOG_ERROR("Topic not found: %s/%s", config.domain, config.name);
5861
ASSERT(false);
5962
}
63+
const size_t packet_size =
64+
ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE;
65+
ASSERT(packet_size <= buffer_size);
66+
max_packet_size = LibXR::max(max_packet_size, packet_size);
6067
topics_pack_buffer_[i] = LibXR::RawData(
61-
new uint8_t[ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE],
62-
ans->data_.max_length + LibXR::Topic::PACK_BASE_SIZE);
68+
new uint8_t[packet_size], packet_size);
69+
70+
void (*func)(bool, CallbackInfo, LibXR::MicrosecondTimestamp,
71+
LibXR::ConstRawData&) =
72+
[](bool in_isr, CallbackInfo info,
73+
LibXR::MicrosecondTimestamp timestamp, LibXR::ConstRawData& data) {
74+
auto& buffer = info.client->topics_pack_buffer_[info.index];
75+
ASSERT(data.size_ + LibXR::Topic::PACK_BASE_SIZE <= buffer.size_);
76+
LibXR::Topic::PackData(info.topic_crc32, buffer, timestamp, data);
6377

64-
void (*func)(bool, CallbackInfo, LibXR::RawData&) =
65-
[](bool in_isr, CallbackInfo info, LibXR::RawData& data) {
6678
LibXR::WriteOperation op;
67-
LibXR::Topic::PackData(info.topic_crc32,
68-
info.client->topics_pack_buffer_[info.index],
69-
data);
70-
info.client->tx_queue_.PushBatch(
71-
static_cast<uint8_t*>(
72-
info.client->topics_pack_buffer_[info.index].addr_),
73-
info.client->topics_pack_buffer_[info.index].size_);
74-
info.client->tx_sem_.PostFromCallback(in_isr);
79+
auto ans = info.client->uart_->Write(
80+
{buffer.addr_, data.size_ + LibXR::Topic::PACK_BASE_SIZE}, op,
81+
in_isr);
82+
UNUSED(ans);
7583
};
7684

7785
auto msg_cb = LibXR::Topic::Callback::Create(
@@ -84,35 +92,14 @@ class SharedTopicClient : public LibXR::Application {
8492
i++;
8593
}
8694

87-
tx_thread_.Create(this, TxThreadFun, "SharedTopicClientTxThread",
88-
task_stack_depth, LibXR::Thread::Priority::REALTIME);
95+
ASSERT(max_packet_size <= uart_->write_port_->queue_data_->MaxSize());
8996

9097
app.Register(*this);
9198
}
9299

93-
static void TxThreadFun(SharedTopicClient* client) {
94-
LibXR::Semaphore write_op_sem;
95-
LibXR::WriteOperation op(write_op_sem);
96-
LibXR::WriteOperation op_none;
97-
while (true) {
98-
client->tx_sem_.Wait();
99-
auto size =
100-
LibXR::min(client->tx_queue_.Size(), client->tx_buffer_.size_);
101-
if (size > 0 && client->tx_queue_.PopBatch(
102-
static_cast<uint8_t*>(client->tx_buffer_.addr_),
103-
size) == LibXR::ErrorCode::OK) {
104-
client->uart_->Write(
105-
{static_cast<uint8_t*>(client->tx_buffer_.addr_), size}, op_none);
106-
}
107-
}
108-
}
109100
void OnMonitor() override {}
110101

111102
private:
112103
LibXR::UART* uart_;
113-
LibXR::RawData tx_buffer_;
114-
LibXR::LockFreeQueue<uint8_t> tx_queue_;
115104
LibXR::RawData* topics_pack_buffer_;
116-
LibXR::Semaphore tx_sem_;
117-
LibXR::Thread tx_thread_;
118105
};

0 commit comments

Comments
 (0)