Skip to content

Commit 3b54f6b

Browse files
committed
add tests to verify message is correctly set
1 parent ac168b8 commit 3b54f6b

20 files changed

Lines changed: 304 additions & 158 deletions

include/pulsar/Result.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <cstdint>
2525
#include <iosfwd>
2626
#include <string>
27+
#include <utility>
2728

2829
namespace pulsar {
2930

@@ -104,7 +105,13 @@ PULSAR_PUBLIC const char* strResult(Result result);
104105
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);
105106

106107
struct Error {
107-
Result result;
108+
Error() = default;
109+
Error(Result result) : result(result) {}
110+
Error(Result result, std::string message) : result(result), message(std::move(message)) {}
111+
112+
operator Result() const { return result; }
113+
114+
Result result = ResultOk;
108115
std::string message;
109116
};
110117

lib/BinaryProtoLookupService.cc

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
3737
-> LookupResultFuture {
3838
LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic
3939
<< ", redirect count: " << redirectCount);
40-
auto promise = std::make_shared<Promise<Result, LookupResult>>();
40+
auto promise = std::make_shared<Promise<Error, LookupResult>>();
4141
if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) {
4242
LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is "
4343
<< maxLookupRedirects_);
@@ -62,7 +62,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
6262
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
6363
cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise);
6464
lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount](
65-
Result result, const LookupDataResultPtr& data) {
65+
Error result, const LookupDataResultPtr& data) {
6666
if (result != ResultOk || !data) {
6767
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
6868
promise->setFailed(result);
@@ -74,7 +74,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
7474
if (data->isRedirect()) {
7575
LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
7676
findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1)
77-
.addListener([promise](Result result, const LookupResult& value) {
77+
.addListener([promise](Error result, const LookupResult& value) {
7878
if (result == ResultOk) {
7979
promise->setValue(value);
8080
} else {
@@ -100,7 +100,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
100100
* @param topicName topic to get number of partitions.
101101
*
102102
*/
103-
Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
103+
Future<Error, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(
104104
const TopicNamePtr& topicName) {
105105
LookupDataResultPromisePtr promise = std::make_shared<LookupDataResultPromise>();
106106
if (!topicName) {
@@ -135,7 +135,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
135135
std::placeholders::_2, clientCnx, promise));
136136
}
137137

138-
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result,
138+
void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Error result,
139139
const LookupDataResultPtr& data,
140140
const ClientConnectionWeakPtr& clientCnx,
141141
const LookupDataResultPromisePtr& promise) {
@@ -154,9 +154,9 @@ uint64_t BinaryProtoLookupService::newRequestId() {
154154
return ++requestIdGenerator_;
155155
}
156156

157-
Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
157+
Future<Error, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
158158
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
159-
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result, NamespaceTopicsPtr>>();
159+
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Error, NamespaceTopicsPtr>>();
160160
if (!nsName) {
161161
promise->setFailed(ResultInvalidTopicName);
162162
return promise->getFuture();
@@ -168,9 +168,9 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
168168
return promise->getFuture();
169169
}
170170

171-
Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
172-
const std::string& version) {
173-
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, SchemaInfo>>();
171+
Future<Error, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName,
172+
const std::string& version) {
173+
GetSchemaPromisePtr promise = std::make_shared<Promise<Error, SchemaInfo>>();
174174

175175
if (!topicName) {
176176
promise->setFailed(ResultInvalidTopicName);
@@ -197,7 +197,7 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName
197197
<< " version: " << version);
198198

199199
conn->newGetSchema(topicName, version, requestId)
200-
.addListener([promise](Result result, const SchemaInfo& schemaInfo) {
200+
.addListener([promise](Error result, const SchemaInfo& schemaInfo) {
201201
if (result != ResultOk) {
202202
promise->setFailed(result);
203203
return;
@@ -228,11 +228,10 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
228228
std::placeholders::_1, std::placeholders::_2, promise));
229229
}
230230

231-
void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result,
232-
const NamespaceTopicsPtr& topicsPtr,
231+
void BinaryProtoLookupService::getTopicsOfNamespaceListener(Error result, const NamespaceTopicsPtr& topicsPtr,
233232
const NamespaceTopicsPromisePtr& promise) {
234233
if (result != ResultOk) {
235-
promise->setFailed(ResultLookupError);
234+
promise->setFailed(Error{ResultLookupError, result.message});
236235
return;
237236
}
238237

lib/BinaryProtoLookupService.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
3434
class ConnectionPool;
3535
class LookupDataResult;
3636
class ServiceNameResolver;
37-
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
38-
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
37+
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Error, NamespaceTopicsPtr>>;
38+
using GetSchemaPromisePtr = std::shared_ptr<Promise<Error, SchemaInfo>>;
3939

4040
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4141
public:
@@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4848

4949
LookupResultFuture getBroker(const TopicName& topicName) override;
5050

51-
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
51+
Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
5252

53-
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
53+
Future<Error, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
5454
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
5555

56-
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
56+
Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;
5757

5858
ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
5959

@@ -75,7 +75,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
7575
const ClientConnectionWeakPtr& clientCnx,
7676
const LookupDataResultPromisePtr& promise);
7777

78-
void handlePartitionMetadataLookup(const std::string& topicName, Result result,
78+
void handlePartitionMetadataLookup(const std::string& topicName, Error result,
7979
const LookupDataResultPtr& data,
8080
const ClientConnectionWeakPtr& clientCnx,
8181
const LookupDataResultPromisePtr& promise);
@@ -87,7 +87,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
8787
void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result,
8888
const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise);
8989

90-
void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& topicsPtr,
90+
void getTopicsOfNamespaceListener(Error result, const NamespaceTopicsPtr& topicsPtr,
9191
const NamespaceTopicsPromisePtr& promise);
9292

9393
uint64_t newRequestId();

lib/Client.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ std::variant<Producer, Error> Client::createProducerV2(const std::string& topic)
7070
std::variant<Producer, Error> Client::createProducerV2(const std::string& topic,
7171
const ProducerConfiguration& conf) {
7272
Promise<bool, std::variant<Producer, Error> > promise;
73-
createProducerAsyncV2(topic, conf, [promise](const auto& result) { promise.setValue(result); });
73+
createProducerAsyncV2(topic, conf, [promise](std::variant<Producer, Error>&& result) {
74+
promise.setValue(std::move(result));
75+
});
7476
Future<bool, std::variant<Producer, Error> > future = promise.getFuture();
7577

7678
std::variant<Producer, Error> result;
@@ -114,8 +116,9 @@ std::variant<Consumer, Error> Client::subscribeV2(const std::string& topic,
114116
const std::string& subscriptionName,
115117
const ConsumerConfiguration& conf) {
116118
Promise<bool, std::variant<Consumer, Error> > promise;
117-
subscribeAsyncV2(topic, subscriptionName, conf,
118-
[promise](const auto& result) { promise.setValue(result); });
119+
subscribeAsyncV2(topic, subscriptionName, conf, [promise](std::variant<Consumer, Error>&& result) {
120+
promise.setValue(std::move(result));
121+
});
119122
Future<bool, std::variant<Consumer, Error> > future = promise.getFuture();
120123

121124
std::variant<Consumer, Error> result;
@@ -168,8 +171,9 @@ std::variant<Consumer, Error> Client::subscribeV2(const std::vector<std::string>
168171
const std::string& subscriptionName,
169172
const ConsumerConfiguration& conf) {
170173
Promise<bool, std::variant<Consumer, Error> > promise;
171-
subscribeAsyncV2(topics, subscriptionName, conf,
172-
[promise](const auto& result) { promise.setValue(result); });
174+
subscribeAsyncV2(topics, subscriptionName, conf, [promise](std::variant<Consumer, Error>&& result) {
175+
promise.setValue(std::move(result));
176+
});
173177
Future<bool, std::variant<Consumer, Error> > future = promise.getFuture();
174178

175179
std::variant<Consumer, Error> result;
@@ -233,8 +237,9 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess
233237
std::variant<Reader, Error> Client::createReaderV2(const std::string& topic, const MessageId& startMessageId,
234238
const ReaderConfiguration& conf) {
235239
Promise<bool, std::variant<Reader, Error> > promise;
236-
createReaderAsyncV2(topic, startMessageId, conf,
237-
[promise](const auto& result) { promise.setValue(result); });
240+
createReaderAsyncV2(topic, startMessageId, conf, [promise](std::variant<Reader, Error>&& result) {
241+
promise.setValue(std::move(result));
242+
});
238243
Future<bool, std::variant<Reader, Error> > future = promise.getFuture();
239244

240245
std::variant<Reader, Error> result;

lib/ClientConnection.cc

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co
10481048
self->numOfPendingLookupRequest_--;
10491049
}
10501050
});
1051-
request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) {
1051+
request->getFuture().addListener([promise](Error result, const LookupDataResultPtr& lookupDataResult) {
10521052
if (result == ResultOk) {
10531053
promise->setValue(lookupDataResult);
10541054
} else {
@@ -1413,13 +1413,13 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
14131413
return request->getFuture();
14141414
}
14151415

1416-
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
1416+
Future<Error, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
14171417
const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
14181418
Lock lock(mutex_);
14191419
if (isClosed()) {
14201420
lock.unlock();
14211421
LOG_ERROR(cnxString() << "Client is not connected to the broker");
1422-
Promise<Result, NamespaceTopicsPtr> promise;
1422+
Promise<Error, NamespaceTopicsPtr> promise;
14231423
promise.setFailed(ResultNotConnected);
14241424
return promise.getFuture();
14251425
}
@@ -1437,14 +1437,14 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
14371437
return request->getFuture();
14381438
}
14391439

1440-
Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName,
1441-
const std::string& version, uint64_t requestId) {
1440+
Future<Error, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName,
1441+
const std::string& version, uint64_t requestId) {
14421442
Lock lock(mutex_);
14431443

14441444
if (isClosed()) {
14451445
lock.unlock();
14461446
LOG_ERROR(cnxString() << "Client is not connected to the broker");
1447-
Promise<Result, SchemaInfo> promise;
1447+
Promise<Error, SchemaInfo> promise;
14481448
promise.setFailed(ResultNotConnected);
14491449
return promise.getFuture();
14501450
}
@@ -1556,7 +1556,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
15561556
<< " msg: " << partitionMetadataResponse.message());
15571557
checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message());
15581558
request->fail(
1559-
getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()));
1559+
Error{getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()),
1560+
partitionMetadataResponse.message()});
15601561
} else {
15611562
LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: "
15621563
<< partitionMetadataResponse.request_id() << " with empty response: ");
@@ -1628,7 +1629,8 @@ void ClientConnection::handleLookupTopicRespose(
16281629
<< " error: " << lookupTopicResponse.error()
16291630
<< " msg: " << lookupTopicResponse.message());
16301631
checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message());
1631-
request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
1632+
request->fail(Error{getResult(lookupTopicResponse.error(), lookupTopicResponse.message()),
1633+
lookupTopicResponse.message()});
16321634
} else {
16331635
LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id()
16341636
<< " with empty response: ");
@@ -1699,6 +1701,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
16991701

17001702
void ClientConnection::handleError(const proto::CommandError& error) {
17011703
Result result = getResult(error.error(), error.message());
1704+
Error errorResult{result, error.has_message() ? error.message() : ""};
17021705
LOG_WARN(cnxString() << "Received error response from server: " << result
17031706
<< (error.has_message() ? (" (" + error.message() + ")") : "")
17041707
<< " -- req_id: " << error.request_id());
@@ -1716,27 +1719,51 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17161719
data.errorMessage = error.message();
17171720
}
17181721
request->fail(result, data);
1719-
} else {
1720-
auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
1721-
if (it != pendingGetLastMessageIdRequests_.end()) {
1722-
auto request = std::move(it->second);
1723-
pendingGetLastMessageIdRequests_.erase(it);
1724-
lock.unlock();
1722+
return;
1723+
}
17251724

1726-
request->fail(result);
1727-
} else {
1728-
auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id());
1729-
if (it != pendingGetNamespaceTopicsRequests_.end()) {
1730-
auto request = std::move(it->second);
1731-
pendingGetNamespaceTopicsRequests_.erase(it);
1732-
lock.unlock();
1725+
auto lookupIt = pendingLookupRequests_.find(error.request_id());
1726+
if (lookupIt != pendingLookupRequests_.end()) {
1727+
auto request = std::move(lookupIt->second);
1728+
pendingLookupRequests_.erase(lookupIt);
1729+
numOfPendingLookupRequest_--;
1730+
lock.unlock();
17331731

1734-
request->fail(result);
1735-
} else {
1736-
lock.unlock();
1737-
}
1738-
}
1732+
request->fail(errorResult);
1733+
return;
1734+
}
1735+
1736+
auto lastMessageIdIt = pendingGetLastMessageIdRequests_.find(error.request_id());
1737+
if (lastMessageIdIt != pendingGetLastMessageIdRequests_.end()) {
1738+
auto request = std::move(lastMessageIdIt->second);
1739+
pendingGetLastMessageIdRequests_.erase(lastMessageIdIt);
1740+
lock.unlock();
1741+
1742+
request->fail(result);
1743+
return;
1744+
}
1745+
1746+
auto topicsIt = pendingGetNamespaceTopicsRequests_.find(error.request_id());
1747+
if (topicsIt != pendingGetNamespaceTopicsRequests_.end()) {
1748+
auto request = std::move(topicsIt->second);
1749+
pendingGetNamespaceTopicsRequests_.erase(topicsIt);
1750+
lock.unlock();
1751+
1752+
request->fail(errorResult);
1753+
return;
17391754
}
1755+
1756+
auto schemaIt = pendingGetSchemaRequests_.find(error.request_id());
1757+
if (schemaIt != pendingGetSchemaRequests_.end()) {
1758+
auto request = std::move(schemaIt->second);
1759+
pendingGetSchemaRequests_.erase(schemaIt);
1760+
lock.unlock();
1761+
1762+
request->fail(errorResult);
1763+
return;
1764+
}
1765+
1766+
lock.unlock();
17401767
}
17411768

17421769
std::string ClientConnection::getMigratedBrokerServiceUrl(
@@ -1959,7 +1986,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp
19591986
: "")
19601987
<< " -- req_id: " << response.request_id());
19611988
}
1962-
request->fail(result);
1989+
request->fail(Error{result, response.has_error_message() ? response.error_message() : ""});
19631990
return;
19641991
}
19651992

0 commit comments

Comments
 (0)