Skip to content

Commit eafe1c7

Browse files
committed
Reimplement libboostasio
Provide better implementation for Boost.ASIO library support. New implementation have the following advantages over existing one: * all OS supported * TLS/SSL is supported natively on all OS using Boost.ASIO * Boost.ASIO multithreading support is safe and correct (see examples/libboostasio_multithreading.cpp)
1 parent 32faf95 commit eafe1c7

6 files changed

Lines changed: 727 additions & 539 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC
131131
$<INSTALL_INTERFACE:include/>
132132
)
133133

134-
if(AMQP-CPP_LINUX_TCP)
134+
if(AMQP-CPP_LINUX_TCP OR AMQP-CPP_BUILD_EXAMPLES)
135135
target_link_libraries(${PROJECT_NAME} ${CMAKE_DL_LIBS})
136136
# Find OpenSSL and provide include dirs
137137
find_package(OpenSSL REQUIRED)

examples/CMakeLists.txt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,24 @@ add_executable(amqpcpp_boost_example libboostasio.cpp)
88

99
add_dependencies(amqpcpp_boost_example amqpcpp)
1010

11-
target_link_libraries(amqpcpp_boost_example amqpcpp boost_system pthread dl ssl)
11+
target_link_libraries(amqpcpp_boost_example amqpcpp boost_system pthread dl)
1212

13+
14+
add_executable(amqpcpp_boost_ssl_example libboostasio_ssl.cpp)
15+
16+
add_dependencies(amqpcpp_boost_ssl_example amqpcpp)
17+
18+
target_link_libraries(amqpcpp_boost_ssl_example amqpcpp boost_system pthread dl ssl crypto)
19+
20+
21+
add_executable(amqpcpp_boost_multithreading_example libboostasio_multithreading.cpp)
22+
23+
add_dependencies(amqpcpp_boost_multithreading_example amqpcpp)
24+
25+
target_link_libraries(amqpcpp_boost_multithreading_example amqpcpp boost_system pthread dl)
26+
27+
28+
if(AMQP-CPP_LINUX_TCP)
1329
###################################
1430
# Libev
1531
###################################
@@ -30,3 +46,4 @@ add_executable(amqpcpp_libuv_example libuv.cpp)
3046
add_dependencies(amqpcpp_libuv_example amqpcpp)
3147

3248
target_link_libraries(amqpcpp_libuv_example amqpcpp uv pthread dl ssl)
49+
endif(AMQP-CPP_LINUX_TCP)

examples/libboostasio.cpp

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/**
22
* LibBoostAsio.cpp
3-
*
3+
*
44
* Test program to check AMQP functionality based on Boost's asio io_service.
5-
*
5+
*
66
* @author Gavin Smith <gavin.smith@coralbay.tv>
77
*
88
* Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
@@ -11,10 +11,10 @@
1111
/**
1212
* Dependencies
1313
*/
14-
#include <boost/asio/io_service.hpp>
15-
#include <boost/asio/strand.hpp>
16-
#include <boost/asio/deadline_timer.hpp>
17-
14+
#include <boost/asio/io_context.hpp>
15+
#include <boost/asio/ip/tcp.hpp>
16+
#include <boost/asio/connect.hpp>
17+
#include <boost/asio/signal_set.hpp>
1818

1919
#include <amqpcpp.h>
2020
#include <amqpcpp/libboostasio.h>
@@ -25,32 +25,41 @@
2525
*/
2626
int main()
2727
{
28+
boost::asio::io_context io_context;
29+
30+
boost::asio::signal_set signal_set{io_context, SIGINT, SIGTERM};
31+
32+
signal_set.async_wait([&io_context] (const boost::system::error_code& error, int signal_number) {
33+
std::cerr << "Got signal " << signal_number << ", terminating..." << std::endl;
34+
35+
io_context.stop();
36+
});
2837

29-
// access to the boost asio handler
30-
// note: we suggest use of 2 threads - normally one is fin (we are simply demonstrating thread safety).
31-
boost::asio::io_service service(4);
38+
const AMQP::Address address("amqp://guest:guest@localhost/");
39+
40+
boost::asio::ip::tcp::resolver resolver(io_context);
41+
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(address.hostname(), address.secure() ? "amqps" : "amqp");
42+
43+
boost::asio::ip::tcp::socket socket(io_context);
44+
boost::asio::connect(socket, endpoints);
3245

33-
// handler for libev
34-
AMQP::LibBoostAsioHandler handler(service);
35-
3646
// make a connection
37-
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/"));
38-
47+
AMQP::LibBoostAsioConnection connection(std::move(socket), address.login(), address.vhost());
48+
3949
// we need a channel too
40-
AMQP::TcpChannel channel(&connection);
41-
50+
AMQP::LibBoostAsioChannel channel(&connection);
51+
4252
// create a temporary queue
4353
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
44-
54+
4555
// report the name of the temporary queue
4656
std::cout << "declared queue " << name << std::endl;
47-
57+
4858
// now we can close the connection
4959
connection.close();
5060
});
51-
61+
5262
// run the handler
53-
// a t the moment, one will need SIGINT to stop. In time, should add signal handling through boost API.
54-
return service.run();
63+
return io_context.run();
5564
}
5665

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* LibBoostAsio.cpp
3+
*
4+
* Test program to check AMQP functionality based on Boost's asio io_service.
5+
*
6+
* @author Gavin Smith <gavin.smith@coralbay.tv>
7+
*
8+
* Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
9+
*/
10+
11+
/**
12+
* Dependencies
13+
*/
14+
15+
#include <array>
16+
#include <chrono>
17+
#include <thread>
18+
19+
#include <boost/asio/io_context.hpp>
20+
#include <boost/asio/ip/tcp.hpp>
21+
#include <boost/asio/connect.hpp>
22+
#include <boost/asio/signal_set.hpp>
23+
24+
#include <amqpcpp.h>
25+
#include <amqpcpp/libboostasio.h>
26+
27+
28+
/**
29+
* Main program
30+
* @return int
31+
*/
32+
int main()
33+
{
34+
using namespace std::chrono_literals;
35+
36+
boost::asio::io_context io_context;
37+
38+
boost::asio::signal_set signal_set{io_context, SIGINT, SIGTERM};
39+
40+
signal_set.async_wait([&io_context] (const boost::system::error_code& error, int signal_number) {
41+
std::cerr << "Got signal " << signal_number << ", terminating..." << std::endl;
42+
43+
io_context.stop();
44+
});
45+
46+
boost::asio::strand<boost::asio::io_context::executor_type> strand(io_context.get_executor());
47+
48+
const AMQP::Address address("amqp://guest:guest@localhost/");
49+
50+
boost::asio::ip::tcp::resolver resolver(io_context);
51+
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(address.hostname(), address.secure() ? "amqps" : "amqp");
52+
53+
boost::asio::ip::tcp::socket socket(strand);
54+
boost::asio::connect(socket, endpoints);
55+
56+
// make a connection
57+
AMQP::LibBoostAsioConnection connection(std::move(socket), address.login(), address.vhost());
58+
59+
std::array<AMQP::LibBoostAsioChannel, 4> channels {{
60+
AMQP::LibBoostAsioChannel{&connection},
61+
AMQP::LibBoostAsioChannel{&connection},
62+
AMQP::LibBoostAsioChannel{&connection},
63+
AMQP::LibBoostAsioChannel{&connection}
64+
}};
65+
66+
// create a temporary queue
67+
channels[0].declareQueue(AMQP::exclusive).onSuccess([&io_context, &strand, &connection, &channels](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
68+
// report the name of the temporary queue
69+
std::cout << "declared queue " << name << std::endl;
70+
71+
// fill the queue
72+
for (std::size_t i = 0; i < 100; ++i) {
73+
channels[0].publish("", name, std::to_string(i).c_str());
74+
}
75+
76+
for (auto& channel: channels) {
77+
channel.setQos(1).onSuccess([&io_context, &strand, &channel, name] () {
78+
channel.consume(name).onReceived([&io_context, &strand, &channel] (const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
79+
std::cout << "delivery tag " << deliveryTag << " by " << channel.id() << " body " << std::string(message.body(), message.bodySize()) << std::endl;
80+
81+
boost::asio::post(io_context, [&channel, &strand, deliveryTag] () {
82+
std::this_thread::sleep_for(1s);
83+
84+
boost::asio::post(strand, [&channel, deliveryTag] () {
85+
channel.ack(deliveryTag);
86+
87+
std::cout << "ack " << deliveryTag << " by " << channel.id() << std::endl;
88+
});
89+
});
90+
});
91+
});
92+
}
93+
});
94+
95+
const auto tf = [&io_context] () {
96+
io_context.run();
97+
};
98+
std::array<std::thread, 4> pool {{
99+
std::thread{tf},
100+
std::thread{tf},
101+
std::thread{tf},
102+
std::thread{tf}
103+
}};
104+
105+
for (auto& thread: pool) {
106+
thread.join();
107+
}
108+
109+
return 0;
110+
}
111+

examples/libboostasio_ssl.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* LibBoostAsio.cpp
3+
*
4+
* Test program to check AMQP functionality based on Boost's asio io_service.
5+
*
6+
* @author Gavin Smith <gavin.smith@coralbay.tv>
7+
*
8+
* Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp
9+
*/
10+
11+
/**
12+
* Dependencies
13+
*/
14+
#include <boost/asio/io_context.hpp>
15+
#include <boost/asio/ip/tcp.hpp>
16+
#include <boost/asio/connect.hpp>
17+
#include <boost/asio/signal_set.hpp>
18+
#include <boost/asio/ssl.hpp>
19+
20+
#include <amqpcpp.h>
21+
#include <amqpcpp/libboostasio.h>
22+
23+
/**
24+
* Main program
25+
* @return int
26+
*/
27+
int main()
28+
{
29+
boost::asio::io_context io_context;
30+
31+
boost::asio::signal_set signal_set{io_context, SIGINT, SIGTERM};
32+
33+
signal_set.async_wait([&io_context] (const boost::system::error_code& error, int signal_number) {
34+
std::cerr << "Got signal " << signal_number << ", terminating..." << std::endl;
35+
36+
io_context.stop();
37+
});
38+
39+
const AMQP::Address address("amqps://guest:guest@localhost/");
40+
41+
boost::asio::ip::tcp::resolver resolver(io_context);
42+
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(address.hostname(), address.secure() ? "amqps" : "amqp");
43+
44+
boost::asio::ssl::context ssl_context(boost::asio::ssl::context::sslv23);
45+
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> ssl_socket(io_context, ssl_context);
46+
boost::asio::connect(ssl_socket.lowest_layer(), endpoints);
47+
48+
ssl_socket.set_verify_mode(boost::asio::ssl::verify_none);
49+
ssl_socket.handshake(ssl_socket.client);
50+
51+
// make a connection
52+
AMQP::LibBoostAsioConnection connection(std::move(ssl_socket), address.login(), address.vhost());
53+
54+
// we need a channel too
55+
AMQP::LibBoostAsioChannel channel(&connection);
56+
57+
// create a temporary queue
58+
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
59+
60+
// report the name of the temporary queue
61+
std::cout << "declared queue " << name << std::endl;
62+
63+
// now we can close the connection
64+
connection.close();
65+
});
66+
67+
// run the handler
68+
// at the moment, one will need SIGINT to stop. In time, should add signal handling through boost API.
69+
return io_context.run();
70+
}
71+

0 commit comments

Comments
 (0)