Skip to content

Commit c97312d

Browse files
committed
simplify tests and fix thread safety when closing socket
1 parent 505417d commit c97312d

1 file changed

Lines changed: 30 additions & 41 deletions

File tree

tests/ClientTest.cc

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
#include <pulsar/Version.h>
2222

2323
#include <algorithm>
24+
#include <atomic>
2425
#include <chrono>
25-
#include <condition_variable>
2626
#include <future>
27-
#include <mutex>
2827
#include <sstream>
2928
#include <thread>
29+
#include <utility>
3030

3131
#include "MockClientImpl.h"
3232
#include "PulsarAdminHelper.h"
@@ -52,70 +52,59 @@ class SilentTcpServer {
5252
public:
5353
SilentTcpServer()
5454
: acceptor_(ioContext_, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), 0)),
55-
acceptedFuture_(acceptedPromise_.get_future()) {}
55+
acceptedFuture_(acceptedPromise_.get_future()),
56+
port_(acceptor_.local_endpoint().port()),
57+
workGuard_(ASIO::make_work_guard(ioContext_)) {}
5658

5759
~SilentTcpServer() { stop(); }
5860

59-
int getPort() const { return acceptor_.local_endpoint().port(); }
61+
int getPort() const noexcept { return port_; }
6062

6163
void start() {
6264
serverThread_ = std::thread([this] {
6365
socket_.reset(new ASIO::ip::tcp::socket(ioContext_));
66+
acceptor_.async_accept(
67+
*socket_, [this](const ASIO_ERROR &acceptError) { acceptedPromise_.set_value(acceptError); });
6468

65-
ASIO_ERROR acceptError;
66-
acceptor_.accept(*socket_, acceptError);
67-
acceptedPromise_.set_value(acceptError);
68-
69-
std::unique_lock<std::mutex> lock(mutex_);
70-
cond_.wait(lock, [this] { return stopped_; });
71-
lock.unlock();
72-
73-
if (socket_) {
74-
ASIO_ERROR closeError;
75-
socket_->close(closeError);
76-
}
77-
78-
ASIO_ERROR closeError;
79-
acceptor_.close(closeError);
69+
ioContext_.run();
8070
});
8171
}
8272

8373
bool waitUntilAccepted(std::chrono::milliseconds timeout) const {
8474
return acceptedFuture_.wait_for(timeout) == std::future_status::ready;
8575
}
8676

87-
ASIO_ERROR acceptedError() const { return acceptedFuture_.get(); }
77+
auto acceptedError() const { return acceptedFuture_.get(); }
8878

8979
void stop() {
90-
{
91-
std::lock_guard<std::mutex> lock(mutex_);
92-
if (stopped_) {
93-
return;
94-
}
95-
stopped_ = true;
96-
}
97-
98-
ASIO_ERROR closeError;
99-
acceptor_.close(closeError);
100-
if (socket_) {
101-
socket_->close(closeError);
102-
}
103-
104-
cond_.notify_all();
105-
if (serverThread_.joinable()) {
106-
serverThread_.join();
80+
bool expected = false;
81+
if (!stopped_.compare_exchange_strong(expected, true) || !serverThread_.joinable()) {
82+
return;
10783
}
84+
ASIO::post(ioContext_, [this] {
85+
ASIO_ERROR closeError;
86+
if (socket_ && socket_->is_open()) {
87+
socket_->close(closeError);
88+
}
89+
if (acceptor_.is_open()) {
90+
acceptor_.close(closeError);
91+
}
92+
workGuard_.reset();
93+
});
94+
serverThread_.join();
10895
}
10996

11097
private:
98+
using WorkGuard = decltype(ASIO::make_work_guard(std::declval<ASIO::io_context &>()));
99+
111100
ASIO::io_context ioContext_;
112101
ASIO::ip::tcp::acceptor acceptor_;
113-
std::shared_ptr<ASIO::ip::tcp::socket> socket_;
102+
std::unique_ptr<ASIO::ip::tcp::socket> socket_;
114103
std::promise<ASIO_ERROR> acceptedPromise_;
115104
std::shared_future<ASIO_ERROR> acceptedFuture_;
116-
std::mutex mutex_;
117-
std::condition_variable cond_;
118-
bool stopped_{false};
105+
const int port_;
106+
WorkGuard workGuard_;
107+
std::atomic_bool stopped_{false};
119108
std::thread serverThread_;
120109
};
121110

0 commit comments

Comments
 (0)