Skip to content

Commit d6368b8

Browse files
authored
feat: add FDv1AdapterSynchronizer wrapping IDataSynchronizer as IFDv2Synchronizer (#540)
## Summary Adds an adapter that wraps any FDv1 `IDataSynchronizer` (e.g. the existing `StreamingDataSource`) and presents it as an `IFDv2Synchronizer`. FDv1 `Init`/`Upsert` callbacks delivered through an internal `IDestination` are translated into `FDv2SourceResult::ChangeSet` results (`kFull` for `Init`, `kPartial` for `Upsert`), with empty selectors and `fdv1_fallback=false`. The orchestrator picks this up after `SwitchToFDv1Fallback()` once a factory whose `IsFDv1Fallback()` returns true is configured. Wiring the adapter into the public config builders is a separate concern, deferred until the Configuration API work. ## Test plan - [x] 8 unit tests cover: lazy start, no-restart on second Next, Init→kFull, Upsert→kPartial (both overloads), pending-Next-resolved-as-Shutdown-on-close, ShutdownAsync called once on close (and not at all if never started), Next-after-Close - [x] Full server suite green (486 tests) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches core flag data sync and fallback behavior with async lifecycle and threading; risk is mitigated by isolation behind IFDv2Synchronizer and broad unit tests, but mis-mapping status or shutdown could affect live updates until config is wired. > > **Overview** > Adds **`FDv1AdapterSynchronizer`**, which wraps an FDv1 `IDataSynchronizer` (e.g. streaming) and exposes it as **`IFDv2Synchronizer`** for FDv2 orchestration and FDv1 fallback paths. > > An internal **`ConvertingDestination`** turns FDv1 `Init`/`Upsert` into **`FDv2SourceResult::ChangeSet`** (`kFull` / `kPartial`, empty selectors, **`fdv1_fallback=false`**). **`Next()`** lazily starts the wrapped source once, queues multiple results in FIFO order, and races close vs. pending work via **`WhenAny`**. **`DataSourceStatusManager`** errors map to **`Interrupted`** or **`TerminalError`**. > > The new sources are registered in **`CMakeLists.txt`**. **12 gtest cases** cover lifecycle, conversion, queuing, status mapping, and shutdown. Public config wiring is left for a follow-up. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 796d25c. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 277d87c commit d6368b8

4 files changed

Lines changed: 632 additions & 0 deletions

File tree

libs/server-sdk/src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ target_sources(${LIBNAME}
7676
data_systems/fdv2/source_manager.cpp
7777
data_systems/fdv2/fdv2_data_system.hpp
7878
data_systems/fdv2/fdv2_data_system.cpp
79+
data_systems/fdv2/fdv1_adapter_synchronizer.hpp
80+
data_systems/fdv2/fdv1_adapter_synchronizer.cpp
7981
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
8082
data_systems/background_sync/sources/streaming/streaming_data_source.cpp
8183
data_systems/background_sync/sources/streaming/event_handler.hpp
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#include "fdv1_adapter_synchronizer.hpp"
2+
3+
#include <utility>
4+
5+
namespace launchdarkly::server_side::data_systems {
6+
7+
using data_interfaces::FDv2SourceResult;
8+
using DataSourceState = DataSourceStatus::DataSourceState;
9+
10+
// ----- State -----
11+
12+
FDv1AdapterSynchronizer::State::State(
13+
async::Future<std::monostate> closed_future)
14+
: closed_future_(std::move(closed_future)) {}
15+
16+
async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::State::GetNext() {
17+
std::lock_guard lock(mutex_);
18+
if (!result_queue_.empty()) {
19+
auto result = std::move(result_queue_.front());
20+
result_queue_.pop_front();
21+
return async::MakeFuture(std::move(result));
22+
}
23+
return pending_promise_.emplace().GetFuture();
24+
}
25+
26+
void FDv1AdapterSynchronizer::State::ResolvePendingAsShutdown() {
27+
std::optional<async::Promise<FDv2SourceResult>> promise;
28+
{
29+
std::lock_guard lock(mutex_);
30+
if (pending_promise_) {
31+
promise = std::move(pending_promise_);
32+
pending_promise_.reset();
33+
}
34+
}
35+
if (promise) {
36+
promise->Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}});
37+
}
38+
}
39+
40+
void FDv1AdapterSynchronizer::State::Notify(FDv2SourceResult result) {
41+
std::optional<async::Promise<FDv2SourceResult>> promise;
42+
{
43+
std::lock_guard lock(mutex_);
44+
if (closed_future_.IsFinished()) {
45+
return;
46+
}
47+
if (pending_promise_) {
48+
promise = std::move(pending_promise_);
49+
pending_promise_.reset();
50+
} else {
51+
result_queue_.push_back(std::move(result));
52+
return;
53+
}
54+
}
55+
// Resolve outside the lock — Promise::Resolve may invoke inline
56+
// continuations that could call back into Notify or GetNext.
57+
promise->Resolve(std::move(result));
58+
}
59+
60+
// ----- ConvertingDestination -----
61+
62+
FDv1AdapterSynchronizer::ConvertingDestination::ConvertingDestination(
63+
std::weak_ptr<State> state)
64+
: state_(std::move(state)) {}
65+
66+
void FDv1AdapterSynchronizer::ConvertingDestination::Init(
67+
data_model::SDKDataSet data_set) {
68+
auto state = state_.lock();
69+
if (!state) {
70+
return;
71+
}
72+
data_interfaces::ChangeSetData changes;
73+
changes.reserve(data_set.flags.size() + data_set.segments.size());
74+
for (auto& [key, flag] : data_set.flags) {
75+
changes.push_back({key, std::move(flag)});
76+
}
77+
for (auto& [key, segment] : data_set.segments) {
78+
changes.push_back({key, std::move(segment)});
79+
}
80+
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
81+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
82+
data_model::ChangeSetType::kFull, std::move(changes),
83+
data_model::Selector{}}}});
84+
}
85+
86+
void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
87+
std::string const& key,
88+
data_model::FlagDescriptor flag) {
89+
auto state = state_.lock();
90+
if (!state) {
91+
return;
92+
}
93+
data_interfaces::ChangeSetData changes;
94+
changes.push_back({key, std::move(flag)});
95+
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
96+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
97+
data_model::ChangeSetType::kPartial, std::move(changes),
98+
data_model::Selector{}}}});
99+
}
100+
101+
void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
102+
std::string const& key,
103+
data_model::SegmentDescriptor segment) {
104+
auto state = state_.lock();
105+
if (!state) {
106+
return;
107+
}
108+
data_interfaces::ChangeSetData changes;
109+
changes.push_back({key, std::move(segment)});
110+
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
111+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
112+
data_model::ChangeSetType::kPartial, std::move(changes),
113+
data_model::Selector{}}}});
114+
}
115+
116+
std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity()
117+
const {
118+
static std::string const identity = "FDv1 adapter destination";
119+
return identity;
120+
}
121+
122+
// ----- FDv1AdapterSynchronizer -----
123+
124+
FDv1AdapterSynchronizer::FDv1AdapterSynchronizer(SourceBuilder source_builder)
125+
: state_(std::make_shared<State>(close_promise_.GetFuture())),
126+
destination_(std::make_unique<ConvertingDestination>(state_)),
127+
status_manager_(
128+
std::make_unique<data_components::DataSourceStatusManager>()),
129+
status_subscription_(status_manager_->OnDataSourceStatusChange(
130+
[state = state_](DataSourceStatus status) {
131+
auto error = status.LastError();
132+
if (!error) {
133+
return;
134+
}
135+
switch (status.State()) {
136+
case DataSourceState::kInterrupted:
137+
case DataSourceState::kInitializing:
138+
// Recoverable error.
139+
state->Notify(FDv2SourceResult{
140+
FDv2SourceResult::Interrupted{*error}});
141+
break;
142+
case DataSourceState::kOff:
143+
// Terminal error.
144+
state->Notify(FDv2SourceResult{
145+
FDv2SourceResult::TerminalError{*error}});
146+
break;
147+
case DataSourceState::kValid:
148+
// Recovery; no FDv2 result.
149+
break;
150+
}
151+
})),
152+
fdv1_source_(source_builder(*status_manager_)) {}
153+
154+
FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() {
155+
Close();
156+
}
157+
158+
async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::Next(
159+
data_model::Selector /*selector*/) {
160+
auto closed = close_promise_.GetFuture();
161+
if (closed.IsFinished()) {
162+
return async::MakeFuture(
163+
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
164+
}
165+
{
166+
std::lock_guard lock(lifecycle_mutex_);
167+
if (!started_) {
168+
started_ = true;
169+
fdv1_source_->StartAsync(destination_.get(),
170+
/*bootstrap_data=*/nullptr);
171+
}
172+
}
173+
auto result_future = state_->GetNext();
174+
if (result_future.IsFinished()) {
175+
return result_future;
176+
}
177+
return async::WhenAny(closed, result_future)
178+
.Then(
179+
[state = state_, result_future](std::size_t const& idx) mutable
180+
-> async::Future<FDv2SourceResult> {
181+
if (idx == 0) {
182+
state->ResolvePendingAsShutdown();
183+
return async::MakeFuture(
184+
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
185+
}
186+
return result_future;
187+
},
188+
async::kInlineExecutor);
189+
}
190+
191+
void FDv1AdapterSynchronizer::Close() {
192+
if (!close_promise_.Resolve(std::monostate{})) {
193+
return;
194+
}
195+
std::lock_guard lock(lifecycle_mutex_);
196+
bool const was_started = started_;
197+
started_ = true;
198+
if (was_started) {
199+
fdv1_source_->ShutdownAsync([] {});
200+
}
201+
}
202+
203+
std::string const& FDv1AdapterSynchronizer::Identity() const {
204+
static std::string const identity = "FDv1 fallback adapter";
205+
return identity;
206+
}
207+
208+
} // namespace launchdarkly::server_side::data_systems
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#pragma once
2+
3+
#include "../../data_components/status_notifications/data_source_status_manager.hpp"
4+
#include "../../data_interfaces/destination/idestination.hpp"
5+
#include "../../data_interfaces/source/idata_synchronizer.hpp"
6+
#include "../../data_interfaces/source/ifdv2_synchronizer.hpp"
7+
8+
#include <launchdarkly/async/promise.hpp>
9+
#include <launchdarkly/connection.hpp>
10+
11+
#include <deque>
12+
#include <functional>
13+
#include <memory>
14+
#include <mutex>
15+
#include <optional>
16+
#include <string>
17+
#include <variant>
18+
19+
namespace launchdarkly::server_side::data_systems {
20+
21+
/**
22+
* Adapts an FDv1 IDataSynchronizer to the IFDv2Synchronizer interface.
23+
*
24+
* FDv1 Init/Upsert callbacks delivered through an internal IDestination are
25+
* translated into FDv2SourceResult::ChangeSet results, with empty selectors
26+
* and fdv1_fallback = false (the directive does not re-fire from FDv1 data).
27+
*
28+
* Threading: Next() and Close() may be called from any thread; only one
29+
* Next() may be outstanding at a time. Member declaration order ensures
30+
* the wrapped FDv1 source destructs before destination_ and state_, so any
31+
* in-flight FDv1 callbacks land on live objects during teardown. This
32+
* relies on the wrapped IDataSynchronizer blocking on its in-flight work
33+
* in its destructor.
34+
*/
35+
class FDv1AdapterSynchronizer final
36+
: public data_interfaces::IFDv2Synchronizer {
37+
public:
38+
using SourceBuilder =
39+
std::function<std::shared_ptr<data_interfaces::IDataSynchronizer>(
40+
data_components::DataSourceStatusManager&)>;
41+
42+
/**
43+
* @param source_builder Called once during construction with the
44+
* adapter's status manager. Returns the wrapped
45+
* FDv1 source, which must be constructed against
46+
* the provided manager as its status sink.
47+
*/
48+
explicit FDv1AdapterSynchronizer(SourceBuilder source_builder);
49+
50+
~FDv1AdapterSynchronizer() override;
51+
52+
async::Future<data_interfaces::FDv2SourceResult> Next(
53+
data_model::Selector selector) override;
54+
void Close() override;
55+
[[nodiscard]] std::string const& Identity() const override;
56+
57+
private:
58+
/**
59+
* Holds the result queue and pending Next() promise; shared with the
60+
* FDv1 source's IDestination via the inner ConvertingDestination.
61+
* All methods are thread-safe.
62+
*/
63+
class State {
64+
public:
65+
explicit State(async::Future<std::monostate> closed_future);
66+
67+
async::Future<data_interfaces::FDv2SourceResult> GetNext();
68+
69+
// Resolves any pending Next() promise with Shutdown and clears it.
70+
// Called on the close path so the abandoned promise doesn't leave
71+
// potential continuations dangling.
72+
void ResolvePendingAsShutdown();
73+
74+
void Notify(data_interfaces::FDv2SourceResult result);
75+
76+
private:
77+
// Finished once the owning FDv1AdapterSynchronizer's close_promise_
78+
// is resolved. Read in Notify to drop late results.
79+
async::Future<std::monostate> const closed_future_;
80+
81+
mutable std::mutex mutex_;
82+
// Protected by mutex_.
83+
std::optional<async::Promise<data_interfaces::FDv2SourceResult>>
84+
pending_promise_;
85+
std::deque<data_interfaces::FDv2SourceResult> result_queue_;
86+
};
87+
88+
/**
89+
* Translates FDv1 IDestination callbacks into FDv2 results queued on
90+
* State. Thread-safe (delegates to State).
91+
*/
92+
class ConvertingDestination final : public data_interfaces::IDestination {
93+
public:
94+
explicit ConvertingDestination(std::weak_ptr<State> state);
95+
void Init(data_model::SDKDataSet data_set) override;
96+
void Upsert(std::string const& key,
97+
data_model::FlagDescriptor flag) override;
98+
void Upsert(std::string const& key,
99+
data_model::SegmentDescriptor segment) override;
100+
[[nodiscard]] std::string const& Identity() const override;
101+
102+
private:
103+
std::weak_ptr<State> state_;
104+
};
105+
106+
// Thread-safe primitive. Declared before state_ so state_'s constructor
107+
// can take a future from it.
108+
async::Promise<std::monostate> close_promise_;
109+
110+
// shared_ptr so async callbacks that may fire after this is destroyed
111+
// can hold their own reference.
112+
std::shared_ptr<State> const state_;
113+
std::unique_ptr<ConvertingDestination> const destination_;
114+
115+
std::unique_ptr<data_components::DataSourceStatusManager> const
116+
status_manager_;
117+
std::unique_ptr<IConnection> const status_subscription_;
118+
119+
std::shared_ptr<data_interfaces::IDataSynchronizer> const fdv1_source_;
120+
121+
// Serializes StartAsync and ShutdownAsync on fdv1_source_ across
122+
// concurrent Next() and Close() calls.
123+
std::mutex lifecycle_mutex_;
124+
// Protected by lifecycle_mutex_. Set when Next() calls StartAsync, or
125+
// when Close() runs without a prior start (to gate any later Next()
126+
// from calling StartAsync after Close).
127+
bool started_ = false;
128+
};
129+
130+
} // namespace launchdarkly::server_side::data_systems

0 commit comments

Comments
 (0)