Skip to content

Commit 3de0f25

Browse files
committed
Update DelegateMQ library
1 parent dbb1d8c commit 3de0f25

File tree

12 files changed

+511
-45
lines changed

12 files changed

+511
-45
lines changed

DelegateMQ/delegate/Delegate.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,38 @@ namespace trait
5454
std::is_pointer_v<RawT> &&
5555
std::is_pointer_v<std::remove_pointer_t<RawT>>;
5656
};
57+
58+
// Helper trait to detect std::function specializations.
59+
template <typename T>
60+
struct is_std_function : std::false_type {};
61+
62+
template <typename Sig>
63+
struct is_std_function<std::function<Sig>> : std::true_type {};
64+
65+
// Helper trait to check if a type is a callable (lambda, functor)
66+
// but NOT a function pointer or std::function (which have their own MakeDelegate overloads).
67+
template <typename T, typename = void>
68+
struct is_callable : std::false_type {};
69+
70+
template <typename T>
71+
struct is_callable<T, std::void_t<decltype(&std::decay_t<T>::operator())>> :
72+
std::integral_constant<bool,
73+
!std::is_pointer_v<std::decay_t<T>> &&
74+
!is_std_function<std::decay_t<T>>::value> {};
75+
76+
// Helper to deduce function signature from a member function pointer (like operator())
77+
template <typename T>
78+
struct function_traits;
79+
80+
template <typename ClassType, typename RetType, typename... Args>
81+
struct function_traits<RetType(ClassType::*)(Args...) const> {
82+
using function_type = RetType(Args...);
83+
};
84+
85+
template <typename ClassType, typename RetType, typename... Args>
86+
struct function_traits<RetType(ClassType::*)(Args...)> {
87+
using function_type = RetType(Args...);
88+
};
5789
}
5890

5991
/// @brief Non-template base class for all delegates.
@@ -927,6 +959,17 @@ auto MakeDelegate(std::function<RetType(Args...)> func) {
927959
return DelegateFunction<RetType(Args...)>(func);
928960
}
929961

962+
/// @brief Creates a delegate that binds to a raw lambda or functor.
963+
/// @tparam F The lambda or functor type.
964+
/// @param[in] func The lambda or functor to bind.
965+
/// @return A `DelegateFunction` object bound to the specified lambda or functor.
966+
template <typename F, typename = std::enable_if_t<trait::is_callable<F>::value>>
967+
auto MakeDelegate(F&& func) {
968+
// Deduce the signature from the operator()
969+
using Sig = typename trait::function_traits<decltype(&std::remove_reference_t<F>::operator())>::function_type;
970+
return DelegateFunction<Sig>(std::forward<F>(func));
971+
}
972+
930973
}
931974

932975
#endif

DelegateMQ/delegate/DelegateAsync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,17 @@ auto MakeDelegate(std::function<RetType(Args...)> func, IThread& thread) {
12461246
return DelegateFunctionAsync<RetType(Args...)>(func, thread);
12471247
}
12481248

1249+
/// @brief Creates an asynchronous delegate that binds to a raw lambda or functor.
1250+
/// @tparam F The lambda or functor type.
1251+
/// @param[in] func The lambda or functor to bind.
1252+
/// @param[in] thread The `IThread` on which the function will be invoked asynchronously.
1253+
/// @return A `DelegateFunctionAsync` object bound to the specified lambda or functor and thread.
1254+
template <typename F, typename = std::enable_if_t<trait::is_callable<F>::value>>
1255+
auto MakeDelegate(F&& func, IThread& thread) {
1256+
using Sig = typename trait::function_traits<decltype(&std::remove_reference_t<F>::operator())>::function_type;
1257+
return DelegateFunctionAsync<Sig>(std::forward<F>(func), thread);
1258+
}
1259+
12491260
}
12501261

12511262
#endif

DelegateMQ/delegate/DelegateAsyncWait.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1729,6 +1729,18 @@ auto MakeDelegate(std::function<RetType(Args...)> func, IThread& thread, Duratio
17291729
return DelegateFunctionAsyncWait<RetType(Args...)>(func, thread, timeout);
17301730
}
17311731

1732+
/// @brief Creates an asynchronous delegate that binds to a raw lambda or functor with a wait and timeout.
1733+
/// @tparam F The lambda or functor type.
1734+
/// @param[in] func The lambda or functor to bind.
1735+
/// @param[in] thread The `IThread` on which the function will be invoked asynchronously.
1736+
/// @param[in] timeout The duration to wait for the function to complete before returning.
1737+
/// @return A `DelegateFunctionAsyncWait` object bound to the specified lambda or functor, thread, and timeout.
1738+
template <typename F, typename = std::enable_if_t<trait::is_callable<F>::value>>
1739+
auto MakeDelegate(F&& func, IThread& thread, Duration timeout) {
1740+
using Sig = typename trait::function_traits<decltype(&std::remove_reference_t<F>::operator())>::function_type;
1741+
return DelegateFunctionAsyncWait<Sig>(std::forward<F>(func), thread, timeout);
1742+
}
1743+
17321744
}
17331745

17341746
#endif

DelegateMQ/delegate/DelegateRemote.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,6 +1583,17 @@ auto MakeDelegate(std::function<RetType(Args...)> func, DelegateRemoteId id) {
15831583
return DelegateFunctionRemote<RetType(Args...)>(func, id);
15841584
}
15851585

1586+
/// @brief Creates an asynchronous delegate that binds to a raw lambda or functor.
1587+
/// @tparam F The lambda or functor type.
1588+
/// @param[in] func The lambda or functor to bind.
1589+
/// @param[in] id The delegate remote identifier.
1590+
/// @return A `DelegateFunctionRemote` object bound to the specified lambda or functor and id.
1591+
template <typename F, typename = std::enable_if_t<trait::is_callable<F>::value>>
1592+
auto MakeDelegate(F&& func, DelegateRemoteId id) {
1593+
using Sig = typename trait::function_traits<decltype(&std::remove_reference_t<F>::operator())>::function_type;
1594+
return DelegateFunctionRemote<Sig>(std::forward<F>(func), id);
1595+
}
1596+
15861597
}
15871598

15881599
#endif

DelegateMQ/predef/databus/DataBus.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66
#include "DataBusQos.h"
77
#include "SpyPacket.h"
88
#include "predef/util/Fault.h"
9-
10-
#ifdef _WIN32
11-
#include "predef/util/WinsockConnect.h"
12-
#endif
9+
#include "predef/util/NetworkConnect.h"
1310

1411
#include <string>
1512
#include <unordered_map>
@@ -317,3 +314,4 @@ class DataBus {
317314
} // namespace dmq
318315

319316
#endif // DMQ_DATABUS_H
317+

DelegateMQ/predef/databus/Participant.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ class Participant {
6565
}
6666
}
6767

68-
// Register a local handler for a remote topic.
69-
// When this participant receives data with 'remoteId', it will call 'func'.
68+
// Register a local handler for a remote topic using a `std::function`.
7069
template <typename T>
7170
void RegisterHandler(dmq::DelegateRemoteId remoteId, dmq::ISerializer<void(T)>& serializer, std::function<void(T)> func) {
7271
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
@@ -78,6 +77,18 @@ class Participant {
7877
m_channels[remoteId] = { channel, channel->GetEndpoint() };
7978
}
8079

80+
// Register a local handler for a remote topic using a raw lambda or functor.
81+
template <typename T, typename F, typename = std::enable_if_t<trait::is_callable<F>::value>>
82+
void RegisterHandler(dmq::DelegateRemoteId remoteId, dmq::ISerializer<void(T)>& serializer, F&& func) {
83+
std::lock_guard<dmq::RecursiveMutex> lock(m_mutex);
84+
auto channel = std::make_shared<dmq::RemoteChannel<void(T)>>(*m_transport, serializer);
85+
86+
// Use Bind() to register the callback for incoming calls.
87+
channel->Bind(std::forward<F>(func), remoteId);
88+
89+
m_channels[remoteId] = { channel, channel->GetEndpoint() };
90+
}
91+
8192
private:
8293
struct ChannelInvoker {
8394
std::shared_ptr<void> channel;

DelegateMQ/predef/dispatcher/RemoteChannel.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ class RemoteChannel<RetType(Args...)>
143143
m_delegate.SetStream(&m_stream);
144144
}
145145

146+
/// @brief Bind a raw lambda or functor as the receive-side handler.
147+
template <typename F, typename = std::enable_if_t<trait::is_callable<F>::value>>
148+
void Bind(F&& func, DelegateRemoteId id) {
149+
m_delegate.Bind(std::forward<F>(func), id);
150+
m_delegate.SetDispatcher(&m_dispatcher);
151+
m_delegate.SetSerializer(m_serializer);
152+
m_delegate.SetStream(&m_stream);
153+
}
154+
146155
/// @brief Invoke the channel (fire-and-forget send).
147156
/// @pre Bind() must have been called first.
148157
void operator()(Args... args) { m_delegate(args...); }
@@ -309,6 +318,17 @@ auto MakeDelegate(std::function<RetType(Args...)> func, DelegateRemoteId id, Rem
309318
return d;
310319
}
311320

321+
/// @brief Creates a remote delegate bound to a raw lambda or functor via a RemoteChannel.
322+
template <typename F, typename Sig, typename = std::enable_if_t<trait::is_callable<F>::value>>
323+
auto MakeDelegate(F&& func, DelegateRemoteId id, RemoteChannel<Sig>& channel)
324+
{
325+
DelegateFunctionRemote<Sig> d(std::forward<F>(func), id);
326+
d.SetDispatcher(channel.GetDispatcher());
327+
d.SetSerializer(channel.GetSerializer());
328+
d.SetStream(&channel.GetStream());
329+
return d;
330+
}
331+
312332
} // namespace dmq
313333

314334
#endif // REMOTE_CHANNEL_H
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#ifndef LINUX_MULTICAST_TRANSPORT_H
2+
#define LINUX_MULTICAST_TRANSPORT_H
3+
4+
#include "delegate/DelegateOpt.h"
5+
#include "predef/transport/ITransport.h"
6+
#include "predef/transport/DmqHeader.h"
7+
8+
#include <iostream>
9+
#include <sstream>
10+
#include <cstring>
11+
#include <cstdlib>
12+
#include <unistd.h>
13+
#include <arpa/inet.h>
14+
#include <sys/socket.h>
15+
#include <net/if.h>
16+
#include <fcntl.h>
17+
#include <errno.h>
18+
19+
/// @brief Linux Multicast UDP transport implementation for DelegateMQ.
20+
class MulticastTransport : public ITransport
21+
{
22+
public:
23+
enum class Type { PUB, SUB };
24+
25+
MulticastTransport() = default;
26+
~MulticastTransport() { Close(); }
27+
28+
int Create(Type type, const char* groupAddr, uint16_t port, const char* localInterface = "0.0.0.0")
29+
{
30+
m_type = type;
31+
m_socket = socket(AF_INET, SOCK_DGRAM, 0);
32+
if (m_socket < 0) return -1;
33+
34+
int reuse = 1;
35+
setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
36+
37+
// ALWAYS enable loopback for testing convenience
38+
int loop = 1;
39+
setsockopt(m_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
40+
41+
memset(&m_addr, 0, sizeof(m_addr));
42+
m_addr.sin_family = AF_INET;
43+
m_addr.sin_port = htons(port);
44+
45+
if (type == Type::PUB) {
46+
m_addr.sin_addr.s_addr = inet_addr(groupAddr);
47+
48+
struct in_addr interface_addr;
49+
interface_addr.s_addr = inet_addr(localInterface);
50+
setsockopt(m_socket, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr));
51+
52+
int ttl = 3;
53+
setsockopt(m_socket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
54+
}
55+
else {
56+
m_addr.sin_addr.s_addr = INADDR_ANY;
57+
if (bind(m_socket, (struct sockaddr*)&m_addr, sizeof(m_addr)) < 0) return -1;
58+
59+
struct ip_mreq mreq;
60+
mreq.imr_multiaddr.s_addr = inet_addr(groupAddr);
61+
mreq.imr_interface.s_addr = inet_addr(localInterface);
62+
if (setsockopt(m_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
63+
return -1;
64+
}
65+
66+
struct timeval timeout;
67+
timeout.tv_sec = 1;
68+
timeout.tv_usec = 0;
69+
setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
70+
}
71+
return 0;
72+
}
73+
74+
void Close() {
75+
if (m_socket != -1) {
76+
close(m_socket);
77+
m_socket = -1;
78+
}
79+
}
80+
81+
virtual int Send(xostringstream& os, const DmqHeader& header) override {
82+
if (m_type != Type::PUB) return -1;
83+
std::string payload = os.str();
84+
DmqHeader headerCopy = header;
85+
headerCopy.SetLength(static_cast<uint16_t>(payload.length()));
86+
87+
xostringstream ss(std::ios::in | std::ios::out | std::ios::binary);
88+
uint16_t marker = htons(headerCopy.GetMarker());
89+
uint16_t id = htons(headerCopy.GetId());
90+
uint16_t seqNum = htons(headerCopy.GetSeqNum());
91+
uint16_t length = htons(headerCopy.GetLength());
92+
93+
ss.write((const char*)&marker, 2);
94+
ss.write((const char*)&id, 2);
95+
ss.write((const char*)&seqNum, 2);
96+
ss.write((const char*)&length, 2);
97+
ss.write(payload.data(), payload.size());
98+
99+
std::string data = ss.str();
100+
sendto(m_socket, data.c_str(), data.size(), 0, (struct sockaddr*)&m_addr, sizeof(m_addr));
101+
return 0;
102+
}
103+
104+
virtual int Receive(xstringstream& is, DmqHeader& header) override {
105+
if (m_type != Type::SUB) return -1;
106+
int size = recvfrom(m_socket, m_buffer, sizeof(m_buffer), 0, NULL, NULL);
107+
108+
if (size <= (int)DmqHeader::HEADER_SIZE) return -1;
109+
110+
xstringstream headerStream(std::ios::in | std::ios::out | std::ios::binary);
111+
headerStream.write(m_buffer, DmqHeader::HEADER_SIZE);
112+
headerStream.seekg(0);
113+
114+
uint16_t val = 0;
115+
headerStream.read((char*)&val, 2); header.SetMarker(ntohs(val));
116+
headerStream.read((char*)&val, 2); header.SetId(ntohs(val));
117+
headerStream.read((char*)&val, 2); header.SetSeqNum(ntohs(val));
118+
headerStream.read((char*)&val, 2); header.SetLength(ntohs(val));
119+
120+
if (header.GetMarker() != DmqHeader::MARKER) {
121+
// std::cerr << "[Multicast] Bad Marker: " << std::hex << header.GetMarker() << std::dec << std::endl;
122+
return -1;
123+
}
124+
125+
is.write(m_buffer + DmqHeader::HEADER_SIZE, size - DmqHeader::HEADER_SIZE);
126+
return 0;
127+
}
128+
129+
private:
130+
int m_socket = -1;
131+
sockaddr_in m_addr{};
132+
Type m_type = Type::PUB;
133+
static const int BUFFER_SIZE = 4096;
134+
char m_buffer[BUFFER_SIZE] = { 0 };
135+
};
136+
137+
#endif

0 commit comments

Comments
 (0)