Skip to content

Commit b1157e1

Browse files
committed
feat: translate FDv1 status changes to FDv2 results in FDv1AdapterSynchronizer
1 parent 7f1e999 commit b1157e1

3 files changed

Lines changed: 155 additions & 23 deletions

File tree

libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace launchdarkly::server_side::data_systems {
66

77
using data_interfaces::FDv2SourceResult;
8+
using DataSourceState = DataSourceStatus::DataSourceState;
89

910
// ----- State -----
1011

@@ -132,9 +133,32 @@ std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity()
132133
// ----- FDv1AdapterSynchronizer -----
133134

134135
FDv1AdapterSynchronizer::FDv1AdapterSynchronizer(
135-
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source)
136+
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source,
137+
data_components::DataSourceStatusManager* status_manager)
136138
: state_(std::make_shared<State>()),
137139
destination_(std::make_unique<ConvertingDestination>(state_)),
140+
status_manager_(status_manager),
141+
status_subscription_(status_manager_->OnDataSourceStatusChange(
142+
[state = state_](DataSourceStatus status) {
143+
auto error = status.LastError();
144+
if (!error) {
145+
return;
146+
}
147+
switch (status.State()) {
148+
case DataSourceState::kInterrupted:
149+
state->Notify(FDv2SourceResult{
150+
FDv2SourceResult::Interrupted{*error}});
151+
break;
152+
case DataSourceState::kOff:
153+
state->Notify(FDv2SourceResult{
154+
FDv2SourceResult::TerminalError{*error}});
155+
break;
156+
case DataSourceState::kInitializing:
157+
case DataSourceState::kValid:
158+
// No FDv2 result for these states.
159+
break;
160+
}
161+
})),
138162
fdv1_source_(std::move(fdv1_source)) {}
139163

140164
FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() {

libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#pragma once
22

3+
#include "../../data_components/status_notifications/data_source_status_manager.hpp"
34
#include "../../data_interfaces/destination/idestination.hpp"
45
#include "../../data_interfaces/source/idata_synchronizer.hpp"
56
#include "../../data_interfaces/source/ifdv2_synchronizer.hpp"
67

78
#include <launchdarkly/async/promise.hpp>
9+
#include <launchdarkly/connection.hpp>
810

911
#include <deque>
1012
#include <memory>
@@ -23,15 +25,26 @@ namespace launchdarkly::server_side::data_systems {
2325
* and fdv1_fallback = false (the directive does not re-fire from FDv1 data).
2426
*
2527
* Threading: Next() and Close() may be called from any thread; only one
26-
* Next() may be outstanding at a time. The adapter blocks in its destructor
27-
* waiting for the FDv1 source's ShutdownAsync completion, so no callbacks
28-
* are in flight when the wrapped source is destroyed.
28+
* Next() may be outstanding at a time. Member declaration order ensures
29+
* the wrapped FDv1 source destructs before destination_ and state_, so any
30+
* in-flight FDv1 callbacks land on live objects during teardown. This
31+
* relies on the wrapped IDataSynchronizer blocking on its in-flight work
32+
* in its destructor.
2933
*/
3034
class FDv1AdapterSynchronizer final
3135
: public data_interfaces::IFDv2Synchronizer {
3236
public:
33-
explicit FDv1AdapterSynchronizer(
34-
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source);
37+
/**
38+
* @param fdv1_source The wrapped source. Must have been constructed
39+
* with status_manager as its status sink so that
40+
* state changes flow back into this adapter.
41+
* @param status_manager Non-owning. The caller retains ownership and
42+
* must keep it alive for the lifetime of the
43+
* wrapped source and this adapter.
44+
*/
45+
FDv1AdapterSynchronizer(
46+
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source,
47+
data_components::DataSourceStatusManager* status_manager);
3548

3649
~FDv1AdapterSynchronizer() override;
3750

@@ -95,9 +108,17 @@ class FDv1AdapterSynchronizer final
95108
std::weak_ptr<State> state_;
96109
};
97110

98-
// const after construction.
111+
// shared_ptr so async callbacks that may fire after this is destroyed
112+
// can hold their own reference.
99113
std::shared_ptr<State> const state_;
100114
std::unique_ptr<ConvertingDestination> const destination_;
115+
116+
// Non-owning. The caller must keep this alive for the lifetime of
117+
// the wrapped source, which holds a reference to it for status
118+
// reporting.
119+
data_components::DataSourceStatusManager* const status_manager_;
120+
std::unique_ptr<IConnection> const status_subscription_;
121+
101122
std::unique_ptr<data_interfaces::IDataSynchronizer> const fdv1_source_;
102123

103124
// Thread-safe primitive.

libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <utility>
99

1010
using namespace launchdarkly;
11+
using namespace launchdarkly::server_side;
12+
using namespace launchdarkly::server_side::data_components;
1113
using namespace launchdarkly::server_side::data_interfaces;
1214
using namespace launchdarkly::server_side::data_systems;
1315
using namespace std::chrono_literals;
@@ -18,6 +20,8 @@ namespace {
1820
// the IDestination it was given so the test can drive Init/Upsert.
1921
class MockFDv1Source final : public IDataSynchronizer {
2022
public:
23+
explicit MockFDv1Source(DataSourceStatusManager& /*status_manager*/) {}
24+
2125
void StartAsync(IDestination* destination,
2226
data_model::SDKDataSet const* bootstrap) override {
2327
++start_count;
@@ -46,9 +50,10 @@ class MockFDv1Source final : public IDataSynchronizer {
4650
} // namespace
4751

4852
TEST(FDv1AdapterSynchronizerTest, FirstNextStartsFDv1Source) {
49-
auto source = std::make_unique<MockFDv1Source>();
53+
DataSourceStatusManager status_manager;
54+
auto source = std::make_unique<MockFDv1Source>(status_manager);
5055
auto* source_ptr = source.get();
51-
FDv1AdapterSynchronizer adapter(std::move(source));
56+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
5257

5358
auto future = adapter.Next(data_model::Selector{});
5459

@@ -58,9 +63,10 @@ TEST(FDv1AdapterSynchronizerTest, FirstNextStartsFDv1Source) {
5863
}
5964

6065
TEST(FDv1AdapterSynchronizerTest, SecondNextDoesNotRestartSource) {
61-
auto source = std::make_unique<MockFDv1Source>();
66+
DataSourceStatusManager status_manager;
67+
auto source = std::make_unique<MockFDv1Source>(status_manager);
6268
auto* source_ptr = source.get();
63-
FDv1AdapterSynchronizer adapter(std::move(source));
69+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
6470

6571
auto first = adapter.Next(data_model::Selector{});
6672
source_ptr->destination_->Init(data_model::SDKDataSet{});
@@ -72,9 +78,10 @@ TEST(FDv1AdapterSynchronizerTest, SecondNextDoesNotRestartSource) {
7278
}
7379

7480
TEST(FDv1AdapterSynchronizerTest, FDv1InitProducesFullChangeSet) {
75-
auto source = std::make_unique<MockFDv1Source>();
81+
DataSourceStatusManager status_manager;
82+
auto source = std::make_unique<MockFDv1Source>(status_manager);
7683
auto* source_ptr = source.get();
77-
FDv1AdapterSynchronizer adapter(std::move(source));
84+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
7885

7986
auto future = adapter.Next(data_model::Selector{});
8087

@@ -97,9 +104,10 @@ TEST(FDv1AdapterSynchronizerTest, FDv1InitProducesFullChangeSet) {
97104
}
98105

99106
TEST(FDv1AdapterSynchronizerTest, FDv1UpsertProducesPartialChangeSet) {
100-
auto source = std::make_unique<MockFDv1Source>();
107+
DataSourceStatusManager status_manager;
108+
auto source = std::make_unique<MockFDv1Source>(status_manager);
101109
auto* source_ptr = source.get();
102-
FDv1AdapterSynchronizer adapter(std::move(source));
110+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
103111

104112
// Flag upsert.
105113
auto flag_future = adapter.Next(data_model::Selector{});
@@ -138,8 +146,9 @@ TEST(FDv1AdapterSynchronizerTest, FDv1UpsertProducesPartialChangeSet) {
138146
}
139147

140148
TEST(FDv1AdapterSynchronizerTest, ClosePendingNextReturnsShutdown) {
141-
auto source = std::make_unique<MockFDv1Source>();
142-
FDv1AdapterSynchronizer adapter(std::move(source));
149+
DataSourceStatusManager status_manager;
150+
auto source = std::make_unique<MockFDv1Source>(status_manager);
151+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
143152

144153
auto future = adapter.Next(data_model::Selector{});
145154
EXPECT_FALSE(future.IsFinished());
@@ -153,9 +162,10 @@ TEST(FDv1AdapterSynchronizerTest, ClosePendingNextReturnsShutdown) {
153162
}
154163

155164
TEST(FDv1AdapterSynchronizerTest, CloseShutsDownStartedFDv1Source) {
156-
auto source = std::make_unique<MockFDv1Source>();
165+
DataSourceStatusManager status_manager;
166+
auto source = std::make_unique<MockFDv1Source>(status_manager);
157167
auto* source_ptr = source.get();
158-
FDv1AdapterSynchronizer adapter(std::move(source));
168+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
159169

160170
adapter.Next(data_model::Selector{});
161171
adapter.Close();
@@ -164,9 +174,10 @@ TEST(FDv1AdapterSynchronizerTest, CloseShutsDownStartedFDv1Source) {
164174
}
165175

166176
TEST(FDv1AdapterSynchronizerTest, CloseWithoutStartDoesNotShutDownFDv1Source) {
167-
auto source = std::make_unique<MockFDv1Source>();
177+
DataSourceStatusManager status_manager;
178+
auto source = std::make_unique<MockFDv1Source>(status_manager);
168179
auto* source_ptr = source.get();
169-
FDv1AdapterSynchronizer adapter(std::move(source));
180+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
170181

171182
// No Next() call — FDv1 source was never started.
172183
adapter.Close();
@@ -175,9 +186,85 @@ TEST(FDv1AdapterSynchronizerTest, CloseWithoutStartDoesNotShutDownFDv1Source) {
175186
EXPECT_EQ(0, source_ptr->shutdown_count);
176187
}
177188

189+
TEST(FDv1AdapterSynchronizerTest, QueuedResultsDrainInFifoOrder) {
190+
DataSourceStatusManager status_manager;
191+
auto source = std::make_unique<MockFDv1Source>(status_manager);
192+
auto* source_ptr = source.get();
193+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
194+
195+
// Start the source by satisfying one Next() with an Init.
196+
auto first = adapter.Next(data_model::Selector{});
197+
source_ptr->destination_->Init(data_model::SDKDataSet{});
198+
first.WaitForResult(1s);
199+
200+
// Two upserts queue with no Next() in flight.
201+
data_model::Flag flag_a;
202+
flag_a.key = "a";
203+
data_model::Flag flag_b;
204+
flag_b.key = "b";
205+
source_ptr->destination_->Upsert("a", data_model::FlagDescriptor(flag_a));
206+
source_ptr->destination_->Upsert("b", data_model::FlagDescriptor(flag_b));
207+
208+
// Drain in FIFO order.
209+
auto r1 = adapter.Next(data_model::Selector{}).WaitForResult(1s);
210+
auto r2 = adapter.Next(data_model::Selector{}).WaitForResult(1s);
211+
ASSERT_TRUE(r1.has_value());
212+
ASSERT_TRUE(r2.has_value());
213+
auto* cs1 = std::get_if<FDv2SourceResult::ChangeSet>(&r1->value);
214+
auto* cs2 = std::get_if<FDv2SourceResult::ChangeSet>(&r2->value);
215+
ASSERT_NE(cs1, nullptr);
216+
ASSERT_NE(cs2, nullptr);
217+
ASSERT_EQ(1u, cs1->change_set.data.size());
218+
ASSERT_EQ(1u, cs2->change_set.data.size());
219+
EXPECT_EQ("a", cs1->change_set.data[0].key);
220+
EXPECT_EQ("b", cs2->change_set.data[0].key);
221+
}
222+
223+
TEST(FDv1AdapterSynchronizerTest, InterruptedStatusProducesInterruptedResult) {
224+
DataSourceStatusManager status_manager;
225+
auto source = std::make_unique<MockFDv1Source>(status_manager);
226+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
227+
228+
// kInterrupted from kInitializing stays kInitializing; drive past first.
229+
status_manager.SetState(DataSourceStatus::DataSourceState::kValid);
230+
231+
auto future = adapter.Next(data_model::Selector{});
232+
status_manager.SetState(
233+
DataSourceStatus::DataSourceState::kInterrupted,
234+
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, "boom");
235+
236+
auto result = future.WaitForResult(1s);
237+
ASSERT_TRUE(result.has_value());
238+
auto* interrupted =
239+
std::get_if<FDv2SourceResult::Interrupted>(&result->value);
240+
ASSERT_NE(interrupted, nullptr);
241+
EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
242+
interrupted->error.Kind());
243+
}
244+
245+
TEST(FDv1AdapterSynchronizerTest, OffStatusProducesTerminalErrorResult) {
246+
DataSourceStatusManager status_manager;
247+
auto source = std::make_unique<MockFDv1Source>(status_manager);
248+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
249+
250+
auto future = adapter.Next(data_model::Selector{});
251+
status_manager.SetState(
252+
DataSourceStatus::DataSourceState::kOff,
253+
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse, "401");
254+
255+
auto result = future.WaitForResult(1s);
256+
ASSERT_TRUE(result.has_value());
257+
auto* terminal =
258+
std::get_if<FDv2SourceResult::TerminalError>(&result->value);
259+
ASSERT_NE(terminal, nullptr);
260+
EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
261+
terminal->error.Kind());
262+
}
263+
178264
TEST(FDv1AdapterSynchronizerTest, NextAfterCloseReturnsShutdown) {
179-
auto source = std::make_unique<MockFDv1Source>();
180-
FDv1AdapterSynchronizer adapter(std::move(source));
265+
DataSourceStatusManager status_manager;
266+
auto source = std::make_unique<MockFDv1Source>(status_manager);
267+
FDv1AdapterSynchronizer adapter(std::move(source), &status_manager);
181268

182269
adapter.Close();
183270
auto future = adapter.Next(data_model::Selector{});

0 commit comments

Comments
 (0)