Skip to content

Commit 19d82fe

Browse files
committed
Fix close() returns ResultAlreadyClosed after unsubscribe
Fixes #88 ### Motivation When `close` is called if the consumer has already called `unsubscribe` or `close`, it should not fail. See https://github.com/apache/pulsar/blob/428c18c8d0c3d135189920740192982e11ffb2bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1034 ### Modifications Use the same close logic with Java client. Add `testCloseAgainBeforeCloseDone` and `testCloseAfterUnsubscribe` to verify the new behaviors of `Consumer::close`.
1 parent 6f115e7 commit 19d82fe

2 files changed

Lines changed: 29 additions & 2 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,8 +1250,10 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
12501250
}
12511251
};
12521252

1253-
if (state_ != Ready) {
1254-
callback(ResultAlreadyClosed);
1253+
auto state = state_.load();
1254+
if (state == Closing || state == Closed) {
1255+
shutdown();
1256+
callback(ResultOk);
12551257
return;
12561258
}
12571259

tests/ConsumerTest.cc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,4 +1401,29 @@ TEST(ConsumerTest, testNoListenerThreadBlocking) {
14011401
client.close();
14021402
}
14031403

1404+
TEST(ConsumerTest, testCloseAfterUnsubscribe) {
1405+
Client client{lookupUrl};
1406+
Consumer consumer;
1407+
ASSERT_EQ(ResultOk, client.subscribe("test-close-after-unsubscribe", "sub", consumer));
1408+
ASSERT_EQ(ResultOk, consumer.unsubscribe());
1409+
ASSERT_EQ(ResultOk, consumer.close());
1410+
}
1411+
1412+
TEST(ConsumerTest, testCloseAgainBeforeCloseDone) {
1413+
Client client{lookupUrl};
1414+
Consumer consumer;
1415+
ASSERT_EQ(ResultOk, client.subscribe("test-close-again-before-close-done", "sub", consumer));
1416+
auto done = std::make_shared<std::atomic_bool>(false);
1417+
auto result = std::make_shared<std::atomic<Result>>(ResultOk);
1418+
consumer.closeAsync([done, result](Result innerResult) {
1419+
result->store(innerResult);
1420+
done->store(true);
1421+
});
1422+
ASSERT_EQ(ResultOk, consumer.close());
1423+
ASSERT_FALSE(*done);
1424+
waitUntil(std::chrono::seconds(3), [done] { return done->load(); });
1425+
ASSERT_EQ(ResultOk, *result);
1426+
ASSERT_TRUE(*done);
1427+
}
1428+
14041429
} // namespace pulsar

0 commit comments

Comments
 (0)