Skip to content

Commit 10852a5

Browse files
committed
Update DelegateMQ library
1 parent 201c0fc commit 10852a5

File tree

11 files changed

+116
-51
lines changed

11 files changed

+116
-51
lines changed

DelegateMQ/delegate/DelegateAsync.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ class DelegateFreeAsync<RetType(Args...)> : public DelegateFree<RetType(Args...)
267267
}
268268
else {
269269
// Create a clone instance of this delegate
270-
auto delegate = std::shared_ptr<ClassType>(Clone());
270+
auto delegate = xmake_shared<ClassType>(*this);
271271
if (!delegate)
272272
BAD_ALLOC();
273273

@@ -580,7 +580,7 @@ class DelegateMemberAsync<TClass, RetType(Args...)> : public DelegateMember<TCla
580580
}
581581
else {
582582
// Create a clone instance of this delegate
583-
auto delegate = std::shared_ptr<ClassType>(Clone());
583+
auto delegate = xmake_shared<ClassType>(*this);
584584
if (!delegate)
585585
BAD_ALLOC();
586586

@@ -822,7 +822,7 @@ class DelegateMemberAsyncSp<TClass, RetType(Args...)> : public DelegateMemberSp<
822822
}
823823
else {
824824
// Create a clone instance of this delegate
825-
auto delegate = std::shared_ptr<ClassType>(Clone());
825+
auto delegate = xmake_shared<ClassType>(*this);
826826
if (!delegate)
827827
BAD_ALLOC();
828828

@@ -1076,7 +1076,7 @@ class DelegateFunctionAsync<RetType(Args...)> : public DelegateFunction<RetType(
10761076
}
10771077
else {
10781078
// Create a clone instance of this delegate
1079-
auto delegate = std::shared_ptr<ClassType>(Clone());
1079+
auto delegate = xmake_shared<ClassType>(*this);
10801080
if (!delegate)
10811081
BAD_ALLOC();
10821082

DelegateMQ/delegate/DelegateAsyncWait.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ class DelegateFreeAsyncWait<RetType(Args...)> : public DelegateFree<RetType(Args
325325
return BaseType::operator()(std::forward<Args>(args)...);
326326
} else {
327327
// Create a clone instance of this delegate
328-
auto delegate = std::shared_ptr<ClassType>(Clone());
328+
auto delegate = xmake_shared<ClassType>(*this);
329329
if (!delegate)
330330
BAD_ALLOC();
331331

@@ -761,7 +761,7 @@ class DelegateMemberAsyncWait<TClass, RetType(Args...)> : public DelegateMember<
761761
return BaseType::operator()(std::forward<Args>(args)...);
762762
} else {
763763
// Create a clone instance of this delegate
764-
auto delegate = std::shared_ptr<ClassType>(Clone());
764+
auto delegate = xmake_shared<ClassType>(*this);
765765
if (!delegate)
766766
BAD_ALLOC();
767767

@@ -1114,7 +1114,7 @@ class DelegateMemberAsyncWaitSp<TClass, RetType(Args...)> : public DelegateMembe
11141114
return BaseType::operator()(std::forward<Args>(args)...);
11151115
} else {
11161116
// Create a clone instance of this delegate
1117-
auto delegate = std::shared_ptr<ClassType>(Clone());
1117+
auto delegate = xmake_shared<ClassType>(*this);
11181118
if (!delegate)
11191119
BAD_ALLOC();
11201120

@@ -1469,7 +1469,7 @@ class DelegateFunctionAsyncWait<RetType(Args...)> : public DelegateFunction<RetT
14691469
return BaseType::operator()(std::forward<Args>(args)...);
14701470
} else {
14711471
// Create a clone instance of this delegate
1472-
auto delegate = std::shared_ptr<ClassType>(Clone());
1472+
auto delegate = xmake_shared<ClassType>(*this);
14731473
if (!delegate)
14741474
BAD_ALLOC();
14751475

DelegateMQ/delegate/DelegateOpt.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ namespace dmq
139139
#include "predef/allocator/xlist.h"
140140
#include "predef/allocator/xsstream.h"
141141
#include "predef/allocator/stl_allocator.h"
142+
#include "predef/allocator/xnew.h"
142143
#else
143144
#include <string>
144145
#include <list>
@@ -170,6 +171,17 @@ namespace dmq
170171
{
171172
return std::make_shared<T>(std::forward<Args>(args)...);
172173
}
174+
175+
// Fallback xnew/xdelete — use standard new/delete when fixed-block allocator is disabled
176+
template<typename T, typename... Args>
177+
inline T* xnew(Args&&... args) {
178+
return new(std::nothrow) T(std::forward<Args>(args)...);
179+
}
180+
181+
template<typename T>
182+
inline void xdelete(T* p) {
183+
delete p;
184+
}
173185
#endif
174186

175187
// @TODO: Select the desired logging (see Predef.cmake).

DelegateMQ/delegate/make_tuple_heap.h

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ class heap_arg_deleter : public heap_arg_deleter_base
6868
{
6969
public:
7070
heap_arg_deleter(T& arg) : m_arg(arg) { }
71-
virtual ~heap_arg_deleter() {
72-
delete &m_arg;
71+
virtual ~heap_arg_deleter() {
72+
xdelete(&m_arg);
7373
}
7474
private:
7575
T& m_arg;
@@ -81,8 +81,8 @@ class heap_arg_deleter<T*> : public heap_arg_deleter_base
8181
{
8282
public:
8383
heap_arg_deleter(T* arg) : m_arg(arg) { }
84-
virtual ~heap_arg_deleter() {
85-
delete m_arg;
84+
virtual ~heap_arg_deleter() {
85+
xdelete(m_arg);
8686
}
8787
private:
8888
T* m_arg;
@@ -93,13 +93,17 @@ template<typename T>
9393
class heap_arg_deleter<T**> : public heap_arg_deleter_base
9494
{
9595
public:
96-
heap_arg_deleter(T** arg) : m_arg(arg) {}
96+
// Capture the original inner pointer at construction so the destructor always
97+
// frees the xnew'd copy, regardless of whether the target function overwrites
98+
// *arg (e.g. an outgoing-argument pattern like (*s) = new T).
99+
heap_arg_deleter(T** arg) : m_arg(arg), m_inner(*arg) {}
97100
virtual ~heap_arg_deleter() {
98-
delete *m_arg;
99-
delete m_arg;
101+
xdelete(m_inner); // free the original xnew'd inner copy (may be nullptr)
102+
xdelete(m_arg); // free the outer pointer storage
100103
}
101104
private:
102105
T** m_arg;
106+
T* m_inner; // original value of *arg at dispatch time
103107
};
104108

105109
/// @brief Append a pointer to pointer argument to the tuple
@@ -111,28 +115,28 @@ auto tuple_append(xlist<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, const
111115
// Check if arg is nullptr or *arg is nullptr
112116
if (arg != nullptr && *arg != nullptr) {
113117
// Allocate memory for heap_arg and copy the value
114-
heap_arg = new(std::nothrow) Arg * ();
118+
heap_arg = xnew<Arg*>();
115119
if (!heap_arg) {
116120
BAD_ALLOC();
117121
}
118-
119-
*heap_arg = new(std::nothrow) Arg(**arg);
122+
123+
*heap_arg = xnew<Arg>(**arg);
120124
if (!*heap_arg) {
121-
delete heap_arg;
125+
xdelete(heap_arg);
122126
BAD_ALLOC();
123127
}
124128
}
125129
else {
126130
// If arg is nullptr or *arg is nullptr, create heap_arg as nullptr
127-
heap_arg = new(std::nothrow) Arg * (nullptr);
131+
heap_arg = xnew<Arg*>(nullptr);
128132
if (!heap_arg) {
129133
BAD_ALLOC();
130134
}
131135
}
132-
std::shared_ptr<heap_arg_deleter_base> deleter(new(std::nothrow) heap_arg_deleter<Arg**>(heap_arg));
136+
auto deleter = xmake_shared<heap_arg_deleter<Arg**>>(heap_arg);
133137
if (!deleter) {
134-
delete* heap_arg;
135-
delete heap_arg;
138+
xdelete(*heap_arg);
139+
xdelete(heap_arg);
136140
BAD_ALLOC();
137141
}
138142

@@ -157,14 +161,14 @@ auto tuple_append(xlist<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, const
157161
{
158162
Arg* heap_arg = nullptr;
159163
if (arg != nullptr) {
160-
heap_arg = new(std::nothrow) Arg(*arg); // Only create a new Arg if arg is not nullptr
164+
heap_arg = xnew<Arg>(*arg);
161165
if (!heap_arg) {
162166
BAD_ALLOC();
163167
}
164168
}
165-
std::shared_ptr<heap_arg_deleter_base> deleter(new(std::nothrow) heap_arg_deleter<Arg*>(heap_arg));
169+
auto deleter = xmake_shared<heap_arg_deleter<Arg*>>(heap_arg);
166170
if (!deleter) {
167-
delete heap_arg;
171+
xdelete(heap_arg);
168172
BAD_ALLOC();
169173
}
170174

@@ -187,13 +191,13 @@ auto tuple_append(xlist<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, const
187191
template <typename Arg, typename... TupleElem>
188192
auto tuple_append(xlist<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, const std::tuple<TupleElem...> &tup, Arg& arg)
189193
{
190-
Arg* heap_arg = new(std::nothrow) Arg(arg);
194+
Arg* heap_arg = xnew<Arg>(arg);
191195
if (!heap_arg) {
192196
BAD_ALLOC();
193197
}
194-
std::shared_ptr<heap_arg_deleter_base> deleter(new(std::nothrow) heap_arg_deleter<Arg*>(heap_arg));
198+
auto deleter = xmake_shared<heap_arg_deleter<Arg*>>(heap_arg);
195199
if (!deleter) {
196-
delete heap_arg;
200+
xdelete(heap_arg);
197201
BAD_ALLOC();
198202
}
199203

DelegateMQ/predef/allocator/xnew.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#ifndef _XNEW_H
2+
#define _XNEW_H
3+
4+
// @see https://github.com/endurodave/DelegateMQ
5+
// David Lafreniere, 2026.
6+
7+
/// @file xnew.h
8+
/// @brief Fixed-block pool equivalents of operator new / delete.
9+
///
10+
/// @details
11+
/// xnew<T>(args...) allocates from the xallocator fixed-block pool and
12+
/// constructs T in-place via placement new. xdelete<T>(p) calls the
13+
/// destructor then returns the memory to the pool via xfree().
14+
///
15+
/// These are the pool-aware counterparts of new(std::nothrow) / delete
16+
/// for types that do not carry the XALLOCATOR macro (e.g. user argument
17+
/// types marshalled through make_tuple_heap).
18+
19+
#include "xallocator.h"
20+
21+
/// @brief Allocate and construct T in the fixed-block pool.
22+
/// @return Pointer to the constructed object, or nullptr if xmalloc fails.
23+
template<typename T, typename... Args>
24+
inline T* xnew(Args&&... args)
25+
{
26+
void* mem = xmalloc(sizeof(T));
27+
if (!mem) return nullptr;
28+
return ::new(mem) T(std::forward<Args>(args)...);
29+
}
30+
31+
/// @brief Destroy T and return its memory to the fixed-block pool.
32+
template<typename T>
33+
inline void xdelete(T* p)
34+
{
35+
if (p)
36+
{
37+
p->~T();
38+
xfree(const_cast<void*>(static_cast<const void*>(p)));
39+
}
40+
}
41+
42+
#endif // _XNEW_H

DelegateMQ/predef/os/stdlib/Thread.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ using namespace dmq;
1616
// Thread
1717
//----------------------------------------------------------------------------
1818
Thread::Thread(const std::string& threadName, size_t maxQueueSize)
19-
: m_thread(nullptr)
19+
: m_thread(std::nullopt)
2020
, m_exit(false)
2121
, THREAD_NAME(threadName)
2222
, MAX_QUEUE_SIZE(maxQueueSize)
@@ -38,17 +38,17 @@ bool Thread::CreateThread(std::optional<dmq::Duration> watchdogTimeout)
3838
{
3939
if (!m_thread)
4040
{
41-
m_threadStartPromise = std::promise<void>();
42-
m_threadStartFuture = m_threadStartPromise.get_future();
41+
m_threadStartPromise.emplace();
42+
m_threadStartFuture.emplace(m_threadStartPromise->get_future());
4343
m_exit = false;
4444

45-
m_thread = std::unique_ptr<std::thread>(new thread(&Thread::Process, this));
45+
m_thread.emplace(&Thread::Process, this);
4646

4747
auto handle = m_thread->native_handle();
4848
SetThreadName(handle, THREAD_NAME);
4949

5050
// Wait for the thread to enter the Process method
51-
m_threadStartFuture.get();
51+
m_threadStartFuture->get();
5252

5353
m_lastAliveTime.store(Timer::GetNow());
5454

@@ -59,12 +59,12 @@ bool Thread::CreateThread(std::optional<dmq::Duration> watchdogTimeout)
5959
m_watchdogTimeout = watchdogTimeout.value();
6060

6161
// Timer to ensure the Thread instance runs periodically.
62-
m_threadTimer = std::make_unique<Timer>();
62+
m_threadTimer = std::unique_ptr<Timer>(new Timer());
6363
m_threadTimerConn = m_threadTimer->OnExpired.Connect(MakeDelegate(this, &Thread::ThreadCheck, *this));
6464
m_threadTimer->Start(m_watchdogTimeout.load() / 4);
6565

6666
// Timer to check that this Thread instance runs.
67-
m_watchdogTimer = std::make_unique<Timer>();
67+
m_watchdogTimer = std::unique_ptr<Timer>(new Timer());
6868
m_watchdogTimerConn = m_watchdogTimer->OnExpired.Connect(MakeDelegate(this, &Thread::WatchdogCheck));
6969
m_watchdogTimer->Start(m_watchdogTimeout.load() / 2);
7070
}
@@ -79,7 +79,7 @@ bool Thread::CreateThread(std::optional<dmq::Duration> watchdogTimeout)
7979
//----------------------------------------------------------------------------
8080
std::thread::id Thread::GetThreadId()
8181
{
82-
if (m_thread == nullptr)
82+
if (!m_thread.has_value())
8383
throw std::invalid_argument("Thread pointer is null");
8484

8585
return m_thread->get_id();
@@ -147,7 +147,7 @@ void Thread::ExitThread()
147147
}
148148

149149
// Create a new ThreadMsg
150-
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));
150+
auto threadMsg = xmake_shared<ThreadMsg>(MSG_EXIT_THREAD, nullptr);
151151

152152
{
153153
lock_guard<mutex> lock(m_mutex);
@@ -182,7 +182,7 @@ void Thread::ExitThread()
182182

183183
{
184184
lock_guard<mutex> lock(m_mutex);
185-
m_thread = nullptr;
185+
m_thread.reset();
186186
while (!m_queue.empty())
187187
m_queue.pop();
188188

@@ -202,11 +202,11 @@ void Thread::DispatchDelegate(std::shared_ptr<dmq::DelegateMsg> msg)
202202
if (m_exit.load())
203203
return;
204204

205-
if (m_thread == nullptr)
205+
if (!m_thread.has_value())
206206
throw std::invalid_argument("Thread pointer is null");
207207

208208
// If using XALLOCATOR explicit operator new required. See xallocator.h.
209-
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_DISPATCH_DELEGATE, msg));
209+
auto threadMsg = xmake_shared<ThreadMsg>(MSG_DISPATCH_DELEGATE, msg);
210210

211211
std::unique_lock<std::mutex> lk(m_mutex);
212212

@@ -264,7 +264,7 @@ void Thread::ThreadCheck()
264264
void Thread::Process()
265265
{
266266
// Signal that the thread has started processing to notify CreateThread
267-
m_threadStartPromise.set_value();
267+
m_threadStartPromise->set_value();
268268

269269
LOG_INFO("Thread::Process Start {}", THREAD_NAME);
270270

DelegateMQ/predef/os/stdlib/Thread.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,16 @@ class Thread : public dmq::IThread
107107
/// other user delegate events to be handled.
108108
void ThreadCheck();
109109

110-
std::unique_ptr<std::thread> m_thread;
110+
std::optional<std::thread> m_thread;
111+
#ifdef DMQ_ALLOCATOR
112+
std::priority_queue<std::shared_ptr<ThreadMsg>,
113+
std::vector<std::shared_ptr<ThreadMsg>, stl_allocator<std::shared_ptr<ThreadMsg>>>,
114+
ThreadMsgComparator> m_queue;
115+
#else
111116
std::priority_queue<std::shared_ptr<ThreadMsg>,
112117
std::vector<std::shared_ptr<ThreadMsg>>,
113118
ThreadMsgComparator> m_queue;
119+
#endif
114120
std::mutex m_mutex;
115121
std::condition_variable m_cv;
116122

@@ -122,9 +128,9 @@ class Thread : public dmq::IThread
122128
// Max queue size for back pressure (0 = unlimited)
123129
const size_t MAX_QUEUE_SIZE;
124130

125-
// Promise and future to synchronize thread start
126-
std::promise<void> m_threadStartPromise;
127-
std::future<void> m_threadStartFuture;
131+
// Promise and future to synchronize thread start (constructed lazily in CreateThread)
132+
std::optional<std::promise<void>> m_threadStartPromise;
133+
std::optional<std::future<void>> m_threadStartFuture;
128134

129135
std::atomic<bool> m_exit;
130136

DelegateMQ/predef/os/win32/Thread.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ bool Thread::CreateThread(std::optional<dmq::Duration> watchdogTimeout)
6060
m_watchdogTimeout = watchdogTimeout.value();
6161

6262
// Timer to ensure the Thread instance runs periodically.
63-
m_threadTimer = std::make_unique<Timer>();
63+
m_threadTimer = std::unique_ptr<Timer>(new Timer());
6464
m_threadTimerConn = m_threadTimer->OnExpired.Connect(MakeDelegate(this, &Thread::ThreadCheck, *this));
6565
m_threadTimer->Start(m_watchdogTimeout.load() / 4);
6666

6767
// Timer to check that this Thread instance runs.
68-
m_watchdogTimer = std::make_unique<Timer>();
68+
m_watchdogTimer = std::unique_ptr<Timer>(new Timer());
6969
m_watchdogTimerConn = m_watchdogTimer->OnExpired.Connect(MakeDelegate(this, &Thread::WatchdogCheck));
7070
m_watchdogTimer->Start(m_watchdogTimeout.load() / 2);
7171
}

0 commit comments

Comments
 (0)