Skip to content

Commit e281dac

Browse files
committed
Update DelegateMQ library
1 parent 3da0dc3 commit e281dac

90 files changed

Lines changed: 1804 additions & 566 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

DelegateMQ/DelegateMQ.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@
8888
defined(DMQ_THREAD_THREADX) || \
8989
defined(DMQ_THREAD_ZEPHYR) || \
9090
defined(DMQ_THREAD_CMSIS_RTOS2) || \
91-
defined(DMQ_THREAD_QT)
91+
defined(DMQ_THREAD_QT) || \
92+
defined(DMQ_THREAD_NONE)
9293
#include "delegate/MulticastDelegateSafe.h"
9394
#include "delegate/UnicastDelegateSafe.h"
9495
#endif
@@ -253,4 +254,5 @@
253254
#include "extras/databus/DeadlineSubscription.h"
254255
#endif
255256

257+
256258
#endif

DelegateMQ/delegate/DelegateOpt.h

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -116,22 +116,22 @@ namespace dmq
116116

117117
#elif defined(DMQ_THREAD_FREERTOS)
118118
// Use the custom FreeRTOS wrapper
119-
using Clock = dmq::FreeRTOSClock;
119+
using Clock = dmq::os::FreeRTOSClock;
120120

121121
#elif defined(DMQ_THREAD_THREADX)
122122
// Use the custom ThreadX wrapper
123-
using Clock = dmq::ThreadXClock;
123+
using Clock = dmq::os::ThreadXClock;
124124

125125
#elif defined(DMQ_THREAD_ZEPHYR)
126126
// Use the custom Zephyr wrapper
127-
using Clock = dmq::ZephyrClock;
127+
using Clock = dmq::os::ZephyrClock;
128128

129129
#elif defined(DMQ_THREAD_CMSIS_RTOS2)
130-
using Clock = dmq::CmsisRtos2Clock;
130+
using Clock = dmq::os::CmsisRtos2Clock;
131131

132132
#else
133133
// Assuming implemented the 'g_ticks' variable
134-
using Clock = dmq::BareMetalClock;
134+
using Clock = dmq::os::BareMetalClock;
135135
#endif
136136

137137
// --- GENERIC TYPES ---
@@ -151,31 +151,31 @@ namespace dmq
151151

152152
#elif defined(DMQ_THREAD_FREERTOS)
153153
// Use the custom FreeRTOS wrapper
154-
using Mutex = dmq::FreeRTOSMutex;
155-
using RecursiveMutex = dmq::FreeRTOSRecursiveMutex;
156-
using ConditionVariable = dmq::FreeRTOSConditionVariable;
154+
using Mutex = dmq::os::FreeRTOSMutex;
155+
using RecursiveMutex = dmq::os::FreeRTOSRecursiveMutex;
156+
using ConditionVariable = dmq::os::FreeRTOSConditionVariable;
157157
template<typename T> using LockGuard = PortableLockGuard<T>;
158158
template<typename T> using UniqueLock = std::unique_lock<T>;
159159
#define DMQ_HAS_CV
160160

161161
#elif defined(DMQ_THREAD_THREADX)
162162
// Use the custom ThreadX wrapper
163-
using Mutex = dmq::ThreadXMutex;
164-
using RecursiveMutex = dmq::ThreadXRecursiveMutex;
165-
using ConditionVariable = dmq::ThreadXConditionVariable;
163+
using Mutex = dmq::os::ThreadXMutex;
164+
using RecursiveMutex = dmq::os::ThreadXRecursiveMutex;
165+
using ConditionVariable = dmq::os::ThreadXConditionVariable;
166166
template<typename T> using LockGuard = PortableLockGuard<T>;
167167
template<typename T> using UniqueLock = std::unique_lock<T>;
168168
#define DMQ_HAS_CV
169169

170170
#elif defined(DMQ_THREAD_ZEPHYR)
171171
// Use the custom Zephyr wrapper
172-
using Mutex = dmq::ZephyrMutex;
173-
using RecursiveMutex = dmq::ZephyrRecursiveMutex;
172+
using Mutex = dmq::os::ZephyrMutex;
173+
using RecursiveMutex = dmq::os::ZephyrRecursiveMutex;
174174
template<typename T> using LockGuard = PortableLockGuard<T>;
175175

176176
#elif defined(DMQ_THREAD_CMSIS_RTOS2)
177-
using Mutex = dmq::CmsisRtos2Mutex;
178-
using RecursiveMutex = dmq::CmsisRtos2RecursiveMutex;
177+
using Mutex = dmq::os::CmsisRtos2Mutex;
178+
using RecursiveMutex = dmq::os::CmsisRtos2RecursiveMutex;
179179
template<typename T> using LockGuard = PortableLockGuard<T>;
180180

181181
#else
@@ -237,35 +237,37 @@ namespace dmq
237237
#undef XALLOCATOR
238238
#define XALLOCATOR
239239

240-
// Use default std::allocator for dynamic storage allocation
241-
template <typename T, typename Alloc = std::allocator<T>>
242-
class xlist : public std::list<T, Alloc> {
243-
public:
244-
using std::list<T, Alloc>::list; // Inherit constructors
245-
using std::list<T, Alloc>::operator=;
246-
};
247-
248-
typedef std::basic_ostringstream<char, std::char_traits<char>> xostringstream;
249-
typedef std::basic_stringstream<char, std::char_traits<char>> xstringstream;
250-
251-
typedef std::string xstring;
252-
253-
// Fallback xmake_shared — uses std::make_shared when fixed-block allocator is disabled
254-
template <typename T, typename... Args>
255-
inline std::shared_ptr<T> xmake_shared(Args&&... args)
256-
{
257-
return std::make_shared<T>(std::forward<Args>(args)...);
258-
}
259-
260-
// Fallback xnew/xdelete — use standard new/delete when fixed-block allocator is disabled
261-
template<typename T, typename... Args>
262-
inline T* xnew(Args&&... args) {
263-
return new(std::nothrow) T(std::forward<Args>(args)...);
264-
}
265-
266-
template<typename T>
267-
inline void xdelete(T* p) {
268-
delete p;
240+
namespace dmq {
241+
// Use default std::allocator for dynamic storage allocation
242+
template <typename T, typename Alloc = std::allocator<T>>
243+
class xlist : public std::list<T, Alloc> {
244+
public:
245+
using std::list<T, Alloc>::list; // Inherit constructors
246+
using std::list<T, Alloc>::operator=;
247+
};
248+
249+
typedef std::basic_ostringstream<char, std::char_traits<char>> xostringstream;
250+
typedef std::basic_stringstream<char, std::char_traits<char>> xstringstream;
251+
252+
typedef std::string xstring;
253+
254+
// Fallback xmake_shared — uses std::make_shared when fixed-block allocator is disabled
255+
template <typename T, typename... Args>
256+
inline std::shared_ptr<T> xmake_shared(Args&&... args)
257+
{
258+
return std::make_shared<T>(std::forward<Args>(args)...);
259+
}
260+
261+
// Fallback xnew/xdelete — use standard new/delete when fixed-block allocator is disabled
262+
template<typename T, typename... Args>
263+
inline T* xnew(Args&&... args) {
264+
return new(std::nothrow) T(std::forward<Args>(args)...);
265+
}
266+
267+
template<typename T>
268+
inline void xdelete(T* p) {
269+
delete p;
270+
}
269271
}
270272
#endif
271273

DelegateMQ/extras/allocator/xlist.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
#include "stl_allocator.h"
55
#include <list>
66

7-
// xlist uses a fix-block memory allocator
8-
template <typename T, typename Alloc = stl_allocator<T>>
9-
using xlist = std::list<T, Alloc>;
7+
namespace dmq {
8+
// xlist uses a fix-block memory allocator
9+
template <typename T, typename Alloc = stl_allocator<T>>
10+
using xlist = std::list<T, Alloc>;
11+
}
1012

1113
#if 0 // Deprecated
1214
template<class _Ty,

DelegateMQ/extras/allocator/xnew.h

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,26 @@
1818

1919
#include "xallocator.h"
2020

21-
/// @brief Allocate and construct T in the fixed-block pool.
22-
/// @return Pointer to the constructed object, or nullptr if xmalloc fails.
23-
template<typename T, typename... Args>
24-
inline T* xnew(Args&&... args)
25-
{
26-
void* mem = xmalloc(sizeof(T));
27-
if (!mem) return nullptr;
28-
return ::new(mem) T(std::forward<Args>(args)...);
29-
}
21+
namespace dmq {
22+
/// @brief Allocate and construct T in the fixed-block pool.
23+
/// @return Pointer to the constructed object, or nullptr if xmalloc fails.
24+
template<typename T, typename... Args>
25+
inline T* xnew(Args&&... args)
26+
{
27+
void* mem = xmalloc(sizeof(T));
28+
if (!mem) return nullptr;
29+
return ::new(mem) T(std::forward<Args>(args)...);
30+
}
3031

31-
/// @brief Destroy T and return its memory to the fixed-block pool.
32-
template<typename T>
33-
inline void xdelete(T* p)
34-
{
35-
if (p)
32+
/// @brief Destroy T and return its memory to the fixed-block pool.
33+
template<typename T>
34+
inline void xdelete(T* p)
3635
{
37-
p->~T();
38-
xfree(const_cast<void*>(static_cast<const void*>(p)));
36+
if (p)
37+
{
38+
p->~T();
39+
xfree(const_cast<void*>(static_cast<const void*>(p)));
40+
}
3941
}
4042
}
4143

DelegateMQ/extras/allocator/xsstream.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
#include "stl_allocator.h"
55
#include <sstream>
66

7-
typedef std::basic_stringstream<char, std::char_traits<char>, stl_allocator<char> > xstringstream;
8-
typedef std::basic_ostringstream<char, std::char_traits<char>, stl_allocator<char> > xostringstream;
7+
namespace dmq {
8+
typedef std::basic_stringstream<char, std::char_traits<char>, stl_allocator<char> > xstringstream;
9+
typedef std::basic_ostringstream<char, std::char_traits<char>, stl_allocator<char> > xostringstream;
10+
}
911
// xwstringstream/xwostringstream are unused by DelegateMQ. Uncomment if wide string support is needed by the application.
1012
// typedef std::basic_stringstream<wchar_t, std::char_traits<wchar_t>, stl_allocator<wchar_t> > xwstringstream;
1113
// typedef std::basic_ostringstream<wchar_t, std::char_traits<wchar_t>, stl_allocator<wchar_t> > xwostringstream;

DelegateMQ/extras/allocator/xstring.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
#include "stl_allocator.h"
55
#include <string>
66

7-
typedef std::basic_string<char, std::char_traits<char>, stl_allocator<char> > xstring;
7+
namespace dmq {
8+
typedef std::basic_string<char, std::char_traits<char>, stl_allocator<char> > xstring;
9+
}
810
// xwstring is unused by DelegateMQ. Uncomment if wide string support is needed by the application.
911
// typedef std::basic_string<wchar_t, std::char_traits<wchar_t>, stl_allocator<wchar_t> > xwstring;
1012

DelegateMQ/extras/databus/DataBus.h

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
#ifndef DMQ_DATABUS_H
22
#define DMQ_DATABUS_H
33

4-
#include "DelegateMQ.h"
4+
#include "delegate/Signal.h"
5+
#include "delegate/DelegateRemote.h"
6+
#include "delegate/DelegateAsync.h"
7+
#include "delegate/IThread.h"
8+
#include "delegate/DelegateOpt.h"
59
#include "Participant.h"
610
#include "DataBusQos.h"
711
#include "SpyPacket.h"
@@ -17,7 +21,7 @@
1721
#include <typeindex>
1822
#include <atomic>
1923

20-
namespace dmq {
24+
namespace dmq::databus {
2125

2226
// The DataBus is a central registry for topic-based communication.
2327
// It allows components to publish and subscribe to data topics identified by strings.
@@ -81,7 +85,13 @@ class DataBus {
8185
GetInstance().InternalRegisterStringifier<T>(topic, std::move(func));
8286
}
8387

88+
// Enable/Disable Last Value Cache (LVC) for a topic.
89+
static void LastValueCache(const std::string& topic, bool enabled) {
90+
GetInstance().InternalLastValueCache(topic, enabled);
91+
}
92+
8493
// Subscribe to all bus traffic (topic and stringified value).
94+
// Monitor all traffic on the DataBus.
8595
// NOTE: priority is only applied when thread != nullptr; passing a non-default
8696
// priority without a thread is a programming error and triggers FaultHandler.
8797
static dmq::ScopedConnection Monitor(std::function<void(const SpyPacket&)> func, dmq::IThread* thread = nullptr, dmq::Priority priority = dmq::Priority::NORMAL) {
@@ -99,12 +109,22 @@ class DataBus {
99109
return instance.m_monitorSignal.Connect(dmq::MakeDelegate(std::move(func)));
100110
}
101111

112+
/// Fired when a message is published but has no local or remote subscribers.
113+
static dmq::ScopedConnection SubscribeUnhandled(std::function<void(const std::string& topic)> func) {
114+
return GetInstance().m_unhandledSignal.Connect(dmq::MakeDelegate(std::move(func)));
115+
}
116+
102117
// Reset the DataBus (mostly for testing).
103118
static void ResetForTesting() {
104119
GetInstance().InternalReset();
105120
}
106121

107122
private:
123+
void InternalLastValueCache(const std::string& topic, bool enabled) {
124+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
125+
m_topicQos[topic].lastValueCache = enabled;
126+
}
127+
108128
DataBus() = default;
109129
~DataBus() = default;
110130

@@ -225,7 +245,7 @@ class DataBus {
225245
// Must be first — before any writes — so a mismatch never corrupts LVC.
226246
auto itType = m_typeIndices.find(topic);
227247
if (itType != m_typeIndices.end() && itType->second != std::type_index(typeid(T))) {
228-
::FaultHandler(__FILE__, (unsigned short)__LINE__);
248+
::dmq::util::FaultHandler(__FILE__, (unsigned short)__LINE__);
229249
return;
230250
}
231251

@@ -271,16 +291,24 @@ class DataBus {
271291
}
272292

273293
// 7. Local distribution
294+
bool handled = false;
274295
if (signal) {
275296
(*signal)(data);
297+
handled = true;
276298
}
277299

278300
// 8. Remote distribution using the snapshot
279301
if (serializer) {
280302
for (auto& participant : participantsSnapshot) {
281303
participant->Send<T>(topic, data, *serializer);
304+
handled = true;
282305
}
283306
}
307+
308+
// 9. Notify if no one received the message
309+
if (!handled) {
310+
m_unhandledSignal(topic);
311+
}
284312
}
285313

286314
void InternalAddParticipant(std::shared_ptr<Participant> participant) {
@@ -295,7 +323,7 @@ class DataBus {
295323
auto itType = m_typeIndices.find(topic);
296324
if (itType != m_typeIndices.end()) {
297325
if (itType->second != std::type_index(typeid(T))) {
298-
::FaultHandler(__FILE__, (unsigned short)__LINE__);
326+
::dmq::util::FaultHandler(__FILE__, (unsigned short)__LINE__);
299327
return;
300328
}
301329
} else {
@@ -314,7 +342,7 @@ class DataBus {
314342
auto itType = m_typeIndices.find(topic);
315343
if (itType != m_typeIndices.end()) {
316344
if (itType->second != std::type_index(typeid(T))) {
317-
::FaultHandler(__FILE__, (unsigned short)__LINE__);
345+
::dmq::util::FaultHandler(__FILE__, (unsigned short)__LINE__);
318346
return;
319347
}
320348
} else {
@@ -347,7 +375,7 @@ class DataBus {
347375
if (itType != m_typeIndices.end()) {
348376
if (itType->second != std::type_index(typeid(T))) {
349377
// Runtime Type Safety: Catch same topic string used with different types
350-
::FaultHandler(__FILE__, (unsigned short)__LINE__);
378+
::dmq::util::FaultHandler(__FILE__, (unsigned short)__LINE__);
351379
return nullptr;
352380
}
353381
} else {
@@ -378,9 +406,11 @@ class DataBus {
378406
std::unordered_map<std::string, QoS> m_topicQos;
379407
std::unordered_map<std::string, std::shared_ptr<void>> m_stringifiers;
380408
dmq::Signal<void(const SpyPacket&)> m_monitorSignal;
409+
dmq::Signal<void(const std::string& topic)> m_unhandledSignal;
381410
};
382411

383-
} // namespace dmq
412+
} // namespace dmq::databus
413+
384414

385415
#endif // DMQ_DATABUS_H
386416

DelegateMQ/extras/databus/DataBusQos.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#ifndef DMQ_DATABUSQOS_H
22
#define DMQ_DATABUSQOS_H
33

4-
#include "DelegateMQ.h"
4+
#include "delegate/DelegateOpt.h"
55
#include <optional>
66

7-
namespace dmq {
7+
namespace dmq::databus {
88

99
// Quality of Service settings for a topic subscription.
1010
struct QoS {
@@ -22,6 +22,7 @@ struct QoS {
2222
std::optional<dmq::Duration> minSeparation;
2323
};
2424

25-
} // namespace dmq
25+
} // namespace dmq::databus
26+
2627

2728
#endif // DMQ_DATABUSQOS_H

0 commit comments

Comments
 (0)