Skip to content

Commit 09ba09d

Browse files
committed
Contract tests passing with multi support.
1 parent 27a498f commit 09ba09d

6 files changed

Lines changed: 311 additions & 97 deletions

File tree

libs/internal/include/launchdarkly/network/curl_multi_manager.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ class CurlMultiManager : public std::enable_shared_from_this<CurlMultiManager> {
5454
/**
5555
* Add an easy handle to be managed.
5656
* @param easy The CURL easy handle (must be configured)
57+
* @param headers The curl_slist headers (will be freed automatically)
5758
* @param callback Called when the transfer completes
5859
*/
59-
void add_handle(CURL* easy, CompletionCallback callback);
60+
void add_handle(CURL* easy, curl_slist* headers, CompletionCallback callback);
6061

6162
/**
6263
* Remove an easy handle from management.
@@ -86,7 +87,7 @@ class CurlMultiManager : public std::enable_shared_from_this<CurlMultiManager> {
8687
// Per-socket data
8788
struct SocketInfo {
8889
curl_socket_t sockfd;
89-
std::unique_ptr<boost::asio::posix::stream_descriptor> descriptor;
90+
std::shared_ptr<boost::asio::posix::stream_descriptor> descriptor;
9091
int action{0}; // CURL_POLL_IN, CURL_POLL_OUT, etc.
9192
};
9293

@@ -99,6 +100,7 @@ class CurlMultiManager : public std::enable_shared_from_this<CurlMultiManager> {
99100

100101
std::mutex mutex_;
101102
std::map<CURL*, CompletionCallback> callbacks_;
103+
std::map<CURL*, curl_slist*> headers_;
102104
int still_running_{0};
103105
};
104106

libs/internal/src/network/curl_multi_manager.cpp

Lines changed: 112 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,54 @@ CurlMultiManager::CurlMultiManager(boost::asio::any_io_executor executor)
3333

3434
CurlMultiManager::~CurlMultiManager() {
3535
if (multi_handle_) {
36+
// Extract and clear pending handles, callbacks, and headers
37+
std::map<CURL*, CompletionCallback> pending_callbacks;
38+
std::map<CURL*, curl_slist*> pending_headers;
39+
{
40+
std::lock_guard<std::mutex> lock(mutex_);
41+
pending_callbacks = std::move(callbacks_);
42+
pending_headers = std::move(headers_);
43+
callbacks_.clear();
44+
headers_.clear();
45+
}
46+
47+
// Remove handles from multi and cleanup resources
48+
// Do NOT invoke callbacks as they may access destroyed objects
49+
for (auto& [easy, callback] : pending_callbacks) {
50+
curl_multi_remove_handle(multi_handle_, easy);
51+
52+
// Free headers if they exist for this handle
53+
auto header_it = pending_headers.find(easy);
54+
if (header_it != pending_headers.end() && header_it->second) {
55+
curl_slist_free_all(header_it->second);
56+
}
57+
58+
curl_easy_cleanup(easy);
59+
}
60+
3661
curl_multi_cleanup(multi_handle_);
3762
}
3863
}
3964

40-
void CurlMultiManager::add_handle(CURL* easy, CompletionCallback callback) {
65+
void CurlMultiManager::add_handle(CURL* easy, curl_slist* headers, CompletionCallback callback) {
4166
{
4267
std::lock_guard<std::mutex> lock(mutex_);
4368
callbacks_[easy] = std::move(callback);
69+
headers_[easy] = headers;
4470
}
4571

4672
CURLMcode rc = curl_multi_add_handle(multi_handle_, easy);
4773
if (rc != CURLM_OK) {
4874
std::lock_guard<std::mutex> lock(mutex_);
4975
callbacks_.erase(easy);
76+
77+
// Free headers on error
78+
auto header_it = headers_.find(easy);
79+
if (header_it != headers_.end() && header_it->second) {
80+
curl_slist_free_all(header_it->second);
81+
}
82+
headers_.erase(easy);
83+
5084
std::cerr << "Failed to add handle to multi: "
5185
<< curl_multi_strerror(rc) << std::endl;
5286
}
@@ -57,6 +91,13 @@ void CurlMultiManager::remove_handle(CURL* easy) {
5791

5892
std::lock_guard<std::mutex> lock(mutex_);
5993
callbacks_.erase(easy);
94+
95+
// Free headers if they exist
96+
auto header_it = headers_.find(easy);
97+
if (header_it != headers_.end() && header_it->second) {
98+
curl_slist_free_all(header_it->second);
99+
}
100+
headers_.erase(easy);
60101
}
61102

62103
int CurlMultiManager::socket_callback(CURL* easy, curl_socket_t s, int what,
@@ -144,18 +185,31 @@ void CurlMultiManager::check_multi_info() {
144185
CURLcode result = msg->data.result;
145186

146187
CompletionCallback callback;
188+
curl_slist* headers = nullptr;
147189
{
148190
std::lock_guard<std::mutex> lock(mutex_);
149191
auto it = callbacks_.find(easy);
150192
if (it != callbacks_.end()) {
151193
callback = std::move(it->second);
152194
callbacks_.erase(it);
153195
}
196+
197+
// Get and remove headers
198+
auto header_it = headers_.find(easy);
199+
if (header_it != headers_.end()) {
200+
headers = header_it->second;
201+
headers_.erase(header_it);
202+
}
154203
}
155204

156205
// Remove from multi handle
157206
curl_multi_remove_handle(multi_handle_, easy);
158207

208+
// Free headers
209+
if (headers) {
210+
curl_slist_free_all(headers);
211+
}
212+
159213
// Invoke completion callback
160214
if (callback) {
161215
boost::asio::post(executor_, [callback = std::move(callback),
@@ -170,7 +224,7 @@ void CurlMultiManager::check_multi_info() {
170224
void CurlMultiManager::start_socket_monitor(SocketInfo* socket_info, int action) {
171225
if (!socket_info->descriptor) {
172226
// Create descriptor for this socket
173-
socket_info->descriptor = std::make_unique<
227+
socket_info->descriptor = std::make_shared<
174228
boost::asio::posix::stream_descriptor>(executor_);
175229
socket_info->descriptor->assign(socket_info->sockfd);
176230
}
@@ -182,28 +236,72 @@ void CurlMultiManager::start_socket_monitor(SocketInfo* socket_info, int action)
182236

183237
// Monitor for read events
184238
if (action & CURL_POLL_IN) {
185-
socket_info->descriptor->async_wait(
186-
boost::asio::posix::stream_descriptor::wait_read,
187-
[weak_self, sockfd](const boost::system::error_code& ec) {
188-
if (!ec) {
239+
// Use weak_ptr to safely detect when descriptor is deleted
240+
std::weak_ptr<boost::asio::posix::stream_descriptor> weak_descriptor = socket_info->descriptor;
241+
242+
// Use shared_ptr for recursive lambda
243+
auto read_handler = std::make_shared<std::function<void()>>();
244+
*read_handler = [weak_self, sockfd, weak_descriptor, read_handler]() {
245+
// Check if manager and descriptor are still valid
246+
auto self = weak_self.lock();
247+
auto descriptor = weak_descriptor.lock();
248+
if (!self || !descriptor) {
249+
return;
250+
}
251+
252+
descriptor->async_wait(
253+
boost::asio::posix::stream_descriptor::wait_read,
254+
[weak_self, sockfd, weak_descriptor, read_handler](const boost::system::error_code& ec) {
255+
// If operation was canceled or had an error, don't re-register
256+
if (ec) {
257+
return;
258+
}
259+
189260
if (auto self = weak_self.lock()) {
190261
self->handle_socket_action(sockfd, CURL_CSELECT_IN);
262+
263+
// Always try to re-register for continuous monitoring
264+
// The validity check at the top of read_handler will stop it if needed
265+
(*read_handler)(); // Recursive call
191266
}
192-
}
193-
});
267+
});
268+
};
269+
(*read_handler)(); // Initial call
194270
}
195271

196272
// Monitor for write events
197273
if (action & CURL_POLL_OUT) {
198-
socket_info->descriptor->async_wait(
199-
boost::asio::posix::stream_descriptor::wait_write,
200-
[weak_self, sockfd](const boost::system::error_code& ec) {
201-
if (!ec) {
274+
// Use weak_ptr to safely detect when descriptor is deleted
275+
std::weak_ptr<boost::asio::posix::stream_descriptor> weak_descriptor = socket_info->descriptor;
276+
277+
// Use shared_ptr for recursive lambda
278+
auto write_handler = std::make_shared<std::function<void()>>();
279+
*write_handler = [weak_self, sockfd, weak_descriptor, write_handler]() {
280+
// Check if manager and descriptor are still valid
281+
auto self = weak_self.lock();
282+
auto descriptor = weak_descriptor.lock();
283+
if (!self || !descriptor) {
284+
return;
285+
}
286+
287+
descriptor->async_wait(
288+
boost::asio::posix::stream_descriptor::wait_write,
289+
[weak_self, sockfd, weak_descriptor, write_handler](const boost::system::error_code& ec) {
290+
// If operation was canceled or had an error, don't re-register
291+
if (ec) {
292+
return;
293+
}
294+
202295
if (auto self = weak_self.lock()) {
203296
self->handle_socket_action(sockfd, CURL_CSELECT_OUT);
297+
298+
// Always try to re-register for continuous monitoring
299+
// The validity check at the top of write_handler will stop it if needed
300+
(*write_handler)(); // Recursive call
204301
}
205-
}
206-
});
302+
});
303+
};
304+
(*write_handler)(); // Initial call
207305
}
208306
}
209307

libs/internal/src/network/curl_requester.cpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,14 @@ void CurlRequester::PerformRequestWithMulti(std::shared_ptr<CurlMultiManager> mu
103103
// This will be cleaned up in the completion callback
104104
struct RequestContext {
105105
CURL* curl;
106-
curl_slist* headers;
107106
std::string url;
108107
std::string body; // Keep body alive
109108
std::string response_body;
110109
HttpResult::HeadersType response_headers;
111110
std::function<void(const HttpResult&)> callback;
112111

113112
~RequestContext() {
114-
if (headers) {
115-
curl_slist_free_all(headers);
116-
}
113+
// Headers are managed by CurlMultiManager
117114
if (curl) {
118115
curl_easy_cleanup(curl);
119116
}
@@ -122,9 +119,11 @@ void CurlRequester::PerformRequestWithMulti(std::shared_ptr<CurlMultiManager> mu
122119

123120
auto ctx = std::make_shared<RequestContext>();
124121
ctx->curl = curl;
125-
ctx->headers = nullptr;
126122
ctx->callback = std::move(cb);
127123

124+
// Headers will be managed by CurlMultiManager
125+
curl_slist* headers = nullptr;
126+
128127
// Helper macro to check curl_easy_setopt return values
129128
#define CURL_SETOPT_CHECK(handle, option, parameter) \
130129
do { \
@@ -133,6 +132,9 @@ void CurlRequester::PerformRequestWithMulti(std::shared_ptr<CurlMultiManager> mu
133132
std::string error_message = kErrorCurlPrefix; \
134133
error_message += "curl_easy_setopt failed for " #option ": "; \
135134
error_message += curl_easy_strerror(code); \
135+
if (headers) { \
136+
curl_slist_free_all(headers); \
137+
} \
136138
ctx->callback(HttpResult(error_message)); \
137139
return; \
138140
} \
@@ -170,19 +172,18 @@ void CurlRequester::PerformRequestWithMulti(std::shared_ptr<CurlMultiManager> mu
170172
auto const& base_headers = request.Properties().BaseHeaders();
171173
for (auto const& [key, value] : base_headers) {
172174
std::string header = key + kHeaderSeparator + value;
173-
const auto appendResult = curl_slist_append(ctx->headers, header.c_str());
175+
const auto appendResult = curl_slist_append(headers, header.c_str());
174176
if (!appendResult) {
175-
if (ctx->headers) {
176-
curl_slist_free_all(ctx->headers);
177-
ctx->headers = nullptr;
177+
if (headers) {
178+
curl_slist_free_all(headers);
178179
}
179180
ctx->callback(HttpResult(kErrorHeaderAppend));
180181
return;
181182
}
182-
ctx->headers = appendResult;
183+
headers = appendResult;
183184
}
184-
if (ctx->headers) {
185-
CURL_SETOPT_CHECK(curl, CURLOPT_HTTPHEADER, ctx->headers);
185+
if (headers) {
186+
CURL_SETOPT_CHECK(curl, CURLOPT_HTTPHEADER, headers);
186187
}
187188

188189
// Set timeouts with millisecond precision
@@ -232,7 +233,8 @@ void CurlRequester::PerformRequestWithMulti(std::shared_ptr<CurlMultiManager> mu
232233
#undef CURL_SETOPT_CHECK
233234

234235
// Add handle to multi manager for async processing
235-
multi_manager->add_handle(curl, [ctx](CURL* easy, CURLcode result) {
236+
// Headers will be freed automatically by CurlMultiManager
237+
multi_manager->add_handle(curl, headers, [ctx](CURL* easy, CURLcode result) {
236238
// This callback runs on the executor when the request completes
237239

238240
// Check for errors

0 commit comments

Comments
 (0)