Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
/// <reference types="node" />

export interface ClientConfig {
serviceUrl: string;
export type ClientConfig = ClientConfigBase & (
{ serviceUrl: string; serviceUrlProvider?: never } |
{ serviceUrl?: never; serviceUrlProvider: AutoClusterFailoverConfig }
);

export interface ClientConfigBase {
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
operationTimeoutSeconds?: number;
ioThreads?: number;
Expand All @@ -37,6 +41,20 @@ export interface ClientConfig {
connectionTimeoutMs?: number;
}

export type ServiceInfo = string | {
serviceUrl: string;
authentication?: AuthenticationTls | AuthenticationAthenz | AuthenticationToken | AuthenticationOauth2 | AuthenticationBasic;
tlsTrustCertsFilePath?: string;
};

export interface AutoClusterFailoverConfig {
primary: ServiceInfo;
secondary: ServiceInfo[];
checkIntervalMs?: number;
failoverThreshold?: number;
switchBackThreshold?: number;
}

export class Client {
static setLogHandler(logHandler: (level: LogLevel, file: string, line: number, message: string) => void): void;
constructor(config: ClientConfig);
Expand Down
249 changes: 241 additions & 8 deletions src/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,28 @@
#include "Producer.h"
#include "Reader.h"
#include "ThreadSafeDeferred.h"
#include <pulsar/AutoClusterFailover.h>
#include <pulsar/Client.h>
#include <pulsar/ServiceInfo.h>
#include <pulsar/c/authentication.h>
#include <pulsar/c/client.h>
#include <pulsar/c/client_configuration.h>
#include <pulsar/c/result.h>
#include "pulsar/ClientConfiguration.h"
#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>

static const std::string CFG_SERVICE_URL = "serviceUrl";
static const std::string CFG_SERVICE_URL_PROVIDER = "serviceUrlProvider";
static const std::string CFG_PRIMARY = "primary";
static const std::string CFG_SECONDARY = "secondary";
static const std::string CFG_CHECK_INTERVAL_MS = "checkIntervalMs";
static const std::string CFG_FAILOVER_THRESHOLD = "failoverThreshold";
static const std::string CFG_SWITCH_BACK_THRESHOLD = "switchBackThreshold";
static const std::string CFG_AUTH = "authentication";
static const std::string CFG_AUTH_PROP = "binding";
static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds";
Expand All @@ -52,6 +68,194 @@ struct _pulsar_client_configuration {
pulsar::ClientConfiguration conf;
};

struct _pulsar_client {
std::unique_ptr<pulsar::Client> client;
};

struct _pulsar_authentication {
pulsar::AuthenticationPtr auth;
};

static bool IsPresent(const Napi::Value &value) { return !value.IsUndefined() && !value.IsNull(); }

static std::optional<pulsar::AuthenticationPtr> BuildAuthenticationPtr(
const Napi::Object &authObject, std::vector<Napi::ObjectReference> &authRefs) {
Napi::Env env = authObject.Env();

if (!authObject.Has(CFG_AUTH_PROP) || !authObject.Get(CFG_AUTH_PROP).IsObject()) {
Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

Napi::Object binding = authObject.Get(CFG_AUTH_PROP).As<Napi::Object>();
authRefs.emplace_back(Napi::Persistent(binding));
Authentication *auth = Authentication::Unwrap(authRefs.back().Value());

if (auth == nullptr || auth->GetCAuthentication() == nullptr) {
Napi::Error::New(env, "Authentication must be a Pulsar authentication object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

return auth->GetCAuthentication()->auth;
}

static std::optional<pulsar::ServiceInfo> BuildServiceInfo(const Napi::Value &value,
const std::string &fieldName,
std::vector<Napi::ObjectReference> &authRefs,
const pulsar::AuthenticationPtr &defaultAuth,
const std::optional<std::string> &defaultTls) {
Napi::Env env = value.Env();

if (value.IsString()) {
std::string serviceUrl = value.ToString().Utf8Value();
if (serviceUrl.empty()) {
Napi::Error::New(env, fieldName + " service URL must be a non-empty string")
.ThrowAsJavaScriptException();
return std::nullopt;
}
return pulsar::ServiceInfo(serviceUrl, defaultAuth, defaultTls);
}

if (!value.IsObject()) {
Napi::Error::New(env, fieldName + " must be a service URL string or service info object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

Napi::Object serviceInfo = value.As<Napi::Object>();
if (!serviceInfo.Has(CFG_SERVICE_URL) || !serviceInfo.Get(CFG_SERVICE_URL).IsString() ||
serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
Napi::Error::New(env, fieldName + ".serviceUrl is required and must be a non-empty string")
.ThrowAsJavaScriptException();
return std::nullopt;
}

std::string serviceUrl = serviceInfo.Get(CFG_SERVICE_URL).ToString().Utf8Value();
pulsar::AuthenticationPtr authentication = defaultAuth;
std::optional<std::string> tlsTrustCertsFilePath = defaultTls;

if (serviceInfo.Has(CFG_AUTH) && IsPresent(serviceInfo.Get(CFG_AUTH))) {
if (!serviceInfo.Get(CFG_AUTH).IsObject()) {
Napi::Error::New(env, fieldName + ".authentication must be a Pulsar authentication object")
.ThrowAsJavaScriptException();
return std::nullopt;
}

auto auth = BuildAuthenticationPtr(serviceInfo.Get(CFG_AUTH).As<Napi::Object>(), authRefs);
if (!auth.has_value()) {
return std::nullopt;
}
authentication = auth.value();
}

if (serviceInfo.Has(CFG_TLS_TRUST_CERT) && IsPresent(serviceInfo.Get(CFG_TLS_TRUST_CERT))) {
if (!serviceInfo.Get(CFG_TLS_TRUST_CERT).IsString()) {
Napi::Error::New(env, fieldName + ".tlsTrustCertsFilePath must be a string")
.ThrowAsJavaScriptException();
return std::nullopt;
}
tlsTrustCertsFilePath = serviceInfo.Get(CFG_TLS_TRUST_CERT).ToString().Utf8Value();
}

return pulsar::ServiceInfo(serviceUrl, authentication, tlsTrustCertsFilePath);
}

static bool SetPositiveUint32(const Napi::Object &config, const std::string &fieldName, uint32_t &target) {
if (!config.Has(fieldName) || !IsPresent(config.Get(fieldName))) {
return true;
}

Napi::Env env = config.Env();
if (!config.Get(fieldName).IsNumber()) {
Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
.ThrowAsJavaScriptException();
return false;
}

int64_t value = config.Get(fieldName).ToNumber().Int64Value();
if (value <= 0 || value > UINT32_MAX) {
Napi::Error::New(env, "serviceUrlProvider." + fieldName + " must be a positive number")
.ThrowAsJavaScriptException();
return false;
}

target = static_cast<uint32_t>(value);
return true;
}

static std::unique_ptr<pulsar::ServiceInfoProvider> BuildServiceInfoProvider(
const Napi::Object &clientConfig, std::vector<Napi::ObjectReference> &authRefs,
const pulsar::AuthenticationPtr &defaultAuth, const std::optional<std::string> &defaultTls) {
Napi::Value providerValue = clientConfig.Get(CFG_SERVICE_URL_PROVIDER);
Napi::Env env = clientConfig.Env();

if (!providerValue.IsObject()) {
Napi::Error::New(env, "serviceUrlProvider must be an object").ThrowAsJavaScriptException();
return nullptr;
}

Napi::Object providerConfig = providerValue.As<Napi::Object>();
if (!providerConfig.Has(CFG_PRIMARY) || !IsPresent(providerConfig.Get(CFG_PRIMARY))) {
Napi::Error::New(env, "serviceUrlProvider.primary is required").ThrowAsJavaScriptException();
return nullptr;
}

auto primary = BuildServiceInfo(providerConfig.Get(CFG_PRIMARY), "serviceUrlProvider.primary", authRefs,
defaultAuth, defaultTls);
if (!primary.has_value()) {
return nullptr;
}

if (!providerConfig.Has(CFG_SECONDARY) || !providerConfig.Get(CFG_SECONDARY).IsArray()) {
Napi::Error::New(env, "serviceUrlProvider.secondary is required and must be an array")
.ThrowAsJavaScriptException();
return nullptr;
}

Napi::Array secondaryConfig = providerConfig.Get(CFG_SECONDARY).As<Napi::Array>();
if (secondaryConfig.Length() == 0) {
Napi::Error::New(env, "serviceUrlProvider.secondary must contain at least one service")
.ThrowAsJavaScriptException();
return nullptr;
}

std::vector<pulsar::ServiceInfo> secondary;
secondary.reserve(secondaryConfig.Length());
for (uint32_t i = 0; i < secondaryConfig.Length(); i++) {
auto serviceInfo =
BuildServiceInfo(secondaryConfig.Get(i), "serviceUrlProvider.secondary[" + std::to_string(i) + "]",
authRefs, defaultAuth, defaultTls);
if (!serviceInfo.has_value()) {
return nullptr;
}
secondary.emplace_back(std::move(serviceInfo.value()));
}

pulsar::AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary.value()),
std::move(secondary));

uint32_t checkIntervalMs = static_cast<uint32_t>(autoClusterFailoverConfig.checkInterval.count());
if (!SetPositiveUint32(providerConfig, CFG_CHECK_INTERVAL_MS, checkIntervalMs)) {
return nullptr;
}
autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);

if (!SetPositiveUint32(providerConfig, CFG_FAILOVER_THRESHOLD,
autoClusterFailoverConfig.failoverThreshold)) {
return nullptr;
}

if (!SetPositiveUint32(providerConfig, CFG_SWITCH_BACK_THRESHOLD,
autoClusterFailoverConfig.switchBackThreshold)) {
return nullptr;
}

return std::unique_ptr<pulsar::ServiceInfoProvider>(
new pulsar::AutoClusterFailover(std::move(autoClusterFailoverConfig)));
}

void Client::SetLogHandler(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
Expand Down Expand Up @@ -103,16 +307,28 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
Napi::HandleScope scope(env);
Napi::Object clientConfig = info[0].As<Napi::Object>();

if (!clientConfig.Has(CFG_SERVICE_URL) || !clientConfig.Get(CFG_SERVICE_URL).IsString() ||
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty()) {
Napi::Error::New(env, "Service URL is required and must be specified as a string")
bool hasServiceUrlProvider =
clientConfig.Has(CFG_SERVICE_URL_PROVIDER) && IsPresent(clientConfig.Get(CFG_SERVICE_URL_PROVIDER));
bool hasServiceUrl = clientConfig.Has(CFG_SERVICE_URL) && IsPresent(clientConfig.Get(CFG_SERVICE_URL));
if (hasServiceUrlProvider && hasServiceUrl) {
Napi::Error::New(env, "Only one of serviceUrl or serviceUrlProvider can be configured")
.ThrowAsJavaScriptException();
return;
Comment thread
shibd marked this conversation as resolved.
}

if (!hasServiceUrlProvider && (!hasServiceUrl || !clientConfig.Get(CFG_SERVICE_URL).IsString() ||
clientConfig.Get(CFG_SERVICE_URL).ToString().Utf8Value().empty())) {
Napi::Error::New(env,
"Service URL is required and must be specified as a string unless serviceUrlProvider "
"is configured")
.ThrowAsJavaScriptException();
return;
}
Comment thread
shibd marked this conversation as resolved.
Outdated
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();

this->cClientConfig = std::shared_ptr<pulsar_client_configuration_t>(pulsar_client_configuration_create(),
pulsar_client_configuration_free);
pulsar::AuthenticationPtr defaultAuthentication = pulsar::AuthFactory::Disabled();
std::optional<std::string> defaultTlsTrustCertsFilePath = std::nullopt;

// The logger can only be set once per process, so we will take control of it
if (clientConfig.Has(CFG_LOG_LEVEL) && clientConfig.Get(CFG_LOG_LEVEL).IsNumber()) {
Expand Down Expand Up @@ -145,9 +361,10 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
this->authRef_ = Napi::Persistent(obj.Get(CFG_AUTH_PROP).As<Napi::Object>());
Authentication *auth = Authentication::Unwrap(this->authRef_.Value());
this->authRefs_.emplace_back(Napi::Persistent(obj.Get(CFG_AUTH_PROP).As<Napi::Object>()));
Authentication *auth = Authentication::Unwrap(this->authRefs_.back().Value());
Comment thread
shibd marked this conversation as resolved.
Outdated
pulsar_client_configuration_set_auth(cClientConfig.get(), auth->GetCAuthentication());
defaultAuthentication = auth->GetCAuthentication()->auth;
}
}

Expand Down Expand Up @@ -190,6 +407,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
Napi::String tlsTrustCertsFilePath = clientConfig.Get(CFG_TLS_TRUST_CERT).ToString();
pulsar_client_configuration_set_tls_trust_certs_file_path(cClientConfig.get(),
tlsTrustCertsFilePath.Utf8Value().c_str());
defaultTlsTrustCertsFilePath = tlsTrustCertsFilePath.Utf8Value();
}

if (clientConfig.Has(CFG_TLS_CERT_FILE) && clientConfig.Get(CFG_TLS_CERT_FILE).IsString()) {
Expand Down Expand Up @@ -227,8 +445,23 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
}

try {
this->cClient = std::shared_ptr<pulsar_client_t>(
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
if (hasServiceUrlProvider) {
std::unique_ptr<pulsar::ServiceInfoProvider> serviceInfoProvider = BuildServiceInfoProvider(
clientConfig, this->authRefs_, defaultAuthentication, defaultTlsTrustCertsFilePath);
if (serviceInfoProvider == nullptr) {
return;
}

pulsar_client_t *rawClient = new pulsar_client_t;
rawClient->client.reset(new pulsar::Client(
pulsar::Client::create(std::move(serviceInfoProvider), cClientConfig.get()->conf)));
this->cClient =
std::shared_ptr<pulsar_client_t>(rawClient, [](pulsar_client_t *client) { delete client; });
} else {
Comment thread
shibd marked this conversation as resolved.
Outdated
Napi::String serviceUrl = clientConfig.Get(CFG_SERVICE_URL).ToString();
this->cClient = std::shared_ptr<pulsar_client_t>(
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
}
} catch (const std::exception &e) {
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
}
Expand Down
3 changes: 2 additions & 1 deletion src/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <napi.h>
#include <pulsar/c/client.h>
#include <vector>

struct LogMessage {
pulsar_logger_level_t level;
Expand Down Expand Up @@ -54,7 +55,7 @@ class Client : public Napi::ObjectWrap<Client> {
std::shared_ptr<pulsar_client_t> cClient;
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO;
Napi::ObjectReference authRef_;
std::vector<Napi::ObjectReference> authRefs_;

Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);
Expand Down
4 changes: 3 additions & 1 deletion tests/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ const baseUrl = 'http://localhost:8080';
describe('Client', () => {
describe('CreateFailedByUrlSetIncorrect', () => {
test('No Set Url', async () => {
const expectedError = 'Service URL is required and must be specified as a string '
+ 'unless serviceUrlProvider is configured';
await expect(() => new Pulsar.Client({
operationTimeoutSeconds: 30,
})).toThrow('Service URL is required and must be specified as a string');
})).toThrow(expectedError);
});

test('Set empty url', async () => {
Expand Down
Loading
Loading