Skip to content

Commit fafc2bc

Browse files
committed
chore: add orchestration logging for FDv2 data system
1 parent 89c06c7 commit fafc2bc

2 files changed

Lines changed: 44 additions & 27 deletions

File tree

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ FDv2DataSystem::FDv2DataSystem(
4343
store_(),
4444
change_notifier_(store_, store_),
4545
initialize_called_(false),
46+
last_logged_synchronizer_interrupted_(false),
4647
closed_(false),
4748
selector_(),
4849
initializer_index_(0),
@@ -121,6 +122,9 @@ void FDv2DataSystem::RunNextInitializer() {
121122
} else {
122123
auto& factory = initializer_factories_[initializer_index_++];
123124
active_initializer_ = factory->Build();
125+
LD_LOG(logger_, LogLevel::kInfo)
126+
<< Identity() << ": starting initializer "
127+
<< active_initializer_->Identity();
124128
active_initializer_->Run().Then(
125129
[this](data_interfaces::FDv2SourceResult const& result)
126130
-> std::monostate {
@@ -152,6 +156,8 @@ void FDv2DataSystem::OnInitializerResult(
152156
cs.change_set.selector.value.has_value();
153157
ApplyChangeSet(std::move(cs.change_set));
154158
if (has_selector) {
159+
LD_LOG(logger_, LogLevel::kInfo)
160+
<< Identity() << ": initializer succeeded";
155161
got_basis = true;
156162
}
157163
},
@@ -211,6 +217,10 @@ void FDv2DataSystem::StartSynchronizers() {
211217
}
212218
active_synchronizer_ = source_manager_.NextSynchronizer();
213219
if (active_synchronizer_) {
220+
LD_LOG(logger_, LogLevel::kInfo)
221+
<< Identity() << ": starting synchronizer "
222+
<< active_synchronizer_->Identity();
223+
last_logged_synchronizer_interrupted_.store(false);
214224
active_conditions_ = BuildActiveConditions();
215225
} else {
216226
exhausted = true;
@@ -320,33 +330,37 @@ void FDv2DataSystem::OnSynchronizerResult(
320330
bool got_shutdown = false;
321331
bool advance = false;
322332

323-
std::visit(overloaded{
324-
[&](Result::ChangeSet& cs) {
325-
ApplyChangeSet(std::move(cs.change_set));
326-
},
327-
[&](Result::Shutdown&) { got_shutdown = true; },
328-
[&](Result::Interrupted const& iv) {
329-
LD_LOG(logger_, LogLevel::kWarn)
330-
<< Identity() << ": synchronizer interrupted: "
331-
<< iv.error.Message();
332-
status_manager_->SetState(
333-
DataSourceStatus::DataSourceState::kInterrupted,
334-
iv.error.Kind(), iv.error.Message());
335-
},
336-
[&](Result::TerminalError const& te) {
337-
LD_LOG(logger_, LogLevel::kWarn)
338-
<< Identity() << ": synchronizer terminal error: "
339-
<< te.error.Message();
340-
status_manager_->SetState(
341-
DataSourceStatus::DataSourceState::kInterrupted,
342-
te.error.Kind(), te.error.Message());
343-
advance = true;
344-
},
345-
[&](Result::Goodbye const&) {
346-
// The synchronizer handles this internally.
347-
},
348-
},
349-
result.value);
333+
std::visit(
334+
overloaded{
335+
[&](Result::ChangeSet& cs) {
336+
last_logged_synchronizer_interrupted_.store(false);
337+
ApplyChangeSet(std::move(cs.change_set));
338+
},
339+
[&](Result::Shutdown&) { got_shutdown = true; },
340+
[&](Result::Interrupted const& iv) {
341+
if (!last_logged_synchronizer_interrupted_.exchange(true)) {
342+
LD_LOG(logger_, LogLevel::kInfo)
343+
<< Identity()
344+
<< ": synchronizer interrupted: " << iv.error.Message();
345+
}
346+
status_manager_->SetState(
347+
DataSourceStatus::DataSourceState::kInterrupted,
348+
iv.error.Kind(), iv.error.Message());
349+
},
350+
[&](Result::TerminalError const& te) {
351+
LD_LOG(logger_, LogLevel::kWarn)
352+
<< Identity()
353+
<< ": synchronizer terminal error: " << te.error.Message();
354+
status_manager_->SetState(
355+
DataSourceStatus::DataSourceState::kInterrupted,
356+
te.error.Kind(), te.error.Message());
357+
advance = true;
358+
},
359+
[&](Result::Goodbye const&) {
360+
// The synchronizer handles this internally.
361+
},
362+
},
363+
result.value);
350364

351365
{
352366
std::lock_guard<std::mutex> lock(mutex_);

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
283283
// Set by Initialize() to detect repeat or concurrent calls.
284284
std::atomic_bool initialize_called_;
285285

286+
// Suppresses consecutive "interrupted" logs from the active synchronizer.
287+
std::atomic_bool last_logged_synchronizer_interrupted_;
288+
286289
// Orchestration state, guarded by mutex_.
287290
std::mutex mutex_;
288291
bool closed_;

0 commit comments

Comments
 (0)