|
17 | 17 | * under the License. |
18 | 18 | */ |
19 | 19 | #include <gtest/gtest.h> |
| 20 | +#include <pulsar/Authentication.h> |
20 | 21 | #include <pulsar/Client.h> |
| 22 | +#include <pulsar/InitialPosition.h> |
21 | 23 | #include <pulsar/Version.h> |
22 | 24 |
|
23 | 25 | #include <algorithm> |
24 | 26 | #include <chrono> |
25 | 27 | #include <future> |
26 | | -#include <sstream> |
| 28 | +#include <optional> |
| 29 | +#include <tuple> |
27 | 30 |
|
28 | 31 | #include "MockClientImpl.h" |
29 | 32 | #include "PulsarAdminHelper.h" |
|
32 | 35 | #include "lib/ClientConnection.h" |
33 | 36 | #include "lib/LogUtils.h" |
34 | 37 | #include "lib/checksum/ChecksumProvider.h" |
35 | | -#include "lib/stats/ProducerStatsImpl.h" |
36 | 38 |
|
37 | 39 | DECLARE_LOG_OBJECT() |
38 | 40 |
|
@@ -508,48 +510,65 @@ TEST(ClientTest, testNoRetry) { |
508 | 510 | } |
509 | 511 |
|
510 | 512 | TEST(ClientTest, testUpdateConnectionInfo) { |
511 | | - const std::string cluster1Url = "pulsar://localhost:6650"; |
512 | | - const std::string cluster2Url = "pulsar://localhost:6652"; |
513 | | - const std::string topic1 = "testUpdateConnectionInfo-cluster1-" + std::to_string(time(nullptr)); |
514 | | - const std::string topic2 = "testUpdateConnectionInfo-cluster2-" + std::to_string(time(nullptr)); |
515 | | - |
516 | | - Client client(cluster1Url); |
517 | | - |
518 | | - // Produce and consume on cluster 1 |
519 | | - Producer producer1; |
520 | | - ASSERT_EQ(ResultOk, client.createProducer(topic1, producer1)); |
| 513 | + extern std::string getToken(); // from AuthToken.cc |
| 514 | + |
| 515 | + // Access "private/auth" namespace in cluster 1 |
| 516 | + auto info1 = |
| 517 | + std::make_tuple("pulsar://localhost:6650", AuthToken::createWithToken(getToken()), std::nullopt); |
| 518 | + // Access "private/auth" namespace in cluster 2 |
| 519 | + auto info2 = |
| 520 | + std::make_tuple("pulsar+ssl://localhost:6653", |
| 521 | + AuthTls::create(TEST_CONF_DIR "/client-cert.pem", TEST_CONF_DIR "/client-key.pem"), |
| 522 | + TEST_CONF_DIR "/hn-verification/cacert.pem"); |
| 523 | + // Access "public/default" namespace in cluster 1, which doesn't require authentication |
| 524 | + auto info3 = std::make_tuple("pulsar://localhost:6650", std::nullopt, std::nullopt); |
| 525 | + |
| 526 | + Client client{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; |
| 527 | + const auto topicRequiredAuth = "private/auth/testUpdateConnectionInfo-" + std::to_string(time(nullptr)); |
| 528 | + Producer producer; |
| 529 | + ASSERT_EQ(ResultOk, client.createProducer(topicRequiredAuth, producer)); |
521 | 530 | MessageId msgId; |
522 | | - ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("msg-on-cluster1").build(), msgId)); |
523 | | - producer1.close(); |
| 531 | + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-0").build(), msgId)); |
524 | 532 |
|
525 | | - // Verify there are connections in the pool |
526 | | - auto connections = PulsarFriend::getConnections(client); |
527 | | - ASSERT_FALSE(connections.empty()); |
| 533 | + // Switch to cluster 2 (started by ./build-support/start-mim-test-service-inside-container.sh) |
| 534 | + ASSERT_FALSE(PulsarFriend::getConnections(client).empty()); |
| 535 | + client.updateConnectionInfo(std::get<0>(info2), std::get<1>(info2), std::get<2>(info2)); |
| 536 | + ASSERT_TRUE(PulsarFriend::getConnections(client).empty()); |
528 | 537 |
|
529 | | - // Switch to cluster 2 |
530 | | - client.updateConnectionInfo(cluster2Url, std::nullopt, std::nullopt); |
| 538 | + // Now the same will access the same topic in cluster 2 |
| 539 | + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-1").build(), msgId)); |
| 540 | + ASSERT_EQ(ResultOk, producer.close()); |
531 | 541 |
|
532 | | - // Previous connections should have been closed |
533 | | - for (const auto &cnx : connections) { |
534 | | - ASSERT_TRUE(cnx->isClosed()); |
535 | | - } |
| 542 | + // Switch back to cluster 1 without any authentication, the previous authentication info configured for |
| 543 | + // cluster 2 will be cleared. |
| 544 | + client.updateConnectionInfo(std::get<0>(info3), std::get<1>(info3), std::get<2>(info3)); |
536 | 545 |
|
537 | | - // Produce and consume on cluster 2 using the same client |
538 | | - Producer producer2; |
539 | | - ASSERT_EQ(ResultOk, client.createProducer(topic2, producer2)); |
540 | | - ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("msg-on-cluster2").build(), msgId)); |
| 546 | + const auto topicNoAuth = "testUpdateConnectionInfo-" + std::to_string(time(nullptr)); |
| 547 | + ASSERT_EQ(ResultOk, client.createProducer(topicNoAuth, producer)); |
| 548 | + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-2").build(), msgId)); |
541 | 549 |
|
542 | | - Consumer consumer2; |
543 | | - ASSERT_EQ(ResultOk, client.subscribe(topic2, "sub", consumer2)); |
544 | | - Message msg; |
545 | | - ASSERT_EQ(ResultOk, consumer2.receive(msg, 5000)); |
546 | | - ASSERT_EQ("msg-on-cluster2", msg.getDataAsString()); |
547 | | - |
548 | | - // Verify connection pool now has connections to cluster 2 |
549 | | - auto newConnections = PulsarFriend::getConnections(client); |
550 | | - ASSERT_FALSE(newConnections.empty()); |
551 | | - |
552 | | - consumer2.close(); |
553 | | - producer2.close(); |
554 | 550 | client.close(); |
| 551 | + |
| 552 | + // Verify messages sent to cluster 1 and cluster 2 can be consumed successfully with correct |
| 553 | + // authentication info. |
| 554 | + auto verify = [](Client &client, const std::string &topic, const std::string &value) { |
| 555 | + Reader reader; |
| 556 | + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); |
| 557 | + Message msg; |
| 558 | + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); |
| 559 | + ASSERT_EQ(value, msg.getDataAsString()); |
| 560 | + }; |
| 561 | + Client client1{std::get<0>(info1), ClientConfiguration().setAuth(std::get<1>(info1))}; |
| 562 | + verify(client1, topicRequiredAuth, "msg-0"); |
| 563 | + client1.close(); |
| 564 | + |
| 565 | + Client client2{ |
| 566 | + std::get<0>(info2), |
| 567 | + ClientConfiguration().setAuth(std::get<1>(info2)).setTlsTrustCertsFilePath(std::get<2>(info2))}; |
| 568 | + verify(client2, topicRequiredAuth, "msg-1"); |
| 569 | + client2.close(); |
| 570 | + |
| 571 | + Client client3{std::get<0>(info3)}; |
| 572 | + verify(client3, topicNoAuth, "msg-2"); |
| 573 | + client3.close(); |
555 | 574 | } |
0 commit comments