Skip to content

Commit 9f99e82

Browse files
committed
wait for pending operations gracefully
1 parent 6d9be17 commit 9f99e82

3 files changed

Lines changed: 75 additions & 25 deletions

File tree

lib/ClientConnection.cc

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,18 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep
411411
* at this point the connection is deemed valid to be used by clients of this class
412412
*/
413413
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
414+
if (connectTimeoutTask_) {
415+
connectTimeoutTask_->stop();
416+
}
417+
Lock lock(mutex_);
414418
if (!err) {
419+
if (isClosed()) {
420+
ASIO_ERROR err;
421+
socket_->close(err);
422+
connectPromise_.setFailed(ResultAlreadyClosed);
423+
LOG_INFO(cnxString_ << "Connection already closed");
424+
return;
425+
}
415426
std::stringstream cnxStringStream;
416427
try {
417428
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
@@ -430,11 +441,6 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
430441
<< ", physical address:" << physicalAddress_);
431442
}
432443

433-
Lock lock(mutex_);
434-
if (isClosed()) {
435-
LOG_INFO(cnxString_ << "Connection already closed");
436-
return;
437-
}
438444
state_ = TcpConnected;
439445
lock.unlock();
440446

@@ -484,10 +490,12 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
484490
auto tlsSocket = tlsSocket_;
485491
// socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
486492
// fault might happen
487-
auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
493+
pendingOperations_++;
494+
auto callback = [this, weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
488495
auto self = weakSelf.lock();
489496
if (self) {
490-
self->handleHandshake(err);
497+
handleHandshake(err);
498+
pendingOperations_--;
491499
}
492500
};
493501
tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
@@ -496,6 +504,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
496504
handleHandshake(ASIO_SUCCESS);
497505
}
498506
} else {
507+
lock.unlock();
499508
LOG_ERROR(cnxString_ << "Failed to establish connection to " << endpoint << ": " << err.message());
500509
if (err == ASIO::error::operation_aborted) {
501510
close();
@@ -596,11 +605,13 @@ void ClientConnection::tcpConnectAsync() {
596605
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
597606

598607
auto weakSelf = weak_from_this();
608+
pendingOperations_++;
599609
resolver_->async_resolve(service_url.host(), std::to_string(service_url.port()),
600-
[weakSelf](auto err, const auto& results) {
610+
[this, weakSelf](auto err, const auto& results) {
601611
auto self = weakSelf.lock();
602612
if (self) {
603-
self->handleResolve(err, results);
613+
handleResolve(err, results);
614+
pendingOperations_--;
604615
}
605616
});
606617
}
@@ -642,12 +653,15 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result
642653
ptr->connectTimeoutTask_->stop();
643654
});
644655
connectTimeoutTask_->start();
645-
ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
646-
auto self = weakSelf.lock();
647-
if (self) {
648-
self->handleTcpConnected(err, endpoint);
649-
}
650-
});
656+
pendingOperations_++;
657+
ASIO::async_connect(*socket_, results,
658+
[this, weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
659+
auto self = weakSelf.lock();
660+
if (self) {
661+
handleTcpConnected(err, endpoint);
662+
pendingOperations_--;
663+
}
664+
});
651665
}
652666

653667
void ClientConnection::readNextCommand() {
@@ -1079,6 +1093,7 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) {
10791093

10801094
if (pendingWriteOperations_++ == 0) {
10811095
// Write immediately to socket
1096+
lock.unlock();
10821097
if (tlsSocket_) {
10831098
auto weakSelf = weak_from_this();
10841099
auto callback = [weakSelf, cmd]() {
@@ -1162,6 +1177,7 @@ void ClientConnection::sendPendingCommands() {
11621177
pendingWriteBuffers_.pop_front();
11631178

11641179
auto self = shared_from_this();
1180+
lock.unlock();
11651181
if (any.type() == typeid(SharedBuffer)) {
11661182
SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
11671183
asyncWrite(buffer.const_asio_buffer(),
@@ -1297,16 +1313,19 @@ void ClientConnection::close(Result result, bool detach) {
12971313
if (isClosed()) {
12981314
return;
12991315
}
1300-
state_ = Disconnected;
13011316

13021317
if (socket_) {
13031318
ASIO_ERROR err;
1304-
socket_->shutdown(ASIO::socket_base::shutdown_both, err);
1305-
socket_->close(err);
1306-
if (err) {
1307-
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
1308-
}
1319+
if (state_ != Pending) {
1320+
socket_->shutdown(ASIO::socket_base::shutdown_both, err);
1321+
socket_->close(err);
1322+
if (err) {
1323+
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
1324+
}
1325+
} // else: there is an ongoing connect operation, the socket will be closed after the operation
1326+
// succeeds in `handleTcpConnected`
13091327
}
1328+
state_ = Disconnected;
13101329
if (tlsSocket_) {
13111330
ASIO_ERROR err;
13121331
tlsSocket_->lowest_layer().close(err);

lib/ClientConnection.h

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
221221

222222
void handleKeepAliveTimeout();
223223

224+
auto pendingOperations() const noexcept { return pendingOperations_.load(std::memory_order_acquire); }
225+
224226
private:
225227
struct PendingRequestData {
226228
Promise<Result, ResponseData> promise;
@@ -298,25 +300,41 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
298300

299301
template <typename ConstBufferSequence, typename WriteHandler>
300302
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler handler) {
303+
Lock lock{mutex_};
301304
if (isClosed()) {
302305
return;
303306
}
307+
lock.unlock();
308+
pendingOperations_++;
309+
auto wrappedHandler = customAllocWriteHandler(
310+
[this, handler = std::move(handler)](const ASIO_ERROR& err, size_t bytesTransferred) mutable {
311+
handler(err, bytesTransferred);
312+
pendingOperations_--;
313+
});
304314
if (tlsSocket_) {
305-
ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler));
315+
ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, wrappedHandler));
306316
} else {
307-
ASIO::async_write(*socket_, buffers, handler);
317+
ASIO::async_write(*socket_, buffers, wrappedHandler);
308318
}
309319
}
310320

311321
template <typename MutableBufferSequence, typename ReadHandler>
312322
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler handler) {
323+
Lock lock{mutex_};
313324
if (isClosed()) {
314325
return;
315326
}
327+
lock.unlock();
328+
pendingOperations_++;
329+
auto wrappedHandler = customAllocReadHandler(
330+
[this, handler = std::move(handler)](const ASIO_ERROR& err, size_t bytesTransferred) mutable {
331+
handler(err, bytesTransferred);
332+
pendingOperations_--;
333+
});
316334
if (tlsSocket_) {
317-
tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, handler));
335+
tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, wrappedHandler));
318336
} else {
319-
socket_->async_receive(buffers, handler);
337+
socket_->async_receive(buffers, wrappedHandler);
320338
}
321339
}
322340

@@ -419,6 +437,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
419437
const std::string clientVersion_;
420438
ConnectionPool& pool_;
421439
const size_t poolIndex_;
440+
std::atomic_int pendingOperations_{0};
422441

423442
friend class PulsarFriend;
424443
friend class ConsumerTest;

lib/ConnectionPool.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "ConnectionPool.h"
2020

21+
#include <thread>
22+
2123
#ifdef USE_ASIO
2224
#include <asio/ip/tcp.hpp>
2325
#include <asio/ssl.hpp>
@@ -61,8 +63,18 @@ bool ConnectionPool::close() {
6163
if (cnx) {
6264
// The 2nd argument is false because removing a value during the iteration will cause segfault
6365
cnx->close(ResultDisconnected, false);
66+
for (int i = 0; i < 5000 && cnx->pendingOperations() > 0; i++) {
67+
using namespace std::chrono_literals;
68+
std::this_thread::sleep_for(1ms);
69+
}
70+
if (cnx->pendingOperations() > 0) {
71+
LOG_WARN(
72+
"Connection still has pending operations after waiting for 5 seconds, pending count: "
73+
<< cnx->pendingOperations());
74+
}
6475
}
6576
}
77+
6678
pool_.clear();
6779
return true;
6880
}

0 commit comments

Comments
 (0)