Skip to content

Commit 0a1126c

Browse files
committed
Update DelegateMQ library
1 parent e281dac commit 0a1126c

53 files changed

Lines changed: 3401 additions & 874 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/cmake_clang.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ jobs:
2424

2525
- name: Run IntegrationTestFrameworkApp
2626
run: ./build/IntegrationTestFrameworkApp # Run the built executable
27+
28+

.github/workflows/cmake_ubuntu.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ jobs:
2424

2525
- name: Run IntegrationTestFrameworkApp
2626
run: ./build/IntegrationTestFrameworkApp # Run the built executable
27+
28+

.github/workflows/cmake_windows.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ on:
88
branches:
99
- main # Trigger on pull
1010

11+
1112
jobs:
1213
build:
13-
runs-on: windows-latest # Use Windows environment for the build
14+
runs-on: windows-2022 # Use Windows environment for the build
1415

1516
steps:
1617
- name: Checkout code
1718
uses: actions/checkout@v4 # Checkout the repository code
1819

19-
- name: Set up Visual Studio
20-
uses: microsoft/setup-msbuild@v2 # Set up Visual Studio environment (MSBuild)
21-
2220
- name: Configure CMake
23-
run: cmake -S . -B build -G "Visual Studio 17 2022" -DENABLE_IT=ON # Configure CMake for Visual Studio
21+
run: cmake -S . -B build -DENABLE_IT=ON # Configure CMake for Visual Studio
2422

2523
- name: Build
2624
run: cmake --build build --config Release # Build the project using CMake with Release configuration
2725

2826
- name: Run IntegrationTestFrameworkApp
2927
run: .\build\Release\IntegrationTestFrameworkApp.exe # Run the built executable (adjust path for MSBuild)
28+
29+

DelegateMQ/DelegateMQ.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,15 @@
233233
#endif
234234

235235
#include "extras/util/Fault.h"
236+
#include "extras/util/ClockHelper.h"
236237

237238
// Only include Timer and AsyncInvoke if threads exist
238239
#if !defined(DMQ_THREAD_NONE)
239240
#include "extras/util/Timer.h"
241+
#include "extras/util/TimerDelegate.h"
240242
#include "extras/util/AsyncInvoke.h"
241243
#include "extras/util/TransportMonitor.h"
244+
#include "extras/util/ThreadMonitor.h"
242245
#endif
243246

244247
// Only include NetworkEngine if a transport that uses it is active

DelegateMQ/delegate/DelegateAsync.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ class DelegateFreeAsync<RetType(Args...)> : public DelegateFree<RetType(Args...)
280280
if (thread) {
281281
// Dispatch message onto the callback destination thread. Invoke()
282282
// will be called by the destintation thread.
283-
thread->DispatchDelegate(msg);
283+
bool success = thread->DispatchDelegate(msg);
284+
(void)success;
284285
}
285286

286287
// Do not wait for destination thread return value from async function call
@@ -593,7 +594,8 @@ class DelegateMemberAsync<TClass, RetType(Args...)> : public DelegateMember<TCla
593594
if (thread) {
594595
// Dispatch message onto the callback destination thread. Invoke()
595596
// will be called by the destintation thread.
596-
thread->DispatchDelegate(msg);
597+
bool success = thread->DispatchDelegate(msg);
598+
(void)success;
597599
}
598600

599601
// Do not wait for destination thread return value from async function call
@@ -835,7 +837,8 @@ class DelegateMemberAsyncSp<TClass, RetType(Args...)> : public DelegateMemberSp<
835837
if (thread) {
836838
// Dispatch message onto the callback destination thread. Invoke()
837839
// will be called by the destintation thread.
838-
thread->DispatchDelegate(msg);
840+
bool success = thread->DispatchDelegate(msg);
841+
(void)success;
839842
}
840843

841844
// Do not wait for destination thread return value from async function call
@@ -1089,7 +1092,8 @@ class DelegateFunctionAsync<RetType(Args...)> : public DelegateFunction<RetType(
10891092
if (thread) {
10901093
// Dispatch message onto the callback destination thread. Invoke()
10911094
// will be called by the destintation thread.
1092-
thread->DispatchDelegate(msg);
1095+
bool success = thread->DispatchDelegate(msg);
1096+
(void)success;
10931097
}
10941098

10951099
// Do not wait for destination thread return value from async function call

DelegateMQ/delegate/DelegateAsyncWait.h

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,14 @@ class DelegateFreeAsyncWait<RetType(Args...)> : public DelegateFree<RetType(Args
342342
if (thread) {
343343
// Dispatch message onto the callback destination thread. Invoke()
344344
// will be called by the destination thread.
345-
thread->DispatchDelegate(msg);
346-
347-
// Wait for destination thread to execute the delegate function and get return value
348-
if (msg->GetSema().Wait(m_timeout)) {
349-
// Wait succeeded. Now acquire lock to safely read the value.
350-
const dmq::LockGuard<Mutex> lock(msg->GetLock());
351-
m_success = true;
352-
m_retVal = delegate->m_retVal;
345+
if (thread->DispatchDelegate(msg)) {
346+
// Wait for destination thread to execute the delegate function and get return value
347+
if (msg->GetSema().Wait(m_timeout)) {
348+
// Wait succeeded. Now acquire lock to safely read the value.
349+
const dmq::LockGuard<Mutex> lock(msg->GetLock());
350+
m_success = true;
351+
m_retVal = delegate->m_retVal;
352+
}
353353
}
354354
}
355355

@@ -778,14 +778,14 @@ class DelegateMemberAsyncWait<TClass, RetType(Args...)> : public DelegateMember<
778778
if (thread) {
779779
// Dispatch message onto the callback destination thread. Invoke()
780780
// will be called by the destination thread.
781-
thread->DispatchDelegate(msg);
782-
783-
// Wait for destination thread to execute the delegate function and get return value
784-
if (msg->GetSema().Wait(m_timeout)) {
785-
// Wait succeeded. Now acquire lock to safely read the value.
786-
const dmq::LockGuard<Mutex> lock(msg->GetLock());
787-
m_success = true;
788-
m_retVal = delegate->m_retVal;
781+
if (thread->DispatchDelegate(msg)) {
782+
// Wait for destination thread to execute the delegate function and get return value
783+
if (msg->GetSema().Wait(m_timeout)) {
784+
// Wait succeeded. Now acquire lock to safely read the value.
785+
const dmq::LockGuard<Mutex> lock(msg->GetLock());
786+
m_success = true;
787+
m_retVal = delegate->m_retVal;
788+
}
789789
}
790790
}
791791

@@ -1131,14 +1131,14 @@ class DelegateMemberAsyncWaitSp<TClass, RetType(Args...)> : public DelegateMembe
11311131
if (thread) {
11321132
// Dispatch message onto the callback destination thread. Invoke()
11331133
// will be called by the destination thread.
1134-
thread->DispatchDelegate(msg);
1135-
1136-
// Wait for destination thread to execute the delegate function and get return value
1137-
if (msg->GetSema().Wait(m_timeout)) {
1138-
// Wait succeeded. Now acquire lock to safely read the value.
1139-
const dmq::LockGuard<Mutex> lock(msg->GetLock());
1140-
m_success = true;
1141-
m_retVal = delegate->m_retVal;
1134+
if (thread->DispatchDelegate(msg)) {
1135+
// Wait for destination thread to execute the delegate function and get return value
1136+
if (msg->GetSema().Wait(m_timeout)) {
1137+
// Wait succeeded. Now acquire lock to safely read the value.
1138+
const dmq::LockGuard<Mutex> lock(msg->GetLock());
1139+
m_success = true;
1140+
m_retVal = delegate->m_retVal;
1141+
}
11421142
}
11431143
}
11441144

@@ -1486,14 +1486,14 @@ class DelegateFunctionAsyncWait<RetType(Args...)> : public DelegateFunction<RetT
14861486
if (thread) {
14871487
// Dispatch message onto the callback destination thread. Invoke()
14881488
// will be called by the destination thread.
1489-
thread->DispatchDelegate(msg);
1490-
1491-
// Wait for destination thread to execute the delegate function and get return value
1492-
if (msg->GetSema().Wait(m_timeout)) {
1493-
// Wait succeeded. Now acquire lock to safely read the value.
1494-
const dmq::LockGuard<Mutex> lock(msg->GetLock());
1495-
m_success = true;
1496-
m_retVal = delegate->m_retVal;
1489+
if (thread->DispatchDelegate(msg)) {
1490+
// Wait for destination thread to execute the delegate function and get return value
1491+
if (msg->GetSema().Wait(m_timeout)) {
1492+
// Wait succeeded. Now acquire lock to safely read the value.
1493+
const dmq::LockGuard<Mutex> lock(msg->GetLock());
1494+
m_success = true;
1495+
m_retVal = delegate->m_retVal;
1496+
}
14971497
}
14981498
}
14991499

DelegateMQ/delegate/DelegateOpt.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,25 @@ namespace dmq
139139
using Duration = typename Clock::duration;
140140
using TimePoint = typename Clock::time_point;
141141

142+
/// @brief Default timeout for the TIMEOUT queue-full policy across all Thread ports.
143+
/// Override per-thread at construction or project-wide before including this header.
144+
inline constexpr std::chrono::seconds DEFAULT_DISPATCH_TIMEOUT{2};
145+
146+
// --- RESOURCE LIMITS & SBO CONFIGURATION ---
147+
148+
/// @brief Max timers processed in one tick without heap allocation.
149+
inline constexpr size_t MAX_TIMER_EXPIRED = 16;
150+
151+
/// @brief Signal Small-Buffer Optimization (SBO) count.
152+
/// Signals with <= this many subscribers are invoked heap-free.
153+
inline constexpr size_t SIGNAL_SBO_COUNT = 8;
154+
155+
/// @brief Default internal queue size for all dmq::os::Thread ports.
156+
inline constexpr size_t DEFAULT_QUEUE_SIZE = 20;
157+
158+
/// @brief Max number of threads that can be monitored by the watchdog.
159+
inline constexpr size_t MAX_WATCHDOG_THREADS = 16;
160+
142161
// --- MUTEX / LOCK SELECTION ---
143162
#if defined(DMQ_THREAD_STDLIB) || defined(DMQ_THREAD_WIN32) || defined(DMQ_THREAD_QT)
144163
// Windows / Linux / macOS / Qt

DelegateMQ/delegate/IThread.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class IThread
3535
///
3636
/// @param[in] msg A shared pointer to the delegate message. This pointer must remain valid
3737
/// until the target thread finishes execution.
38-
virtual void DispatchDelegate(std::shared_ptr<DelegateMsg> msg) = 0;
38+
/// @return true if the message was successfully enqueued, false otherwise.
39+
virtual bool DispatchDelegate(std::shared_ptr<DelegateMsg> msg) = 0;
3940

4041
/// @brief Returns true if the calling thread is this thread.
4142
/// Used to decide whether to marshal an event or execute inline.

DelegateMQ/delegate/MulticastDelegate.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,13 @@ class MulticastDelegate<RetType(Args...)>
233233
int& m_cnt;
234234
MulticastDelegate* m_container;
235235
};
236-
236+
protected:
237237
/// List of registered delegates
238238
xlist<std::shared_ptr<DelegateType>> m_delegates;
239239

240240
/// Count of active nested broadcasts
241241
int m_broadcastCount = 0;
242-
242+
243243
/// Flag for handling lazy delete
244244
bool m_cleanup = false;
245245
};

DelegateMQ/delegate/MulticastDelegateSafe.h

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,47 @@ class MulticastDelegateSafe<RetType(Args...)> : public MulticastDelegate<RetType
4747
/// A void return value is used since multiple targets invoked.
4848
/// @param[in] args The arguments used when invoking the target functions
4949
void operator()(Args... args) {
50-
const dmq::LockGuard<RecursiveMutex> lock(m_lock);
51-
BaseType::operator ()(args...);
50+
// To prevent deadlocks, the mutex must be released before invoking the
51+
// delegate. Circular lock dependencies can occur if the delegate target
52+
// function itself attempts to acquire a lock that is held by the
53+
// thread invoking the delegate.
54+
// Use a small-buffer optimization to avoid heap allocation in the common case.
55+
std::shared_ptr<DelegateType> small_buf[SIGNAL_SBO_COUNT];
56+
xlist<std::shared_ptr<DelegateType>> large_buf;
57+
size_t count = 0;
58+
59+
{
60+
const dmq::LockGuard<RecursiveMutex> lock(m_lock);
61+
count = this->m_delegates.size();
62+
if (count <= SIGNAL_SBO_COUNT) {
63+
size_t i = 0;
64+
for (auto& d : this->m_delegates) {
65+
small_buf[i++] = d;
66+
}
67+
} else {
68+
large_buf = this->m_delegates;
69+
}
70+
}
71+
72+
if (count <= SIGNAL_SBO_COUNT) {
73+
for (size_t i = 0; i < count; ++i) {
74+
if (small_buf[i])
75+
(*small_buf[i])(args...);
76+
small_buf[i].reset(); // Clear to release shared_ptr immediately
77+
}
78+
} else {
79+
for (auto& d : large_buf) {
80+
if (d)
81+
(*d)(args...);
82+
}
83+
}
5284
}
5385

5486
/// Invoke all bound target functions. A void return value is used
5587
/// since multiple targets invoked.
5688
/// @param[in] args The arguments used when invoking the target functions
5789
void Broadcast(Args... args) {
58-
const dmq::LockGuard<RecursiveMutex> lock(m_lock);
59-
BaseType::Broadcast(args...);
90+
operator()(args...);
6091
}
6192

6293
/// Insert a delegate into the container.

0 commit comments

Comments
 (0)