|
| 1 | +#ifdef LD_CURL_NETWORKING |
| 2 | + |
| 3 | +#include "launchdarkly/network/curl_multi_manager.hpp" |
| 4 | + |
| 5 | +#include <boost/asio/post.hpp> |
| 6 | + |
| 7 | +#include <iostream> |
| 8 | + |
| 9 | +namespace launchdarkly::network { |
| 10 | + |
| 11 | +std::shared_ptr<CurlMultiManager> CurlMultiManager::create( |
| 12 | + boost::asio::any_io_executor executor) { |
| 13 | + // Can't use make_shared because constructor is private |
| 14 | + return std::shared_ptr<CurlMultiManager>( |
| 15 | + new CurlMultiManager(std::move(executor))); |
| 16 | +} |
| 17 | + |
| 18 | +CurlMultiManager::CurlMultiManager(boost::asio::any_io_executor executor) |
| 19 | + : executor_(std::move(executor)), |
| 20 | + multi_handle_(curl_multi_init()), |
| 21 | + timer_(executor_) { |
| 22 | + |
| 23 | + if (!multi_handle_) { |
| 24 | + throw std::runtime_error("Failed to initialize CURL multi handle"); |
| 25 | + } |
| 26 | + |
| 27 | + // Set callbacks for socket and timer notifications |
| 28 | + curl_multi_setopt(multi_handle_, CURLMOPT_SOCKETFUNCTION, socket_callback); |
| 29 | + curl_multi_setopt(multi_handle_, CURLMOPT_SOCKETDATA, this); |
| 30 | + curl_multi_setopt(multi_handle_, CURLMOPT_TIMERFUNCTION, timer_callback); |
| 31 | + curl_multi_setopt(multi_handle_, CURLMOPT_TIMERDATA, this); |
| 32 | +} |
| 33 | + |
| 34 | +CurlMultiManager::~CurlMultiManager() { |
| 35 | + if (multi_handle_) { |
| 36 | + curl_multi_cleanup(multi_handle_); |
| 37 | + } |
| 38 | +} |
| 39 | + |
| 40 | +void CurlMultiManager::add_handle(CURL* easy, CompletionCallback callback) { |
| 41 | + { |
| 42 | + std::lock_guard<std::mutex> lock(mutex_); |
| 43 | + callbacks_[easy] = std::move(callback); |
| 44 | + } |
| 45 | + |
| 46 | + CURLMcode rc = curl_multi_add_handle(multi_handle_, easy); |
| 47 | + if (rc != CURLM_OK) { |
| 48 | + std::lock_guard<std::mutex> lock(mutex_); |
| 49 | + callbacks_.erase(easy); |
| 50 | + std::cerr << "Failed to add handle to multi: " |
| 51 | + << curl_multi_strerror(rc) << std::endl; |
| 52 | + } |
| 53 | +} |
| 54 | + |
| 55 | +void CurlMultiManager::remove_handle(CURL* easy) { |
| 56 | + curl_multi_remove_handle(multi_handle_, easy); |
| 57 | + |
| 58 | + std::lock_guard<std::mutex> lock(mutex_); |
| 59 | + callbacks_.erase(easy); |
| 60 | +} |
| 61 | + |
| 62 | +int CurlMultiManager::socket_callback(CURL* easy, curl_socket_t s, int what, |
| 63 | + void* userp, void* socketp) { |
| 64 | + auto* manager = static_cast<CurlMultiManager*>(userp); |
| 65 | + auto* socket_info = static_cast<SocketInfo*>(socketp); |
| 66 | + |
| 67 | + if (what == CURL_POLL_REMOVE) { |
| 68 | + if (socket_info) { |
| 69 | + manager->stop_socket_monitor(socket_info); |
| 70 | + curl_multi_assign(manager->multi_handle_, s, nullptr); |
| 71 | + delete socket_info; |
| 72 | + } |
| 73 | + } else { |
| 74 | + if (!socket_info) { |
| 75 | + // New socket |
| 76 | + socket_info = new SocketInfo{s, nullptr, 0}; |
| 77 | + curl_multi_assign(manager->multi_handle_, s, socket_info); |
| 78 | + } |
| 79 | + |
| 80 | + manager->start_socket_monitor(socket_info, what); |
| 81 | + } |
| 82 | + |
| 83 | + return 0; |
| 84 | +} |
| 85 | + |
| 86 | +int CurlMultiManager::timer_callback(CURLM* multi, long timeout_ms, void* userp) { |
| 87 | + auto* manager = static_cast<CurlMultiManager*>(userp); |
| 88 | + |
| 89 | + // Cancel any existing timer |
| 90 | + manager->timer_.cancel(); |
| 91 | + |
| 92 | + if (timeout_ms > 0) { |
| 93 | + // Set new timer |
| 94 | + manager->timer_.expires_after(std::chrono::milliseconds(timeout_ms)); |
| 95 | + manager->timer_.async_wait([weak_self = manager->weak_from_this()]( |
| 96 | + const boost::system::error_code& ec) { |
| 97 | + if (!ec) { |
| 98 | + if (auto self = weak_self.lock()) { |
| 99 | + self->handle_timeout(); |
| 100 | + } |
| 101 | + } |
| 102 | + }); |
| 103 | + } else if (timeout_ms == 0) { |
| 104 | + // Call socket_action immediately |
| 105 | + boost::asio::post(manager->executor_, [weak_self = manager->weak_from_this()]() { |
| 106 | + if (auto self = weak_self.lock()) { |
| 107 | + self->handle_timeout(); |
| 108 | + } |
| 109 | + }); |
| 110 | + } |
| 111 | + // If timeout_ms < 0, no timeout (delete timer) |
| 112 | + |
| 113 | + return 0; |
| 114 | +} |
| 115 | + |
| 116 | +void CurlMultiManager::handle_socket_action(curl_socket_t s, int event_bitmask) { |
| 117 | + int running_handles = 0; |
| 118 | + CURLMcode rc = curl_multi_socket_action(multi_handle_, s, event_bitmask, |
| 119 | + &running_handles); |
| 120 | + |
| 121 | + if (rc != CURLM_OK) { |
| 122 | + std::cerr << "curl_multi_socket_action failed: " |
| 123 | + << curl_multi_strerror(rc) << std::endl; |
| 124 | + } |
| 125 | + |
| 126 | + check_multi_info(); |
| 127 | + |
| 128 | + if (running_handles != still_running_) { |
| 129 | + still_running_ = running_handles; |
| 130 | + } |
| 131 | +} |
| 132 | + |
| 133 | +void CurlMultiManager::handle_timeout() { |
| 134 | + handle_socket_action(CURL_SOCKET_TIMEOUT, 0); |
| 135 | +} |
| 136 | + |
| 137 | +void CurlMultiManager::check_multi_info() { |
| 138 | + int msgs_in_queue; |
| 139 | + CURLMsg* msg; |
| 140 | + |
| 141 | + while ((msg = curl_multi_info_read(multi_handle_, &msgs_in_queue))) { |
| 142 | + if (msg->msg == CURLMSG_DONE) { |
| 143 | + CURL* easy = msg->easy_handle; |
| 144 | + CURLcode result = msg->data.result; |
| 145 | + |
| 146 | + CompletionCallback callback; |
| 147 | + { |
| 148 | + std::lock_guard<std::mutex> lock(mutex_); |
| 149 | + auto it = callbacks_.find(easy); |
| 150 | + if (it != callbacks_.end()) { |
| 151 | + callback = std::move(it->second); |
| 152 | + callbacks_.erase(it); |
| 153 | + } |
| 154 | + } |
| 155 | + |
| 156 | + // Remove from multi handle |
| 157 | + curl_multi_remove_handle(multi_handle_, easy); |
| 158 | + |
| 159 | + // Invoke completion callback |
| 160 | + if (callback) { |
| 161 | + boost::asio::post(executor_, [callback = std::move(callback), |
| 162 | + easy, result]() { |
| 163 | + callback(easy, result); |
| 164 | + }); |
| 165 | + } |
| 166 | + } |
| 167 | + } |
| 168 | +} |
| 169 | + |
| 170 | +void CurlMultiManager::start_socket_monitor(SocketInfo* socket_info, int action) { |
| 171 | + if (!socket_info->descriptor) { |
| 172 | + // Create descriptor for this socket |
| 173 | + socket_info->descriptor = std::make_unique< |
| 174 | + boost::asio::posix::stream_descriptor>(executor_); |
| 175 | + socket_info->descriptor->assign(socket_info->sockfd); |
| 176 | + } |
| 177 | + |
| 178 | + socket_info->action = action; |
| 179 | + |
| 180 | + auto weak_self = weak_from_this(); |
| 181 | + curl_socket_t sockfd = socket_info->sockfd; |
| 182 | + |
| 183 | + // Monitor for read events |
| 184 | + if (action & CURL_POLL_IN) { |
| 185 | + socket_info->descriptor->async_wait( |
| 186 | + boost::asio::posix::stream_descriptor::wait_read, |
| 187 | + [weak_self, sockfd](const boost::system::error_code& ec) { |
| 188 | + if (!ec) { |
| 189 | + if (auto self = weak_self.lock()) { |
| 190 | + self->handle_socket_action(sockfd, CURL_CSELECT_IN); |
| 191 | + } |
| 192 | + } |
| 193 | + }); |
| 194 | + } |
| 195 | + |
| 196 | + // Monitor for write events |
| 197 | + if (action & CURL_POLL_OUT) { |
| 198 | + socket_info->descriptor->async_wait( |
| 199 | + boost::asio::posix::stream_descriptor::wait_write, |
| 200 | + [weak_self, sockfd](const boost::system::error_code& ec) { |
| 201 | + if (!ec) { |
| 202 | + if (auto self = weak_self.lock()) { |
| 203 | + self->handle_socket_action(sockfd, CURL_CSELECT_OUT); |
| 204 | + } |
| 205 | + } |
| 206 | + }); |
| 207 | + } |
| 208 | +} |
| 209 | + |
| 210 | +void CurlMultiManager::stop_socket_monitor(SocketInfo* socket_info) { |
| 211 | + if (socket_info->descriptor) { |
| 212 | + socket_info->descriptor->cancel(); |
| 213 | + socket_info->descriptor->release(); |
| 214 | + socket_info->descriptor.reset(); |
| 215 | + } |
| 216 | +} |
| 217 | + |
| 218 | +} // namespace launchdarkly::network |
| 219 | + |
| 220 | +#endif // LD_CURL_NETWORKING |
0 commit comments