Skip to content

Commit 9abc9a8

Browse files
authored
Add auto cluster failover support (#487)
1 parent 13ba504 commit 9abc9a8

5 files changed

Lines changed: 544 additions & 12 deletions

File tree

index.d.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
*/
1919
/// <reference types="node" />
2020

21-
export interface ClientConfig {
22-
serviceUrl: string;
21+
export type ClientConfig = ClientConfigBase & (
22+
{ serviceUrl: string; serviceUrlProvider?: never } |
23+
{ serviceUrl?: never; serviceUrlProvider: AutoClusterFailoverConfig }
24+
);
25+
26+
export interface ClientConfigBase {
2327
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
2428
operationTimeoutSeconds?: number;
2529
ioThreads?: number;
@@ -37,6 +41,20 @@ export interface ClientConfig {
3741
connectionTimeoutMs?: number;
3842
}
3943

44+
export type ServiceInfo = string | {
45+
serviceUrl: string;
46+
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
47+
tlsTrustCertsFilePath?: string;
48+
};
49+
50+
export interface AutoClusterFailoverConfig {
51+
primary: ServiceInfo;
52+
secondary: ServiceInfo[];
53+
checkIntervalMs?: number;
54+
failoverThreshold?: number;
55+
switchBackThreshold?: number;
56+
}
57+
4058
export class Client {
4159
static setLogHandler(logHandler: (level: LogLevel, file: string, line: number, message: string) => void): void;
4260
constructor(config: ClientConfig);

src/Client.cc

Lines changed: 249 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,28 @@
2323
#include "Producer.h"
2424
#include "Reader.h"
2525
#include "ThreadSafeDeferred.h"
26+
#include <pulsar/AutoClusterFailover.h>
27+
#include <pulsar/Client.h>
28+
#include <pulsar/ServiceInfo.h>
29+
#include <pulsar/c/authentication.h>
2630
#include <pulsar/c/client.h>
2731
#include <pulsar/c/client_configuration.h>
2832
#include <pulsar/c/result.h>
2933
#include "pulsar/ClientConfiguration.h"
34+
#include <chrono>
35+
#include <cstdint>
36+
#include <memory>
37+
#include <optional>
38+
#include <string>
39+
#include <vector>
3040

3141
static const std::string CFG_SERVICE_URL = "serviceUrl";
42+
static const std::string CFG_SERVICE_URL_PROVIDER = "serviceUrlProvider";
43+
static const std::string CFG_PRIMARY = "primary";
44+
static const std::string CFG_SECONDARY = "secondary";
45+
static const std::string CFG_CHECK_INTERVAL_MS = "checkIntervalMs";
46+
static const std::string CFG_FAILOVER_THRESHOLD = "failoverThreshold";
47+
static const std::string CFG_SWITCH_BACK_THRESHOLD = "switchBackThreshold";
3248
static const std::string CFG_AUTH = "authentication";
3349
static const std::string CFG_AUTH_PROP = "binding";
3450
static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
@@ -52,6 +68,194 @@ struct _pulsar_client_configuration {
5268
pulsar::ClientConfiguration conf;
5369
};
5470

71+
struct _pulsar_client {
72+
std::unique_ptr<pulsar::Client> client;
73+
};
74+
75+
struct _pulsar_authentication {
76+
pulsar::AuthenticationPtr auth;
77+
};
78+
79+
static bool IsPresent(const Napi::Value &value) { return !value.IsUndefined() && !value.IsNull(); }
80+
81+
static std::optional<pulsar::AuthenticationPtr> BuildAuthenticationPtr(
82+
const Napi::Object &authObject, std::vector<Napi::ObjectReference> &authRefs) {
83+
Napi::Env env = authObject.Env();
84+
85+
if (!authObject.Has(CFG_AUTH_PROP) || !authObject.Get(CFG_AUTH_PROP).IsObject()) {
86+
Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
87+
.ThrowAsJavaScriptException();
88+
return std::nullopt;
89+
}
90+
91+
Napi::Object binding = authObject.Get(CFG_AUTH_PROP).As<Napi::Object>();
92+
authRefs.emplace_back(Napi::Persistent(binding));
93+
Authentication *auth = Authentication::Unwrap(authRefs.back().Value());
94+
95+
if (auth == nullptr || auth->GetCAuthentication() == nullptr) {
96+
Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
97+
.ThrowAsJavaScriptException();
98+
return std::nullopt;
99+
}
100+
101+
return auth->GetCAuthentication()->auth;
102+
}
103+
104+
static std::optional<pulsar::ServiceInfo> BuildServiceInfo(const Napi::Value &value,
105+
const std::string &fieldName,
106+
std::vector<Napi::ObjectReference> &authRefs,
107+
const pulsar::AuthenticationPtr &defaultAuth,
108+
const std::optional<std::string> &defaultTls) {
109+
Napi::Env env = value.Env();
110+
111+
if (value.IsString()) {
112+
std::string serviceUrl = value.ToString().Utf8Value();
113+
if (serviceUrl.empty()) {
114+
Napi::Error::New(env, fieldName + " service URL must be a non-empty string")
115+
.ThrowAsJavaScriptException();
116+
return std::nullopt;
117+
}
118+
return pulsar::ServiceInfo(serviceUrl, defaultAuth, defaultTls);
119+
}
120+
121+
if (!value.IsObject()) {
122+
Napi::Error::New(env, fieldName + " must be a service URL string or service info object")
123+
.ThrowAsJavaScriptException();
124+
return std::nullopt;
125+
}
126+
127+
Napi::Object serviceInfo = value.As<Napi::Object>();
128+
if (!serviceInfo.Has(CFG_SERVICE_URL) || !serviceInfo.Get(CFG_SERVICE_URL).IsString() ||
129+
serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
130+
Napi::Error::New(env, fieldName + ".serviceUrl is required and must be a non-empty string")
131+
.ThrowAsJavaScriptException();
132+
return std::nullopt;
133+
}
134+
135+
std::string serviceUrl = serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value();
136+
pulsar::AuthenticationPtr authentication = defaultAuth;
137+
std::optional<std::string> tlsTrustCertsFilePath = defaultTls;
138+
139+
if (serviceInfo.Has(CFG_AUTH) && IsPresent(serviceInfo.Get(CFG_AUTH))) {
140+
if (!serviceInfo.Get(CFG_AUTH).IsObject()) {
141+
Napi::Error::New(env, fieldName + ".authentication must be a Pulsar authentication object")
142+
.ThrowAsJavaScriptException();
143+
return std::nullopt;
144+
}
145+
146+
auto auth = BuildAuthenticationPtr(serviceInfo.Get(CFG_AUTH).As<Napi::Object>(), authRefs);
147+
if (!auth.has_value()) {
148+
return std::nullopt;
149+
}
150+
authentication = auth.value();
151+
}
152+
153+
if (serviceInfo.Has(CFG_TLS_TRUST_CERT) && IsPresent(serviceInfo.Get(CFG_TLS_TRUST_CERT))) {
154+
if (!serviceInfo.Get(CFG_TLS_TRUST_CERT).IsString()) {
155+
Napi::Error::New(env, fieldName + ".tlsTrustCertsFilePath must be a string")
156+
.ThrowAsJavaScriptException();
157+
return std::nullopt;
158+
}
159+
tlsTrustCertsFilePath = serviceInfo.Get(CFG_TLS_TRUST_CERT).ToString().Utf8Value();
160+
}
161+
162+
return pulsar::ServiceInfo(serviceUrl, authentication, tlsTrustCertsFilePath);
163+
}
164+
165+
static bool SetPositiveUint32(const Napi::Object &config, const std::string &fieldName, uint32_t &target) {
166+
if (!config.Has(fieldName) || !IsPresent(config.Get(fieldName))) {
167+
return true;
168+
}
169+
170+
Napi::Env env = config.Env();
171+
if (!config.Get(fieldName).IsNumber()) {
172+
Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
173+
.ThrowAsJavaScriptException();
174+
return false;
175+
}
176+
177+
int64_t value = config.Get(fieldName).ToNumber().Int64Value();
178+
if (value <= 0 || value > UINT32_MAX) {
179+
Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
180+
.ThrowAsJavaScriptException();
181+
return false;
182+
}
183+
184+
target = static_cast<uint32_t>(value);
185+
return true;
186+
}
187+
188+
static std::unique_ptr<pulsar::ServiceInfoProvider> BuildServiceInfoProvider(
189+
const Napi::Object &clientConfig, std::vector<Napi::ObjectReference> &authRefs,
190+
const pulsar::AuthenticationPtr &defaultAuth, const std::optional<std::string> &defaultTls) {
191+
Napi::Value providerValue = clientConfig.Get(CFG_SERVICE_URL_PROVIDER);
192+
Napi::Env env = clientConfig.Env();
193+
194+
if (!providerValue.IsObject()) {
195+
Napi::Error::New(env, "serviceUrlProvider must be an object").ThrowAsJavaScriptException();
196+
return nullptr;
197+
}
198+
199+
Napi::Object providerConfig = providerValue.As<Napi::Object>();
200+
if (!providerConfig.Has(CFG_PRIMARY) || !IsPresent(providerConfig.Get(CFG_PRIMARY))) {
201+
Napi::Error::New(env, "serviceUrlProvider.primary is required").ThrowAsJavaScriptException();
202+
return nullptr;
203+
}
204+
205+
auto primary = BuildServiceInfo(providerConfig.Get(CFG_PRIMARY), "serviceUrlProvider.primary", authRefs,
206+
defaultAuth, defaultTls);
207+
if (!primary.has_value()) {
208+
return nullptr;
209+
}
210+
211+
if (!providerConfig.Has(CFG_SECONDARY) || !providerConfig.Get(CFG_SECONDARY).IsArray()) {
212+
Napi::Error::New(env, "serviceUrlProvider.secondary is required and must be an array")
213+
.ThrowAsJavaScriptException();
214+
return nullptr;
215+
}
216+
217+
Napi::Array secondaryConfig = providerConfig.Get(CFG_SECONDARY).As<Napi::Array>();
218+
if (secondaryConfig.Length() == 0) {
219+
Napi::Error::New(env, "serviceUrlProvider.secondary must contain at least one service")
220+
.ThrowAsJavaScriptException();
221+
return nullptr;
222+
}
223+
224+
std::vector<pulsar::ServiceInfo> secondary;
225+
secondary.reserve(secondaryConfig.Length());
226+
for (uint32_t i = 0; i < secondaryConfig.Length(); i++) {
227+
auto serviceInfo =
228+
BuildServiceInfo(secondaryConfig.Get(i), "serviceUrlProvider.secondary[" + std::to_string(i) + "]",
229+
authRefs, defaultAuth, defaultTls);
230+
if (!serviceInfo.has_value()) {
231+
return nullptr;
232+
}
233+
secondary.emplace_back(std::move(serviceInfo.value()));
234+
}
235+
236+
pulsar::AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary.value()),
237+
std::move(secondary));
238+
239+
uint32_t checkIntervalMs = static_cast<uint32_t>(autoClusterFailoverConfig.checkInterval.count());
240+
if (!SetPositiveUint32(providerConfig, CFG_CHECK_INTERVAL_MS, checkIntervalMs)) {
241+
return nullptr;
242+
}
243+
autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);
244+
245+
if (!SetPositiveUint32(providerConfig, CFG_FAILOVER_THRESHOLD,
246+
autoClusterFailoverConfig.failoverThreshold)) {
247+
return nullptr;
248+
}
249+
250+
if (!SetPositiveUint32(providerConfig, CFG_SWITCH_BACK_THRESHOLD,
251+
autoClusterFailoverConfig.switchBackThreshold)) {
252+
return nullptr;
253+
}
254+
255+
return std::unique_ptr<pulsar::ServiceInfoProvider>(
256+
new pulsar::AutoClusterFailover(std::move(autoClusterFailoverConfig)));
257+
}
258+
55259
void Client::SetLogHandler(const Napi::CallbackInfo &info) {
56260
Napi::Env env = info.Env();
57261
Napi::HandleScope scope(env);
@@ -103,16 +307,34 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
103307
Napi::HandleScope scope(env);
104308
Napi::Object clientConfig = info[0].As<Napi::Object>();
105309

106-
if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString() ||
107-
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
310+
bool hasServiceUrlProvider =
311+
clientConfig.Has(CFG_SERVICE_URL_PROVIDER) && IsPresent(clientConfig.Get(CFG_SERVICE_URL_PROVIDER));
312+
bool hasServiceUrl = clientConfig.Has(CFG_SERVICE_URL) && IsPresent(clientConfig.Get(CFG_SERVICE_URL));
313+
if (hasServiceUrlProvider && hasServiceUrl) {
314+
Napi::Error::New(env, "Only one of serviceUrl or serviceUrlProvider can be configured")
315+
.ThrowAsJavaScriptException();
316+
return;
317+
}
318+
319+
if (!hasServiceUrlProvider && !hasServiceUrl) {
320+
Napi::Error::New(env,
321+
"Service URL is required and must be specified as a string unless serviceUrlProvider "
322+
"is configured")
323+
.ThrowAsJavaScriptException();
324+
return;
325+
}
326+
327+
if (hasServiceUrl && (!clientConfig.Get(CFG_SERVICE_URL).IsString() ||
328+
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty())) {
108329
Napi::Error::New(env, "Service URL is required and must be specified as a string")
109330
.ThrowAsJavaScriptException();
110331
return;
111332
}
112-
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
113333

114334
this->cClientConfig = std::shared_ptr<pulsar_client_configuration_t>(pulsar_client_configuration_create(),
115335
pulsar_client_configuration_free);
336+
pulsar::AuthenticationPtr defaultAuthentication = pulsar::AuthFactory::Disabled();
337+
std::optional<std::string> defaultTlsTrustCertsFilePath = std::nullopt;
116338

117339
// The logger can only be set once per process, so we will take control of it
118340
if (clientConfig.Has(CFG_LOG_LEVEL) && clientConfig.Get(CFG_LOG_LEVEL).IsNumber()) {
@@ -145,9 +367,12 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
145367
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
146368
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
147369
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
148-
this->authRef_ = Napi::Persistent(obj.Get(CFG_AUTH_PROP).As<Napi::Object>());
149-
Authentication *auth = Authentication::Unwrap(this->authRef_.Value());
150-
pulsar_client_configuration_set_auth(cClientConfig.get(), auth->GetCAuthentication());
370+
auto auth = BuildAuthenticationPtr(obj, this->authRefs_);
371+
if (!auth.has_value()) {
372+
return;
373+
}
374+
cClientConfig.get()->conf.setAuth(auth.value());
375+
defaultAuthentication = auth.value();
151376
}
152377
}
153378

@@ -190,6 +415,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
190415
Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
191416
pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig.get(),
192417
tlsTrustCertsFilePath.Utf8Value().c_str());
418+
defaultTlsTrustCertsFilePath = tlsTrustCertsFilePath.Utf8Value();
193419
}
194420

195421
if (clientConfig.Has(CFG_TLS_CERT_FILE) && clientConfig.Get(CFG_TLS_CERT_FILE).IsString()) {
@@ -227,8 +453,23 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
227453
}
228454

229455
try {
230-
this->cClient = std::shared_ptr<pulsar_client_t>(
231-
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
456+
if (hasServiceUrlProvider) {
457+
std::unique_ptr<pulsar::ServiceInfoProvider> serviceInfoProvider = BuildServiceInfoProvider(
458+
clientConfig, this->authRefs_, defaultAuthentication, defaultTlsTrustCertsFilePath);
459+
if (serviceInfoProvider == nullptr) {
460+
return;
461+
}
462+
463+
std::unique_ptr<pulsar_client_t> rawClient(new pulsar_client_t);
464+
rawClient->client.reset(new pulsar::Client(
465+
pulsar::Client::create(std::move(serviceInfoProvider), cClientConfig.get()->conf)));
466+
this->cClient = std::shared_ptr<pulsar_client_t>(rawClient.release(),
467+
[](pulsar_client_t *client) { delete client; });
468+
} else {
469+
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
470+
this->cClient = std::shared_ptr<pulsar_client_t>(
471+
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
472+
}
232473
} catch (const std::exception &e) {
233474
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
234475
}

src/Client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <napi.h>
2424
#include <pulsar/c/client.h>
25+
#include <vector>
2526

2627
struct LogMessage {
2728
pulsar_logger_level_t level;
@@ -54,7 +55,7 @@ class Client : public Napi::ObjectWrap<Client> {
5455
std::shared_ptr<pulsar_client_t> cClient;
5556
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
5657
pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO;
57-
Napi::ObjectReference authRef_;
58+
std::vector<Napi::ObjectReference> authRefs_;
5859

5960
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
6061
Napi::Value Subscribe(const Napi::CallbackInfo &info);

tests/client.test.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ const baseUrl = 'http://localhost:8080';
2626
describe('Client', () => {
2727
describe('CreateFailedByUrlSetIncorrect', () => {
2828
test('No Set Url', async () => {
29+
const expectedError = 'Service URL is required and must be specified as a string '
30+
+ 'unless serviceUrlProvider is configured';
2931
await expect(() => new Pulsar.Client({
3032
operationTimeoutSeconds: 30,
31-
})).toThrow('Service URL is required and must be specified as a string');
33+
})).toThrow(expectedError);
3234
});
3335

3436
test('Set empty url', async () => {
@@ -51,6 +53,17 @@ const baseUrl = 'http://localhost:8080';
5153
operationTimeoutSeconds: 30,
5254
})).toThrow('Service URL is required and must be specified as a string');
5355
});
56+
57+
test('Set both service url and service url provider', async () => {
58+
await expect(() => new Pulsar.Client({
59+
serviceUrl: 'pulsar://localhost:6650',
60+
serviceUrlProvider: {
61+
primary: 'pulsar://localhost:6650',
62+
secondary: ['pulsar://localhost:6651'],
63+
},
64+
operationTimeoutSeconds: 30,
65+
})).toThrow('Only one of serviceUrl or serviceUrlProvider can be configured');
66+
});
5467
});
5568
describe('test getPartitionsForTopic', () => {
5669
test('GetPartitions for empty topic', async () => {

0 commit comments

Comments
 (0)