Skip to content

Commit f5666d8

Browse files
committed
Merge branch 'main' into bewaremypower/service-url-provider
2 parents 5285efd + e80f65e commit f5666d8

16 files changed

Lines changed: 568 additions & 347 deletions

.github/workflows/ci-pr-validation.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,15 @@ jobs:
260260
Pop-Location
261261
}
262262
263+
- name: Ensure vcpkg has full history(windows)
264+
if: runner.os == 'Windows'
265+
shell: pwsh
266+
run: |
267+
$isShallow = (git -C "${{ env.VCPKG_ROOT }}" rev-parse --is-shallow-repository).Trim()
268+
if ($isShallow -eq "true") {
269+
git -C "${{ env.VCPKG_ROOT }}" fetch --unshallow
270+
}
271+
263272
- name: remove system vcpkg(windows)
264273
if: runner.os == 'Windows'
265274
run: rm -rf "$VCPKG_INSTALLATION_ROOT"

lib/ClientConnection.cc

Lines changed: 247 additions & 288 deletions
Large diffs are not rendered by default.

lib/ClientConnection.h

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include <any>
2727
#include <atomic>
2828
#include <cstdint>
29+
#include <future>
30+
#include <optional>
2931
#ifdef USE_ASIO
3032
#include <asio/bind_executor.hpp>
3133
#include <asio/io_context.hpp>
@@ -42,8 +44,10 @@
4244
#include <deque>
4345
#include <functional>
4446
#include <memory>
47+
#include <mutex>
4548
#include <string>
4649
#include <unordered_map>
50+
#include <utility>
4751
#include <vector>
4852

4953
#include "AsioTimer.h"
@@ -157,14 +161,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
157161
* Close the connection.
158162
*
159163
* @param result all pending futures will complete with this result
160-
* @param detach remove it from the pool if it's true. When false, the connection remains
161-
* associated with the pool but is logically closed; this is currently used when the
162-
* pool itself is being closed or when switching clusters.
163164
* @param switchCluster whether the close is triggered by cluster switching
164-
*
165-
* `detach` should only be false when the connection pool is closed.
166165
*/
167-
void close(Result result = ResultConnectError, bool detach = true, bool switchCluster = false);
166+
const std::future<void>& close(Result result = ResultConnectError, bool switchCluster = false);
168167

169168
bool isClosed() const;
170169

@@ -197,7 +196,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
197196

198197
const std::string& brokerAddress() const;
199198

200-
const std::string& cnxString() const;
199+
auto cnxString() const { return *std::atomic_load(&cnxStringPtr_); }
201200

202201
int getServerProtocolVersion() const;
203202

@@ -223,28 +222,48 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
223222
mockingRequests_.store(true, std::memory_order_release);
224223
}
225224

226-
void handleKeepAliveTimeout();
225+
void handleKeepAliveTimeout(const ASIO_ERROR& ec);
227226

228227
private:
229228
struct PendingRequestData {
230229
Promise<Result, ResponseData> promise;
231230
DeadlineTimerPtr timer;
232231
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};
232+
233+
void fail(Result result) {
234+
cancelTimer(*timer);
235+
promise.setFailed(result);
236+
}
233237
};
234238

235239
struct LookupRequestData {
236240
LookupDataResultPromisePtr promise;
237241
DeadlineTimerPtr timer;
242+
243+
void fail(Result result) {
244+
cancelTimer(*timer);
245+
promise->setFailed(result);
246+
}
238247
};
239248

240249
struct LastMessageIdRequestData {
241250
GetLastMessageIdResponsePromisePtr promise;
242251
DeadlineTimerPtr timer;
252+
253+
void fail(Result result) {
254+
cancelTimer(*timer);
255+
promise->setFailed(result);
256+
}
243257
};
244258

245259
struct GetSchemaRequest {
246260
Promise<Result, SchemaInfo> promise;
247261
DeadlineTimerPtr timer;
262+
263+
void fail(Result result) {
264+
cancelTimer(*timer);
265+
promise.setFailed(result);
266+
}
248267
};
249268

250269
/*
@@ -301,26 +320,26 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
301320
}
302321

303322
template <typename ConstBufferSequence, typename WriteHandler>
304-
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler handler) {
323+
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler&& handler) {
305324
if (isClosed()) {
306325
return;
307326
}
308327
if (tlsSocket_) {
309-
ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler));
328+
ASIO::async_write(*tlsSocket_, buffers, std::forward<WriteHandler>(handler));
310329
} else {
311-
ASIO::async_write(*socket_, buffers, handler);
330+
ASIO::async_write(*socket_, buffers, std::forward<WriteHandler>(handler));
312331
}
313332
}
314333

315334
template <typename MutableBufferSequence, typename ReadHandler>
316-
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler handler) {
335+
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler&& handler) {
317336
if (isClosed()) {
318337
return;
319338
}
320339
if (tlsSocket_) {
321-
tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, handler));
340+
tlsSocket_->async_read_some(buffers, std::forward<ReadHandler>(handler));
322341
} else {
323-
socket_->async_receive(buffers, handler);
342+
socket_->async_receive(buffers, std::forward<ReadHandler>(handler));
324343
}
325344
}
326345

@@ -341,7 +360,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
341360
*/
342361
SocketPtr socket_;
343362
TlsSocketPtr tlsSocket_;
344-
ASIO::strand<ASIO::io_context::executor_type> strand_;
345363

346364
const std::string logicalAddress_;
347365
/*
@@ -354,7 +372,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
354372
ClientConfiguration::ProxyProtocol proxyProtocol_;
355373

356374
// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
357-
std::string cnxString_;
375+
std::shared_ptr<std::string> cnxStringPtr_;
358376

359377
/*
360378
* indicates if async connection establishment failed
@@ -364,7 +382,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
364382
SharedBuffer incomingBuffer_;
365383

366384
Promise<Result, ClientConnectionWeakPtr> connectPromise_;
367-
std::shared_ptr<PeriodicTask> connectTimeoutTask_;
385+
const std::chrono::milliseconds connectTimeout_;
386+
const DeadlineTimerPtr connectTimer_;
368387

369388
typedef std::map<long, PendingRequestData> PendingRequestsMap;
370389
PendingRequestsMap pendingRequests_;
@@ -423,6 +442,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
423442
const std::string clientVersion_;
424443
ConnectionPool& pool_;
425444
const size_t poolIndex_;
445+
std::optional<std::future<void>> closeFuture_;
426446

427447
friend class PulsarFriend;
428448
friend class ConsumerTest;

lib/Commands.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,9 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
902902
batchPayload.write(payload.data(), payload.readableBytes());
903903
}
904904

905-
return messages.back().impl_->metadata.sequence_id();
905+
// Use the first message's sequence_id so that ackReceived can compute
906+
// lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly.
907+
return messages.front().impl_->metadata.sequence_id();
906908
}
907909

908910
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,

lib/ConnectionPool.cc

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,43 @@ bool ConnectionPool::close() {
5555
return false;
5656
}
5757

58+
std::vector<ClientConnectionPtr> connectionsToClose;
59+
// ClientConnection::close() will remove the connection from the pool, which is not allowed when iterating
60+
// over a map, so we store the connections to close in a vector first and don't iterate the pool when
61+
// closing the connections.
5862
std::unique_lock<std::recursive_mutex> lock(mutex_);
63+
connectionsToClose.reserve(pool_.size());
64+
for (auto&& kv : pool_) {
65+
connectionsToClose.emplace_back(kv.second);
66+
}
67+
pool_.clear();
68+
lock.unlock();
5969

60-
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
61-
auto& cnx = cnxIt->second;
70+
for (auto&& cnx : connectionsToClose) {
6271
if (cnx) {
63-
// The 2nd argument is false because removing a value during the iteration will cause segfault
64-
cnx->close(ResultDisconnected, false);
72+
// Close with a fatal error to not let client retry
73+
auto& future = cnx->close(ResultAlreadyClosed);
74+
using namespace std::chrono_literals;
75+
if (auto status = future.wait_for(5s); status != std::future_status::ready) {
76+
LOG_WARN("Connection close timed out for " << cnx.get()->cnxString());
77+
}
78+
if (cnx.use_count() > 1) {
79+
// There are some asynchronous operations that hold the reference on the connection, we should
80+
// wait until them to finish. Otherwise, `io_context::stop()` will be called in
81+
// `ClientImpl::shutdown()` when closing the `ExecutorServiceProvider`. Then
82+
// `io_context::run()` will return and the `io_context` object will be destroyed. In this
83+
// case, if there is any pending handler, it will crash.
84+
for (int i = 0; i < 500 && cnx.use_count() > 1; i++) {
85+
std::this_thread::sleep_for(10ms);
86+
}
87+
if (cnx.use_count() > 1) {
88+
LOG_WARN("Connection still has " << (cnx.use_count() - 1)
89+
<< " references after waiting for 5 seconds for "
90+
<< cnx.get()->cnxString());
91+
}
92+
}
6593
}
6694
}
67-
pool_.clear();
6895
return true;
6996
}
7097

@@ -73,7 +100,7 @@ void ConnectionPool::closeAllConnectionsForNewCluster() {
73100
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
74101
auto& cnx = cnxIt->second;
75102
if (cnx) {
76-
cnx->close(ResultDisconnected, false, true);
103+
cnx->close(ResultDisconnected, true);
77104
}
78105
}
79106
pool_.clear();

lib/ExecutorService.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,6 @@ void ExecutorService::close(long timeoutMs) {
125125
}
126126
}
127127

128-
void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, std::move(task)); }
129-
130128
/////////////////////
131129

132130
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)

lib/ExecutorService.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323

2424
#include <atomic>
2525
#ifdef USE_ASIO
26+
#include <asio/dispatch.hpp>
2627
#include <asio/io_context.hpp>
2728
#include <asio/ip/tcp.hpp>
29+
#include <asio/post.hpp>
2830
#include <asio/ssl.hpp>
2931
#else
32+
#include <boost/asio/dispatch.hpp>
3033
#include <boost/asio/io_context.hpp>
3134
#include <boost/asio/ip/tcp.hpp>
35+
#include <boost/asio/post.hpp>
3236
#include <boost/asio/ssl.hpp>
3337
#endif
3438
#include <chrono>
@@ -37,6 +41,7 @@
3741
#include <memory>
3842
#include <mutex>
3943
#include <thread>
44+
#include <utility>
4045

4146
#include "AsioTimer.h"
4247

@@ -62,7 +67,19 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
6267
TcpResolverPtr createTcpResolver();
6368
// throws std::runtime_error if failed
6469
DeadlineTimerPtr createDeadlineTimer();
65-
void postWork(std::function<void(void)> task);
70+
71+
// Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop
72+
// queue and executed later.
73+
template <typename T>
74+
void postWork(T &&task) {
75+
ASIO::post(io_context_, std::forward<T>(task));
76+
}
77+
78+
// Different from `postWork`, if it's already in the event loop, execute the task immediately
79+
template <typename T>
80+
void dispatch(T &&task) {
81+
ASIO::dispatch(io_context_, std::forward<T>(task));
82+
}
6683

6784
// See TimeoutProcessor for the semantics of the parameter.
6885
void close(long timeoutMs = 3000);

lib/PeriodicTask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
5353

5454
void stop() noexcept;
5555

56-
void setCallback(CallbackType callback) noexcept { callback_ = callback; }
56+
void setCallback(CallbackType&& callback) noexcept { callback_ = std::move(callback); }
5757

5858
State getState() const noexcept { return state_; }
5959
int getPeriodMs() const noexcept { return periodMs_; }

lib/ProducerImpl.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -933,19 +933,24 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
933933
return false;
934934
}
935935

936-
uint64_t expectedSequenceId = op.sendArgs->sequenceId;
937-
if (sequenceId > expectedSequenceId) {
938-
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
939-
<< " expecting: " << expectedSequenceId << " queue size=" //
940-
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
936+
const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId;
937+
const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1;
938+
// Broker may ack with either the first or the last sequence id of the batch.
939+
if (sequenceId > expectedLastSequenceId) {
940+
LOG_WARN(getName() << "Got ack for msg " << sequenceId
941+
<< " expecting last: " << expectedLastSequenceId
942+
<< " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_);
941943
return false;
942-
} else if (sequenceId < expectedSequenceId) {
944+
}
945+
if (sequenceId < expectedFirstSequenceId) {
943946
// Ignoring the ack since it's referring to a message that has already timed out.
944-
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
945-
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
946-
<< " producer: " << producerId_);
947+
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " -- MessageId - " << messageId
948+
<< " first-seq: " << expectedFirstSequenceId << " producer: " << producerId_);
947949
return true;
948950
}
951+
// sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; accept as matching this op.
952+
const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId);
953+
lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : sequenceId;
949954

950955
// Message was persisted correctly
951956
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
@@ -960,7 +965,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
960965
}
961966

962967
releaseSemaphoreForSendOp(op);
963-
lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
964968

965969
std::unique_ptr<OpSendMsg> opSendMsg{pendingMessagesQueue_.front().release()};
966970
pendingMessagesQueue_.pop_front();

0 commit comments

Comments
 (0)