Skip to content

Commit 08b472d

Browse files
committed
refactor(rest): Switch HttpClient to use connection pool
1 parent 883a43f commit 08b472d

File tree

3 files changed

+25
-89
lines changed

3 files changed

+25
-89
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ function(resolve_cpr_dependency)
449449
if(DEFINED ENV{ICEBERG_CPR_URL})
450450
set(CPR_URL "$ENV{ICEBERG_CPR_URL}")
451451
else()
452-
set(CPR_URL "https://github.com/libcpr/cpr/archive/refs/tags/1.12.0.tar.gz")
452+
set(CPR_URL "https://github.com/libcpr/cpr/archive/refs/tags/1.14.1.tar.gz")
453453
endif()
454454

455455
fetchcontent_declare(cpr

src/iceberg/catalog/rest/http_client.cc

Lines changed: 22 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -134,41 +134,10 @@ Status HandleFailureResponse(const cpr::Response& response,
134134

135135
} // namespace
136136

137-
void HttpClient::PrepareSession(
138-
const std::string& path, HttpMethod method,
139-
const std::unordered_map<std::string, std::string>& params,
140-
const std::unordered_map<std::string, std::string>& headers) {
141-
session_->SetUrl(cpr::Url{path});
142-
session_->SetParameters(GetParameters(params));
143-
session_->RemoveContent();
144-
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
145-
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
146-
// manually enforce HTTP GET to clear it.
147-
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
148-
switch (method) {
149-
case HttpMethod::kGet:
150-
session_->PrepareGet();
151-
break;
152-
case HttpMethod::kPost:
153-
session_->PreparePost();
154-
break;
155-
case HttpMethod::kPut:
156-
session_->PreparePut();
157-
break;
158-
case HttpMethod::kDelete:
159-
session_->PrepareDelete();
160-
break;
161-
case HttpMethod::kHead:
162-
session_->PrepareHead();
163-
break;
164-
}
165-
auto final_headers = MergeHeaders(default_headers_, headers);
166-
session_->SetHeader(final_headers);
167-
}
168-
169137
HttpClient::HttpClient(std::unordered_map<std::string, std::string> default_headers)
170138
: default_headers_{std::move(default_headers)},
171-
session_{std::make_unique<cpr::Session>()} {
139+
// session_{std::make_unique<cpr::Session>()} {
140+
connection_pool_{std::make_unique<cpr::ConnectionPool>()} {
172141
// Set default Content-Type for all requests (including GET/HEAD/DELETE).
173142
// Many systems require that content type is set regardless and will fail,
174143
// even on an empty bodied request.
@@ -182,12 +151,9 @@ Result<HttpResponse> HttpClient::Get(
182151
const std::string& path, const std::unordered_map<std::string, std::string>& params,
183152
const std::unordered_map<std::string, std::string>& headers,
184153
const ErrorHandler& error_handler) {
185-
cpr::Response response;
186-
{
187-
std::lock_guard guard(session_mutex_);
188-
PrepareSession(path, HttpMethod::kGet, params, headers);
189-
response = session_->Get();
190-
}
154+
auto final_headers = MergeHeaders(default_headers_, headers);
155+
cpr::Response response =
156+
cpr::Get(cpr::Url{path}, GetParameters(params), final_headers, *connection_pool_);
191157

192158
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
193159
HttpResponse http_response;
@@ -199,13 +165,9 @@ Result<HttpResponse> HttpClient::Post(
199165
const std::string& path, const std::string& body,
200166
const std::unordered_map<std::string, std::string>& headers,
201167
const ErrorHandler& error_handler) {
202-
cpr::Response response;
203-
{
204-
std::lock_guard guard(session_mutex_);
205-
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
206-
session_->SetBody(cpr::Body{body});
207-
response = session_->Post();
208-
}
168+
auto final_headers = MergeHeaders(default_headers_, headers);
169+
cpr::Response response =
170+
cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers, *connection_pool_);
209171

210172
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
211173
HttpResponse http_response;
@@ -218,25 +180,16 @@ Result<HttpResponse> HttpClient::PostForm(
218180
const std::unordered_map<std::string, std::string>& form_data,
219181
const std::unordered_map<std::string, std::string>& headers,
220182
const ErrorHandler& error_handler) {
221-
cpr::Response response;
222-
223-
{
224-
std::lock_guard guard(session_mutex_);
225-
226-
// Override default Content-Type (application/json) with form-urlencoded
227-
auto form_headers = headers;
228-
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
229-
230-
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
231-
std::vector<cpr::Pair> pair_list;
232-
pair_list.reserve(form_data.size());
233-
for (const auto& [key, val] : form_data) {
234-
pair_list.emplace_back(key, val);
235-
}
236-
session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));
237-
238-
response = session_->Post();
183+
auto final_headers = MergeHeaders(default_headers_, headers);
184+
final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded);
185+
std::vector<cpr::Pair> pair_list;
186+
pair_list.reserve(form_data.size());
187+
for (const auto& [key, val] : form_data) {
188+
pair_list.emplace_back(key, val);
239189
}
190+
cpr::Response response =
191+
cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(), pair_list.end()),
192+
final_headers, *connection_pool_);
240193

241194
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
242195
HttpResponse http_response;
@@ -247,12 +200,8 @@ Result<HttpResponse> HttpClient::PostForm(
247200
Result<HttpResponse> HttpClient::Head(
248201
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
249202
const ErrorHandler& error_handler) {
250-
cpr::Response response;
251-
{
252-
std::lock_guard guard(session_mutex_);
253-
PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
254-
response = session_->Head();
255-
}
203+
auto final_headers = MergeHeaders(default_headers_, headers);
204+
cpr::Response response = cpr::Head(cpr::Url{path}, final_headers, *connection_pool_);
256205

257206
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
258207
HttpResponse http_response;
@@ -264,12 +213,9 @@ Result<HttpResponse> HttpClient::Delete(
264213
const std::string& path, const std::unordered_map<std::string, std::string>& params,
265214
const std::unordered_map<std::string, std::string>& headers,
266215
const ErrorHandler& error_handler) {
267-
cpr::Response response;
268-
{
269-
std::lock_guard guard(session_mutex_);
270-
PrepareSession(path, HttpMethod::kDelete, params, headers);
271-
response = session_->Delete();
272-
}
216+
auto final_headers = MergeHeaders(default_headers_, headers);
217+
cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params),
218+
final_headers, *connection_pool_);
273219

274220
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
275221
HttpResponse http_response;

src/iceberg/catalog/rest/http_client.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@
2121

2222
#include <cstdint>
2323
#include <memory>
24-
#include <mutex>
2524
#include <string>
2625
#include <unordered_map>
2726

28-
#include "iceberg/catalog/rest/endpoint.h"
2927
#include "iceberg/catalog/rest/iceberg_rest_export.h"
3028
#include "iceberg/catalog/rest/type_fwd.h"
3129
#include "iceberg/result.h"
@@ -34,7 +32,7 @@
3432
/// \brief Http client for Iceberg REST API.
3533

3634
namespace cpr {
37-
class Session;
35+
class ConnectionPool;
3836
} // namespace cpr
3937

4038
namespace iceberg::rest {
@@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient {
110108
const ErrorHandler& error_handler);
111109

112110
private:
113-
void PrepareSession(const std::string& path, HttpMethod method,
114-
const std::unordered_map<std::string, std::string>& params,
115-
const std::unordered_map<std::string, std::string>& headers);
116-
117111
std::unordered_map<std::string, std::string> default_headers_;
118-
119-
// TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent
120-
// calls
121-
std::unique_ptr<cpr::Session> session_;
122-
mutable std::mutex session_mutex_;
112+
std::unique_ptr<cpr::ConnectionPool> connection_pool_;
123113
};
124114

125115
} // namespace iceberg::rest

0 commit comments

Comments
 (0)