File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -410,7 +410,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
410410 // However, `close` could be called when a mutex is held, so we have to use a recursive mutex to avoid
411411 // deadlock.
412412 mutable std::recursive_mutex mutex_;
413- using Lock = std::lock_guard <std::recursive_mutex>;
413+ using Lock = std::unique_lock <std::recursive_mutex>;
414414
415415 // Pending buffers to write on the socket
416416 std::deque<std::any> pendingWriteBuffers_;
Original file line number Diff line number Diff line change @@ -63,14 +63,15 @@ bool ConnectionPool::close() {
6363 if (cnx) {
6464 // The 2nd argument is false because removing a value during the iteration will cause segfault
6565 cnx->close (ResultDisconnected, false );
66- for (int i = 0 ; i < 5000 && cnx->pendingOperations () > 0 ; i++) {
66+ for (int i = 0 ; i < clientConfiguration_.getConnectionTimeout () && cnx->pendingOperations () > 0 ;
67+ i++) {
6768 using namespace std ::chrono_literals;
6869 std::this_thread::sleep_for (1ms);
6970 }
7071 if (cnx->pendingOperations () > 0 ) {
71- LOG_WARN (
72- " Connection still has pending operations after waiting for 5 seconds, pending count: "
73- << cnx->pendingOperations ());
72+ LOG_WARN (" Connection still has pending operations after waiting for "
73+ << clientConfiguration_. getConnectionTimeout ()
74+ << " ms, pending count: " << cnx->pendingOperations ());
7475 }
7576 }
7677 }
You can’t perform that action at this time.
0 commit comments