Skip to content

Commit 4b6bf12

Browse files
committed
Update DelegateMQ library
1 parent f58020a commit 4b6bf12

File tree

19 files changed

+49
-39
lines changed

19 files changed

+49
-39
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ list(APPEND SOURCES ${DMQ_PREDEF_SOURCES})
4040

4141
# Organize delegate source files within IDE (Visual Studio)
4242
source_group("Delegate Files" FILES ${DMQ_LIB_SOURCES})
43+
source_group("Predef Files" FILES ${DMQ_PREDEF_SOURCES})
4344

4445
# Platform-specific linker flags
4546
if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")

DelegateMQ/DelegateMQ.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,14 @@
163163
#elif defined(DMQ_TRANSPORT_WIN32_UDP)
164164
#include "predef/dispatcher/Dispatcher.h"
165165
#include "predef/transport/win32-udp/Win32UdpTransport.h"
166+
#include "predef/transport/win32-udp/MulticastTransport.h"
166167
#elif defined(DMQ_TRANSPORT_WIN32_TCP)
167168
#include "predef/dispatcher/Dispatcher.h"
168169
#include "predef/transport/win32-tcp/Win32TcpTransport.h"
169170
#elif defined(DMQ_TRANSPORT_LINUX_UDP)
170171
#include "predef/dispatcher/Dispatcher.h"
171172
#include "predef/transport/linux-udp/LinuxUdpTransport.h"
173+
#include "predef/transport/linux-udp/MulticastTransport.h"
172174
#elif defined(DMQ_TRANSPORT_LINUX_TCP)
173175
#include "predef/dispatcher/Dispatcher.h"
174176
#include "predef/transport/linux-tcp/LinuxTcpTransport.h"

DelegateMQ/predef/databus/DataBus.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ class DataBus {
6262
GetInstance().InternalRegisterSerializer<T>(topic, serializer);
6363
}
6464

65+
// Register an incoming remote topic and automatically republish received data to the local bus.
66+
// Replaces the boilerplate RegisterHandler lambda pattern:
67+
// participant.RegisterHandler<T>(remoteId, serializer, [topic](T msg) {
68+
// DataBus::Publish<T>(topic, std::move(msg));
69+
// });
70+
template <typename T>
71+
static void AddIncomingTopic(const std::string& topic, dmq::DelegateRemoteId remoteId, Participant& participant, dmq::ISerializer<void(T)>& serializer) {
72+
participant.RegisterHandler<T>(remoteId, serializer, [topic](T msg) {
73+
DataBus::Publish<T>(topic, std::move(msg));
74+
});
75+
}
76+
6577
// Register a stringifier for a topic to enable spying/logging.
6678
template <typename T>
6779
static void RegisterStringifier(const std::string& topic, std::function<std::string(const T&)> func) {

DelegateMQ/predef/transport/arm-lwip-netconn-udp/ArmLwipNetconnUdpTransport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class NetconnUdpTransport : public ITransport
119119
}
120120

121121
DmqHeader headerCopy = header;
122-
std::string payload = os.str();
122+
auto payload = os.str();
123123
if (payload.length() > UINT16_MAX) {
124124
return -1;
125125
}

DelegateMQ/predef/transport/arm-lwip-udp/ArmLwipUdpTransport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class UdpTransport : public ITransport
145145
DmqHeader headerCopy = header;
146146

147147
// Calculate payload size and set it
148-
std::string payload = os.str();
148+
auto payload = os.str();
149149
if (payload.length() > UINT16_MAX) {
150150
return -1;
151151
}
@@ -167,7 +167,7 @@ class UdpTransport : public ITransport
167167
// Append Payload
168168
ss.write(payload.data(), payload.size());
169169

170-
std::string data = ss.str();
170+
auto data = ss.str();
171171

172172
// Always track the message (unless it is an ACK)
173173
if (header.GetId() != dmq::ACK_REMOTE_ID && m_transportMonitor)

DelegateMQ/predef/transport/linux-tcp/LinuxTcpTransport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class TcpTransport : public ITransport
118118
{
119119
if (m_connFd < 0) return -1;
120120

121-
std::string payload = os.str();
121+
auto payload = os.str();
122122
DmqHeader headerCopy = header;
123123
headerCopy.SetLength(static_cast<uint16_t>(payload.length()));
124124

@@ -136,7 +136,7 @@ class TcpTransport : public ITransport
136136
ss.write((char*)&length, 2);
137137
ss.write(payload.data(), payload.size());
138138

139-
std::string packet = ss.str();
139+
auto packet = ss.str();
140140

141141
// Always track the message (unless it is an ACK)
142142
// Use Host Byte Order for ID check

DelegateMQ/predef/transport/linux-udp/LinuxUdpTransport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class UdpTransport : public ITransport
151151
DmqHeader headerCopy = header;
152152

153153
// Calculate payload size and set it
154-
std::string payload = os.str();
154+
auto payload = os.str();
155155
if (payload.length() > UINT16_MAX) {
156156
std::cerr << "Error: Payload too large." << std::endl;
157157
return -1;
@@ -174,7 +174,7 @@ class UdpTransport : public ITransport
174174
// Append Payload
175175
ss.write(payload.data(), payload.size());
176176

177-
std::string data = ss.str();
177+
auto data = ss.str();
178178

179179
// Always track the message (unless it is an ACK)
180180
// Use Host Byte Order for ID check

DelegateMQ/predef/transport/linux-udp/MulticastTransport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class MulticastTransport : public ITransport
8080

8181
virtual int Send(xostringstream& os, const DmqHeader& header) override {
8282
if (m_type != Type::PUB) return -1;
83-
std::string payload = os.str();
83+
auto payload = os.str();
8484
DmqHeader headerCopy = header;
8585
headerCopy.SetLength(static_cast<uint16_t>(payload.length()));
8686

@@ -96,7 +96,7 @@ class MulticastTransport : public ITransport
9696
ss.write((const char*)&length, 2);
9797
ss.write(payload.data(), payload.size());
9898

99-
std::string data = ss.str();
99+
auto data = ss.str();
100100
sendto(m_socket, data.c_str(), data.size(), 0, (struct sockaddr*)&m_addr, sizeof(m_addr));
101101
return 0;
102102
}

DelegateMQ/predef/transport/mqtt/MqttTransport.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class MqttTransport : public ITransport
147147

148148
// Create a local copy to modify the length
149149
DmqHeader headerCopy = header;
150-
std::string payload = os.str();
150+
auto payload = os.str();
151151
if (payload.length() > UINT16_MAX) {
152152
std::cerr << "Payload too large." << std::endl;
153153
return -1;
@@ -170,7 +170,7 @@ class MqttTransport : public ITransport
170170
// Append Payload
171171
ss.write(payload.data(), payload.size());
172172

173-
std::string fullPacket = ss.str();
173+
auto fullPacket = ss.str();
174174

175175
MQTTClient_message pubmsg = MQTTClient_message_initializer;
176176
MQTTClient_deliveryToken token;

DelegateMQ/predef/transport/netx-udp/NetXUdpTransport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class NetXUdpTransport : public ITransport
150150
return -1;
151151
}
152152

153-
std::string payload = os.str();
153+
auto payload = os.str();
154154
if (payload.length() > UINT16_MAX) {
155155
tx_mutex_put(&m_mutex);
156156
return -1;

0 commit comments

Comments
 (0)