Skip to content

Commit 5b4a914

Browse files
committed
fix thread safety when calling isClosed and add explanations
1 parent fca9000 commit 5b4a914

2 files changed

Lines changed: 22 additions & 4 deletions

File tree

lib/ClientConnection.cc

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,10 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
417417
Lock lock(mutex_);
418418
if (!err) {
419419
if (isClosed()) {
420-
ASIO_ERROR err;
421-
socket_->close(err);
420+
if (socket_) {
421+
ASIO_ERROR err;
422+
socket_->close(err);
423+
}
422424
connectPromise_.setFailed(ResultAlreadyClosed);
423425
LOG_INFO(cnxString_ << "Connection already closed");
424426
return;
@@ -674,9 +676,11 @@ void ClientConnection::readNextCommand() {
674676
}
675677

676678
void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t minReadSize) {
679+
Lock lock{mutex_};
677680
if (isClosed()) {
678681
return;
679682
}
683+
lock.unlock();
680684
// Update buffer write idx with new data
681685
incomingBuffer_.bytesWritten(bytesTransferred);
682686

@@ -1145,9 +1149,11 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
11451149
}
11461150

11471151
void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) {
1152+
Lock lock{mutex_};
11481153
if (isClosed()) {
11491154
return;
11501155
}
1156+
lock.unlock();
11511157
if (err) {
11521158
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
11531159
close(ResultDisconnected);
@@ -1269,9 +1275,11 @@ void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
12691275
}
12701276

12711277
void ClientConnection::handleKeepAliveTimeout() {
1278+
Lock lock{mutex_};
12721279
if (isClosed()) {
12731280
return;
12741281
}
1282+
lock.unlock();
12751283

12761284
if (havePendingPingRequest_) {
12771285
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
@@ -1325,8 +1333,9 @@ void ClientConnection::close(Result result, bool detach) {
13251333
} else {
13261334
// There is an ongoing connect operation, the socket will be closed after the operation succeeds
13271335
// in `handleTcpConnected`
1328-
LOG_WARN(cnxString_
1329-
<< "Socket is still connecting, it will be closed once the connection attempt finishes");
1336+
LOG_DEBUG(
1337+
cnxString_
1338+
<< "Socket is still connecting, it will be closed once the connection attempt finishes");
13301339
}
13311340
}
13321341
state_ = Disconnected;

lib/ClientConnection.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,15 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
441441
const std::string clientVersion_;
442442
ConnectionPool& pool_;
443443
const size_t poolIndex_;
444+
445+
// This field is used to check if there is any pending operation when closing the connection pool. It's
446+
// because the event loop thread, which runs `io_context::run()`, will be stopped immediately by
447+
// `io_context::stop()` after closing the pool. If the pending operation's callback handler is not called
448+
// before the event loop is stopped, it could crash because the destructor of `io_context` will destroy
449+
// internal task queues so that the asynchronous operation's callback handler could refer and access a
450+
// destroyed object.
451+
// This counter is added to track the pending operation count so that the pool can wait for a certain time
452+
// until all pending operations are completed when closing this connection.
444453
std::atomic_int pendingOperations_{0};
445454

446455
friend class PulsarFriend;

0 commit comments

Comments
 (0)