Skip to content

Commit bf74051

Browse files
committed
Add the framework for AutoClusterFailover
1 parent c6de067 commit bf74051

5 files changed

Lines changed: 207 additions & 6 deletions

File tree

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
20+
#define PULSAR_AUTO_CLUSTER_FAILOVER_H_
21+
22+
#include <pulsar/ServiceInfo.h>
23+
24+
#include <chrono>
25+
26+
namespace pulsar {
27+
28+
class Client;
29+
class AutoClusterFailoverImpl;
30+
31+
class PULSAR_PUBLIC AutoClusterFailover final {
32+
public:
33+
/**
34+
* Leverage `Client`'s internals for service URL probe and cluster switching.
35+
*/
36+
void initialize(Client& client);
37+
38+
struct Config {
39+
ServiceInfo primary;
40+
std::vector<ServiceInfo> secondary;
41+
std::chrono::milliseconds checkInterval{30000}; // 30 seconds
42+
};
43+
44+
class Builder {
45+
public:
46+
Builder(ServiceInfo primary, std::vector<ServiceInfo> secondary) {
47+
config_.primary = std::move(primary);
48+
config_.secondary = std::move(secondary);
49+
}
50+
51+
Builder& withCheckInterval(std::chrono::milliseconds interval) {
52+
config_.checkInterval = interval;
53+
return *this;
54+
}
55+
56+
AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); }
57+
58+
private:
59+
Config config_;
60+
};
61+
62+
explicit AutoClusterFailover(Config&& config);
63+
~AutoClusterFailover();
64+
65+
private:
66+
std::shared_ptr<AutoClusterFailoverImpl> impl_;
67+
};
68+
69+
} // namespace pulsar
70+
71+
#endif

include/pulsar/Client.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <pulsar/Reader.h>
3030
#include <pulsar/Result.h>
3131
#include <pulsar/Schema.h>
32+
#include <pulsar/ServiceInfo.h>
3233
#include <pulsar/TableView.h>
3334
#include <pulsar/defines.h>
3435

@@ -43,12 +44,6 @@ typedef std::function<void(Result, TableView)> TableViewCallback;
4344
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
4445
typedef std::function<void(Result)> CloseCallback;
4546

46-
struct PULSAR_PUBLIC ServiceInfo {
47-
std::string serviceUrl;
48-
std::optional<AuthenticationPtr> authentication;
49-
std::optional<std::string> tlsTrustCertsFilePath;
50-
};
51-
5247
class ClientImpl;
5348
class PulsarFriend;
5449
class PulsarWrapper;
@@ -443,6 +438,7 @@ class PULSAR_PUBLIC Client {
443438

444439
friend class PulsarFriend;
445440
friend class PulsarWrapper;
441+
friend class AutoClusterFailover;
446442
std::shared_ptr<ClientImpl> impl_;
447443
};
448444
} // namespace pulsar

include/pulsar/ServiceInfo.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef PULSAR_SERVICE_INFO_H_
20+
#define PULSAR_SERVICE_INFO_H_
21+
22+
#include <pulsar/Authentication.h>
23+
24+
#include <optional>
25+
#include <string>
26+
27+
namespace pulsar {
28+
29+
struct PULSAR_PUBLIC ServiceInfo {
30+
std::string serviceUrl;
31+
std::optional<AuthenticationPtr> authentication;
32+
std::optional<std::string> tlsTrustCertsFilePath;
33+
34+
bool operator==(const ServiceInfo& other) const {
35+
return serviceUrl == other.serviceUrl && authentication == other.authentication &&
36+
tlsTrustCertsFilePath == other.tlsTrustCertsFilePath;
37+
}
38+
};
39+
40+
} // namespace pulsar
41+
42+
#endif

lib/AutoClusterFailover.cc

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <pulsar/AutoClusterFailover.h>
20+
#include <pulsar/Client.h>
21+
22+
#include <atomic>
23+
#include <memory>
24+
25+
#include "ClientImpl.h"
26+
#include "ExecutorService.h"
27+
#include "LogUtils.h"
28+
29+
DECLARE_LOG_OBJECT()
30+
31+
namespace pulsar {
32+
33+
class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterFailoverImpl> {
34+
public:
35+
explicit AutoClusterFailoverImpl(AutoClusterFailover::Config&& config) : config_(std::move(config)) {}
36+
37+
~AutoClusterFailoverImpl() { running_->store(false, std::memory_order_release); }
38+
39+
void initialize(const ClientImplPtr& client) {
40+
client_ = client;
41+
executor_ = client->getIOExecutorProvider()->get();
42+
scheduleProbeAndUpdateServiceUrl();
43+
}
44+
45+
void scheduleProbeAndUpdateServiceUrl() {
46+
auto timer = executor_->createDeadlineTimer();
47+
timer->expires_after(config_.checkInterval);
48+
timer->async_wait([this, weakSelf{weak_from_this()}, timer](auto error) {
49+
auto self = weakSelf.lock();
50+
if (!self) {
51+
LOG_INFO("AutoClusterFailoverImpl has been destroyed, exiting timer callback");
52+
return;
53+
}
54+
auto closed = !running_->load(std::memory_order_acquire);
55+
if (!error || closed) {
56+
LOG_INFO("AutoClusterFailover exited, timer error: " << error.message()
57+
<< ", closed: " << closed);
58+
return;
59+
}
60+
61+
probeAndUpdateServiceUrl();
62+
});
63+
}
64+
65+
void probeAndUpdateServiceUrl() {
66+
auto currentServiceInfo = client_->getServiceInfo();
67+
if (currentServiceInfo == config_.primary) {
68+
// TODO: probe whether primary is down
69+
} else {
70+
// TODO:
71+
// 1. probe whether current (one secondary) is down
72+
// 2. if not, check whether primary is up and switch back if it is
73+
}
74+
scheduleProbeAndUpdateServiceUrl();
75+
}
76+
77+
private:
78+
const AutoClusterFailover::Config config_;
79+
ClientImplPtr client_;
80+
ExecutorServicePtr executor_;
81+
std::shared_ptr<std::atomic_bool> running_{std::make_shared<std::atomic_bool>(true)};
82+
};
83+
84+
AutoClusterFailover::AutoClusterFailover(AutoClusterFailover::Config&& config)
85+
: impl_(std::make_shared<AutoClusterFailoverImpl>(std::move(config))) {}
86+
87+
AutoClusterFailover::~AutoClusterFailover() {}
88+
89+
void AutoClusterFailover::initialize(Client& client) { impl_->initialize(client.impl_); }
90+
91+
} // namespace pulsar

lib/ClientImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
236236
LookupServiceFactory lookupServiceFactory_;
237237

238238
friend class Client;
239+
friend class AutoClusterFailoverImpl;
239240
};
240241
} /* namespace pulsar */
241242

0 commit comments

Comments
 (0)