Skip to content

Commit aaf5b0a

Browse files
committed
Update delegate library
1 parent ea80c16 commit aaf5b0a

13 files changed

Lines changed: 296 additions & 117 deletions

File tree

CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ cmake_minimum_required(VERSION 3.10)
1313
project(IntegrationTestFramework VERSION 1.0 LANGUAGES CXX)
1414

1515
# Set build options
16-
set (DMQ_EXTERNAL_LIB "OFF")
1716
set (DMQ_ALLOCATOR "OFF")
1817
set (DMQ_UTIL "ON")
1918
set (DMQ_THREAD "DMQ_THREAD_STDLIB")

DelegateMQ/DelegateMQ.cmake

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,53 @@
1-
set (DMQ_ROOT_DIR "${CMAKE_CURRENT_LIST_DIR}")
2-
3-
if(EXISTS "${DMQ_ROOT_DIR}/Common.cmake")
4-
include ("${DMQ_ROOT_DIR}/Common.cmake")
5-
else()
6-
message(FATAL_ERROR "Common.cmake not found.")
7-
endif()
8-
9-
if(EXISTS "${DMQ_ROOT_DIR}/Predef.cmake")
10-
include ("${DMQ_ROOT_DIR}/Predef.cmake")
11-
else()
12-
message(FATAL_ERROR "Predef.cmake not found.")
13-
endif()
14-
15-
if (DMQ_EXTERNAL_LIB)
16-
if(EXISTS "${DMQ_ROOT_DIR}/External.cmake")
17-
include ("${DMQ_ROOT_DIR}/External.cmake")
18-
else()
19-
message(FATAL_ERROR "External.cmake not found.")
1+
# DelegateMQ cmake module
2+
#
3+
# This module sets the following variables in your project:
4+
#
5+
# DMQ_INCLUDE_DIR - the directory containing DelegateMQ headers.
6+
# DMQ_LIB_SOURCES - the core DelegateMQ delegate library files.
7+
# DMQ_PREDEF_SOURCES - the predefined supporting source code files
8+
# based on the DMQ build options.
9+
#
10+
# Set DMQ build options:
11+
#
12+
# # Set DMQ build options. Update as necessary.
13+
# set(DMQ_ALLOCATOR "OFF")
14+
# set(DMQ_UTIL "ON")
15+
# set(DMQ_THREAD "DMQ_THREAD_STDLIB")
16+
# set(DMQ_SERIALIZE "DMQ_SERIALIZE_NONE")
17+
# set(DMQ_TRANSPORT "DMQ_TRANSPORT_NONE")
18+
# include("${CMAKE_SOURCE_DIR}/../../../src/delegate-mq/DelegateMQ.cmake")
19+
#
20+
# Use variables to build:
21+
#
22+
# # Collect DelegateMQ predef source files
23+
# list(APPEND SOURCES ${DMQ_PREDEF_SOURCES})
24+
#
25+
# # Add include directory
26+
# include_directories(${DMQ_INCLUDE_DIR})
27+
28+
macro(check _file)
29+
if(NOT EXISTS "${_file}")
30+
message(FATAL_ERROR "File or directory ${_file} referenced by variable ${_var} does not exist!")
2031
endif()
21-
endif()
32+
endmacro()
33+
34+
macro(set_and_check _var _file)
35+
set(${_var} "${_file}")
36+
check("${_file}")
37+
endmacro()
38+
39+
set_and_check(DMQ_ROOT_DIR "${CMAKE_CURRENT_LIST_DIR}")
40+
set_and_check(DMQ_INCLUDE_DIR "${DMQ_ROOT_DIR}")
41+
42+
check("${DMQ_ROOT_DIR}/Macros.cmake")
43+
include ("${DMQ_ROOT_DIR}/Macros.cmake")
44+
45+
check("${DMQ_ROOT_DIR}/Common.cmake")
46+
include ("${DMQ_ROOT_DIR}/Common.cmake")
47+
48+
check("${DMQ_ROOT_DIR}/Predef.cmake")
49+
include ("${DMQ_ROOT_DIR}/Predef.cmake")
2250

23-
if(EXISTS "${DMQ_ROOT_DIR}/Macros.cmake")
24-
include ("${DMQ_ROOT_DIR}/Macros.cmake")
25-
else()
26-
message(FATAL_ERROR "Macros.cmake not found.")
27-
endif()
51+
check("${DMQ_ROOT_DIR}/External.cmake")
52+
include ("${DMQ_ROOT_DIR}/External.cmake")
2853

DelegateMQ/DelegateMQ.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102

103103
#include "predef/util/Fault.h"
104104
#include "predef/util/Timer.h"
105+
#include "predef/util/TransportMonitor.h"
105106
#include "predef/util/AsyncInvoke.h"
106107

107108
#endif

DelegateMQ/External.cmake

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,42 @@
11
# External library definitions to support remote delegates. Update the options
22
# below based on the target build platform.
33

4-
# Modify the options below for your target platform external libraries.
5-
6-
if(CMAKE_SYSTEM_NAME STREQUAL "Windows")
7-
message(STATUS "Building on Windows")
8-
9-
# TODO: Update to installed library version
10-
set(ZMQ_LIB_NAME "libzmq-mt-4_3_5.lib")
11-
12-
# Set path to the vcpkg directory for support libraries (zmq.h)
13-
set(VCPKG_ROOT_DIR "${DMQ_ROOT_DIR}/../../../vcpkg/installed/x64-windows")
14-
elseif(CMAKE_SYSTEM_NAME STREQUAL "Linux")
15-
message(STATUS "Building on Linux")
16-
set(ZMQ_LIB_NAME "libzmq.a")
17-
set(VCPKG_ROOT_DIR "${DMQ_ROOT_DIR}/../../../vcpkg/installed/x64-linux")
18-
19-
else()
20-
message(FATAL_ERROR "Select directories based on build platform.")
21-
endif()
22-
23-
if(NOT EXISTS "${VCPKG_ROOT_DIR}")
24-
message(FATAL_ERROR "${VCPKG_ROOT_DIR} Directory does not exist. Update VCPKG_ROOT_DIR to the correct directory.")
25-
endif()
26-
27-
# Set ZeroMQ library file name and directory
4+
# ZeroMQ library package
285
# https://github.com/zeromq
29-
set(ZMQ_LIB_DIR "${VCPKG_ROOT_DIR}/lib")
30-
if (NOT EXISTS "${ZMQ_LIB_DIR}/${ZMQ_LIB_NAME}")
31-
message(FATAL_ERROR "Error: ${ZMQ_LIB_NAME} not found in ${ZMQ_LIB_DIR}. Please ensure the library is available.")
32-
else()
33-
message(STATUS "Found ${ZMQ_LIB_NAME} in ${ZMQ_LIB_DIR}")
6+
if(DMQ_TRANSPORT STREQUAL "DMQ_TRANSPORT_ZEROMQ")
7+
# vcpkg package manager
8+
# https://github.com/microsoft/vcpkg
9+
set_and_check(VCPKG_ROOT_DIR "${DMQ_ROOT_DIR}/../../../vcpkg")
10+
11+
if(CMAKE_SYSTEM_NAME STREQUAL "Windows")
12+
set_and_check(ZeroMQ_DIR "${VCPKG_ROOT_DIR}/installed/x64-windows/share/zeromq")
13+
elseif(CMAKE_SYSTEM_NAME STREQUAL "Linux")
14+
set_and_check(ZeroMQ_DIR "${VCPKG_ROOT_DIR}/installed/x64-linux-dynamic/share/zeromq")
15+
endif()
16+
find_package(ZeroMQ CONFIG REQUIRED)
17+
if (ZeroMQ_FOUND)
18+
message(STATUS "ZeroMQ found: ${ZeroMQ_VERSION}")
19+
else()
20+
message(FATAL_ERROR "ZeroMQ not found!")
21+
endif()
3422
endif()
35-
36-
# Set path to the MessagePack C++ library (msgpack.hpp)
23+
24+
# MessagePack C++ library (msgpack.hpp)
3725
# https://github.com/msgpack/msgpack-c/tree/cpp_master
38-
set(MSGPACK_INCLUDE_DIR "${DMQ_ROOT_DIR}/../../../msgpack-c/include")
39-
if(NOT EXISTS "${MSGPACK_INCLUDE_DIR}")
40-
message(FATAL_ERROR "${MSGPACK_INCLUDE_DIR} Directory does not exist. Update MSGPACK_INCLUDE_DIR to the correct directory.")
26+
if(DMQ_SERIALIZE STREQUAL "DMQ_SERIALIZE_MSGPACK")
27+
set_and_check(MSGPACK_INCLUDE_DIR "${DMQ_ROOT_DIR}/../../../msgpack-c/include")
4128
endif()
4229

43-
# Set path to the RapidJSON C++ library
30+
# RapidJSON C++ library
4431
# https://github.com/Tencent/rapidjson
45-
set(RAPIDJSON_INCLUDE_DIR "${DMQ_ROOT_DIR}/../../../rapidjson/include")
46-
if(NOT EXISTS "${RAPIDJSON_INCLUDE_DIR}")
47-
message(FATAL_ERROR "${RAPIDJSON_INCLUDE_DIR} Directory does not exist. Update RAPIDJSON_INCLUDE_DIR to the correct directory.")
32+
if(DMQ_SERIALIZE STREQUAL "DMQ_SERIALIZE_RAPIDJSON")
33+
set_and_check(RAPIDJSON_INCLUDE_DIR "${DMQ_ROOT_DIR}/../../../rapidjson/include")
4834
endif()
4935

50-
# Set path to the FreeRTOS library
36+
# FreeRTOS library
5137
# https://github.com/FreeRTOS/FreeRTOS
52-
set(FREERTOS_ROOT_DIR "${DMQ_ROOT_DIR}/../../../FreeRTOSv202212.00")
53-
if(NOT EXISTS "${FREERTOS_ROOT_DIR}")
54-
message(FATAL_ERROR "${FREERTOS_ROOT_DIR} Directory does not exist. Update FREERTOS_ROOT_DIR to the correct directory.")
55-
else()
38+
if(DMQ_THREAD STREQUAL "DMQ_THREAD_FREERTOS")
39+
set_and_check(FREERTOS_ROOT_DIR "${DMQ_ROOT_DIR}/../../../FreeRTOSv202212.00")
5640
# Collect FreeRTOS source files
5741
file(GLOB FREERTOS_SOURCES
5842
"${FREERTOS_ROOT_DIR}/FreeRTOS/Source/*.c"

DelegateMQ/Macros.cmake

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
# Function to copy .dll files from vcpkg bin directory to the build output directory
2-
function(copy_vcpkg_dlls VCPKG_ROOT_DIR DELEGATE_APP)
3-
# Get .dll and .pdb files in the vcpkg bin directory
4-
file(GLOB ZMQ_BIN_FILES "${VCPKG_ROOT_DIR}/bin/libzmq*.dll" "${VCPKG_ROOT_DIR}/bin/libzmq*.pdb")
5-
2+
function(copy_files _src_files _dest_dir)
63
# Copy each DLL file to the build output directory
7-
foreach(DLL_FILE ${ZMQ_BIN_FILES})
4+
foreach(FILE ${_src_files})
85
add_custom_command(TARGET ${DELEGATE_APP} POST_BUILD
96
COMMAND ${CMAKE_COMMAND} -E copy_if_different
10-
${DLL_FILE} # Copy the DLL file
11-
"${CMAKE_BINARY_DIR}/Debug" # Destination directory
12-
COMMENT "Copying ${DLL_FILE} to build output"
7+
${FILE} # Copy files
8+
${_dest_dir} # Destination directory
9+
COMMENT "Copying ${FILE} to build output"
1310
)
1411
endforeach()
1512
endfunction()
1613

1714

1815

16+
17+

DelegateMQ/delegate/IDispatcher.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
namespace dmq {
1010

11-
// Remote identifier shared between sender and receiver remotes
11+
// Remote identifier shared between sender and receiver remotes.
1212
typedef uint16_t DelegateRemoteId;
1313
const uint16_t INVALID_REMOTE_ID = -1;
14+
const uint16_t ACK_REMOTE_ID = 0;
1415

1516
/// @brief Delegate interface class to dispatch serialized function argument data
1617
/// to a remote destination. Implemented by the application if using remote delegates.

DelegateMQ/predef/dispatcher/Dispatcher.h

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,14 @@ class Dispatcher : public dmq::IDispatcher
3939
// Send argument data to the transport
4040
virtual int Dispatch(std::ostream& os, dmq::DelegateRemoteId id)
4141
{
42-
std::stringstream ss(std::ios::in | std::ios::out | std::ios::binary);
43-
44-
DmqHeader header(id, DmqHeader::GetNextSeqNum());
45-
46-
// Write each header value using the getters from DmqHeader
47-
auto marker = header.GetMarker();
48-
ss.write(reinterpret_cast<const char*>(&marker), sizeof(marker));
49-
50-
auto id_value = header.GetId();
51-
ss.write(reinterpret_cast<const char*>(&id_value), sizeof(id_value));
52-
53-
auto seqNum = header.GetSeqNum();
54-
ss.write(reinterpret_cast<const char*>(&seqNum), sizeof(seqNum));
55-
56-
// Insert delegate arguments from the stream (os)
57-
ss << os.rdbuf();
42+
std::ostringstream* ss = dynamic_cast<std::ostringstream*>(&os);
43+
if (!ss)
44+
return -1;
5845

5946
if (m_transport)
6047
{
61-
int err = m_transport->Send(ss);
48+
DmqHeader header(id, DmqHeader::GetNextSeqNum());
49+
int err = m_transport->Send(*ss, header);
6250
return err;
6351
}
6452
return -1;

DelegateMQ/predef/transport/ITransport.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ class ITransport
1010
public:
1111
/// Send data to a remote
1212
/// @param[in] os Output stream to send.
13+
/// @param[in] header The header to send.
1314
/// @return 0 if success.
14-
virtual int Send(std::stringstream& os) = 0;
15+
virtual int Send(std::ostringstream& os, const DmqHeader& header) = 0;
1516

1617
/// Receive data from a remote
1718
/// @param[out] header Incoming delegate message header.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#ifndef ITRANSPORT_MONITOR_H
2+
#define ITRANSPORT_MONITOR_H
3+
4+
//#include "DmqHeader.h"
5+
//#include <sstream>
6+
7+
/// @brief DelegateMQ transport monitor interface.
8+
class ITransportMonitor
9+
{
10+
public:
11+
/// Add a sequence number
12+
/// param[in] seqNum - the message sequence number
13+
/// param[in] remoteId - the remote ID
14+
virtual void Add(uint16_t seqNum, dmq::DelegateRemoteId remoteId) = 0;
15+
16+
/// Remove a sequence number
17+
/// param[in] seqNum - the message sequence number
18+
virtual void Remove(uint16_t seqNum) = 0;
19+
};
20+
21+
#endif

DelegateMQ/predef/transport/win32-pipe/Win32PipeTransport.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,24 +74,38 @@ class Win32PipeTransport : public ITransport
7474
m_hPipe = INVALID_HANDLE_VALUE;
7575
}
7676

77-
virtual int Send(std::stringstream& os) override
77+
virtual int Send(std::ostringstream& os, const DmqHeader& header) override
7878
{
79-
size_t length = os.str().length();
80-
if (os.bad() || os.fail() || length <= 0)
79+
if (os.bad() || os.fail())
8180
return -1;
8281

83-
size_t len = (size_t)os.tellp();
84-
char* sendBuf = (char*)malloc(len);
82+
std::ostringstream ss(std::ios::in | std::ios::out | std::ios::binary);
83+
84+
// Write each header value using the getters from DmqHeader
85+
auto marker = header.GetMarker();
86+
ss.write(reinterpret_cast<const char*>(&marker), sizeof(marker));
87+
88+
auto id = header.GetId();
89+
ss.write(reinterpret_cast<const char*>(&id), sizeof(id));
90+
91+
auto seqNum = header.GetSeqNum();
92+
ss.write(reinterpret_cast<const char*>(&seqNum), sizeof(seqNum));
93+
94+
// Insert delegate arguments from the stream (os)
95+
ss << os.rdbuf();
96+
97+
size_t length = ss.str().length();
98+
char* sendBuf = (char*)malloc(length);
8599

86100
// Copy char buffer into heap allocated memory
87-
os.rdbuf()->sgetn(sendBuf, len);
101+
ss.rdbuf()->sgetn(sendBuf, length);
88102

89103
// Send message through named pipe
90104
DWORD sentLen = 0;
91-
BOOL success = WriteFile(m_hPipe, sendBuf, (DWORD)len, &sentLen, NULL);
105+
BOOL success = WriteFile(m_hPipe, sendBuf, (DWORD)length, &sentLen, NULL);
92106
free(sendBuf);
93107

94-
if (!success || sentLen != len)
108+
if (!success || sentLen != length)
95109
return -1;
96110
return 0;
97111
}

0 commit comments

Comments
 (0)