Skip to content

Commit 0208900

Browse files
committed
fix: resolve StalledSynchronizer promise on Close to break leak cycle
1 parent d6be8c2 commit 0208900

2 files changed

Lines changed: 88 additions & 53 deletions

File tree

libs/server-sdk/tests/conditions_test.cpp

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ class RunningIoContext {
4747

4848
TEST(FallbackConditionTest, InterruptedArmsTimerWhichFiresAfterTimeout) {
4949
RunningIoContext ioc;
50-
FallbackCondition condition(ioc.GetExecutor(), 100ms);
50+
FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms);
5151
auto future = condition.Execute();
5252

5353
condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{
5454
FDv2SourceResult::ErrorInfo{
55-
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom",
56-
std::chrono::system_clock::now()},
55+
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError,
56+
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
5757
false,
5858
}});
5959

@@ -65,15 +65,15 @@ TEST(FallbackConditionTest, InterruptedArmsTimerWhichFiresAfterTimeout) {
6565

6666
TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) {
6767
RunningIoContext ioc;
68-
FallbackCondition condition(ioc.GetExecutor(), 100ms);
68+
FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms);
6969
auto future = condition.Execute();
7070

7171
// Arm the timer with Interrupted, then cancel via ChangeSet before it
7272
// fires.
7373
condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{
7474
FDv2SourceResult::ErrorInfo{
75-
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom",
76-
std::chrono::system_clock::now()},
75+
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError,
76+
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
7777
false,
7878
}});
7979
condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{
@@ -92,13 +92,13 @@ TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) {
9292

9393
TEST(FallbackConditionTest, CloseCancelsActiveTimer) {
9494
RunningIoContext ioc;
95-
FallbackCondition condition(ioc.GetExecutor(), 100ms);
95+
FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms);
9696
auto future = condition.Execute();
9797

9898
condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{
9999
FDv2SourceResult::ErrorInfo{
100-
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom",
101-
std::chrono::system_clock::now()},
100+
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError,
101+
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
102102
false,
103103
}});
104104
condition.Close();
@@ -113,7 +113,7 @@ TEST(FallbackConditionTest, CloseCancelsActiveTimer) {
113113

114114
TEST(RecoveryConditionTest, TimerArmedAtConstructionFiresAfterTimeout) {
115115
RunningIoContext ioc;
116-
RecoveryCondition condition(ioc.GetExecutor(), 100ms);
116+
RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms);
117117

118118
auto result = condition.Execute().WaitForResult(1s);
119119

@@ -123,15 +123,15 @@ TEST(RecoveryConditionTest, TimerArmedAtConstructionFiresAfterTimeout) {
123123

124124
TEST(RecoveryConditionTest, InformDoesNotAffectTimer) {
125125
RunningIoContext ioc;
126-
RecoveryCondition condition(ioc.GetExecutor(), 100ms);
126+
RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms);
127127
auto future = condition.Execute();
128128

129129
// Recovery is purely time-based; results from the synchronizer should not
130130
// disturb the timer in either direction.
131131
condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{
132132
FDv2SourceResult::ErrorInfo{
133-
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom",
134-
std::chrono::system_clock::now()},
133+
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError,
134+
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
135135
false,
136136
}});
137137
condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{
@@ -150,7 +150,7 @@ TEST(RecoveryConditionTest, InformDoesNotAffectTimer) {
150150

151151
TEST(RecoveryConditionTest, CloseCancelsActiveTimer) {
152152
RunningIoContext ioc;
153-
RecoveryCondition condition(ioc.GetExecutor(), 100ms);
153+
RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms);
154154
auto future = condition.Execute();
155155

156156
condition.Close();
@@ -176,9 +176,10 @@ TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) {
176176

177177
// Recovery's timer is much shorter, so it should win the race.
178178
std::vector<std::unique_ptr<IFDv2Condition>> conds;
179-
conds.push_back(std::make_unique<FallbackCondition>(ioc.GetExecutor(), 1s));
180179
conds.push_back(
181-
std::make_unique<RecoveryCondition>(ioc.GetExecutor(), 100ms));
180+
std::make_unique<FallbackCondition>(ioc.GetExecutor(), /*timeout=*/1s));
181+
conds.push_back(std::make_unique<RecoveryCondition>(ioc.GetExecutor(),
182+
/*timeout=*/100ms));
182183
Conditions conditions(std::move(conds));
183184

184185
auto result = conditions.GetFuture().WaitForResult(1s);
@@ -193,15 +194,16 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) {
193194
// Fallback's timer is shorter than recovery's; informing Interrupted arms
194195
// the fallback timer, which will then beat recovery.
195196
std::vector<std::unique_ptr<IFDv2Condition>> conds;
197+
conds.push_back(std::make_unique<FallbackCondition>(ioc.GetExecutor(),
198+
/*timeout=*/100ms));
196199
conds.push_back(
197-
std::make_unique<FallbackCondition>(ioc.GetExecutor(), 100ms));
198-
conds.push_back(std::make_unique<RecoveryCondition>(ioc.GetExecutor(), 1s));
200+
std::make_unique<RecoveryCondition>(ioc.GetExecutor(), /*timeout=*/1s));
199201
Conditions conditions(std::move(conds));
200202

201203
conditions.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{
202204
FDv2SourceResult::ErrorInfo{
203-
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom",
204-
std::chrono::system_clock::now()},
205+
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError,
206+
/*status_code=*/0, "boom", std::chrono::system_clock::now()},
205207
false,
206208
}});
207209

@@ -215,10 +217,10 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) {
215217
RunningIoContext ioc;
216218

217219
std::vector<std::unique_ptr<IFDv2Condition>> conds;
218-
conds.push_back(
219-
std::make_unique<RecoveryCondition>(ioc.GetExecutor(), 100ms));
220-
conds.push_back(
221-
std::make_unique<RecoveryCondition>(ioc.GetExecutor(), 100ms));
220+
conds.push_back(std::make_unique<RecoveryCondition>(ioc.GetExecutor(),
221+
/*timeout=*/100ms));
222+
conds.push_back(std::make_unique<RecoveryCondition>(ioc.GetExecutor(),
223+
/*timeout=*/100ms));
222224
Conditions conditions(std::move(conds));
223225

224226
conditions.Close();

libs/server-sdk/tests/fdv2_data_system_test.cpp

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ class StalledSynchronizer : public IFDv2Synchronizer {
183183
}
184184

185185
void Close() override {
186+
promise_.Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}});
186187
if (closed_flag_) {
187188
*closed_flag_ = true;
188189
}
@@ -226,8 +227,9 @@ TEST(FDv2DataSystemTest, OfflineMode_NoFactories_StatusValid) {
226227
boost::asio::io_context ioc;
227228
data_components::DataSourceStatusManager status_manager;
228229

229-
FDv2DataSystem ds({}, {}, nullptr, nullptr, ioc.get_executor(),
230-
&status_manager, logger);
230+
FDv2DataSystem ds({}, {}, /*fallback_condition_factory=*/nullptr,
231+
/*recovery_condition_factory=*/nullptr,
232+
ioc.get_executor(), &status_manager, logger);
231233

232234
// Initialize with no sources; orchestration should not be posted.
233235
ds.Initialize();
@@ -244,8 +246,9 @@ TEST(FDv2DataSystemTest, Destructor_TransitionsStatusToOff) {
244246
data_components::DataSourceStatusManager status_manager;
245247

246248
{
247-
FDv2DataSystem ds({}, {}, nullptr, nullptr, ioc.get_executor(),
248-
&status_manager, logger);
249+
FDv2DataSystem ds({}, {}, /*fallback_condition_factory=*/nullptr,
250+
/*recovery_condition_factory=*/nullptr,
251+
ioc.get_executor(), &status_manager, logger);
249252
ds.Initialize();
250253
ASSERT_EQ(status_manager.Status().State(),
251254
DataSourceStatus::DataSourceState::kValid);
@@ -280,7 +283,9 @@ TEST(FDv2DataSystemTest, InitializerWithBasis_AppliesAndStatusValid) {
280283
initializers.push_back(
281284
std::make_unique<OneShotInitializerFactory>(std::move(initializer)));
282285

283-
FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr,
286+
FDv2DataSystem ds(std::move(initializers), {},
287+
/*fallback_condition_factory=*/nullptr,
288+
/*recovery_condition_factory=*/nullptr,
284289
ioc.get_executor(), &status_manager, logger);
285290

286291
// Run the initializer to completion.
@@ -329,7 +334,9 @@ TEST(FDv2DataSystemTest, InitializerInterrupted_AdvancesToNextInitializer) {
329334
initializers.push_back(std::move(first_factory));
330335
initializers.push_back(std::move(second_factory));
331336

332-
FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr,
337+
FDv2DataSystem ds(std::move(initializers), {},
338+
/*fallback_condition_factory=*/nullptr,
339+
/*recovery_condition_factory=*/nullptr,
333340
ioc.get_executor(), &status_manager, logger);
334341

335342
// Run; first initializer fails, orchestrator should fall through to
@@ -392,7 +399,9 @@ TEST(FDv2DataSystemTest,
392399
initializers.push_back(std::move(first_factory));
393400
initializers.push_back(std::move(second_factory));
394401

395-
FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr,
402+
FDv2DataSystem ds(std::move(initializers), {},
403+
/*fallback_condition_factory=*/nullptr,
404+
/*recovery_condition_factory=*/nullptr,
396405
ioc.get_executor(), &status_manager, logger);
397406

398407
ds.Initialize();
@@ -430,7 +439,9 @@ TEST(FDv2DataSystemTest,
430439
initializers.push_back(std::move(first_factory));
431440
initializers.push_back(std::move(second_factory));
432441

433-
FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr,
442+
FDv2DataSystem ds(std::move(initializers), {},
443+
/*fallback_condition_factory=*/nullptr,
444+
/*recovery_condition_factory=*/nullptr,
434445
ioc.get_executor(), &status_manager, logger);
435446

436447
ds.Initialize();
@@ -459,7 +470,9 @@ TEST(FDv2DataSystemTest, InitializerOnly_AllFail_TransitionsToOff) {
459470
initializers.push_back(
460471
std::make_unique<OneShotInitializerFactory>(std::move(init)));
461472

462-
FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr,
473+
FDv2DataSystem ds(std::move(initializers), {},
474+
/*fallback_condition_factory=*/nullptr,
475+
/*recovery_condition_factory=*/nullptr,
463476
ioc.get_executor(), &status_manager, logger);
464477

465478
// Run: initializer fails and there are no synchronizers to fall through to.
@@ -497,7 +510,9 @@ TEST(FDv2DataSystemTest, SynchronizerChangeSet_AppliesAndStatusValid) {
497510
synchronizers.push_back(
498511
std::make_unique<OneShotSynchronizerFactory>(std::move(sync)));
499512

500-
FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr,
513+
FDv2DataSystem ds({}, std::move(synchronizers),
514+
/*fallback_condition_factory=*/nullptr,
515+
/*recovery_condition_factory=*/nullptr,
501516
ioc.get_executor(), &status_manager, logger);
502517

503518
// No initializers; orchestrator should hand directly to the synchronizer.
@@ -538,7 +553,9 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_StaysOnSameSynchronizer) {
538553
synchronizers.push_back(std::move(first_factory));
539554
synchronizers.push_back(std::move(second_factory));
540555

541-
FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr,
556+
FDv2DataSystem ds({}, std::move(synchronizers),
557+
/*fallback_condition_factory=*/nullptr,
558+
/*recovery_condition_factory=*/nullptr,
542559
ioc.get_executor(), &status_manager, logger);
543560

544561
ds.Initialize();
@@ -575,7 +592,9 @@ TEST(FDv2DataSystemTest, SynchronizerInterrupted_RetriesSameSynchronizer) {
575592
auto* factory_ptr = factory.get();
576593
synchronizers.push_back(std::move(factory));
577594

578-
FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr,
595+
FDv2DataSystem ds({}, std::move(synchronizers),
596+
/*fallback_condition_factory=*/nullptr,
597+
/*recovery_condition_factory=*/nullptr,
579598
ioc.get_executor(), &status_manager, logger);
580599

581600
ds.Initialize();
@@ -613,16 +632,17 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) {
613632
},
614633
false,
615634
}});
616-
auto sync = std::make_unique<MockSynchronizer>(std::move(results), nullptr,
617-
&next_calls);
635+
auto sync = std::make_unique<MockSynchronizer>(
636+
std::move(results), /*closed_flag=*/nullptr, &next_calls);
618637

619638
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
620639
synchronizers.push_back(
621640
std::make_unique<OneShotSynchronizerFactory>(std::move(sync)));
622641

623642
FDv2DataSystem ds(std::move(initializers), std::move(synchronizers),
624-
nullptr, nullptr, ioc.get_executor(), &status_manager,
625-
logger);
643+
/*fallback_condition_factory=*/nullptr,
644+
/*recovery_condition_factory=*/nullptr,
645+
ioc.get_executor(), &status_manager, logger);
626646

627647
ds.Initialize();
628648
ioc.run();
@@ -671,16 +691,17 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) {
671691
}});
672692
results.push_back(
673693
FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}});
674-
auto sync = std::make_unique<MockSynchronizer>(std::move(results), nullptr,
675-
&next_calls);
694+
auto sync = std::make_unique<MockSynchronizer>(
695+
std::move(results), /*closed_flag=*/nullptr, &next_calls);
676696

677697
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
678698
synchronizers.push_back(
679699
std::make_unique<OneShotSynchronizerFactory>(std::move(sync)));
680700

681701
FDv2DataSystem ds(std::move(initializers), std::move(synchronizers),
682-
nullptr, nullptr, ioc.get_executor(), &status_manager,
683-
logger);
702+
/*fallback_condition_factory=*/nullptr,
703+
/*recovery_condition_factory=*/nullptr,
704+
ioc.get_executor(), &status_manager, logger);
684705

685706
ds.Initialize();
686707
ioc.run();
@@ -734,7 +755,9 @@ TEST(FDv2DataSystemTest,
734755
synchronizers.push_back(std::move(first_factory));
735756
synchronizers.push_back(std::move(second_factory));
736757

737-
FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr,
758+
FDv2DataSystem ds({}, std::move(synchronizers),
759+
/*fallback_condition_factory=*/nullptr,
760+
/*recovery_condition_factory=*/nullptr,
738761
ioc.get_executor(), &status_manager, logger);
739762

740763
ds.Initialize();
@@ -771,7 +794,9 @@ TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) {
771794
synchronizers.push_back(
772795
std::make_unique<OneShotSynchronizerFactory>(std::move(sync)));
773796

774-
FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr,
797+
FDv2DataSystem ds({}, std::move(synchronizers),
798+
/*fallback_condition_factory=*/nullptr,
799+
/*recovery_condition_factory=*/nullptr,
775800
ioc.get_executor(), &status_manager, logger);
776801

777802
// Synchronizer fails terminally; no more factories to try.
@@ -798,12 +823,14 @@ TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) {
798823
std::vector<FDv2SourceResult>{
799824
FDv2SourceResult{FDv2SourceResult::Interrupted{
800825
FDv2SourceResult::ErrorInfo{
801-
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0,
802-
"boom", std::chrono::system_clock::now()},
826+
FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError,
827+
/*status_code=*/0, "boom",
828+
std::chrono::system_clock::now()},
803829
false,
804830
}},
805831
},
806-
nullptr, nullptr, /*stall_after_results=*/true);
832+
/*closed_flag=*/nullptr, /*next_calls=*/nullptr,
833+
/*stall_after_results=*/true);
807834
auto primary_factory =
808835
std::make_unique<OneShotSynchronizerFactory>(std::move(primary_sync));
809836

@@ -819,10 +846,12 @@ TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) {
819846
synchronizers.push_back(std::move(secondary_factory));
820847

821848
auto fallback_factory =
822-
std::make_unique<FallbackConditionFactory>(ioc.get_executor(), 50ms);
849+
std::make_unique<FallbackConditionFactory>(ioc.get_executor(),
850+
/*timeout=*/50ms);
823851

824852
FDv2DataSystem ds({}, std::move(synchronizers), std::move(fallback_factory),
825-
nullptr, ioc.get_executor(), &status_manager, logger);
853+
/*recovery_condition_factory=*/nullptr,
854+
ioc.get_executor(), &status_manager, logger);
826855

827856
ds.Initialize();
828857
ioc.run();
@@ -858,7 +887,9 @@ TEST(FDv2DataSystemTest,
858887
std::make_unique<OneShotInitializerFactory>(std::move(initializer)));
859888

860889
{
861-
FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr,
890+
FDv2DataSystem ds(std::move(initializers), {},
891+
/*fallback_condition_factory=*/nullptr,
892+
/*recovery_condition_factory=*/nullptr,
862893
ioc.get_executor(), &status_manager, logger);
863894
ds.Initialize();
864895
// RunNextInitializer runs, builds the source, calls Run().Then(...).
@@ -888,7 +919,9 @@ TEST(FDv2DataSystemTest,
888919
std::make_unique<OneShotSynchronizerFactory>(std::move(synchronizer)));
889920

890921
{
891-
FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr,
922+
FDv2DataSystem ds({}, std::move(synchronizers),
923+
/*fallback_condition_factory=*/nullptr,
924+
/*recovery_condition_factory=*/nullptr,
892925
ioc.get_executor(), &status_manager, logger);
893926
ds.Initialize();
894927
// No initializers -> RunNextInitializer immediately exhausts ->

0 commit comments

Comments
 (0)