Skip to content

Commit 7de31e8

Browse files
committed
refactor: minor cleanup
1 parent b5fc6db commit 7de31e8

2 files changed

Lines changed: 46 additions & 26 deletions

File tree

libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,8 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
157157

158158
void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) {
159159
std::lock_guard lock(mutex_);
160-
if (!base_url_) {
161-
return;
162-
}
160+
// base_url_ is guaranteed populated: EnsureStarted publishes it before
161+
// calling async_connect, which is what eventually triggers this hook.
163162
boost::urls::url u = *base_url_;
164163
if (latest_selector_.value) {
165164
u.params().set("basis", latest_selector_.value->state);
@@ -252,29 +251,19 @@ void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) {
252251

253252
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
254253

255-
std::visit(
256-
[this, &msg](auto const& e) {
257-
using T = std::decay_t<decltype(e)>;
258-
if constexpr (std::is_same_v<
259-
T, sse::errors::UnrecoverableClientError>) {
260-
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
261-
MakeError(ErrorKind::kErrorResponse,
262-
static_cast<ErrorInfo::StatusCodeType>(e.status),
263-
std::move(msg)),
264-
false}});
265-
} else if constexpr (std::is_same_v<
266-
T, sse::errors::InvalidRedirectLocation> ||
267-
std::is_same_v<T,
268-
sse::errors::NotRedirectable> ||
269-
std::is_same_v<T, sse::errors::ReadTimeout>) {
270-
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
271-
MakeError(ErrorKind::kNetworkError, 0, std::move(msg)),
272-
false}});
273-
} else {
274-
static_assert(always_false_v<T>, "non-exhaustive visitor");
275-
}
276-
},
277-
error);
254+
if (auto const* client_error =
255+
std::get_if<sse::errors::UnrecoverableClientError>(&error)) {
256+
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
257+
MakeError(
258+
ErrorKind::kErrorResponse,
259+
static_cast<ErrorInfo::StatusCodeType>(client_error->status),
260+
std::move(msg)),
261+
false}});
262+
return;
263+
}
264+
265+
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
266+
MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}});
278267
}
279268

280269
void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) {
@@ -377,6 +366,9 @@ async::Future<FDv2SourceResult> FDv2StreamingSynchronizer::Next(
377366
}
378367
if (idx == 1) {
379368
state->ClearPendingPromise();
369+
if (result_future.IsFinished()) {
370+
return *result_future.GetResult();
371+
}
380372
return FDv2SourceResult{FDv2SourceResult::Timeout{}};
381373
}
382374
return *result_future.GetResult();

libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,34 @@ TEST(FDv2StreamingSynchronizerTest, OnConnectReconnectUsesLatestSelector) {
319319
EXPECT_EQ(req2.target(), "/sdk/stream?basis=s1");
320320
}
321321

322+
TEST(FDv2StreamingSynchronizerTest, OnConnectSelectorStateIsPercentEncoded) {
323+
auto logger = MakeNullLogger();
324+
IoContextRunner runner;
325+
326+
FDv2StreamingSynchronizer synchronizer(
327+
runner.context().get_executor(), logger,
328+
MakeEndpoints("https://stream.example.com"), MakeHttpProperties(),
329+
std::nullopt, 1s);
330+
331+
boost::urls::url base =
332+
boost::urls::parse_uri("https://stream.example.com").value();
333+
base.segments().push_back("sdk");
334+
base.segments().push_back("stream");
335+
FDv2StreamingSynchronizerTestPeer::SetBaseUrl(synchronizer, base);
336+
FDv2StreamingSynchronizerTestPeer::SetLatestSelector(
337+
synchronizer,
338+
data_model::Selector{data_model::Selector::State{1, "a&b"}});
339+
340+
boost::beast::http::request<boost::beast::http::string_body> req;
341+
342+
// Act: invoke the on_connect hook with a selector state containing '&',
343+
// which would terminate the basis value if left raw.
344+
FDv2StreamingSynchronizerTestPeer::OnConnect(synchronizer, &req);
345+
346+
// Assert: '&' is percent-encoded into the basis query param.
347+
EXPECT_EQ(req.target(), "/sdk/stream?basis=a%26b");
348+
}
349+
322350
// ============================================================================
323351
// SSE event translation
324352
// ============================================================================

0 commit comments

Comments
 (0)