Skip to content

Commit 17b8e79

Browse files
committed
Update DelegateMQ library
1 parent 9cae1c5 commit 17b8e79

File tree

17 files changed

+67
-30
lines changed

17 files changed

+67
-30
lines changed

DelegateMQ/delegate/DelegateOpt.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace dmq
1111
using Duration = std::chrono::duration<uint32_t, std::milli>;
1212
}
1313

14+
// @TODO: Select the desired software fault handling (see Predef.cmake).
1415
#ifdef DMQ_ASSERTS
1516
#include <cassert>
1617
// Use assert error handling. Change assert to a different error
@@ -22,6 +23,7 @@ namespace dmq
2223
#define BAD_ALLOC() throw std::bad_alloc()
2324
#endif
2425

26+
// @TODO: Select the desired heap allocation (see Predef.cmake).
2527
// If DMQ_ASSERTS defined above, consider defining DMQ_ALLOCATOR to prevent
2628
// std::list usage within delegate library from throwing a std::bad_alloc
2729
// exception. The std_allocator calls assert if out of memory.
@@ -46,6 +48,7 @@ namespace dmq
4648
#define XALLOCATOR
4749
#endif
4850

51+
// @TODO: Select the desired logging (see Predef.cmake).
4952
#ifdef DMQ_LOG
5053
#include <spdlog/spdlog.h>
5154
#define LOG_INFO(...) spdlog::info(__VA_ARGS__)

DelegateMQ/delegate/IDispatcher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ typedef uint16_t DelegateRemoteId;
1313
const uint16_t INVALID_REMOTE_ID = -1;
1414
const uint16_t ACK_REMOTE_ID = 0;
1515

16+
/// @TODO Implement the IDispatcher interface if necessary.
1617
/// @brief Delegate interface class to dispatch serialized function argument data
1718
/// to a remote destination. Implemented by the application if using remote delegates.
1819
///

DelegateMQ/delegate/ISerializer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace dmq {
1111
template <class R>
1212
struct ISerializer; // Not defined
1313

14+
/// @TODO Implement the ISerializer interface if necessary.
1415
/// @brief Delegate serializer interface for serializing and deserializing
1516
/// remote delegate arguments. Implemented by application code if remote
1617
/// delegates are used.

DelegateMQ/delegate/IThread.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace dmq {
77

8-
/// @file
8+
/// @TODO Implement the IThread interface if necessary.
99
/// @brief A base class for a delegate enabled execution thread. Implemented by
1010
/// application code if asynchronous delegates are used.
1111
///

DelegateMQ/predef/os/freertos/Thread.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ void Thread::Process(void* instance)
9999
{
100100
case MSG_DISPATCH_DELEGATE:
101101
{
102+
// @TODO: Update error handling below if necessary.
103+
102104
// Get pointer to DelegateMsg data from queue msg data
103105
auto delegateMsg = msg->GetData();
104106
ASSERT_TRUE(delegateMsg);

DelegateMQ/predef/os/freertos/Thread.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
/// @TODO: Implement a priority queue for the thread messages like
1717
/// std::priority_queue<ThreadMsg*, std::vector<ThreadMsg*>, ThreadMsgComparator>
1818
/// as implemented in stdlib\Thread.h if necessary.
19-
/// Optionally implement a watchdog mechanism as shown in stdlib\Thread.h.
19+
/// @TODO: Optionally implement a watchdog mechanism as shown in stdlib\Thread.h.
2020

2121
class ThreadMsg;
2222

DelegateMQ/predef/os/stdlib/Thread.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ bool Thread::CreateThread(std::optional<dmq::Duration> watchdogTimeout)
3434
{
3535
if (!m_thread)
3636
{
37+
m_threadStartPromise = std::promise<void>();
3738
m_threadStartFuture = m_threadStartPromise.get_future();
39+
m_exit = false;
3840

3941
m_thread = std::unique_ptr<std::thread>(new thread(&Thread::Process, this));
4042

@@ -140,11 +142,24 @@ void Thread::ExitThread()
140142
m_exit.store(true);
141143
m_thread->join();
142144

143-
// Clear the queue if anything added while waiting for join
145+
// Prevent deadlock if ExitThread is called from within the thread itself
146+
if (m_thread->joinable())
147+
{
148+
if (std::this_thread::get_id() != m_thread->get_id())
149+
{
150+
m_thread->join();
151+
}
152+
else
153+
{
154+
// We are killing ourselves. Detach so the thread object cleans up naturally.
155+
m_thread->detach();
156+
}
157+
}
158+
144159
{
145160
lock_guard<mutex> lock(m_mutex);
146161
m_thread = nullptr;
147-
while (!m_queue.empty())
162+
while (!m_queue.empty())
148163
m_queue.pop();
149164
}
150165

@@ -189,7 +204,7 @@ void Thread::WatchdogCheck()
189204
{
190205
LOG_ERROR("Watchdog detected unresponsive thread: {}", THREAD_NAME);
191206

192-
// @TODO You can optionally trigger recovery, restart, or further actions here
207+
// @TODO Optionally trigger recovery, restart, or further actions here
193208
// For example, throw or notify external system
194209
}
195210
}
@@ -235,6 +250,8 @@ void Thread::Process()
235250
{
236251
case MSG_DISPATCH_DELEGATE:
237252
{
253+
// @TODO: Update error handling below if necessary.
254+
238255
// Get pointer to DelegateMsg data from queue msg data
239256
auto delegateMsg = msg->GetData();
240257
ASSERT_TRUE(delegateMsg);

DelegateMQ/predef/transport/ITransport.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "DmqHeader.h"
55
#include "../../delegate/DelegateOpt.h"
66

7+
/// @TODO Implement the ITransport interface if necessary.
78
/// @brief DelegateMQ transport interface.
89
class ITransport
910
{

DelegateMQ/predef/transport/ITransportMonitor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//#include "DmqHeader.h"
55
//#include <sstream>
66

7+
/// @TODO Implement the ITransportMonitor interface if necessary.
78
/// @brief DelegateMQ transport monitor interface.
89
class ITransportMonitor
910
{

DelegateMQ/predef/transport/nng/NngTransport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ class NngTransport : public ITransport
235235

236236
if (header.GetMarker() != DmqHeader::MARKER) {
237237
std::cerr << "Invalid sync marker!" << std::endl;
238-
return -1; // TODO: Optionally handle this case more gracefully
238+
return -1; // @TODO: Optionally handle this case more gracefully
239239
}
240240

241241
// Read the DelegateRemoteId (2 bytes) into the `id` variable

0 commit comments

Comments
 (0)