Skip to content

Commit dbb1d8c

Browse files
committed
Update
1 parent ddd6632 commit dbb1d8c

File tree

2 files changed

+60
-5
lines changed

2 files changed

+60
-5
lines changed

DelegateMQ/predef/databus/DataBus.h

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "DelegateMQ.h"
55
#include "Participant.h"
66
#include "DataBusQos.h"
7+
#include "SpyPacket.h"
78
#include "predef/util/Fault.h"
89

910
#ifdef _WIN32
@@ -29,8 +30,8 @@ namespace dmq {
2930
class DataBus {
3031
public:
3132
// Subscribe to a topic with optional QoS and thread dispatching.
32-
// NOTE: This triggers an immediate LVC callback if enabled. There is a small window
33-
// between LVC delivery and actual Signal connection where a publish might be missed.
33+
// NOTE: Signal connection is established before LVC delivery to ensure
34+
// no messages are missed.
3435
template <typename T, typename F>
3536
static dmq::ScopedConnection Subscribe(const std::string& topic, F&& func, dmq::IThread* thread = nullptr, QoS qos = {}) {
3637
return GetInstance().InternalSubscribe<T>(topic, std::forward<F>(func), thread, qos);
@@ -71,9 +72,14 @@ class DataBus {
7172
}
7273

7374
// Subscribe to all bus traffic (topic and stringified value).
74-
static dmq::ScopedConnection Monitor(std::function<void(const std::string&, const std::string&)> func) {
75+
static dmq::ScopedConnection Monitor(std::function<void(const SpyPacket&)> func, dmq::IThread* thread = nullptr, dmq::Priority priority = dmq::Priority::NORMAL) {
7576
DataBus& instance = GetInstance();
7677
std::lock_guard<dmq::RecursiveMutex> lock(instance.m_mutex);
78+
if (thread) {
79+
auto del = dmq::MakeDelegate(std::move(func), *thread);
80+
del.SetPriority(priority);
81+
return instance.m_monitorSignal.Connect(del);
82+
}
7783
return instance.m_monitorSignal.Connect(dmq::MakeDelegate(std::move(func)));
7884
}
7985

@@ -158,6 +164,11 @@ class DataBus {
158164

159165
template <typename T>
160166
void InternalPublish(const std::string& topic, T data) {
167+
// Capture timestamp before lock acquisition for maximum accuracy and
168+
// monotonic ordering using dmq::Clock.
169+
auto now = dmq::Clock::now();
170+
uint64_t timestamp = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
171+
161172
SignalPtr<T> signal;
162173
std::shared_ptr<void> serializerPtr;
163174
dmq::ISerializer<void(T)>* serializer = nullptr;
@@ -205,7 +216,8 @@ class DataBus {
205216

206217
// 5. Dispatch Monitor outside lock to allow re-entry/prevent deadlocks
207218
if (hasMonitor) {
208-
m_monitorSignal(topic, strVal);
219+
SpyPacket packet{ topic, strVal, timestamp };
220+
m_monitorSignal(packet);
209221
}
210222

211223
// 6. Local distribution
@@ -236,6 +248,18 @@ class DataBus {
236248
template <typename T>
237249
void InternalRegisterStringifier(const std::string& topic, std::function<std::string(const T&)> func) {
238250
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
251+
252+
// Runtime Type Safety: Ensure topic is not registered with multiple types
253+
auto itType = m_typeIndices.find(topic);
254+
if (itType != m_typeIndices.end()) {
255+
if (itType->second != std::type_index(typeid(T))) {
256+
::FaultHandler(__FILE__, (unsigned short)__LINE__);
257+
return;
258+
}
259+
} else {
260+
m_typeIndices.emplace(topic, std::type_index(typeid(T)));
261+
}
262+
239263
// Use shared_ptr with custom deleter to fix memory leak
240264
m_stringifiers[topic] = std::shared_ptr<void>(
241265
new std::function<std::string(const T&)>(std::move(func)),
@@ -287,7 +311,7 @@ class DataBus {
287311
std::unordered_map<std::string, std::shared_ptr<void>> m_lastValues;
288312
std::unordered_map<std::string, QoS> m_topicQos;
289313
std::unordered_map<std::string, std::shared_ptr<void>> m_stringifiers;
290-
dmq::Signal<void(const std::string&, const std::string&)> m_monitorSignal;
314+
dmq::Signal<void(const SpyPacket&)> m_monitorSignal;
291315
};
292316

293317
} // namespace dmq
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#ifndef DMQ_SPY_PACKET_H
2+
#define DMQ_SPY_PACKET_H
3+
4+
#include <string>
5+
#include <cstdint>
6+
7+
namespace dmq {
8+
9+
/// @brief Standardized packet containing bus traffic metadata.
10+
/// @details This struct is passed to DataBus::Monitor subscribers and is
11+
/// designed to be serialized for transmission to external diagnostic tools.
12+
///
13+
/// The timestamp_us field uses dmq::Clock::now(), which typically provides
14+
/// monotonic time since boot.
15+
struct SpyPacket {
16+
std::string topic; ///< The name of the data topic.
17+
std::string value; ///< Stringified representation of the data (or "?" if no stringifier registered).
18+
uint64_t timestamp_us; ///< Microseconds (usually since boot) when the message was published.
19+
20+
/// @brief Bitsery serialization method.
21+
template <typename S>
22+
void serialize(S& s) {
23+
s.text1b(topic, 1024);
24+
s.text1b(value, 8192);
25+
s.value8b(timestamp_us);
26+
}
27+
};
28+
29+
} // namespace dmq
30+
31+
#endif // DMQ_SPY_PACKET_H

0 commit comments

Comments
 (0)