55module_description: SharedTopic 是一个基于 UART 的多 Topic 数据共享与解析服务端模块 / SharedTopic is a UART-based multi-topic data sharing and parsing server module
66constructor_args:
77 - uart_name: "usart1"
8- - task_stack_depth: 2048
98 - buffer_size: 256
109 - topic_configs:
1110 - "topic1"
@@ -17,6 +16,7 @@ depends: []
1716// clang-format on
1817
1918#include < cstdint>
19+ #include < cstdlib>
2020#include < cstring>
2121
2222#include " app_framework.hpp"
@@ -25,7 +25,6 @@ depends: []
2525#include " libxr_type.hpp"
2626#include " logger.hpp"
2727#include " message.hpp"
28- #include " semaphore.hpp"
2928#include " thread.hpp"
3029#include " uart.hpp"
3130
@@ -42,8 +41,7 @@ class SharedTopic : public LibXR::Application {
4241 };
4342
4443 SharedTopic (LibXR::HardwareContainer& hw, LibXR::ApplicationManager& app,
45- const char * uart_name, uint32_t task_stack_depth,
46- uint32_t buffer_size,
44+ const char * uart_name, uint32_t buffer_size,
4745 std::initializer_list<TopicConfig> topic_configs)
4846 : uart_(hw.template Find<LibXR::UART>(uart_name)),
4947 server_ (buffer_size),
@@ -52,6 +50,9 @@ class SharedTopic : public LibXR::Application {
5250 cmd_file_((strcpy(cmd_name_, " shared_topic:" ),
5351 strcpy(cmd_name_ + strlen(" shared_topic:" ), uart_name),
5452 LibXR::RamFS::CreateFile(cmd_name_, CommandFunc, this ))) {
53+ ASSERT (uart_ != nullptr );
54+ ASSERT (uart_->read_port_ != nullptr );
55+
5556 for (auto config : topic_configs) {
5657 auto domain = LibXR::Topic::Domain (config.domain );
5758 auto topic = LibXR::Topic::Find (config.name , &domain);
@@ -64,31 +65,17 @@ class SharedTopic : public LibXR::Application {
6465
6566 hw.template FindOrExit <LibXR::RamFS>({" ramfs" })->Add (cmd_file_);
6667
67- rx_thread_.Create (this , RxThreadFun, " SharedTopic::RxThread" ,
68- task_stack_depth, LibXR::Thread::Priority::REALTIME);
68+ rx_callback_ = LibXR::Callback<LibXR::ErrorCode>::CreateGuarded (
69+ [](bool in_isr, SharedTopic* self, LibXR::ErrorCode status) {
70+ self->OnRxReady (in_isr, status);
71+ },
72+ this );
73+ rx_ready_op_ = LibXR::ReadOperation (rx_callback_);
74+ StartRxWait (false );
6975
7076 app.Register (*this );
7177 }
7278
73- static void RxThreadFun (SharedTopic* self) {
74- LibXR::Semaphore sem;
75- LibXR::ReadOperation op (sem);
76- while (true ) {
77- self->uart_ ->Read ({nullptr , 0 }, op);
78- auto size = LibXR::max (
79- sizeof (LibXR::Topic::PackedDataHeader),
80- LibXR::min (self->uart_ ->read_port_ ->Size (), self->rx_buffer_ .size_ ));
81- auto ans =
82- self->uart_ ->Read (LibXR::RawData{self->rx_buffer_ .addr_ , size}, op);
83-
84- if (ans == LibXR::ErrorCode::OK) {
85- self->server_ .ParseData (LibXR::RawData{self->rx_buffer_ .addr_ , size});
86- }
87-
88- self->rx_count_ += size;
89- }
90- }
91-
9279 void OnMonitor () override {}
9380
9481 static int CommandFunc (SharedTopic* self, int argc, char ** argv) {
@@ -118,6 +105,28 @@ class SharedTopic : public LibXR::Application {
118105 }
119106
120107 private:
108+ void OnRxReady (bool in_isr, LibXR::ErrorCode status) {
109+ if (status == LibXR::ErrorCode::OK) {
110+ auto size = LibXR::min (uart_->read_port_ ->Size (), rx_buffer_.size_ );
111+ if (size > 0 ) {
112+ auto ans =
113+ uart_->Read (LibXR::RawData{rx_buffer_.addr_ , size}, rx_data_op_, in_isr);
114+ if (ans == LibXR::ErrorCode::OK) {
115+ server_.ParseDataFromCallback (
116+ LibXR::ConstRawData{rx_buffer_.addr_ , size}, in_isr);
117+ rx_count_ += size;
118+ }
119+ }
120+ }
121+
122+ StartRxWait (in_isr);
123+ }
124+
125+ void StartRxWait (bool in_isr) {
126+ auto ans = uart_->Read ({nullptr , 0 }, rx_ready_op_, in_isr);
127+ ASSERT (ans == LibXR::ErrorCode::OK);
128+ }
129+
121130 LibXR::UART* uart_;
122131
123132 LibXR::Topic::Server server_;
@@ -130,6 +139,7 @@ class SharedTopic : public LibXR::Application {
130139
131140 LibXR::RamFS::File cmd_file_;
132141
133- LibXR::Thread rx_thread_;
134- LibXR::Thread tx_thread_;
142+ LibXR::Callback<LibXR::ErrorCode> rx_callback_;
143+ LibXR::ReadOperation rx_ready_op_;
144+ LibXR::ReadOperation rx_data_op_;
135145};
0 commit comments