Skip to content

Commit 8fa9ec8

Browse files
committed
Update DelegateMQ library
1 parent 3d4c6a1 commit 8fa9ec8

31 files changed

+1556
-743
lines changed

DelegateMQ/DelegateMQ.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,20 @@
179179
#elif defined(DMQ_TRANSPORT_ARM_LWIP_UDP)
180180
#include "predef/dispatcher/Dispatcher.h"
181181
#include "predef/transport/arm-lwip-udp/ArmLwipUdpTransport.h"
182+
#elif defined(DMQ_TRANSPORT_THREADX_UDP)
183+
#include "predef/dispatcher/Dispatcher.h"
184+
#include "predef/transport/threadx-udp/NetXUdpTransport.h"
185+
#elif defined(DMQ_TRANSPORT_ZEPHYR_UDP)
186+
#include "predef/dispatcher/Dispatcher.h"
187+
#include "predef/transport/zephyr-udp/ZephyrUdpTransport.h"
182188
#elif defined(DMQ_TRANSPORT_NONE)
183189
// Create a custom application-specific transport
184190
#else
185191
#error "Transport implementation not found."
186192
#endif
187193

188-
#if defined(DMQ_TRANSPORT_ZEROMQ) || defined(DMQ_TRANSPORT_WIN32_UDP) || defined(DMQ_TRANSPORT_LINUX_UDP)
194+
#if defined(DMQ_TRANSPORT_ZEROMQ) || defined(DMQ_TRANSPORT_WIN32_UDP) || defined(DMQ_TRANSPORT_LINUX_UDP) || \
195+
defined(DMQ_TRANSPORT_THREADX_UDP) || defined(DMQ_TRANSPORT_ZEPHYR_UDP)
189196
#include "predef/util/NetworkEngine.h"
190197
#endif
191198

@@ -198,4 +205,4 @@
198205
#include "predef/util/TransportMonitor.h"
199206
#endif
200207

201-
#endif
208+
#endif

DelegateMQ/External.cmake

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ if(DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_ARM_LWIP_UDP")
117117
set_and_check(LWIP_INCLUDE_DIR_1 "${LWIP_ROOT}/src/include")
118118
set_and_check(LWIP_INCLUDE_DIR_2 "${LWIP_ROOT}/src/include/ipv4")
119119

120-
# Often required for lwipopts.h or arch/cc.h.
120+
# Often required for lwipopts.h or arch/cc.h.
121121
# NOTE: You may need to customize this depending on where your project keeps 'lwipopts.h'
122122
# For now, we point to the generic 'contrib' ports if present, or just the root.
123123
set(LWIP_INCLUDE_DIR_3 "${LWIP_ROOT}/contrib/ports/unix/port/include")
@@ -135,6 +135,25 @@ if(DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_ARM_LWIP_UDP")
135135
"${LWIP_ROOT}/src/netif/*.c"
136136
)
137137
endif()
138+
139+
# ---------------------------------------------------------------------------
140+
# NetX / NetX Duo library (For ThreadX)
141+
# ---------------------------------------------------------------------------
142+
if(DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_THREADX_UDP")
143+
# Adjust this path to your NetX / NetX Duo root
144+
set(NETX_ROOT "${DMQ_ROOT_DIR}/../../../netxduo")
145+
146+
# Verify root
147+
set_and_check(NETX_INCLUDE_DIR "${NETX_ROOT}/common/inc")
148+
149+
# Collect common sources
150+
file(GLOB NETX_SOURCES
151+
"${NETX_ROOT}/common/src/*.c"
152+
"${NETX_ROOT}/common/inc/*.h"
153+
)
154+
155+
# Note: Architecture-specific ports usually handled by your main ThreadX build system
156+
endif()
138157

139158
# ---------------------------------------------------------------------------
140159
# MessagePack
@@ -180,6 +199,7 @@ if(DMQ_THREAD STREQUAL "DMQ_THREAD_FREERTOS")
180199
"${FREERTOS_ROOT_DIR}/FreeRTOS/Source/*.c"
181200
"${FREERTOS_ROOT_DIR}/FreeRTOS/Source/include/*.h"
182201
)
202+
183203
list(APPEND FREERTOS_SOURCES
184204
"${FREERTOS_ROOT_DIR}/FreeRTOS-Plus/Source/FreeRTOS-Plus-Trace/trcKernelPort.c"
185205
"${FREERTOS_ROOT_DIR}/FreeRTOS-Plus/Source/FreeRTOS-Plus-Trace/trcSnapshotRecorder.c"
@@ -212,11 +232,10 @@ endif()
212232
# Zephyr
213233
# ---------------------------------------------------------------------------
214234
if(DMQ_THREAD STREQUAL "DMQ_THREAD_ZEPHYR")
215-
# Zephyr is a build system, not just a library.
235+
# Zephyr is a build system, not just a library.
216236
# We typically do NOT manually glob source files here.
217237
# The application's main CMakeLists.txt must call `find_package(Zephyr)`
218238
# which sets up the include paths and kernel linking automatically.
219-
220239
# Optional: Just verify the root directory exists if you want to be safe
221240
set_and_check(ZEPHYR_ROOT_DIR "${DMQ_ROOT_DIR}/../../../zephyr")
222241

DelegateMQ/Predef.cmake

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ elseif (DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_SERIAL_PORT")
8989
elseif (DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_ARM_LWIP_UDP")
9090
add_compile_definitions(DMQ_TRANSPORT_ARM_LWIP_UDP)
9191
file(GLOB TRANSPORT_SOURCES "${DMQ_ROOT_DIR}/predef/transport/arm-lwip-udp/*.h")
92+
elseif (DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_THREADX_UDP")
93+
add_compile_definitions(DMQ_TRANSPORT_THREADX_UDP)
94+
file(GLOB TRANSPORT_SOURCES "${DMQ_ROOT_DIR}/predef/transport/threadx-udp/*.h")
95+
elseif (DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_ZEPHYR_UDP")
96+
add_compile_definitions(DMQ_TRANSPORT_ZEPHYR_UDP)
97+
file(GLOB TRANSPORT_SOURCES "${DMQ_ROOT_DIR}/predef/transport/zephyr-udp/*.h")
9298
else()
9399
message(FATAL_ERROR "Must set DMQ_TRANSPORT option.")
94100
endif()
@@ -144,4 +150,4 @@ list(APPEND DMQ_PREDEF_SOURCES ${UTIL_SOURCES})
144150
file(GLOB DMQ_LIB_SOURCES
145151
"${DMQ_ROOT_DIR}/*.h"
146152
"${DMQ_ROOT_DIR}/delegate/*.h"
147-
)
153+
)

DelegateMQ/delegate/DelegateOpt.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
#elif defined(DMQ_THREAD_THREADX)
1414
#include "predef/util/ThreadXClock.h"
1515
#include "predef/util/ThreadXMutex.h"
16+
#elif defined(DMQ_THREAD_ZEPHYR)
17+
#include "predef/util/ZephyrClock.h"
18+
#include "predef/util/ZephyrMutex.h"
1619
#elif defined(DMQ_THREAD_CMSIS_RTOS2)
1720
#include "predef/util/CmsisRtos2Clock.h"
1821
#include "predef/util/CmsisRtos2Mutex.h"
@@ -146,4 +149,4 @@ namespace dmq
146149
#define LOG_ERROR(...) do {} while(0)
147150
#endif
148151

149-
#endif
152+
#endif // _DELEGATE_OPT_H

DelegateMQ/predef/os/freertos/Thread.cpp

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ Thread::Thread(const std::string& threadName) : THREAD_NAME(threadName)
2222
Thread::~Thread()
2323
{
2424
ExitThread();
25+
// Safe to delete semaphore now that the thread is definitely gone
26+
if (m_exitSem) {
27+
vSemaphoreDelete(m_exitSem);
28+
m_exitSem = nullptr;
29+
}
2530
}
2631

2732
//----------------------------------------------------------------------------
@@ -31,12 +36,16 @@ bool Thread::CreateThread()
3136
{
3237
if (!m_thread)
3338
{
34-
// 1. Create the Queue first
39+
// 1. Create Exit Synchronization Semaphore
40+
m_exitSem = xSemaphoreCreateBinary();
41+
ASSERT_TRUE(m_exitSem != nullptr);
42+
43+
// 2. Create the Queue
3544
// Holds pointers to ThreadMsg objects (heap allocated)
3645
m_queue = xQueueCreate(20, sizeof(ThreadMsg*));
3746
ASSERT_TRUE(m_queue != nullptr);
3847

39-
// 2. Create the Task
48+
// 3. Create the Task
4049
BaseType_t xReturn = xTaskCreate(
4150
(TaskFunction_t)&Thread::Process,
4251
THREAD_NAME.c_str(),
@@ -58,12 +67,26 @@ void Thread::ExitThread()
5867
if (m_queue) {
5968
// Send exit message
6069
ThreadMsg* msg = new ThreadMsg(MSG_EXIT_THREAD);
61-
if (xQueueSend(m_queue, &msg, 100) != pdPASS) {
70+
// Wait 100ms to send
71+
if (xQueueSend(m_queue, &msg, pdMS_TO_TICKS(100)) != pdPASS) {
6272
delete msg; // Failed to send, clean up
6373
}
64-
// Note: We don't join/wait here because FreeRTOS tasks
65-
// delete themselves asynchronously.
66-
m_thread = nullptr; // Detach
74+
75+
// Wait for the thread to actually finish to avoid Use-After-Free.
76+
// We only wait if we are NOT the thread itself (prevent deadlock).
77+
if (xTaskGetCurrentTaskHandle() != m_thread && m_exitSem) {
78+
xSemaphoreTake(m_exitSem, portMAX_DELAY);
79+
}
80+
81+
// Now safe to clean up resources
82+
// Delete Queue
83+
if (m_queue) {
84+
vQueueDelete(m_queue);
85+
m_queue = nullptr;
86+
}
87+
88+
// Note: m_thread handle is invalid after the task deletes itself
89+
m_thread = nullptr;
6790
}
6891
}
6992

@@ -99,7 +122,7 @@ void Thread::DispatchDelegate(std::shared_ptr<dmq::DelegateMsg> msg)
99122
{
100123
// 3. Handle failure: Delete to prevent memory leak
101124
delete threadMsg;
102-
printf("Error: Thread '%s' queue full! Delegate dropped.\n", THREAD_NAME.c_str());
125+
// printf("Error: Thread '%s' queue full! Delegate dropped.\n", THREAD_NAME.c_str());
103126
}
104127
}
105128

@@ -110,6 +133,7 @@ void Thread::Process(void* instance)
110133
{
111134
Thread* thread = static_cast<Thread*>(instance);
112135
ASSERT_TRUE(thread != nullptr);
136+
113137
thread->Run();
114138

115139
// Self-delete when Run() returns (ExitThread called)
@@ -146,6 +170,10 @@ void Thread::Run()
146170
case MSG_EXIT_THREAD:
147171
{
148172
delete msg;
173+
// Signal ExitThread() that we are done
174+
if (m_exitSem) {
175+
xSemaphoreGive(m_exitSem);
176+
}
149177
return; // Breaks loop, Process() calls vTaskDelete
150178
}
151179

DelegateMQ/predef/os/freertos/Thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "FreeRTOS.h"
2727
#include "task.h"
2828
#include "queue.h"
29+
#include "semphr.h"
2930
#include <string>
3031
#include <memory>
3132

@@ -71,6 +72,8 @@ class Thread : public dmq::IThread
7172

7273
TaskHandle_t m_thread = nullptr;
7374
QueueHandle_t m_queue = nullptr;
75+
SemaphoreHandle_t m_exitSem = nullptr; // Synchronization for safe destruction
76+
7477
const std::string THREAD_NAME;
7578
};
7679

DelegateMQ/predef/serialize/bitsery/Serializer.h

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,50 @@
99
/// to a remote. Bitsery provides fast, compact binary serialization.
1010

1111
#include "delegate/ISerializer.h"
12+
13+
// Core Bitsery
1214
#include <bitsery/bitsery.h>
1315
#include <bitsery/adapter/stream.h>
14-
#include <bitsery/ext/std_tuple.h>
16+
17+
// Common Traits (Include these so standard types work out of the box)
18+
#include <bitsery/traits/string.h>
19+
#include <bitsery/traits/vector.h>
20+
#include <bitsery/traits/list.h>
21+
1522
#include <sstream>
1623
#include <iostream>
1724
#include <type_traits>
1825

19-
// Type trait to check if a type is const
20-
template <typename T>
21-
using is_const_type = std::is_const<std::remove_reference_t<T>>;
22-
2326
template <class R>
2427
struct Serializer; // Not defined
2528

2629
template<class RetType, class... Args>
2730
class Serializer<RetType(Args...)> : public dmq::ISerializer<RetType(Args...)>
2831
{
2932
public:
33+
// Bitsery Adapters for std::ostream / std::istream
3034
using OutputAdapter = bitsery::OutputStreamAdapter;
3135
using InputAdapter = bitsery::InputStreamAdapter;
3236

33-
virtual std::ostream& Write(std::ostream& os, Args... args) override {
37+
// Write: Changed 'Args... args' to 'const Args&... args' for efficiency
38+
virtual std::ostream& Write(std::ostream& os, const Args&... args) override {
3439
try {
35-
os.seekp(0);
36-
bitsery::Serializer<OutputAdapter> writer{ os };
40+
// Reset stream position.
41+
os.seekp(0, std::ios::beg);
3742

38-
// Serialize each argument using fold expression
43+
// Clear stringstreams explicitly to avoid appending new data to old data.
44+
// DelegateMQ often reuses the stream object.
45+
if (auto* ss = dynamic_cast<std::ostringstream*>(&os)) {
46+
ss->str("");
47+
}
48+
49+
// Construct the adapter properly passing the stream
50+
bitsery::Serializer<OutputAdapter> writer{ OutputAdapter{os} };
51+
52+
// Serialize each argument using C++17 fold expression
3953
(writer.object(args), ...);
54+
55+
// Ensure buffer is flushed to the stream
4056
writer.adapter().flush();
4157
}
4258
catch (const std::exception& e) {
@@ -48,11 +64,16 @@ class Serializer<RetType(Args...)> : public dmq::ISerializer<RetType(Args...)>
4864

4965
virtual std::istream& Read(std::istream& is, Args&... args) override {
5066
try {
51-
bitsery::Deserializer<InputAdapter> reader{ is };
67+
// Construct the adapter properly passing the stream
68+
bitsery::Deserializer<InputAdapter> reader{ InputAdapter{is} };
5269

5370
// Deserialize each argument using fold expression
5471
(reader.object(args), ...);
5572

73+
// Optional: Check for deserialization errors
74+
if (reader.adapter().error() != bitsery::ReaderError::NoError) {
75+
throw std::runtime_error("Bitsery reported a read error");
76+
}
5677
}
5778
catch (const std::exception& e) {
5879
std::cerr << "Bitsery deserialize error: " << e.what() << std::endl;
@@ -62,4 +83,4 @@ class Serializer<RetType(Args...)> : public dmq::ISerializer<RetType(Args...)>
6283
}
6384
};
6485

65-
#endif
86+
#endif // SERIALIZER_H

0 commit comments

Comments
 (0)