Skip to content

Commit 716b270

Browse files
committed
fix: restart FDv2 stream on parse or protocol error
1 parent 7695461 commit 716b270

2 files changed

Lines changed: 42 additions & 1 deletion

File tree

libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
192192
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
193193
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
194194
MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}});
195+
std::lock_guard lock(mutex_);
196+
if (sse_client_) {
197+
sse_client_->async_restart("FDv2 parse error");
198+
}
195199
return;
196200
}
197201

@@ -251,6 +255,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
251255
<< kIdentity << ": " << r.message;
252256
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
253257
MakeError(ErrorKind::kInvalidData, 0, r.message)}});
258+
std::lock_guard lock(mutex_);
259+
if (sse_client_) {
260+
sse_client_->async_restart("FDv2 protocol error");
261+
}
254262
} else {
255263
static_assert(always_false_v<T>, "non-exhaustive visitor");
256264
}

libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,9 @@ TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) {
584584
1s);
585585
FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer);
586586

587+
auto mock_client = std::make_shared<MockSseClient>();
588+
FDv2StreamingSynchronizerTestPeer::SetSseClient(synchronizer, mock_client);
589+
587590
sse::Event bad_event("server-intent", "this is not json");
588591

589592
// Act: deliver an event whose data field cannot be parsed as JSON.
@@ -592,13 +595,43 @@ TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) {
592595
auto result = future.WaitForResult(2s);
593596

594597
// Assert: the synchronizer reports Interrupted{kInvalidData} so the
595-
// orchestrator knows the stream produced unparseable bytes.
598+
// orchestrator knows the stream produced unparseable bytes, and drives
599+
// the SSE client to restart so the next connection starts clean.
596600
ASSERT_TRUE(result.has_value());
597601
auto* interrupted =
598602
std::get_if<FDv2SourceResult::Interrupted>(&result->value);
599603
ASSERT_NE(interrupted, nullptr);
600604
EXPECT_EQ(interrupted->error.Kind(),
601605
FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData);
606+
EXPECT_EQ(mock_client->restart_count_, 1);
607+
}
608+
609+
TEST(FDv2StreamingSynchronizerTest,
610+
SchemaViolationServerIntentTriggersRestart) {
611+
auto logger = MakeNullLogger();
612+
IoContextRunner runner;
613+
614+
FDv2StreamingSynchronizer synchronizer(
615+
runner.context().get_executor(), logger,
616+
MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt,
617+
1s);
618+
FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer);
619+
620+
auto mock_client = std::make_shared<MockSseClient>();
621+
FDv2StreamingSynchronizerTestPeer::SetSseClient(synchronizer, mock_client);
622+
623+
// Well-formed JSON, but the shape doesn't match a server-intent payload.
624+
sse::Event bad_event("server-intent",
625+
R"({"data":{"flags":true,"segments":{}}})");
626+
627+
FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, bad_event);
628+
auto future = synchronizer.Next(data_model::Selector{});
629+
auto result = future.WaitForResult(2s);
630+
631+
ASSERT_TRUE(result.has_value());
632+
ASSERT_NE(std::get_if<FDv2SourceResult::Interrupted>(&result->value),
633+
nullptr);
634+
EXPECT_EQ(mock_client->restart_count_, 1);
602635
}
603636

604637
TEST(FDv2StreamingSynchronizerTest, TranslationFailureReturnsInterrupted) {

0 commit comments

Comments
 (0)