@@ -33,15 +33,23 @@ CurlMultiManager::CurlMultiManager(boost::asio::any_io_executor executor)
3333
3434CurlMultiManager::~CurlMultiManager () {
3535 if (multi_handle_) {
36- // Extract and clear pending handles, callbacks, and headers
36+ // Extract and clear pending handles, callbacks, headers, and sockets
3737 std::map<CURL *, CompletionCallback> pending_callbacks;
3838 std::map<CURL *, curl_slist*> pending_headers;
39+ std::map<curl_socket_t , SocketInfo> pending_sockets;
3940 {
4041 std::lock_guard<std::mutex> lock (mutex_);
4142 pending_callbacks = std::move (callbacks_);
4243 pending_headers = std::move (headers_);
44+ pending_sockets = std::move (sockets_);
4345 callbacks_.clear ();
4446 headers_.clear ();
47+ sockets_.clear ();
48+ }
49+
50+ // Clean up all remaining sockets
51+ for (auto & [sockfd, socket_info] : pending_sockets) {
52+ stop_socket_monitor (&socket_info);
4553 }
4654
4755 // Remove handles from multi and cleanup resources
@@ -103,22 +111,20 @@ void CurlMultiManager::remove_handle(CURL* easy) {
103111int CurlMultiManager::socket_callback (CURL * easy, curl_socket_t s, int what,
104112 void * userp, void * socketp) {
105113 auto * manager = static_cast <CurlMultiManager*>(userp);
106- auto * socket_info = static_cast <SocketInfo*>(socketp);
114+
115+ std::lock_guard<std::mutex> lock (manager->mutex_ );
107116
108117 if (what == CURL_POLL_REMOVE ) {
109- if (socket_info) {
110- manager->stop_socket_monitor (socket_info);
111- curl_multi_assign (manager->multi_handle_ , s, nullptr );
112- delete socket_info;
118+ // Remove socket from managed container
119+ auto it = manager->sockets_ .find (s);
120+ if (it != manager->sockets_ .end ()) {
121+ manager->stop_socket_monitor (&it->second );
122+ manager->sockets_ .erase (it);
113123 }
114124 } else {
115- if (!socket_info) {
116- // New socket
117- socket_info = new SocketInfo{s, nullptr , 0 };
118- curl_multi_assign (manager->multi_handle_ , s, socket_info);
119- }
120-
121- manager->start_socket_monitor (socket_info, what);
125+ // Add or update socket in managed container
126+ auto [it, inserted] = manager->sockets_ .try_emplace (s, SocketInfo{s, nullptr , 0 });
127+ manager->start_socket_monitor (&it->second , what);
122128 }
123129
124130 return 0 ;
@@ -229,79 +235,95 @@ void CurlMultiManager::start_socket_monitor(SocketInfo* socket_info, int action)
229235 socket_info->descriptor ->assign (socket_info->sockfd );
230236 }
231237
238+ // Check if action has changed
239+ bool action_changed = (socket_info->action != action);
232240 socket_info->action = action;
233241
234242 auto weak_self = weak_from_this ();
235243 curl_socket_t sockfd = socket_info->sockfd ;
236244
237245 // Monitor for read events
238246 if (action & CURL_POLL_IN ) {
239- // Use weak_ptr to safely detect when descriptor is deleted
240- std::weak_ptr<boost::asio::posix::stream_descriptor> weak_descriptor = socket_info->descriptor ;
241-
242- // Use shared_ptr for recursive lambda
243- auto read_handler = std::make_shared<std::function<void ()>>();
244- *read_handler = [weak_self, sockfd, weak_descriptor, read_handler]() {
245- // Check if manager and descriptor are still valid
246- auto self = weak_self.lock ();
247- auto descriptor = weak_descriptor.lock ();
248- if (!self || !descriptor) {
249- return ;
250- }
247+ // Only create new handler if we don't have one or if action changed
248+ if (!socket_info->read_handler || action_changed) {
249+ // Use weak_ptr to safely detect when descriptor is deleted
250+ std::weak_ptr<boost::asio::posix::stream_descriptor> weak_descriptor = socket_info->descriptor ;
251+
252+ // Create and store handler in SocketInfo to keep it alive
253+ // Use weak_ptr in capture to avoid circular reference
254+ socket_info->read_handler = std::make_shared<std::function<void ()>>();
255+ std::weak_ptr<std::function<void ()>> weak_read_handler = socket_info->read_handler ;
256+ *socket_info->read_handler = [weak_self, sockfd, weak_descriptor, weak_read_handler]() {
257+ // Check if manager and descriptor are still valid
258+ auto self = weak_self.lock ();
259+ auto descriptor = weak_descriptor.lock ();
260+ if (!self || !descriptor) {
261+ return ;
262+ }
251263
252- descriptor->async_wait (
253- boost::asio::posix::stream_descriptor::wait_read,
254- [weak_self, sockfd, weak_descriptor, read_handler](const boost::system::error_code& ec) {
255- // If operation was canceled or had an error, don't re-register
256- if (ec) {
257- return ;
258- }
259-
260- if (auto self = weak_self.lock ()) {
261- self->handle_socket_action (sockfd, CURL_CSELECT_IN );
262-
263- // Always try to re-register for continuous monitoring
264- // The validity check at the top of read_handler will stop it if needed
265- (*read_handler)(); // Recursive call
266- }
267- });
268- };
269- (*read_handler)(); // Initial call
264+ descriptor->async_wait (
265+ boost::asio::posix::stream_descriptor::wait_read,
266+ [weak_self, sockfd, weak_descriptor, weak_read_handler](const boost::system::error_code& ec) {
267+ // If operation was canceled or had an error, don't re-register
268+ if (ec) {
269+ return ;
270+ }
271+
272+ if (auto self = weak_self.lock ()) {
273+ self->handle_socket_action (sockfd, CURL_CSELECT_IN );
274+
275+ // Always try to re-register for continuous monitoring
276+ // The validity check at the top of read_handler will stop it if needed
277+ if (auto handler = weak_read_handler.lock ()) {
278+ (*handler)(); // Recursive call
279+ }
280+ }
281+ });
282+ };
283+ (*socket_info->read_handler )(); // Initial call
284+ }
270285 }
271286
272287 // Monitor for write events
273288 if (action & CURL_POLL_OUT ) {
274- // Use weak_ptr to safely detect when descriptor is deleted
275- std::weak_ptr<boost::asio::posix::stream_descriptor> weak_descriptor = socket_info->descriptor ;
276-
277- // Use shared_ptr for recursive lambda
278- auto write_handler = std::make_shared<std::function<void ()>>();
279- *write_handler = [weak_self, sockfd, weak_descriptor, write_handler]() {
280- // Check if manager and descriptor are still valid
281- auto self = weak_self.lock ();
282- auto descriptor = weak_descriptor.lock ();
283- if (!self || !descriptor) {
284- return ;
285- }
289+ // Only create new handler if we don't have one or if action changed
290+ if (!socket_info->write_handler || action_changed) {
291+ // Use weak_ptr to safely detect when descriptor is deleted
292+ std::weak_ptr<boost::asio::posix::stream_descriptor> weak_descriptor = socket_info->descriptor ;
293+
294+ // Create and store handler in SocketInfo to keep it alive
295+ // Use weak_ptr in capture to avoid circular reference
296+ socket_info->write_handler = std::make_shared<std::function<void ()>>();
297+ std::weak_ptr<std::function<void ()>> weak_write_handler = socket_info->write_handler ;
298+ *socket_info->write_handler = [weak_self, sockfd, weak_descriptor, weak_write_handler]() {
299+ // Check if manager and descriptor are still valid
300+ auto self = weak_self.lock ();
301+ auto descriptor = weak_descriptor.lock ();
302+ if (!self || !descriptor) {
303+ return ;
304+ }
286305
287- descriptor->async_wait (
288- boost::asio::posix::stream_descriptor::wait_write,
289- [weak_self, sockfd, weak_descriptor, write_handler](const boost::system::error_code& ec) {
290- // If operation was canceled or had an error, don't re-register
291- if (ec) {
292- return ;
293- }
294-
295- if (auto self = weak_self.lock ()) {
296- self->handle_socket_action (sockfd, CURL_CSELECT_OUT );
297-
298- // Always try to re-register for continuous monitoring
299- // The validity check at the top of write_handler will stop it if needed
300- (*write_handler)(); // Recursive call
301- }
302- });
303- };
304- (*write_handler)(); // Initial call
306+ descriptor->async_wait (
307+ boost::asio::posix::stream_descriptor::wait_write,
308+ [weak_self, sockfd, weak_descriptor, weak_write_handler](const boost::system::error_code& ec) {
309+ // If operation was canceled or had an error, don't re-register
310+ if (ec) {
311+ return ;
312+ }
313+
314+ if (auto self = weak_self.lock ()) {
315+ self->handle_socket_action (sockfd, CURL_CSELECT_OUT );
316+
317+ // Always try to re-register for continuous monitoring
318+ // The validity check at the top of write_handler will stop it if needed
319+ if (auto handler = weak_write_handler.lock ()) {
320+ (*handler)(); // Recursive call
321+ }
322+ }
323+ });
324+ };
325+ (*socket_info->write_handler )(); // Initial call
326+ }
305327 }
306328}
307329
@@ -311,6 +333,9 @@ void CurlMultiManager::stop_socket_monitor(SocketInfo* socket_info) {
311333 socket_info->descriptor ->release ();
312334 socket_info->descriptor .reset ();
313335 }
336+ // Clear handlers to break any weak_ptr references
337+ socket_info->read_handler .reset ();
338+ socket_info->write_handler .reset ();
314339}
315340
316341} // namespace launchdarkly::network
0 commit comments