Skip to content

Commit ddd6632

Browse files
committed
Update DelegateMQ library
1 parent 10852a5 commit ddd6632

File tree

9 files changed

+463
-1
lines changed

9 files changed

+463
-1
lines changed

DelegateMQ/DelegateMQ.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,9 @@
233233
#include "predef/util/NetworkEngine.h"
234234
#endif
235235

236+
#if defined(DMQ_DATABUS)
237+
#include "predef/databus/DataBus.h"
238+
#include "predef/databus/Participant.h"
239+
#endif
240+
236241
#endif

DelegateMQ/Predef.cmake

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ list(APPEND DMQ_PREDEF_SOURCES ${OS_SOURCES})
164164
list(APPEND DMQ_PREDEF_SOURCES ${ALLOCATOR_SOURCES})
165165
list(APPEND DMQ_PREDEF_SOURCES ${UTIL_SOURCES})
166166

167+
if (DMQ_DATABUS STREQUAL "ON")
168+
add_compile_definitions(DMQ_DATABUS)
169+
file(GLOB DATABUS_SOURCES "${DMQ_ROOT_DIR}/predef/databus/*.h")
170+
list(APPEND DMQ_PREDEF_SOURCES ${DATABUS_SOURCES})
171+
endif()
172+
167173
# Collect all DelegateMQ files
168174
file(GLOB DMQ_LIB_SOURCES
169175
"${DMQ_ROOT_DIR}/*.h"

DelegateMQ/delegate/DelegateRemote.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,10 @@ class DelegateFreeRemote<RetType(Args...)> : public DelegateFree<RetType(Args...
459459
// @return The remote identifier.
460460
DelegateRemoteId GetRemoteId() noexcept { return m_id; }
461461

462+
///@brief Set the remote identifier.
463+
// @param[in] id The remote identifier.
464+
void SetRemoteId(DelegateRemoteId id) noexcept { m_id = id; }
465+
462466
/// @brief Set the dispatcher instance used to send to remote
463467
/// @param[in] dispatcher A dispatcher instance
464468
void SetDispatcher(IDispatcher* dispatcher) {
@@ -952,6 +956,10 @@ class DelegateMemberRemote<TClass, RetType(Args...)> : public DelegateMember<TCl
952956
// @return The remote identifier.
953957
DelegateRemoteId GetRemoteId() noexcept { return m_id; }
954958

959+
///@brief Set the remote identifier.
960+
// @param[in] id The remote identifier.
961+
void SetRemoteId(DelegateRemoteId id) noexcept { m_id = id; }
962+
955963
/// @brief Set the dispatcher instance used to send to remote
956964
/// @param[in] dispatcher A dispatcher instance
957965
void SetDispatcher(IDispatcher* dispatcher) {
@@ -1385,6 +1393,10 @@ class DelegateFunctionRemote<RetType(Args...)> : public DelegateFunction<RetType
13851393
// @return The remote identifier.
13861394
DelegateRemoteId GetRemoteId() noexcept { return m_id; }
13871395

1396+
///@brief Set the remote identifier.
1397+
// @param[in] id The remote identifier.
1398+
void SetRemoteId(DelegateRemoteId id) noexcept { m_id = id; }
1399+
13881400
/// @brief Set the dispatcher instance used to send to remote
13891401
/// @param[in] dispatcher A dispatcher instance
13901402
void SetDispatcher(IDispatcher* dispatcher) {

DelegateMQ/delegate/make_tuple_heap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ auto tuple_append(xlist<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, const
231231
template<typename... Ts>
232232
auto make_tuple_heap(xlist<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, std::tuple<Ts...> tup)
233233
{
234+
(void)heapArgs;
234235
return tup;
235236
}
236237

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
#ifndef DMQ_DATABUS_H
2+
#define DMQ_DATABUS_H
3+
4+
#include "DelegateMQ.h"
5+
#include "Participant.h"
6+
#include "DataBusQos.h"
7+
#include "predef/util/Fault.h"
8+
9+
#ifdef _WIN32
10+
#include "predef/util/WinsockConnect.h"
11+
#endif
12+
13+
#include <string>
14+
#include <unordered_map>
15+
#include <memory>
16+
#include <mutex>
17+
#include <vector>
18+
#include <functional>
19+
#include <typeindex>
20+
21+
namespace dmq {
22+
23+
// The DataBus is a central registry for topic-based communication.
24+
// It allows components to publish and subscribe to data topics identified by strings.
25+
//
26+
// LIFETIME NOTE: This class assumes that any external ISerializer or ITransport objects
27+
// passed to it (e.g., via AddParticipant or RegisterSerializer) will outlive
28+
// the DataBus instance or its Reset() calls.
29+
class DataBus {
30+
public:
31+
// Subscribe to a topic with optional QoS and thread dispatching.
32+
// NOTE: This triggers an immediate LVC callback if enabled. There is a small window
33+
// between LVC delivery and actual Signal connection where a publish might be missed.
34+
template <typename T, typename F>
35+
static dmq::ScopedConnection Subscribe(const std::string& topic, F&& func, dmq::IThread* thread = nullptr, QoS qos = {}) {
36+
return GetInstance().InternalSubscribe<T>(topic, std::forward<F>(func), thread, qos);
37+
}
38+
39+
// Subscribe to a topic with a filter.
40+
template <typename T, typename F, typename P>
41+
static dmq::ScopedConnection SubscribeFilter(const std::string& topic, F&& func, P&& predicate, dmq::IThread* thread = nullptr, QoS qos = {}) {
42+
auto filterFunc = [f = std::forward<F>(func), p = std::forward<P>(predicate)](T data) {
43+
if (p(data)) {
44+
f(data);
45+
}
46+
};
47+
return GetInstance().InternalSubscribe<T>(topic, std::move(filterFunc), thread, qos);
48+
}
49+
50+
// Publish data to a topic.
51+
template <typename T>
52+
static void Publish(const std::string& topic, T data) {
53+
GetInstance().InternalPublish<T>(topic, std::move(data));
54+
}
55+
56+
// Add a remote participant to the bus.
57+
static void AddParticipant(std::shared_ptr<Participant> participant) {
58+
GetInstance().InternalAddParticipant(participant);
59+
}
60+
61+
// Register a serializer for a topic (required for remote distribution).
62+
template <typename T>
63+
static void RegisterSerializer(const std::string& topic, dmq::ISerializer<void(T)>& serializer) {
64+
GetInstance().InternalRegisterSerializer<T>(topic, serializer);
65+
}
66+
67+
// Register a stringifier for a topic to enable spying/logging.
68+
template <typename T>
69+
static void RegisterStringifier(const std::string& topic, std::function<std::string(const T&)> func) {
70+
GetInstance().InternalRegisterStringifier<T>(topic, std::move(func));
71+
}
72+
73+
// Subscribe to all bus traffic (topic and stringified value).
74+
static dmq::ScopedConnection Monitor(std::function<void(const std::string&, const std::string&)> func) {
75+
DataBus& instance = GetInstance();
76+
std::lock_guard<dmq::RecursiveMutex> lock(instance.m_mutex);
77+
return instance.m_monitorSignal.Connect(dmq::MakeDelegate(std::move(func)));
78+
}
79+
80+
// Reset the DataBus (mostly for testing).
81+
static void ResetForTesting() {
82+
GetInstance().InternalReset();
83+
}
84+
85+
private:
86+
DataBus() = default;
87+
~DataBus() = default;
88+
89+
DataBus(const DataBus&) = delete;
90+
DataBus& operator=(const DataBus&) = delete;
91+
92+
static DataBus& GetInstance() {
93+
static DataBus instance;
94+
return instance;
95+
}
96+
97+
template <typename T>
98+
using SignalPtr = std::shared_ptr<dmq::Signal<void(T)>>;
99+
100+
template <typename T, typename F>
101+
dmq::ScopedConnection InternalSubscribe(const std::string& topic, F&& func, dmq::IThread* thread, QoS qos) {
102+
SignalPtr<T> signal;
103+
104+
// Performance Note: Forced std::function construction for internal type management.
105+
std::function<void(T)> typedFunc = std::forward<F>(func);
106+
T* cachedValPtr = nullptr;
107+
T cachedVal;
108+
dmq::ScopedConnection conn;
109+
110+
{
111+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
112+
113+
// 1. Enable LVC if requested (persists for topic lifetime until ResetForTesting)
114+
if (qos.lastValueCache) {
115+
m_topicQos[topic].lastValueCache = true;
116+
}
117+
118+
// 2. Get or create signal with type safety check (std::type_index)
119+
signal = GetOrCreateSignal<T>(topic);
120+
if (!signal) {
121+
return {}; // Type mismatch or other failure
122+
}
123+
124+
// 3. Establish connection while holding the lock. This ensures no publishes
125+
// are missed, as any new publish will be blocked until this lock is released.
126+
if (thread) {
127+
conn = signal->Connect(dmq::MakeDelegate(typedFunc, *thread));
128+
} else {
129+
conn = signal->Connect(dmq::MakeDelegate(typedFunc));
130+
}
131+
132+
// 4. Prepare LVC delivery if enabled and available
133+
if (qos.lastValueCache) {
134+
auto it = m_lastValues.find(topic);
135+
if (it != m_lastValues.end()) {
136+
cachedVal = *std::static_pointer_cast<T>(it->second);
137+
cachedValPtr = &cachedVal;
138+
}
139+
}
140+
}
141+
142+
// 5. Dispatch LVC outside the lock to prevent deadlocks.
143+
// IMPORTANT: Because this happens after releasing the lock, a high-frequency
144+
// publisher on another thread could have already sent a new value to the
145+
// connected signal. The subscriber might receive the fresh value FIRST,
146+
// followed by this stale LVC value. Users of LVC should ensure their
147+
// logic handles potential out-of-order state arrival.
148+
if (cachedValPtr) {
149+
if (thread) {
150+
dmq::MakeDelegate(typedFunc, *thread).AsyncInvoke(*cachedValPtr);
151+
} else {
152+
typedFunc(*cachedValPtr);
153+
}
154+
}
155+
156+
return conn;
157+
}
158+
159+
template <typename T>
160+
void InternalPublish(const std::string& topic, T data) {
161+
SignalPtr<T> signal;
162+
std::shared_ptr<void> serializerPtr;
163+
dmq::ISerializer<void(T)>* serializer = nullptr;
164+
std::vector<std::shared_ptr<Participant>> participantsSnapshot;
165+
std::string strVal = "?";
166+
bool hasMonitor = false;
167+
168+
{
169+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
170+
171+
// 1. Update LVC ONLY if enabled for this topic to save memory.
172+
// NOTE: QoS lastValueCache is currently "sticky" per topic. Once enabled
173+
// by any subscriber, it remains active for that topic until ResetForTesting().
174+
auto itQos = m_topicQos.find(topic);
175+
if (itQos != m_topicQos.end() && itQos->second.lastValueCache) {
176+
m_lastValues[topic] = std::make_shared<T>(data);
177+
}
178+
179+
// 2. Prepare monitor data
180+
if (!m_monitorSignal.Empty()) {
181+
hasMonitor = true;
182+
auto itStr = m_stringifiers.find(topic);
183+
if (itStr != m_stringifiers.end()) {
184+
auto func = static_cast<std::function<std::string(const T&)>*>(itStr->second.get());
185+
strVal = (*func)(data);
186+
}
187+
}
188+
189+
// 3. Get signal and remote info. Only create Signal if there is local interest.
190+
auto itSig = m_signals.find(topic);
191+
if (itSig != m_signals.end()) {
192+
signal = std::static_pointer_cast<dmq::Signal<void(T)>>(itSig->second);
193+
}
194+
195+
auto itSer = m_serializers.find(topic);
196+
if (itSer != m_serializers.end()) {
197+
serializerPtr = itSer->second;
198+
serializer = static_cast<dmq::ISerializer<void(T)>*>(serializerPtr.get());
199+
}
200+
201+
// 4. Snapshot participants while locked to ensure atomicity between
202+
// local and remote dispatch sets.
203+
participantsSnapshot = m_participants;
204+
}
205+
206+
// 5. Dispatch Monitor outside lock to allow re-entry/prevent deadlocks
207+
if (hasMonitor) {
208+
m_monitorSignal(topic, strVal);
209+
}
210+
211+
// 6. Local distribution
212+
if (signal) {
213+
(*signal)(data);
214+
}
215+
216+
// 7. Remote distribution using the snapshot
217+
if (serializer) {
218+
for (auto& participant : participantsSnapshot) {
219+
participant->Send<T>(topic, data, *serializer);
220+
}
221+
}
222+
}
223+
224+
void InternalAddParticipant(std::shared_ptr<Participant> participant) {
225+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
226+
m_participants.push_back(participant);
227+
}
228+
229+
template <typename T>
230+
void InternalRegisterSerializer(const std::string& topic, dmq::ISerializer<void(T)>& serializer) {
231+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
232+
// Use shared_ptr with no-op deleter because serializer is owned by caller
233+
m_serializers[topic] = std::shared_ptr<void>(&serializer, [](void*) {});
234+
}
235+
236+
template <typename T>
237+
void InternalRegisterStringifier(const std::string& topic, std::function<std::string(const T&)> func) {
238+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
239+
// Use shared_ptr with custom deleter to fix memory leak
240+
m_stringifiers[topic] = std::shared_ptr<void>(
241+
new std::function<std::string(const T&)>(std::move(func)),
242+
[](void* ptr) { delete static_cast<std::function<std::string(const T&)>*>(ptr); }
243+
);
244+
}
245+
246+
void InternalReset() {
247+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
248+
m_signals.clear();
249+
m_participants.clear();
250+
m_serializers.clear();
251+
m_lastValues.clear();
252+
m_topicQos.clear();
253+
m_stringifiers.clear(); // shared_ptr correctly frees allocated functions
254+
m_typeIndices.clear();
255+
m_monitorSignal.Clear();
256+
}
257+
258+
template <typename T>
259+
SignalPtr<T> GetOrCreateSignal(const std::string& topic) {
260+
// Assume lock is held by caller
261+
auto itType = m_typeIndices.find(topic);
262+
if (itType != m_typeIndices.end()) {
263+
if (itType->second != std::type_index(typeid(T))) {
264+
// Runtime Type Safety: Catch same topic string used with different types
265+
::FaultHandler(__FILE__, (unsigned short)__LINE__);
266+
return nullptr;
267+
}
268+
} else {
269+
m_typeIndices.emplace(topic, std::type_index(typeid(T)));
270+
}
271+
272+
auto it = m_signals.find(topic);
273+
if (it != m_signals.end()) {
274+
return std::static_pointer_cast<dmq::Signal<void(T)>>(it->second);
275+
}
276+
277+
auto signal = std::make_shared<dmq::Signal<void(T)>>();
278+
m_signals[topic] = std::static_pointer_cast<void>(signal);
279+
return signal;
280+
}
281+
282+
dmq::RecursiveMutex m_mutex;
283+
std::unordered_map<std::string, std::shared_ptr<void>> m_signals;
284+
std::unordered_map<std::string, std::type_index> m_typeIndices;
285+
std::vector<std::shared_ptr<Participant>> m_participants;
286+
std::unordered_map<std::string, std::shared_ptr<void>> m_serializers;
287+
std::unordered_map<std::string, std::shared_ptr<void>> m_lastValues;
288+
std::unordered_map<std::string, QoS> m_topicQos;
289+
std::unordered_map<std::string, std::shared_ptr<void>> m_stringifiers;
290+
dmq::Signal<void(const std::string&, const std::string&)> m_monitorSignal;
291+
};
292+
293+
} // namespace dmq
294+
295+
#endif // DMQ_DATABUS_H
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#ifndef DMQ_DATABUSQOS_H
2+
#define DMQ_DATABUSQOS_H
3+
4+
namespace dmq {
5+
6+
// Quality of Service settings for a topic.
7+
struct QoS {
8+
bool lastValueCache = false; // If true, new subscribers receive the last published value.
9+
};
10+
11+
} // namespace dmq
12+
13+
#endif // DMQ_DATABUSQOS_H

0 commit comments

Comments
 (0)