Skip to content

Commit 8c3274b

Browse files
committed
refactor: parse shared topics in rx thread
1 parent 22085b2 commit 8c3274b

2 files changed

Lines changed: 36 additions & 39 deletions

File tree

README.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ SharedTopic is a UART-based multi-topic data sharing and server module.
2121

2222
## 运行方式
2323

24-
`SharedTopic` 不创建接收线程。模块在 UART `read_port` 上挂 0 字节 read 作为可读事件;
25-
一旦 UART 收到数据,回调会读取当前已有字节并调用 `Topic::Server::ParseDataFromCallback()`
26-
解析出的 Topic 会继续沿 libxr callback/ISR 语义发布,串口包里的 envelope timestamp 会被保留。
24+
`SharedTopic` 创建一个接收线程。线程用 UART `read_port` 的阻塞读等待可读事件,
25+
被唤醒后一次性搬走当前已经到达的字节,再整块调用 `Topic::Server::ParseData()`
26+
解析出的 Topic 走普通 `Topic::Publish()` 语义,串口包里的 envelope timestamp 会被保留。
2727

28-
读取 UART 时会先按 `buffer_size` 成块搬到本地缓存,再逐字节喂给 `Topic::Server`
29-
这样可以避免 parser 内部固定队列在“半包残留 + 新批次较大”时按整批写入语义丢数据。
30-
`buffer_size` 仍必须不小于需要接收的最大单个 Topic 串口包。
28+
业务 Topic 不在 UART/USB 回调链里发布,避免接收回调里继续触发同步命令等业务逻辑。
29+
`buffer_size` 必须不小于需要接收的最大单个 Topic 串口包。
3130

3231
## 依赖 / Depends
3332

SharedTopic.hpp

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ depends: []
2525
#include "libxr_type.hpp"
2626
#include "logger.hpp"
2727
#include "message.hpp"
28+
#include "semaphore.hpp"
2829
#include "thread.hpp"
2930
#include "uart.hpp"
3031

@@ -52,6 +53,7 @@ class SharedTopic : public LibXR::Application {
5253
LibXR::RamFS::CreateFile(cmd_name_, CommandFunc, this))) {
5354
ASSERT(uart_ != nullptr);
5455
ASSERT(uart_->read_port_ != nullptr);
56+
ASSERT(uart_->read_port_->Readable());
5557

5658
for (auto config : topic_configs) {
5759
auto domain = LibXR::Topic::Domain(config.domain);
@@ -65,13 +67,8 @@ class SharedTopic : public LibXR::Application {
6567

6668
hw.template FindOrExit<LibXR::RamFS>({"ramfs"})->Add(cmd_file_);
6769

68-
rx_callback_ = LibXR::Callback<LibXR::ErrorCode>::CreateGuarded(
69-
[](bool in_isr, SharedTopic* self, LibXR::ErrorCode status) {
70-
self->OnRxReady(in_isr, status);
71-
},
72-
this);
73-
rx_ready_op_ = LibXR::ReadOperation(rx_callback_);
74-
StartRxWait(false);
70+
rx_thread_.Create<SharedTopic*>(this, RxThread, "shared_topic", 2048,
71+
LibXR::Thread::Priority::MEDIUM);
7572

7673
app.Register(*this);
7774
}
@@ -81,7 +78,8 @@ class SharedTopic : public LibXR::Application {
8178
static int CommandFunc(SharedTopic* self, int argc, char** argv) {
8279
if (argc == 1) {
8380
LibXR::STDIO::Printf<"Usage:\r\n">();
84-
LibXR::STDIO::Printf<" monitor [time_ms] [interval_ms] - test received speed\r\n">();
81+
LibXR::STDIO::Printf<
82+
" monitor [time_ms] [interval_ms] - test received speed\r\n">();
8583
return 0;
8684
} else if (argc == 4) {
8785
if (strcmp(argv[1], "monitor") == 0) {
@@ -90,8 +88,9 @@ class SharedTopic : public LibXR::Application {
9088
auto start = self->rx_count_;
9189
while (time > 0) {
9290
LibXR::Thread::Sleep(delay);
93-
LibXR::STDIO::Printf<"%f Mbps\r\n">(static_cast<float>(self->rx_count_ - start) * 8.0 /
94-
1024.0 / 1024.0 / delay * 1000.0);
91+
LibXR::STDIO::Printf<"%f Mbps\r\n">(
92+
static_cast<float>(self->rx_count_ - start) * 8.0 / 1024.0 /
93+
1024.0 / delay * 1000.0);
9594
time -= delay;
9695
start = self->rx_count_;
9796
}
@@ -105,29 +104,29 @@ class SharedTopic : public LibXR::Application {
105104
}
106105

107106
private:
108-
void OnRxReady(bool in_isr, LibXR::ErrorCode status) {
109-
if (status == LibXR::ErrorCode::OK) {
110-
auto size = LibXR::min(uart_->read_port_->Size(), rx_buffer_.size_);
111-
if (size > 0) {
112-
auto ans =
113-
uart_->Read(LibXR::RawData{rx_buffer_.addr_, size}, rx_data_op_, in_isr);
114-
if (ans == LibXR::ErrorCode::OK) {
115-
auto* data = static_cast<uint8_t*>(rx_buffer_.addr_);
116-
for (size_t i = 0; i < size; i++) {
117-
server_.ParseDataFromCallback(LibXR::ConstRawData{data + i, 1},
118-
in_isr);
119-
}
120-
rx_count_ += size;
121-
}
107+
static void RxThread(SharedTopic* self) { self->RunRxLoop(); }
108+
109+
void RunRxLoop() {
110+
LibXR::ReadOperation wait_op(rx_sem_);
111+
LibXR::ReadOperation read_op;
112+
113+
while (true) {
114+
auto ans = uart_->Read({nullptr, 0}, wait_op);
115+
if (ans != LibXR::ErrorCode::OK) {
116+
continue;
122117
}
123-
}
124118

125-
StartRxWait(in_isr);
126-
}
119+
while (uart_->read_port_->Size() > 0) {
120+
auto size = LibXR::min(uart_->read_port_->Size(), rx_buffer_.size_);
121+
ans = uart_->Read(LibXR::RawData{rx_buffer_.addr_, size}, read_op);
122+
if (ans != LibXR::ErrorCode::OK) {
123+
break;
124+
}
127125

128-
void StartRxWait(bool in_isr) {
129-
auto ans = uart_->Read({nullptr, 0}, rx_ready_op_, in_isr);
130-
ASSERT(ans == LibXR::ErrorCode::OK);
126+
server_.ParseData(LibXR::ConstRawData{rx_buffer_.addr_, size});
127+
rx_count_ += size;
128+
}
129+
}
131130
}
132131

133132
LibXR::UART* uart_;
@@ -142,7 +141,6 @@ class SharedTopic : public LibXR::Application {
142141

143142
LibXR::RamFS::File cmd_file_;
144143

145-
LibXR::Callback<LibXR::ErrorCode> rx_callback_;
146-
LibXR::ReadOperation rx_ready_op_;
147-
LibXR::ReadOperation rx_data_op_;
144+
LibXR::Semaphore rx_sem_;
145+
LibXR::Thread rx_thread_;
148146
};

0 commit comments

Comments
 (0)