Skip to content

Commit 3da0dc3

Browse files
committed
Updated DelegateMQ library
1 parent 5b70b23 commit 3da0dc3

File tree

20 files changed

+830
-310
lines changed

20 files changed

+830
-310
lines changed

DelegateMQ/DelegateMQ.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@
250250
#if defined(DMQ_DATABUS)
251251
#include "extras/databus/DataBus.h"
252252
#include "extras/databus/Participant.h"
253+
#include "extras/databus/DeadlineSubscription.h"
253254
#endif
254255

255256
#endif

DelegateMQ/delegate/DelegateOpt.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ namespace dmq
225225
#include "extras/allocator/xsstream.h"
226226
#include "extras/allocator/stl_allocator.h"
227227
#include "extras/allocator/xnew.h"
228+
#include "extras/allocator/xmake_shared.h"
228229
#else
229230
#include <string>
230231
#include <list>
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Allocator Suite
2+
3+
A robust, fixed-block memory allocator suite for C++ applications, designed for performance and deterministic behavior in resource-constrained or real-time systems.
4+
5+
## Features
6+
7+
- **Fixed-Block Allocation**: Prevents heap fragmentation by using pre-allocated pools of equal-sized blocks.
8+
- **C++ Class Integration**: `DECLARE_ALLOCATOR` and `IMPLEMENT_ALLOCATOR` macros provide easy integration with custom classes.
9+
- **XALLOCATOR Macro**: Overloads `operator new` and `operator delete` for seamless use.
10+
- **STL Compatibility**: `stl_allocator<T>` allows STL containers (`std::vector`, `std::list`, `std::map`, etc.) to use the fixed-block pool.
11+
- **Smart Pointer Support**: `xmake_shared<T>` creates `std::shared_ptr` with both the object and control block allocated from the pool.
12+
- **C API**: `xmalloc`, `xfree`, and `xrealloc` functions for C-style memory management.
13+
14+
## Components
15+
16+
- **Allocator**: The core engine for fixed-block management.
17+
- **AllocatorPool**: A template-based pool for specific types.
18+
- **stl_allocator**: STL-compliant allocator wrapper.
19+
- **xmake_shared**: Helper for pool-allocated shared pointers.
20+
- **xnew / xdelete**: Helper templates for placement new/destroy in the pool.
21+
22+
## Basic Usage
23+
24+
### Using XALLOCATOR in a class
25+
26+
```cpp
27+
class MyClass {
28+
XALLOCATOR
29+
public:
30+
int data;
31+
};
32+
33+
// Now 'new MyClass' uses the fixed-block allocator
34+
MyClass* obj = new MyClass();
35+
delete obj;
36+
```
37+
38+
### Using STL containers
39+
40+
```cpp
41+
#include "xlist.h"
42+
43+
// xlist is a typedef for std::list using stl_allocator
44+
xlist<int> myList;
45+
myList.push_back(10);
46+
```
47+
48+
### Using xmake_shared
49+
50+
```cpp
51+
auto sp = xmake_shared<MyClass>(args...);
52+
```
53+
54+
## Memory Alignment
55+
56+
The `xallocator` implementation ensures that all allocated blocks are correctly aligned for the target architecture:
57+
58+
- **Power-of-Two Rounding**: All requested block sizes are rounded up to the next power of two (8, 16, 32, 64, etc.). This inherently satisfies the natural alignment requirements of all standard C++ types (`int`, `double`, `long long`, etc.) and modern SIMD/AVX vector instructions.
59+
- **Header Alignment**: A `BLOCK_HEADER_SIZE` (16 bytes on 64-bit, 8 bytes on 32-bit) is added to each block to store metadata. Both the raw block and the user's data pointer remain aligned on 8 or 16-byte boundaries.
60+
- **Efficiency vs. Safety**: This strategy prioritizes "automatic" alignment and safety over maximum memory density. While it may result in some internal fragmentation, it ensures that memory is always safe for any data type without manual alignment configuration.
61+
62+
## Integration with DelegateMQ
63+
64+
To enable the allocator suite within DelegateMQ, define `DMQ_ALLOCATOR` in your build configuration. This will cause DelegateMQ's internal containers and remote delegate marshalling to use the fixed-block pools instead of the standard heap.

DelegateMQ/extras/allocator/stl_allocator.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,4 @@ inline bool operator==(const stl_allocator<T>&, const stl_allocator<U>&) { retur
6969
template <typename T, typename U>
7070
inline bool operator!=(const stl_allocator<T>&, const stl_allocator<U>&) { return false; }
7171

72-
// Create a shared_ptr with both the object and control block allocated from the fixed-block pool
73-
template <typename T, typename... Args>
74-
inline std::shared_ptr<T> xmake_shared(Args&&... args)
75-
{
76-
return std::allocate_shared<T>(stl_allocator<T>(), std::forward<Args>(args)...);
77-
}
78-
7972
#endif
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#ifndef _XMAKE_SHARED_H
2+
#define _XMAKE_SHARED_H
3+
4+
#include "stl_allocator.h"
5+
#include <memory>
6+
#include <utility>
7+
8+
/// @brief Create a shared_ptr with both the object and control block allocated
9+
/// from the fixed-block pool.
10+
template <typename T, typename... Args>
11+
inline std::shared_ptr<T> xmake_shared(Args&&... args)
12+
{
13+
return std::allocate_shared<T>(stl_allocator<T>(), std::forward<Args>(args)...);
14+
}
15+
16+
#endif // _XMAKE_SHARED_H

DelegateMQ/extras/databus/DataBus.h

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <vector>
1616
#include <functional>
1717
#include <typeindex>
18+
#include <atomic>
1819

1920
namespace dmq {
2021

@@ -121,16 +122,34 @@ class DataBus {
121122
template <typename T, typename F>
122123
dmq::ScopedConnection InternalSubscribe(const std::string& topic, F&& func, dmq::IThread* thread, QoS qos) {
123124
SignalPtr<T> signal;
124-
125+
125126
// Performance Note: Forced std::function construction for internal type management.
126127
std::function<void(T)> typedFunc = std::forward<F>(func);
128+
129+
// Wrap with min separation rate limiter if requested. Each subscriber gets its
130+
// own independent last-delivery timestamp, so different subscribers on the same
131+
// topic can have different (or no) rate limits without affecting each other.
132+
if (qos.minSeparation.has_value()) {
133+
auto minSepRep = qos.minSeparation.value().count();
134+
auto lastDeliveryRep = std::make_shared<std::atomic<int64_t>>(0);
135+
auto inner = std::move(typedFunc);
136+
typedFunc = [inner = std::move(inner), minSepRep, lastDeliveryRep](T data) {
137+
auto nowRep = static_cast<int64_t>(dmq::Clock::now().time_since_epoch().count());
138+
auto lastRep = lastDeliveryRep->load(std::memory_order_relaxed);
139+
if (nowRep - lastRep >= minSepRep) {
140+
lastDeliveryRep->store(nowRep, std::memory_order_relaxed);
141+
inner(data);
142+
}
143+
};
144+
}
145+
127146
T* cachedValPtr = nullptr;
128147
T cachedVal;
129148
dmq::ScopedConnection conn;
130149

131150
{
132151
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
133-
152+
134153
// 1. Enable LVC if requested (persists for topic lifetime until ResetForTesting)
135154
if (qos.lastValueCache) {
136155
m_topicQos[topic].lastValueCache = true;
@@ -154,17 +173,25 @@ class DataBus {
154173
if (qos.lastValueCache) {
155174
auto it = m_lastValues.find(topic);
156175
if (it != m_lastValues.end()) {
157-
cachedVal = *std::static_pointer_cast<T>(it->second);
158-
cachedValPtr = &cachedVal;
176+
// Check lifespan: skip delivery if the cached value is too old
177+
bool expired = false;
178+
if (qos.lifespan.has_value()) {
179+
auto age = dmq::Clock::now() - it->second.timestamp;
180+
expired = (age > qos.lifespan.value());
181+
}
182+
if (!expired) {
183+
cachedVal = *std::static_pointer_cast<T>(it->second.value);
184+
cachedValPtr = &cachedVal;
185+
}
159186
}
160187
}
161188
}
162189

163-
// 5. Dispatch LVC outside the lock to prevent deadlocks.
190+
// 5. Dispatch LVC outside the lock to prevent deadlocks.
164191
// IMPORTANT: Because this happens after releasing the lock, a high-frequency
165192
// publisher on another thread could have already sent a new value to the
166193
// connected signal. The subscriber might receive the fresh value FIRST,
167-
// followed by this stale LVC value. Users of LVC should ensure their
194+
// followed by this stale LVC value. Users of LVC should ensure their
168195
// logic handles potential out-of-order state arrival.
169196
if (cachedValPtr) {
170197
if (thread) {
@@ -207,7 +234,7 @@ class DataBus {
207234
// by any subscriber, it remains active for that topic until ResetForTesting().
208235
auto itQos = m_topicQos.find(topic);
209236
if (itQos != m_topicQos.end() && itQos->second.lastValueCache) {
210-
m_lastValues[topic] = std::make_shared<T>(data);
237+
m_lastValues[topic] = LvcEntry{ std::make_shared<T>(data), now };
211238
}
212239

213240
// 3. Prepare monitor data
@@ -337,12 +364,17 @@ class DataBus {
337364
return signal;
338365
}
339366

367+
struct LvcEntry {
368+
std::shared_ptr<void> value;
369+
dmq::TimePoint timestamp;
370+
};
371+
340372
dmq::RecursiveMutex m_mutex;
341373
std::unordered_map<std::string, std::shared_ptr<void>> m_signals;
342374
std::unordered_map<std::string, std::type_index> m_typeIndices;
343375
std::vector<std::shared_ptr<Participant>> m_participants;
344376
std::unordered_map<std::string, std::shared_ptr<void>> m_serializers;
345-
std::unordered_map<std::string, std::shared_ptr<void>> m_lastValues;
377+
std::unordered_map<std::string, LvcEntry> m_lastValues;
346378
std::unordered_map<std::string, QoS> m_topicQos;
347379
std::unordered_map<std::string, std::shared_ptr<void>> m_stringifiers;
348380
dmq::Signal<void(const SpyPacket&)> m_monitorSignal;

DelegateMQ/extras/databus/DataBusQos.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
#ifndef DMQ_DATABUSQOS_H
22
#define DMQ_DATABUSQOS_H
33

4+
#include "DelegateMQ.h"
5+
#include <optional>
6+
47
namespace dmq {
58

6-
// Quality of Service settings for a topic.
9+
// Quality of Service settings for a topic subscription.
710
struct QoS {
8-
bool lastValueCache = false; // If true, new subscribers receive the last published value.
11+
// If true, new subscribers receive the last published value immediately on subscribe.
12+
bool lastValueCache = false;
13+
14+
// Maximum age of a cached LVC value. If the cached value is older than this duration
15+
// when a new subscriber connects, it is considered stale and not delivered.
16+
// Only meaningful when lastValueCache = true.
17+
std::optional<dmq::Duration> lifespan;
18+
19+
// Minimum time between deliveries to this subscriber. Publishes that arrive faster
20+
// than this interval are silently dropped for this subscriber only. Other subscribers
21+
// with a different (or no) minSeparation are unaffected.
22+
std::optional<dmq::Duration> minSeparation;
923
};
1024

1125
} // namespace dmq
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#ifndef DMQ_DEADLINE_SUBSCRIPTION_H
2+
#define DMQ_DEADLINE_SUBSCRIPTION_H
3+
4+
/// @file DeadlineSubscription.h
5+
/// @see https://github.com/DelegateMQ/DelegateMQ
6+
/// David Lafreniere, 2025.
7+
///
8+
/// @brief RAII helper that combines a DataBus subscription with a deadline timer.
9+
///
10+
/// @details
11+
/// `DeadlineSubscription<T>` monitors a DataBus topic and fires a user callback
12+
/// if no message arrives within a configurable deadline window. It is built
13+
/// entirely from existing DelegateMQ primitives — `DataBus::Subscribe`, `Timer`,
14+
/// and `ScopedConnection` — and adds no library-internal mechanism.
15+
///
16+
/// **How it works:**
17+
/// A `Timer` is started at construction time. Every incoming message resets the
18+
/// timer. If the timer fires before the next message arrives, the `onMissed`
19+
/// callback is invoked. Both the data handler and the deadline callback are
20+
/// dispatched to the same optional worker thread.
21+
///
22+
/// **Lifetime:**
23+
/// The object is non-copyable and non-movable. All resources — the DataBus
24+
/// connection, the timer expiry connection, and the timer itself — are released
25+
/// automatically when the object is destroyed. Destruction is always clean:
26+
/// members are destroyed in reverse declaration order, so the DataBus connection
27+
/// disconnects before the timer is torn down.
28+
///
29+
/// **`Timer::ProcessTimers()` requirement:**
30+
/// The deadline timer fires only when `Timer::ProcessTimers()` is called. On
31+
/// platforms with a running `Thread`, this is typically driven by the thread's
32+
/// internal timer. On bare-metal targets, call `ProcessTimers()` from the main
33+
/// super-loop or a SysTick handler. If `ProcessTimers()` is not called, the
34+
/// deadline callback silently never fires.
35+
///
36+
/// **`onMissed` callback context:**
37+
/// - With a `thread` argument: the callback is dispatched asynchronously to
38+
/// that thread, matching the delivery context of the data handler.
39+
/// - Without a `thread` argument: the callback fires synchronously on whatever
40+
/// thread calls `Timer::ProcessTimers()`. On bare-metal this may be an ISR —
41+
/// keep the callback short and non-blocking.
42+
///
43+
/// **Usage:**
44+
/// @code
45+
/// dmq::DeadlineSubscription<SensorData> m_watch{
46+
/// "sensor/temp",
47+
/// std::chrono::milliseconds(500),
48+
/// [](const SensorData& d) { /* handle data */ },
49+
/// []() { /* deadline missed — sensor silent */ },
50+
/// &m_workerThread
51+
/// };
52+
/// @endcode
53+
54+
#include "DelegateMQ.h"
55+
#include "extras/databus/DataBus.h"
56+
#include "extras/util/Timer.h"
57+
#include <functional>
58+
#include <string>
59+
60+
namespace dmq {
61+
62+
template <typename T>
63+
class DeadlineSubscription {
64+
public:
65+
/// Construct a deadline-monitored DataBus subscription.
66+
///
67+
/// @param topic DataBus topic to subscribe to.
68+
/// @param deadline Maximum allowed interval between deliveries. Must be > 0.
69+
/// @param handler Called on each data delivery.
70+
/// @param onMissed Called when no delivery arrives within the deadline window.
71+
/// @param thread Optional worker thread for both callbacks. If nullptr,
72+
/// handler fires on the publisher's thread and onMissed fires
73+
/// on the Timer::ProcessTimers() thread.
74+
DeadlineSubscription(
75+
const std::string& topic,
76+
dmq::Duration deadline,
77+
std::function<void(const T&)> handler,
78+
std::function<void()> onMissed,
79+
dmq::IThread* thread = nullptr)
80+
: m_deadline(deadline)
81+
{
82+
// Connect onMissed to the timer expiry signal, dispatching to thread if provided
83+
if (thread) {
84+
m_timerConn = m_timer.OnExpired.Connect(
85+
MakeDelegate(std::move(onMissed), *thread));
86+
} else {
87+
m_timerConn = m_timer.OnExpired.Connect(
88+
MakeDelegate(std::move(onMissed)));
89+
}
90+
91+
// Arm the timer immediately. It fires if no delivery arrives within deadline.
92+
m_timer.Start(m_deadline, false);
93+
94+
// Subscribe and reset the timer on every delivery
95+
m_conn = DataBus::Subscribe<T>(topic,
96+
[this, h = std::move(handler)](const T& data) {
97+
m_timer.Start(m_deadline, false); // reset deadline window
98+
h(data);
99+
}, thread);
100+
}
101+
102+
~DeadlineSubscription() = default;
103+
104+
DeadlineSubscription(const DeadlineSubscription&) = delete;
105+
DeadlineSubscription& operator=(const DeadlineSubscription&) = delete;
106+
DeadlineSubscription(DeadlineSubscription&&) = delete;
107+
DeadlineSubscription& operator=(DeadlineSubscription&&) = delete;
108+
109+
private:
110+
// Declaration order controls destruction order (reverse).
111+
// m_conn disconnects first (no more timer resets via the data lambda),
112+
// then m_timerConn disconnects (onMissed removed from timer signal),
113+
// then m_timer destructs (removed from global timer list).
114+
dmq::Duration m_deadline;
115+
Timer m_timer;
116+
dmq::ScopedConnection m_timerConn;
117+
dmq::ScopedConnection m_conn;
118+
};
119+
120+
} // namespace dmq
121+
122+
#endif // DMQ_DEADLINE_SUBSCRIPTION_H

0 commit comments

Comments
 (0)