Skip to content

Commit e9c3c04

Browse files
authored
fix: FDv2 fixes surfaced by v3 contract test harness (#547)
FDv2 fixes for issues found running the v3 contract test harness against the server SDK. - Polling URL trailing-slash handling. - Filter key validation in FDv2 polling and streaming. - Protocol handler accepts incremental updates after payload-transferred. - Stream restart on parse failures and protocol errors. - Unknown SSE event types skipped without parsing. - Error events preserve protocol state (only accumulated changes cleared). <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes core FDv2 sync behavior (multi-payload streams, error recovery, and reconnect triggers); mistakes could cause missed updates or extra reconnects, but scope is FDv2 data paths with expanded unit tests. > > **Overview** > **FDv2 protocol handler** now matches streaming incremental semantics: after `payload-transferred` it stays active in `kPartial` instead of resetting, so later put/delete + `payload-transferred` cycles emit **partial** changesets without a new `server-intent`. Put/delete are no longer dropped solely because state was inactive; **`error`** clears in-flight changes but keeps protocol state so updates can continue. Adds **`IsKnownEvent`** for spec-defined ignore behavior. > > **Polling and streaming** build `/sdk/poll` and `/sdk/stream` by stripping a trailing-slash empty URL segment (avoids `//`), and apply the same **`ValidateFilterKey`** used elsewhere—invalid filters are omitted with an error log and a full-environment request. **`MakeFDv2PollRequest`** takes a **`Logger`** for that path. > > **FDv2 streaming** skips unknown SSE event types before JSON parsing, and calls **`async_restart`** on malformed JSON or non-server **`HandleEvent`** protocol errors so the next connection starts clean. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 3488e09. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 4f9f8c4 commit e9c3c04

10 files changed

Lines changed: 248 additions & 23 deletions

libs/internal/include/launchdarkly/fdv2_protocol_handler.hpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ namespace launchdarkly {
1414
/**
1515
* Protocol state machine for the FDv2 wire format.
1616
*
17-
* Accumulates put-object and delete-object events between a server-intent
18-
* and payload-transferred event, then emits a complete FDv2ChangeSet.
17+
* A server-intent opens a transfer cycle: put-object and delete-object
18+
* events accumulate until a payload-transferred event, which emits an
19+
* FDv2ChangeSet. The handler then remains active — subsequent put/delete
20+
* + payload-transferred cycles emit kPartial changesets reusing the prior
21+
* intent, until the server sends a new server-intent, error, or goodbye.
1922
*
2023
* Shared between the polling and streaming synchronizers.
2124
*/
@@ -95,6 +98,13 @@ class FDv2ProtocolHandler {
9598
*/
9699
void Reset();
97100

101+
/**
102+
* @return true if event_type is one that the protocol handler recognizes
103+
* and may dispatch on. Events outside this set are spec-defined as
104+
* "unrecognized data that can be safely ignored".
105+
*/
106+
static bool IsKnownEvent(std::string_view event_type);
107+
98108
FDv2ProtocolHandler() = default;
99109

100110
private:

libs/internal/src/fdv2_protocol_handler.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ static char const* const kGoodbye = "goodbye";
1414

1515
using Error = FDv2ProtocolHandler::Error;
1616

17+
bool FDv2ProtocolHandler::IsKnownEvent(std::string_view event_type) {
18+
return event_type == kServerIntent || event_type == kPutObject ||
19+
event_type == kDeleteObject || event_type == kPayloadTransferred ||
20+
event_type == kError || event_type == kGoodbye;
21+
}
22+
1723
FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleEvent(
1824
std::string_view event_type,
1925
boost::json::value const& data) {
@@ -78,9 +84,6 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleServerIntent(
7884

7985
FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject(
8086
boost::json::value const& data) {
81-
if (state_ == State::kInactive) {
82-
return std::monostate{};
83-
}
8487
auto result = boost::json::value_to<
8588
tl::expected<std::optional<PutObject>, JsonError>>(data);
8689
if (!result) {
@@ -101,9 +104,6 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePutObject(
101104

102105
FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleDeleteObject(
103106
boost::json::value const& data) {
104-
if (state_ == State::kInactive) {
105-
return std::monostate{};
106-
}
107107
auto result = boost::json::value_to<
108108
tl::expected<std::optional<DeleteObject>, JsonError>>(data);
109109
if (!result) {
@@ -152,15 +152,20 @@ FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandlePayloadTransferred(
152152
type, std::move(changes_),
153153
data_model::Selector{data_model::Selector::State{transferred.version,
154154
transferred.state}}};
155-
Reset();
155+
// Transition to kPartial so subsequent put-object/payload-transferred
156+
// cycles work without requiring a new server-intent.
157+
changes_.clear();
158+
state_ = State::kPartial;
156159
return changeset;
157160
}
158161

159162
FDv2ProtocolHandler::Result FDv2ProtocolHandler::HandleError(
160163
boost::json::value const& data) {
161164
auto result = boost::json::value_to<
162165
tl::expected<std::optional<FDv2Error>, JsonError>>(data);
163-
Reset();
166+
// Discard any partial-payload accumulation but keep state intact so
167+
// the next put-object/payload-transferred cycle continues normally.
168+
changes_.clear();
164169
if (!result) {
165170
return Error::JsonParseError(result.error(),
166171
"could not deserialize error event");

libs/internal/tests/fdv2_protocol_handler_test.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,29 @@ TEST(FDv2ProtocolHandlerTest,
235235
EXPECT_TRUE(cs->changes.empty());
236236
}
237237

238+
TEST(FDv2ProtocolHandlerTest, ErrorMidPayloadDiscardsPartialAcceptsSubsequent) {
239+
FDv2ProtocolHandler handler;
240+
241+
handler.HandleEvent("server-intent", MakeServerIntent("xfer-full"));
242+
handler.HandleEvent("put-object",
243+
MakePutObject("flag", "abandoned", kFlagJson));
244+
handler.HandleEvent(
245+
"error", boost::json::parse(R"({"reason":"something went wrong"})"));
246+
247+
// After the error, a fresh put + payload-transferred (without an
248+
// intervening server-intent) emits a changeset containing only the
249+
// post-error put.
250+
handler.HandleEvent("put-object",
251+
MakePutObject("flag", "fresh", kFlagJson));
252+
auto result = handler.HandleEvent("payload-transferred",
253+
MakePayloadTransferred("s", 1));
254+
255+
auto* cs = std::get_if<data_model::FDv2ChangeSet>(&result);
256+
ASSERT_NE(cs, nullptr);
257+
ASSERT_EQ(cs->changes.size(), 1u);
258+
EXPECT_EQ(cs->changes[0].key, "fresh");
259+
}
260+
238261
TEST(FDv2ProtocolHandlerTest, ErrorEventWithIdSetsServerId) {
239262
FDv2ProtocolHandler handler;
240263

@@ -353,3 +376,30 @@ TEST(FDv2ProtocolHandlerTest, PayloadTransferredWithoutServerIntentIsError) {
353376
ASSERT_NE(err, nullptr);
354377
EXPECT_EQ(err->kind, FDv2ProtocolHandler::Error::Kind::kProtocolError);
355378
}
379+
380+
TEST(FDv2ProtocolHandlerTest, ConsecutivePayloadsWithoutNewServerIntent) {
381+
FDv2ProtocolHandler handler;
382+
383+
handler.HandleEvent("server-intent", MakeServerIntent("xfer-full"));
384+
handler.HandleEvent("put-object",
385+
MakePutObject("flag", "first", kFlagJson));
386+
auto first = handler.HandleEvent("payload-transferred",
387+
MakePayloadTransferred("s1", 1));
388+
auto* cs1 = std::get_if<data_model::FDv2ChangeSet>(&first);
389+
ASSERT_NE(cs1, nullptr);
390+
ASSERT_EQ(cs1->changes.size(), 1u);
391+
EXPECT_EQ(cs1->changes[0].key, "first");
392+
393+
// A subsequent payload arrives without an intervening server-intent
394+
// (streaming incremental update): the handler emits a kPartial
395+
// changeset reusing the prior intent.
396+
handler.HandleEvent("put-object",
397+
MakePutObject("flag", "second", kFlagJson));
398+
auto second = handler.HandleEvent("payload-transferred",
399+
MakePayloadTransferred("s2", 2));
400+
auto* cs2 = std::get_if<data_model::FDv2ChangeSet>(&second);
401+
ASSERT_NE(cs2, nullptr);
402+
ASSERT_EQ(cs2->changes.size(), 1u);
403+
EXPECT_EQ(cs2->changes[0].key, "second");
404+
EXPECT_EQ(cs2->type, data_model::ChangeSetType::kPartial);
405+
}

libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "fdv2_polling_impl.hpp"
2+
#include "../background_sync/detail/payload_filter_validation/payload_filter_validation.hpp"
23
#include "fdv2_changeset_translation.hpp"
34

45
#include <launchdarkly/network/http_error_messages.hpp>
@@ -11,7 +12,6 @@
1112

1213
namespace launchdarkly::server_side::data_systems {
1314

14-
static char const* const kFDv2PollPath = "/sdk/poll";
1515
static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback";
1616

1717
static char const* const kErrorParsingBody =
@@ -44,7 +44,8 @@ network::HttpRequest MakeFDv2PollRequest(
4444
config::built::ServiceEndpoints const& endpoints,
4545
config::built::HttpProperties const& http_properties,
4646
data_model::Selector const& selector,
47-
std::optional<std::string> const& filter_key) {
47+
std::optional<std::string> const& filter_key,
48+
Logger const& logger) {
4849
config::builders::HttpPropertiesBuilder const builder(http_properties);
4950

5051
auto parsed = boost::urls::parse_uri(endpoints.PollingBaseUrl());
@@ -54,12 +55,25 @@ network::HttpRequest MakeFDv2PollRequest(
5455
}
5556

5657
boost::urls::url u = parsed.value();
57-
u.set_path(u.path() + kFDv2PollPath);
58+
// A trailing '/' on the base URL appears as an empty final segment;
59+
// remove it so subsequent push_backs don't produce a double slash.
60+
auto segs = u.segments();
61+
if (!segs.empty() && segs.back().empty()) {
62+
segs.pop_back();
63+
}
64+
segs.push_back("sdk");
65+
segs.push_back("poll");
5866
if (selector.value) {
5967
u.params().append({"basis", selector.value->state});
6068
}
6169
if (filter_key) {
62-
u.params().append({"filter", *filter_key});
70+
if (detail::ValidateFilterKey(*filter_key)) {
71+
u.params().append({"filter", *filter_key});
72+
} else {
73+
LD_LOG(logger, LogLevel::kError)
74+
<< "data source config: provided filter is invalid, will "
75+
"request full environment instead";
76+
}
6377
}
6478

6579
return {std::string(u.buffer()), network::HttpMethod::kGet, builder.Build(),

libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ network::HttpRequest MakeFDv2PollRequest(
1919
config::built::ServiceEndpoints const& endpoints,
2020
config::built::HttpProperties const& http_properties,
2121
data_model::Selector const& selector,
22-
std::optional<std::string> const& filter_key);
22+
std::optional<std::string> const& filter_key,
23+
Logger const& logger);
2324

2425
// Parse an HTTP response from the FDv2 polling endpoint through the protocol
2526
// handler and return the appropriate result. identity is used in log messages

libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ FDv2PollingInitializer::FDv2PollingInitializer(
2020
: request_(MakeFDv2PollRequest(endpoints,
2121
http_properties,
2222
std::move(selector),
23-
std::move(filter_key))),
23+
std::move(filter_key),
24+
logger)),
2425
requester_(executor, http_properties.Tls()),
2526
state_(std::make_shared<State>(logger)) {}
2627

libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ FDv2PollingSynchronizer::State::State(
3434
async::Future<network::HttpResult> FDv2PollingSynchronizer::State::Request(
3535
data_model::Selector const& selector) const {
3636
auto request = MakeFDv2PollRequest(endpoints_, http_properties_, selector,
37-
filter_key_);
37+
filter_key_, logger_);
3838

3939
// Promise must be in a shared_ptr because Requester requires callbacks
4040
// to be copy-constructible (stored in std::function).

libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "streaming_synchronizer.hpp"
2+
#include "../background_sync/detail/payload_filter_validation/payload_filter_validation.hpp"
23
#include "fdv2_changeset_translation.hpp"
34

45
#include <launchdarkly/async/timer.hpp>
@@ -74,10 +75,14 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
7475

7576
boost::urls::url u = parsed.value();
7677

77-
// Safer way of appending /sdk/stream than string concatenation: avoids
78-
// double slashes if the base URL has a trailing slash.
79-
u.segments().push_back("sdk");
80-
u.segments().push_back("stream");
78+
// A trailing '/' on the base URL appears as an empty final segment;
79+
// remove it so subsequent push_backs don't produce a double slash.
80+
auto segs = u.segments();
81+
if (!segs.empty() && segs.back().empty()) {
82+
segs.pop_back();
83+
}
84+
segs.push_back("sdk");
85+
segs.push_back("stream");
8186

8287
// basis and filter are not added here — they are appended per-connect by
8388
// the on_connect hook (OnConnect), so that each (re)connection uses the
@@ -168,7 +173,13 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) {
168173
u.params().set("basis", latest_selector_.value->state);
169174
}
170175
if (filter_key_) {
171-
u.params().set("filter", *filter_key_);
176+
if (detail::ValidateFilterKey(*filter_key_)) {
177+
u.params().set("filter", *filter_key_);
178+
} else {
179+
LD_LOG(logger_, LogLevel::kError)
180+
<< "data source config: provided filter is invalid, will "
181+
"request full environment instead";
182+
}
172183
}
173184
req->target(u.encoded_target());
174185
}
@@ -183,6 +194,10 @@ void FDv2StreamingSynchronizer::State::OnResponse(
183194
}
184195

185196
void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
197+
if (!FDv2ProtocolHandler::IsKnownEvent(event.type())) {
198+
return;
199+
}
200+
186201
boost::system::error_code ec;
187202
auto data = boost::json::parse(event.data(), ec);
188203
if (ec) {
@@ -191,6 +206,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
191206
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
192207
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
193208
MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}});
209+
std::lock_guard lock(mutex_);
210+
if (sse_client_) {
211+
sse_client_->async_restart("FDv2 parse error");
212+
}
194213
return;
195214
}
196215

@@ -250,6 +269,10 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
250269
<< kIdentity << ": " << r.message;
251270
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
252271
MakeError(ErrorKind::kInvalidData, 0, r.message)}});
272+
std::lock_guard lock(mutex_);
273+
if (sse_client_) {
274+
sse_client_->async_restart("FDv2 protocol error");
275+
}
253276
} else {
254277
static_assert(always_false_v<T>, "non-exhaustive visitor");
255278
}

libs/server-sdk/tests/fdv2_polling_impl_test.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <data_systems/fdv2/fdv2_polling_impl.hpp>
44

5+
#include <launchdarkly/config/shared/defaults.hpp>
56
#include <launchdarkly/fdv2_protocol_handler.hpp>
67
#include <launchdarkly/logging/logger.hpp>
78
#include <launchdarkly/network/http_requester.hpp>
@@ -107,3 +108,47 @@ TEST(HandleFDv2PollResponseTest, NetworkErrorDoesNotSetFlag) {
107108
std::holds_alternative<FDv2SourceResult::Interrupted>(result.value));
108109
EXPECT_FALSE(result.fdv1_fallback);
109110
}
111+
112+
TEST(MakeFDv2PollRequestTest, BaseWithTrailingSlashDoesNotProduceDoubleSlash) {
113+
auto logger = MakeNullLogger();
114+
config::shared::built::ServiceEndpoints endpoints{"http://example.com/", "",
115+
""};
116+
auto props =
117+
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
118+
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
119+
std::nullopt, logger);
120+
EXPECT_EQ(req.Url(), "http://example.com/sdk/poll");
121+
}
122+
123+
TEST(MakeFDv2PollRequestTest, BaseWithSubpathTrailingSlashJoinsCleanly) {
124+
auto logger = MakeNullLogger();
125+
config::shared::built::ServiceEndpoints endpoints{
126+
"http://example.com/relay/", "", ""};
127+
auto props =
128+
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
129+
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
130+
std::nullopt, logger);
131+
EXPECT_EQ(req.Url(), "http://example.com/relay/sdk/poll");
132+
}
133+
134+
TEST(MakeFDv2PollRequestTest, ValidFilterKeyIsIncluded) {
135+
auto logger = MakeNullLogger();
136+
config::shared::built::ServiceEndpoints endpoints{"http://example.com", "",
137+
""};
138+
auto props =
139+
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
140+
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
141+
std::string{"my-filter_1.0"}, logger);
142+
EXPECT_EQ(req.Url(), "http://example.com/sdk/poll?filter=my-filter_1.0");
143+
}
144+
145+
TEST(MakeFDv2PollRequestTest, InvalidFilterKeyIsDropped) {
146+
auto logger = MakeNullLogger();
147+
config::shared::built::ServiceEndpoints endpoints{"http://example.com", "",
148+
""};
149+
auto props =
150+
config::shared::Defaults<config::shared::ServerSDK>::HttpProperties();
151+
auto req = MakeFDv2PollRequest(endpoints, props, data_model::Selector{},
152+
std::string{"has spaces"}, logger);
153+
EXPECT_EQ(req.Url(), "http://example.com/sdk/poll");
154+
}

0 commit comments

Comments
 (0)