Skip to content

Commit 6b8d925

Browse files
committed
Start refactoring backoff.
1 parent e7622d0 commit 6b8d925

5 files changed

Lines changed: 328 additions & 61 deletions

File tree

libs/server-sent-events/src/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ set(SSE_SOURCES
1212
event.cpp
1313
error.cpp
1414
backoff_detail.cpp
15-
backoff.cpp)
15+
backoff.cpp
16+
curl_client.cpp
17+
backoff_timer.cpp)
1618

1719
if (LD_CURL_NETWORKING)
1820
message(STATUS "LaunchDarkly SSE: CURL networking enabled")
1921
find_package(CURL REQUIRED)
20-
list(APPEND SSE_SOURCES curl_client.cpp)
22+
list(APPEND SSE_SOURCES)
2123
endif()
2224

2325
add_library(${LIBNAME} OBJECT ${SSE_SOURCES})
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "backoff_timer.hpp"
2+
3+
#include <boost/asio/post.hpp>
4+
5+
namespace launchdarkly::sse {
6+
7+
BackoffTimer::BackoffTimer(boost::asio::any_io_executor executor)
8+
: executor_(std::move(executor)) {
9+
std::thread t([this]() { timer_thread_func(); });
10+
t.detach();
11+
}
12+
13+
BackoffTimer::~BackoffTimer() {
14+
// Signal shutdown and wake up the timer thread
15+
{
16+
std::lock_guard<std::mutex> lock(mutex_);
17+
shutdown_ = true;
18+
cancelled_ = true;
19+
}
20+
cv_.notify_one();
21+
22+
// Wait for the timer thread to complete
23+
// if (timer_thread_.joinable()) {
24+
// timer_thread_.join();
25+
// }
26+
}
27+
28+
void BackoffTimer::expires_after(std::chrono::milliseconds duration,
29+
std::function<void(bool cancelled)> callback) {
30+
{
31+
std::lock_guard<std::mutex> lock(mutex_);
32+
33+
// Cancel any existing timer
34+
cancelled_ = timer_active_;
35+
36+
// Set up the new timer
37+
expiry_time_ = std::chrono::steady_clock::now() + duration;
38+
callback_ = std::move(callback);
39+
timer_active_ = true;
40+
cancelled_ = false;
41+
}
42+
43+
// Wake up the timer thread
44+
cv_.notify_one();
45+
}
46+
47+
void BackoffTimer::cancel() {
48+
{
49+
std::lock_guard<std::mutex> lock(mutex_);
50+
if (timer_active_) {
51+
cancelled_ = true;
52+
}
53+
}
54+
55+
// Wake up the timer thread
56+
cv_.notify_one();
57+
}
58+
59+
boost::asio::any_io_executor BackoffTimer::get_executor() const {
60+
return executor_;
61+
}
62+
63+
void BackoffTimer::timer_thread_func() {
64+
while (true) {
65+
std::function<void(bool cancelled)> callback_to_invoke;
66+
bool should_invoke = false;
67+
bool was_cancelled = false;
68+
69+
{
70+
std::unique_lock<std::mutex> lock(mutex_);
71+
72+
// Wait for either:
73+
// 1. A timer to be set (timer_active_ becomes true)
74+
// 2. Shutdown signal
75+
while (!timer_active_ && !shutdown_) {
76+
cv_.wait(lock);
77+
}
78+
79+
// Check if we're shutting down
80+
if (shutdown_) {
81+
return;
82+
}
83+
84+
// Wait until the timer expires or is cancelled
85+
while (timer_active_ && !cancelled_ && !shutdown_) {
86+
auto now = std::chrono::steady_clock::now();
87+
if (now >= expiry_time_) {
88+
// Timer expired
89+
should_invoke = true;
90+
was_cancelled = false;
91+
callback_to_invoke = std::move(callback_);
92+
timer_active_ = false;
93+
break;
94+
}
95+
96+
// Wait until expiry time or until notified
97+
cv_.wait_until(lock, expiry_time_);
98+
}
99+
100+
// Check if timer was cancelled
101+
if (cancelled_ && timer_active_) {
102+
should_invoke = true;
103+
was_cancelled = true;
104+
callback_to_invoke = std::move(callback_);
105+
timer_active_ = false;
106+
cancelled_ = false;
107+
}
108+
109+
// Check if we're shutting down
110+
if (shutdown_) {
111+
return;
112+
}
113+
}
114+
115+
// Invoke callback outside of lock by posting to the executor
116+
if (should_invoke && callback_to_invoke) {
117+
boost::asio::post(executor_, [callback = std::move(callback_to_invoke),
118+
was_cancelled]() {
119+
callback(was_cancelled);
120+
});
121+
}
122+
}
123+
}
124+
125+
} // namespace launchdarkly::sse
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#pragma once
2+
3+
#include <boost/asio/any_io_executor.hpp>
4+
5+
#include <atomic>
6+
#include <chrono>
7+
#include <condition_variable>
8+
#include <functional>
9+
#include <memory>
10+
#include <mutex>
11+
#include <thread>
12+
13+
namespace launchdarkly::sse {
14+
15+
/**
16+
* A thread-based timer that waits for a specified duration and then posts
17+
* a callback to an ASIO executor. The timer can be cancelled, making it
18+
* suitable for use in backoff scenarios where cleanup is required during
19+
* destruction.
20+
*
21+
* The timer uses a dedicated thread for waiting (via condition_variable)
22+
* and posts the callback to the provided ASIO executor when the timer expires.
23+
* This avoids blocking the ASIO thread pool during backoff periods.
24+
*/
25+
class BackoffTimer {
26+
public:
27+
/**
28+
* Construct a BackoffTimer with the given ASIO executor.
29+
* @param executor The ASIO executor to post callbacks to when timer expires.
30+
*/
31+
explicit BackoffTimer(boost::asio::any_io_executor executor);
32+
33+
/**
34+
* Destructor. Cancels any pending timer and waits for the timer thread
35+
* to complete.
36+
*/
37+
~BackoffTimer();
38+
39+
// Non-copyable and non-movable
40+
BackoffTimer(const BackoffTimer&) = delete;
41+
BackoffTimer& operator=(const BackoffTimer&) = delete;
42+
BackoffTimer(BackoffTimer&&) = delete;
43+
BackoffTimer& operator=(BackoffTimer&&) = delete;
44+
45+
/**
46+
* Start an asynchronous wait. When the duration expires, the callback
47+
* will be posted to the executor provided in the constructor.
48+
*
49+
* If a timer is already running, it will be cancelled before starting
50+
* the new timer.
51+
*
52+
* @param duration The duration to wait before invoking the callback.
53+
* @param callback The callback to invoke when the timer expires.
54+
* The callback receives a boolean indicating whether
55+
* the timer was cancelled (true) or expired normally (false).
56+
*/
57+
void expires_after(std::chrono::milliseconds duration,
58+
std::function<void(bool cancelled)> callback);
59+
60+
/**
61+
* Cancel any pending timer. If a timer is running, the callback will
62+
* be invoked with cancelled=true.
63+
*/
64+
void cancel();
65+
66+
/**
67+
* Get the executor used by this timer.
68+
*/
69+
boost::asio::any_io_executor get_executor() const;
70+
71+
private:
72+
void timer_thread_func();
73+
74+
boost::asio::any_io_executor executor_;
75+
76+
std::mutex mutex_;
77+
std::condition_variable cv_;
78+
std::atomic<bool> shutdown_{false};
79+
std::atomic<bool> cancelled_{false};
80+
81+
std::chrono::steady_clock::time_point expiry_time_;
82+
std::function<void(bool cancelled)> callback_;
83+
bool timer_active_{false};
84+
85+
std::thread timer_thread_;
86+
};
87+
88+
} // namespace launchdarkly::sse

libs/server-sent-events/src/curl_client.cpp

Lines changed: 63 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -62,32 +62,12 @@ CurlClient::CurlClient(boost::asio::any_io_executor executor,
6262
write_timeout,
6363
std::move(custom_ca_file),
6464
std::move(proxy_url),
65-
skip_verify_peer,
66-
[this](const std::string& message) {
67-
boost::asio::post(backoff_timer_.get_executor(), [this, message]() {
68-
async_backoff(message);
69-
});
70-
},
71-
[this](const Event& event) {
72-
boost::asio::post(backoff_timer_.get_executor(), [this, event]() {
73-
event_receiver_(event);
74-
});
75-
},
76-
[this](const Error& error) {
77-
boost::asio::post(backoff_timer_.get_executor(), [this, error]() {
78-
report_error(error);
79-
});
80-
},
81-
[this]() {
82-
boost::asio::post(backoff_timer_.get_executor(), [this]() {
83-
backoff_.succeed();
84-
});
85-
}, [this](const std::string& message) {
86-
log_message(message);
87-
});
65+
skip_verify_peer);
66+
std::cout << "Curl client created: " << host << std::endl;
8867
}
8968

9069
CurlClient::~CurlClient() {
70+
std::cout << "Curl client destructing: " << request_context_->url << std::endl;
9171
request_context_->shutdown();
9272
backoff_timer_.cancel();
9373
}
@@ -97,12 +77,58 @@ void CurlClient::async_connect() {
9777
[self = shared_from_this()]() { self->do_run(); });
9878
}
9979

100-
void CurlClient::do_run() const {
80+
void CurlClient::do_run() {
10181
if (request_context_->is_shutting_down()) {
10282
return;
10383
}
10484

10585
auto ctx = request_context_;
86+
auto weak_self = weak_from_this();
87+
ctx->set_callbacks(Callbacks([weak_self, ctx](const std::string& message) {
88+
std::cout << "Backoff " << ctx->url << " " << message << std::endl;
89+
if (auto self = weak_self.lock()) {
90+
boost::asio::post(
91+
self->backoff_timer_.
92+
get_executor(),
93+
[self, message]() {
94+
self->async_backoff(message);
95+
});
96+
}
97+
},
98+
[weak_self, ctx](const Event& event) {
99+
std::cout << "Event " << ctx->url << " " << event.data() << std::endl;
100+
if (auto self = weak_self.lock()) {
101+
boost::asio::post(
102+
self->backoff_timer_.
103+
get_executor(),
104+
[self, event]() {
105+
self->event_receiver_(event);
106+
});
107+
}
108+
},
109+
[weak_self, ctx](const Error& error) {
110+
std::cout << "Error " << ctx->url << " " << error << std::endl;
111+
if (const auto self = weak_self.lock()) {
112+
// report_error does an asio post.
113+
self->report_error(error);
114+
}
115+
},
116+
[weak_self, ctx]() {
117+
std::cout << "Reset backoff" << ctx->url << std::endl;
118+
if (const auto self = weak_self.lock()) {
119+
boost::asio::post(
120+
self->backoff_timer_.
121+
get_executor(),
122+
[self]() {
123+
self->backoff_.succeed();
124+
});
125+
}
126+
}, [weak_self, ctx](const std::string& message) {
127+
std::cout << "Log " << ctx->url << " " << message << std::endl;
128+
if (const auto self = weak_self.lock()) {
129+
self->log_message(message);
130+
}
131+
}));
106132
// Start request in a separate thread since CURL blocks
107133
// Capture only raw 'this' pointer, not shared_ptr
108134
std::thread t(
@@ -122,19 +148,21 @@ void CurlClient::async_backoff(std::string const& reason) {
122148

123149
log_message(msg.str());
124150

125-
backoff_timer_.expires_after(backoff_.delay());
126-
backoff_timer_.async_wait([self = shared_from_this()](
127-
const boost::system::error_code& ec) {
128-
self->on_backoff(ec);
151+
// auto weak_self = weak_from_this();
152+
backoff_timer_.expires_after(backoff_.delay(),
153+
[this](bool cancelled) {
154+
if (request_context_->is_shutting_down()) {
155+
return;
156+
}
157+
// if (const auto self = weak_self.lock()) {
158+
on_backoff(cancelled);
159+
// }
129160
});
130161
}
131162

132-
void CurlClient::on_backoff(const boost::system::error_code& ec) const {
133-
{
134-
if (ec == boost::asio::error::operation_aborted || request_context_->
135-
is_shutting_down()) {
136-
return;
137-
}
163+
void CurlClient::on_backoff(bool cancelled) {
164+
if (cancelled || request_context_->is_shutting_down()) {
165+
return;
138166
}
139167
do_run();
140168
}
@@ -492,6 +520,7 @@ size_t CurlClient::HeaderCallback(const char* buffer,
492520
}
493521

494522
void CurlClient::PerformRequest(std::shared_ptr<RequestContext> context) {
523+
std::cout << "CurlClient::PerformRequest: " << context->url << std::endl;
495524
if (context->is_shutting_down()) {
496525
return;
497526
}

0 commit comments

Comments
 (0)