Skip to content

Commit c6de067

Browse files
committed
fix the reader cannot consume after switching to a new cluster
1 parent 9114edf commit c6de067

8 files changed

Lines changed: 50 additions & 13 deletions

File tree

lib/ClientConnection.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,7 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec,
12921292
startConsumerStatsTimer(consumerStatsRequests);
12931293
}
12941294

1295-
void ClientConnection::close(Result result, bool detach) {
1295+
void ClientConnection::close(Result result, bool detach, bool switchCluster) {
12961296
Lock lock(mutex_);
12971297
if (isClosed()) {
12981298
return;
@@ -1368,6 +1368,9 @@ void ClientConnection::close(Result result, bool detach) {
13681368
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
13691369
auto consumer = it->second.lock();
13701370
if (consumer) {
1371+
if (switchCluster) {
1372+
consumer->onClusterSwitching();
1373+
}
13711374
consumer->handleDisconnection(result, self);
13721375
}
13731376
}

lib/ClientConnection.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
157157
*
158158
* @param result all pending futures will complete with this result
159159
* @param detach remove it from the pool if it's true
160+
* @param switchCluster whether the close is triggered by cluster switching
160161
*
161162
* `detach` should only be false when the connection pool is closed.
162163
*/
163-
void close(Result result = ResultConnectError, bool detach = true);
164+
void close(Result result = ResultConnectError, bool detach = true, bool switchCluster = false);
164165

165166
bool isClosed() const;
166167

lib/ClientImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ void ClientImpl::updateServiceInfo(ServiceInfo&& serviceInfo) {
896896
? std::nullopt
897897
: std::make_optional(clientConfiguration_.getTlsTrustCertsFilePath())};
898898

899-
pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_);
899+
pool_.resetForClusterSwitching(clientConfiguration_.getAuthPtr(), clientConfiguration_);
900900

901901
lookupServicePtr_->close();
902902
for (auto&& it : redirectedClusterLookupServicePtrs_) {

lib/ConnectionPool.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,16 @@ bool ConnectionPool::close() {
6767
return true;
6868
}
6969

70-
void ConnectionPool::resetConnections(const AuthenticationPtr& authentication,
71-
const ClientConfiguration& conf) {
70+
void ConnectionPool::resetForClusterSwitching(const AuthenticationPtr& authentication,
71+
const ClientConfiguration& conf) {
7272
std::unique_lock<std::recursive_mutex> lock(mutex_);
7373
authentication_ = authentication;
7474
clientConfiguration_ = conf;
7575

7676
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
7777
auto& cnx = cnxIt->second;
7878
if (cnx) {
79-
cnx->close(ResultDisconnected, false);
79+
cnx->close(ResultDisconnected, false, true);
8080
}
8181
}
8282
pool_.clear();

lib/ConnectionPool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class PULSAR_PUBLIC ConnectionPool {
5555
* Close all existing connections and update the authentication and configuration.
5656
* Unlike close(), the pool remains open for new connections.
5757
*/
58-
void resetConnections(const AuthenticationPtr& authentication, const ClientConfiguration& conf);
58+
void resetForClusterSwitching(const AuthenticationPtr& authentication, const ClientConfiguration& conf);
5959

6060
void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix,
6161
ClientConnection* value);

lib/ConsumerImpl.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic
125125
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this, conf)),
126126
ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
127127
readCompacted_(conf.isReadCompacted()),
128-
startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())),
128+
startMessageIdFromConfig_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())),
129+
startMessageId_(startMessageIdFromConfig_),
129130
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
130131
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
131132
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -1134,6 +1135,16 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
11341135
}
11351136
}
11361137

1138+
void ConsumerImpl::onClusterSwitching() {
1139+
{
1140+
LockGuard lock{mutex_};
1141+
incomingMessages_.clear();
1142+
startMessageId_ = startMessageIdFromConfig_;
1143+
lastDequedMessageId_ = MessageId::earliest();
1144+
}
1145+
ackGroupingTrackerPtr_->flushAndClean();
1146+
}
1147+
11371148
/**
11381149
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that
11391150
* was

lib/ConsumerImpl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase {
162162
void doImmediateAck(const MessageId& msgId, const ResultCallback& callback, CommandAck_AckType ackType);
163163
void doImmediateAck(const std::set<MessageId>& msgIds, const ResultCallback& callback);
164164

165+
void onClusterSwitching();
166+
165167
protected:
166168
// overrided methods from HandlerBase
167169
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override;
@@ -266,6 +268,11 @@ class ConsumerImpl : public ConsumerImplBase {
266268

267269
MessageId lastDequedMessageId_{MessageId::earliest()};
268270
MessageId lastMessageIdInBroker_{MessageId::earliest()};
271+
272+
// When the consumer switches to a new cluster, we should reset `startMessageId_` to the original value,
273+
// otherwise, the message id of the old cluster might be passed in the Subscribe request on the new
274+
// cluster.
275+
const optional<MessageId> startMessageIdFromConfig_;
269276
optional<MessageId> startMessageId_;
270277

271278
SeekStatus seekStatus_{SeekStatus::NOT_STARTED};

tests/ClientTest.cc

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -522,11 +522,26 @@ TEST(ClientTest, testUpdateServiceInfo) {
522522
ServiceInfo info3{"pulsar://localhost:6650", std::nullopt, std::nullopt};
523523

524524
Client client{info1.serviceUrl, ClientConfiguration().setAuth(*info1.authentication)};
525+
525526
const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr));
526527
Producer producer;
527528
ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer));
528-
MessageId msgId;
529-
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-0").build(), msgId));
529+
530+
Reader reader;
531+
ASSERT_EQ(ResultOk, client.createReader(topicRequiredAuth, MessageId::earliest(), {}, reader));
532+
533+
auto sendAndReceive = [&](const std::string &value) {
534+
MessageId msgId;
535+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(value).build(), msgId));
536+
LOG_INFO("Sent " << value << " to " << msgId);
537+
538+
Message msg;
539+
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
540+
LOG_INFO("Read " << msg.getDataAsString() << " from " << msgId);
541+
ASSERT_EQ(value, msg.getDataAsString());
542+
};
543+
544+
sendAndReceive("msg-0");
530545

531546
// Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh)
532547
ASSERT_FALSE(PulsarFriend::getConnections(client).empty());
@@ -537,8 +552,7 @@ TEST(ClientTest, testUpdateServiceInfo) {
537552
ASSERT_EQ(info2.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath);
538553

539554
// Now the same will access the same topic in cluster 2
540-
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId));
541-
ASSERT_EQ(ResultOk, producer.close());
555+
sendAndReceive("msg-1");
542556

543557
// Switch back to cluster 1 without any authentication, the previous authentication info configured for
544558
// cluster 2 will be cleared.
@@ -548,8 +562,9 @@ TEST(ClientTest, testUpdateServiceInfo) {
548562
ASSERT_EQ(info3.tlsTrustCertsFilePath, client.getServiceInfo().tlsTrustCertsFilePath);
549563

550564
const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr));
565+
producer.close();
551566
ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer));
552-
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build(), msgId));
567+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build()));
553568

554569
client.close();
555570

0 commit comments

Comments
 (0)