Skip to content

Commit 9afe72a

Browse files
authored
Merge branch 'apache:main' into fix/issue-528
2 parents f28c32f + b47e63d commit 9afe72a

43 files changed

Lines changed: 1268 additions & 569 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci-pr-validation.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,15 @@ jobs:
260260
Pop-Location
261261
}
262262
263+
- name: Ensure vcpkg has full history(windows)
264+
if: runner.os == 'Windows'
265+
shell: pwsh
266+
run: |
267+
$isShallow = (git -C "${{ env.VCPKG_ROOT }}" rev-parse --is-shallow-repository).Trim()
268+
if ($isShallow -eq "true") {
269+
git -C "${{ env.VCPKG_ROOT }}" fetch --unshallow
270+
}
271+
263272
- name: remove system vcpkg(windows)
264273
if: runner.os == 'Windows'
265274
run: rm -rf "$VCPKG_INSTALLATION_ROOT"

include/pulsar/Client.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@
2929
#include <pulsar/Reader.h>
3030
#include <pulsar/Result.h>
3131
#include <pulsar/Schema.h>
32+
#include <pulsar/ServiceInfo.h>
33+
#include <pulsar/ServiceInfoProvider.h>
3234
#include <pulsar/TableView.h>
3335
#include <pulsar/defines.h>
3436

37+
#include <memory>
3538
#include <string>
3639

3740
namespace pulsar {
@@ -68,6 +71,20 @@ class PULSAR_PUBLIC Client {
6871
*/
6972
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
7073

74+
/**
75+
* Create a Pulsar client object using the specified ServiceInfoProvider.
76+
*
77+
* The ServiceInfoProvider is responsible for providing the service information (such as service URL)
78+
* dynamically. For example, if it detects a primary Pulsar service is down, it can switch to a secondary
79+
* service and update the client with the new service information.
80+
*
81+
* The Client instance takes ownership of the given ServiceInfoProvider. The provider will be destroyed
82+
* as part of the client's shutdown lifecycle, for example when `Client::close()` or
83+
* `Client::closeAsync()` is called, ensuring that its lifetime is properly managed.
84+
*/
85+
static Client create(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
86+
const ClientConfiguration& clientConfiguration);
87+
7188
/**
7289
* Create a producer with default configuration
7390
*
@@ -414,6 +431,13 @@ class PULSAR_PUBLIC Client {
414431
void getSchemaInfoAsync(const std::string& topic, int64_t version,
415432
std::function<void(Result, const SchemaInfo&)> callback);
416433

434+
/**
435+
* Get the current service information of the client.
436+
*
437+
* @return the current service information
438+
*/
439+
ServiceInfo getServiceInfo() const;
440+
417441
private:
418442
Client(const std::shared_ptr<ClientImpl>&);
419443

include/pulsar/ClientConfiguration.h

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,12 @@ class PULSAR_PUBLIC ClientConfiguration {
7070
/**
7171
* Set the authentication method to be used with the broker
7272
*
73+
* You can get the configured authentication data in `ServiceInfo` returned by `Client::getServiceInfo`.
74+
*
7375
* @param authentication the authentication data to use
7476
*/
7577
ClientConfiguration& setAuth(const AuthenticationPtr& authentication);
7678

77-
/**
78-
* @return the authentication data
79-
*/
80-
Authentication& getAuth() const;
81-
8279
/**
8380
* Set timeout on client operations (subscribe, create producer, close, unsubscribe)
8481
* Default is 30 seconds.
@@ -202,20 +199,6 @@ class PULSAR_PUBLIC ClientConfiguration {
202199
*/
203200
ClientConfiguration& setLogger(LoggerFactory* loggerFactory);
204201

205-
/**
206-
* Configure whether to use the TLS encryption on the connections.
207-
*
208-
* The default value is false.
209-
*
210-
* @param useTls
211-
*/
212-
ClientConfiguration& setUseTls(bool useTls);
213-
214-
/**
215-
* @return whether the TLS encryption is used on the connections
216-
*/
217-
bool isUseTls() const;
218-
219202
/**
220203
* Set the path to the TLS private key file.
221204
*
@@ -243,15 +226,13 @@ class PULSAR_PUBLIC ClientConfiguration {
243226
/**
244227
* Set the path to the trusted TLS certificate file.
245228
*
229+
* You can get the configured trusted TLS certificate file path in `ServiceInfo` returned by
230+
* `Client::getServiceInfo`.
231+
*
246232
* @param tlsTrustCertsFilePath
247233
*/
248234
ClientConfiguration& setTlsTrustCertsFilePath(const std::string& tlsTrustCertsFilePath);
249235

250-
/**
251-
* @return the path to the trusted TLS certificate file
252-
*/
253-
const std::string& getTlsTrustCertsFilePath() const;
254-
255236
/**
256237
* Configure whether the Pulsar client accepts untrusted TLS certificates from brokers.
257238
*

include/pulsar/ServiceInfo.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef PULSAR_SERVICE_INFO_H_
20+
#define PULSAR_SERVICE_INFO_H_
21+
22+
#include <pulsar/Authentication.h>
23+
24+
#include <optional>
25+
#include <string>
26+
27+
namespace pulsar {
28+
29+
/**
30+
* ServiceInfo encapsulates the information of a Pulsar service, which is used by the client to connect to the
31+
* service. It includes the service URL, authentication information, and TLS configuration.
32+
*/
33+
class PULSAR_PUBLIC ServiceInfo final {
34+
public:
35+
ServiceInfo(std::string serviceUrl, AuthenticationPtr authentication = AuthFactory::Disabled(),
36+
std::optional<std::string> tlsTrustCertsFilePath = std::nullopt);
37+
38+
auto& serviceUrl() const noexcept { return serviceUrl_; }
39+
auto useTls() const noexcept { return useTls_; }
40+
auto& authentication() const noexcept { return authentication_; }
41+
auto& tlsTrustCertsFilePath() const noexcept { return tlsTrustCertsFilePath_; }
42+
43+
bool operator==(const ServiceInfo& other) const noexcept {
44+
return serviceUrl_ == other.serviceUrl_ && useTls_ == other.useTls_ &&
45+
authentication_ == other.authentication_ &&
46+
tlsTrustCertsFilePath_ == other.tlsTrustCertsFilePath_;
47+
}
48+
49+
private:
50+
std::string serviceUrl_;
51+
bool useTls_;
52+
AuthenticationPtr authentication_;
53+
std::optional<std::string> tlsTrustCertsFilePath_;
54+
};
55+
56+
} // namespace pulsar
57+
#endif
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef PULSAR_SERVICE_INFO_PROVIDER_H_
20+
#define PULSAR_SERVICE_INFO_PROVIDER_H_
21+
22+
#include <pulsar/ServiceInfo.h>
23+
24+
#include <functional>
25+
26+
namespace pulsar {
27+
28+
class PULSAR_PUBLIC ServiceInfoProvider {
29+
public:
30+
/**
31+
* The destructor will be called when `Client::close()` is invoked, and the provider should stop any
32+
* ongoing work and release the resources in the destructor.
33+
*/
34+
virtual ~ServiceInfoProvider() = default;
35+
36+
/**
37+
* Get the initial `ServiceInfo` connection for the client.
38+
* This method is called **only once** internally in `Client::create()` to get the initial `ServiceInfo`
39+
* for the client to connect to the Pulsar service, typically before {@link initialize} is invoked.
40+
* Since it's only called once, it's legal to return a moved `ServiceInfo` object to avoid unnecessary
41+
* copying.
42+
*/
43+
virtual ServiceInfo initialServiceInfo() = 0;
44+
45+
/**
46+
* Initialize the ServiceInfoProvider.
47+
*
48+
* After the client has obtained the initial `ServiceInfo` via {@link initialServiceInfo}, this method is
49+
* called to allow the provider to start any background work (for example, service discovery or watching
50+
* configuration changes) and to report subsequent updates to the service information.
51+
*
52+
* @param onServiceInfoUpdate the callback to deliver updated `ServiceInfo` values to the client after
53+
* the initial connection has been established
54+
*
55+
* Implementations may choose not to invoke `onServiceInfoUpdate` if the `ServiceInfo` never changes.
56+
*/
57+
virtual void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) = 0;
58+
};
59+
60+
}; // namespace pulsar
61+
62+
#endif

include/pulsar/c/client_configuration.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,9 @@ PULSAR_PUBLIC void pulsar_client_configuration_set_logger(pulsar_client_configur
147147
PULSAR_PUBLIC void pulsar_client_configuration_set_logger_t(pulsar_client_configuration_t *conf,
148148
pulsar_logger_t logger);
149149

150-
PULSAR_PUBLIC void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls);
151-
152-
PULSAR_PUBLIC int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf);
153-
154150
PULSAR_PUBLIC void pulsar_client_configuration_set_tls_trust_certs_file_path(
155151
pulsar_client_configuration_t *conf, const char *tlsTrustCertsFilePath);
156152

157-
PULSAR_PUBLIC const char *pulsar_client_configuration_get_tls_trust_certs_file_path(
158-
pulsar_client_configuration_t *conf);
159-
160153
PULSAR_PUBLIC void pulsar_client_configuration_set_tls_allow_insecure_connection(
161154
pulsar_client_configuration_t *conf, int allowInsecure);
162155

lib/AtomicSharedPtr.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <memory>
22+
namespace pulsar {
23+
24+
// C++17 does not have std::atomic<std::shared_ptr<T>>, so we have to manually implement it.
25+
template <typename T>
26+
class AtomicSharedPtr {
27+
public:
28+
using Pointer = std::shared_ptr<const T>;
29+
30+
AtomicSharedPtr() = default;
31+
explicit AtomicSharedPtr(T value) : ptr_(std::make_shared<const T>(std::move(value))) {}
32+
33+
auto load() const { return std::atomic_load(&ptr_); }
34+
35+
void store(Pointer&& newPtr) { std::atomic_store(&ptr_, std::move(newPtr)); }
36+
37+
private:
38+
std::shared_ptr<const T> ptr_;
39+
};
40+
41+
} // namespace pulsar

lib/BinaryProtoLookupService.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/Authentication.h>
2323
#include <pulsar/ClientConfiguration.h>
2424
#include <pulsar/Schema.h>
25+
#include <pulsar/ServiceInfo.h>
2526

2627
#include <mutex>
2728

@@ -38,9 +39,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
3839

3940
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4041
public:
41-
BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
42+
BinaryProtoLookupService(const ServiceInfo& serviceInfo, ConnectionPool& pool,
4243
const ClientConfiguration& clientConfiguration)
43-
: serviceNameResolver_(serviceUrl),
44+
: serviceNameResolver_(serviceInfo.serviceUrl()),
4445
cnxPool_(pool),
4546
listenerName_(clientConfiguration.getListenerName()),
4647
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}

lib/Client.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* under the License.
1818
*/
1919
#include <pulsar/Client.h>
20+
#include <pulsar/ServiceInfoProvider.h>
2021

2122
#include <iostream>
2223
#include <memory>
@@ -33,13 +34,17 @@ DECLARE_LOG_OBJECT()
3334

3435
namespace pulsar {
3536

36-
Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) {}
37+
Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) { impl_->initialize(); }
3738

38-
Client::Client(const std::string& serviceUrl)
39-
: impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
39+
Client::Client(const std::string& serviceUrl) : Client(serviceUrl, ClientConfiguration()) {}
4040

4141
Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
42-
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
42+
: Client(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
43+
44+
Client Client::create(std::unique_ptr<ServiceInfoProvider> serviceInfoProvider,
45+
const ClientConfiguration& clientConfiguration) {
46+
return Client(std::make_shared<ClientImpl>(std::move(serviceInfoProvider), clientConfiguration));
47+
}
4348

4449
Result Client::createProducer(const std::string& topic, Producer& producer) {
4550
return createProducer(topic, ProducerConfiguration(), producer);
@@ -193,8 +198,10 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers();
193198

194199
void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
195200
std::function<void(Result, const SchemaInfo&)> callback) {
196-
impl_->getLookup()
197-
->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
201+
impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "")
198202
.addListener(std::move(callback));
199203
}
204+
205+
ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }
206+
200207
} // namespace pulsar

lib/ClientConfiguration.cc

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authe
5757
return *this;
5858
}
5959

60-
Authentication& ClientConfiguration::getAuth() const { return *impl_->authenticationPtr; }
61-
6260
const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; }
6361

6462
ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) {
@@ -94,13 +92,6 @@ ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads)
9492

9593
int ClientConfiguration::getMessageListenerThreads() const { return impl_->messageListenerThreads; }
9694

97-
ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) {
98-
impl_->useTls = useTls;
99-
return *this;
100-
}
101-
102-
bool ClientConfiguration::isUseTls() const { return impl_->useTls; }
103-
10495
ClientConfiguration& ClientConfiguration::setValidateHostName(bool validateHostName) {
10596
impl_->validateHostName = validateHostName;
10697
return *this;
@@ -131,10 +122,6 @@ ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::st
131122
return *this;
132123
}
133124

134-
const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const {
135-
return impl_->tlsTrustCertsFilePath;
136-
}
137-
138125
ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) {
139126
impl_->tlsAllowInsecureConnection = allowInsecure;
140127
return *this;

0 commit comments

Comments
 (0)