Skip to content

Commit 810f0d3

Browse files
committed
feat: cyclic synchronizer rotation with block/unblock state
1 parent df918a1 commit 810f0d3

7 files changed

Lines changed: 647 additions & 36 deletions

File tree

libs/server-sdk/src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ target_sources(${LIBNAME}
6262
data_systems/fdv2/streaming_synchronizer.cpp
6363
data_systems/fdv2/conditions.hpp
6464
data_systems/fdv2/conditions.cpp
65+
data_systems/fdv2/source_manager.hpp
66+
data_systems/fdv2/source_manager.cpp
6567
data_systems/fdv2/fdv2_data_system.hpp
6668
data_systems/fdv2/fdv2_data_system.cpp
6769
data_systems/background_sync/sources/streaming/streaming_data_source.hpp

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ FDv2DataSystem::FDv2DataSystem(
3737
: logger_(logger),
3838
ioc_(std::move(ioc)),
3939
initializer_factories_(std::move(initializer_factories)),
40-
synchronizer_factories_(std::move(synchronizer_factories)),
4140
fallback_condition_factory_(std::move(fallback_condition_factory)),
4241
recovery_condition_factory_(std::move(recovery_condition_factory)),
4342
status_manager_(status_manager),
@@ -47,7 +46,7 @@ FDv2DataSystem::FDv2DataSystem(
4746
closed_(false),
4847
selector_(),
4948
initializer_index_(0),
50-
synchronizer_index_(0),
49+
source_manager_(std::move(synchronizer_factories)),
5150
active_initializer_(nullptr),
5251
active_synchronizer_(nullptr),
5352
active_conditions_(nullptr) {}
@@ -101,7 +100,8 @@ void FDv2DataSystem::Initialize() {
101100
assert(!already_called && "Initialize() must be called at most once");
102101

103102
LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": starting";
104-
if (initializer_factories_.empty() && synchronizer_factories_.empty()) {
103+
if (initializer_factories_.empty() &&
104+
source_manager_.SynchronizerCount() == 0) {
105105
// Offline mode: empty store is the canonical state.
106106
status_manager_->SetState(DataSourceStatus::DataSourceState::kValid);
107107
return;
@@ -196,30 +196,27 @@ void FDv2DataSystem::OnInitializerResult(
196196

197197
void FDv2DataSystem::StartSynchronizers() {
198198
bool exhausted = false;
199-
bool cycled_synchronizers = false;
199+
// True if at least one synchronizer factory was configured at construction.
200+
bool any_synchronizers_configured = false;
200201
{
201202
std::lock_guard<std::mutex> lock(mutex_);
202203
if (closed_) {
203204
return;
204205
}
205-
if (synchronizer_index_ >= synchronizer_factories_.size()) {
206-
exhausted = true;
207-
cycled_synchronizers = synchronizer_index_ > 0;
206+
active_synchronizer_ = source_manager_.NextSynchronizer();
207+
if (active_synchronizer_) {
208+
active_conditions_ = BuildActiveConditions();
208209
} else {
209-
auto& factory = synchronizer_factories_[synchronizer_index_];
210-
active_synchronizer_ = factory->Build();
211-
active_conditions_ =
212-
BuildConditionsForSynchronizer(synchronizer_index_);
213-
++synchronizer_index_;
210+
exhausted = true;
211+
any_synchronizers_configured =
212+
source_manager_.SynchronizerCount() > 0;
214213
}
215214
}
216215

217216
if (exhausted) {
218-
// kOff when we can't continue updating; init-only with data stays
219-
// kValid.
220-
if (cycled_synchronizers || !store_.Initialized()) {
217+
if (any_synchronizers_configured || !store_.Initialized()) {
221218
std::string const message =
222-
cycled_synchronizers
219+
any_synchronizers_configured
223220
? "all data source acquisition methods have been exhausted"
224221
: "all initializers exhausted and no synchronizers "
225222
"configured";
@@ -274,7 +271,7 @@ void FDv2DataSystem::OnConditionFired(
274271
if (type == Type::kRecovery) {
275272
LD_LOG(logger_, LogLevel::kInfo)
276273
<< Identity() << ": recovery condition met";
277-
synchronizer_index_ = 0;
274+
source_manager_.ResetSourceIndex();
278275
} else {
279276
LD_LOG(logger_, LogLevel::kInfo)
280277
<< Identity() << ": fallback condition met";
@@ -283,16 +280,18 @@ void FDv2DataSystem::OnConditionFired(
283280
StartSynchronizers();
284281
}
285282

286-
std::unique_ptr<Conditions> FDv2DataSystem::BuildConditionsForSynchronizer(
287-
std::size_t synchronizer_position) const {
283+
std::unique_ptr<Conditions> FDv2DataSystem::BuildActiveConditions() const {
288284
std::vector<std::unique_ptr<data_interfaces::IFDv2Condition>> conditions;
289-
if (synchronizer_factories_.size() <= 1) {
285+
// With only one synchronizer available there's nothing to fall back to
286+
// or recover from, so leave the conditions empty.
287+
if (source_manager_.AvailableSynchronizerCount() == 1) {
290288
return std::make_unique<Conditions>(std::move(conditions));
291289
}
292290
if (fallback_condition_factory_) {
293291
conditions.push_back(fallback_condition_factory_->Build());
294292
}
295-
if (synchronizer_position > 0 && recovery_condition_factory_) {
293+
// The prime synchronizer has nothing more-preferred to recover to.
294+
if (!source_manager_.IsPrimeSynchronizer() && recovery_condition_factory_) {
296295
conditions.push_back(recovery_condition_factory_->Build());
297296
}
298297
return std::make_unique<Conditions>(std::move(conditions));
@@ -356,6 +355,7 @@ void FDv2DataSystem::OnSynchronizerResult(
356355
return;
357356
}
358357
if (advance) {
358+
source_manager_.BlockCurrentSynchronizer();
359359
active_synchronizer_.reset();
360360
active_conditions_.reset();
361361
}

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "../../data_interfaces/source/ifdv2_synchronizer_factory.hpp"
99
#include "../../data_interfaces/system/idata_system.hpp"
1010
#include "conditions.hpp"
11+
#include "source_manager.hpp"
1112

1213
#include <launchdarkly/data_model/selector.hpp>
1314
#include <launchdarkly/logging/logger.hpp>
@@ -104,16 +105,18 @@ namespace launchdarkly::server_side::data_systems {
104105
* |
105106
* | (N exhausted, or basis received)
106107
* v
107-
* +-------------------+ synchronizer #M's Next returns:
108+
* +-------------------+ active synchronizer's Next returns:
108109
* | Synchronizer | ChangeSet -> apply, loop
109110
* | phase | Interrupted -> loop (source self-retries)
110-
* | M = 0, 1, 2, ... | Timeout -> loop
111-
* | | Goodbye -> loop (source self-restarts)
112-
* | | TerminalError -> M += 1
113-
* | | Shutdown -> [Closed]
111+
* | (cyclic; | Goodbye -> loop (source self-restarts)
112+
* | blocked sources | TerminalError -> block, advance
113+
* | are skipped) | Shutdown -> [Closed]
114114
* +-------------------+
115+
* ^ | fallback condition -> advance (with wrap)
116+
* | | recovery condition -> reset to first available
117+
* +---+
115118
* |
116-
* | (M exhausted)
119+
* | (all synchronizers blocked)
117120
* v
118121
* [Done; final status preserved]
119122
*
@@ -248,11 +251,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
248251
void OnSynchronizerResult(data_interfaces::FDv2SourceResult result);
249252
void OnConditionFired(data_interfaces::IFDv2Condition::Type type);
250253

251-
// Builds the conditions to apply to a synchronizer at the given chain
252-
// position. Reads only const-after-construction state, so no
253-
// synchronization is required.
254-
std::unique_ptr<Conditions> BuildConditionsForSynchronizer(
255-
std::size_t synchronizer_position) const;
254+
// Builds the conditions to apply to the currently active synchronizer.
255+
// Must be called with mutex_ held; reads source_manager_ state.
256+
std::unique_ptr<Conditions> BuildActiveConditions() const;
256257

257258
// Applies a typed FDv2 changeset to the in-memory store and updates the
258259
// tracked selector if the changeset's selector is non-empty.
@@ -266,9 +267,6 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
266267
boost::asio::any_io_executor const ioc_;
267268
std::vector<std::unique_ptr<data_interfaces::IFDv2InitializerFactory>> const
268269
initializer_factories_;
269-
std::vector<
270-
std::unique_ptr<data_interfaces::IFDv2SynchronizerFactory>> const
271-
synchronizer_factories_;
272270
std::unique_ptr<data_interfaces::IFDv2ConditionFactory> const
273271
fallback_condition_factory_;
274272
std::unique_ptr<data_interfaces::IFDv2ConditionFactory> const
@@ -290,7 +288,7 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
290288
bool closed_;
291289
data_model::Selector selector_;
292290
std::size_t initializer_index_;
293-
std::size_t synchronizer_index_;
291+
SourceManager source_manager_;
294292
std::unique_ptr<data_interfaces::IFDv2Initializer> active_initializer_;
295293
std::unique_ptr<data_interfaces::IFDv2Synchronizer> active_synchronizer_;
296294
std::unique_ptr<Conditions> active_conditions_;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#include "source_manager.hpp"
2+
3+
#include <utility>
4+
5+
namespace launchdarkly::server_side::data_systems {
6+
7+
using data_interfaces::IFDv2Synchronizer;
8+
using data_interfaces::IFDv2SynchronizerFactory;
9+
10+
SourceManager::SourceManager(
11+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories) {
12+
synchronizers_.reserve(factories.size());
13+
for (auto& factory : factories) {
14+
synchronizers_.push_back(
15+
SynchronizerFactoryWithState{std::move(factory), State::kAvailable,
16+
/*is_fdv1_fallback=*/false});
17+
}
18+
}
19+
20+
std::unique_ptr<IFDv2Synchronizer> SourceManager::NextSynchronizer() {
21+
if (synchronizers_.empty()) {
22+
current_factory_index_ = -1;
23+
return nullptr;
24+
}
25+
for (std::size_t visited = 0; visited < synchronizers_.size(); ++visited) {
26+
synchronizer_index_ =
27+
(synchronizer_index_ + 1) % static_cast<int>(synchronizers_.size());
28+
if (synchronizers_[synchronizer_index_].state == State::kAvailable) {
29+
current_factory_index_ = synchronizer_index_;
30+
return synchronizers_[synchronizer_index_].factory->Build();
31+
}
32+
}
33+
current_factory_index_ = -1;
34+
return nullptr;
35+
}
36+
37+
void SourceManager::BlockCurrentSynchronizer() {
38+
if (current_factory_index_ >= 0) {
39+
synchronizers_[current_factory_index_].state = State::kBlocked;
40+
}
41+
}
42+
43+
void SourceManager::ResetSourceIndex() {
44+
synchronizer_index_ = -1;
45+
}
46+
47+
bool SourceManager::IsPrimeSynchronizer() const {
48+
for (std::size_t i = 0; i < synchronizers_.size(); ++i) {
49+
if (synchronizers_[i].state == State::kAvailable) {
50+
return synchronizer_index_ == static_cast<int>(i);
51+
}
52+
}
53+
return false;
54+
}
55+
56+
std::size_t SourceManager::AvailableSynchronizerCount() const {
57+
std::size_t count = 0;
58+
for (auto const& s : synchronizers_) {
59+
if (s.state == State::kAvailable) {
60+
++count;
61+
}
62+
}
63+
return count;
64+
}
65+
66+
std::size_t SourceManager::SynchronizerCount() const {
67+
return synchronizers_.size();
68+
}
69+
70+
bool SourceManager::IsCurrentSynchronizerFDv1Fallback() const {
71+
return current_factory_index_ >= 0 &&
72+
synchronizers_[current_factory_index_].is_fdv1_fallback;
73+
}
74+
75+
} // namespace launchdarkly::server_side::data_systems
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#pragma once
2+
3+
#include "../../data_interfaces/source/ifdv2_synchronizer.hpp"
4+
#include "../../data_interfaces/source/ifdv2_synchronizer_factory.hpp"
5+
6+
#include <cstddef>
7+
#include <memory>
8+
#include <vector>
9+
10+
namespace launchdarkly::server_side::data_systems {
11+
12+
/**
13+
* Manages a list of synchronizer factories together with per-factory state
14+
* (Available / Blocked) and the index of the currently active factory.
15+
*
16+
* Iteration is cyclic: NextSynchronizer advances past the end and wraps to
17+
* the beginning, skipping any factory in the Blocked state. A factory enters
18+
* the Blocked state when BlockCurrentSynchronizer is called (typically on a
19+
* terminal error). Once blocked, a factory is not eligible to be built again
20+
* until unblocked.
21+
*
22+
* ResetSourceIndex causes the next call to start iteration at index 0 — used
23+
* by recovery, which wants to fall back to the most-preferred Available
24+
* synchronizer.
25+
*
26+
* Each factory also carries an is_fdv1_fallback flag, currently always
27+
* false. TODO: populate when the FDv1 fallback directive is implemented.
28+
*
29+
* Not thread-safe. The caller is responsible for serializing all calls.
30+
*/
31+
class SourceManager {
32+
public:
33+
explicit SourceManager(
34+
std::vector<std::unique_ptr<data_interfaces::IFDv2SynchronizerFactory>>
35+
factories);
36+
37+
/**
38+
* Advances to the next Available synchronizer factory (wrapping past the
39+
* end), builds a synchronizer instance from it, and records that factory
40+
* as the current one for subsequent queries. Returns nullptr if no
41+
* Available factory exists.
42+
*/
43+
std::unique_ptr<data_interfaces::IFDv2Synchronizer> NextSynchronizer();
44+
45+
/**
46+
* Marks the currently tracked factory as Blocked. No-op if no factory is
47+
* currently tracked.
48+
*/
49+
void BlockCurrentSynchronizer();
50+
51+
/**
52+
* Resets the iteration cursor so that the next call to NextSynchronizer
53+
* begins searching from index 0.
54+
*/
55+
void ResetSourceIndex();
56+
57+
/**
58+
* Returns true if the currently tracked factory is the first Available
59+
* factory in the list. Returns false if no factory is currently tracked.
60+
*/
61+
[[nodiscard]] bool IsPrimeSynchronizer() const;
62+
63+
/**
64+
* Returns the count of factories not in the Blocked state.
65+
*/
66+
[[nodiscard]] std::size_t AvailableSynchronizerCount() const;
67+
68+
/**
69+
* Returns the total number of factories configured at construction
70+
* (including any currently in the Blocked state). Constant for the
71+
* lifetime of the SourceManager.
72+
*/
73+
[[nodiscard]] std::size_t SynchronizerCount() const;
74+
75+
/**
76+
* Returns true if the currently tracked factory was configured as the
77+
* FDv1 fallback synchronizer. Always false until the FDv1 fallback
78+
* directive is implemented.
79+
*/
80+
[[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const;
81+
82+
SourceManager(SourceManager const&) = delete;
83+
SourceManager(SourceManager&&) = delete;
84+
SourceManager& operator=(SourceManager const&) = delete;
85+
SourceManager& operator=(SourceManager&&) = delete;
86+
~SourceManager() = default;
87+
88+
private:
89+
enum class State { kAvailable, kBlocked };
90+
91+
struct SynchronizerFactoryWithState {
92+
std::unique_ptr<data_interfaces::IFDv2SynchronizerFactory> factory;
93+
State state = State::kAvailable;
94+
bool is_fdv1_fallback = false;
95+
};
96+
97+
std::vector<SynchronizerFactoryWithState> synchronizers_;
98+
// Iteration cursor; -1 means "start at 0 on next call".
99+
int synchronizer_index_ = -1;
100+
// Index of the most recently returned factory; -1 if none.
101+
int current_factory_index_ = -1;
102+
};
103+
104+
} // namespace launchdarkly::server_side::data_systems

0 commit comments

Comments
 (0)