Skip to content

Commit 31d4a4a

Browse files
authored
refactor(rest): switch HttpClient to use connection pool (#530)
1 parent 9ffb29f commit 31d4a4a

File tree

2 files changed

+23
-88
lines changed

2 files changed

+23
-88
lines changed

src/iceberg/catalog/rest/http_client.cc

Lines changed: 21 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -134,41 +134,9 @@ Status HandleFailureResponse(const cpr::Response& response,
134134

135135
} // namespace
136136

137-
void HttpClient::PrepareSession(
138-
const std::string& path, HttpMethod method,
139-
const std::unordered_map<std::string, std::string>& params,
140-
const std::unordered_map<std::string, std::string>& headers) {
141-
session_->SetUrl(cpr::Url{path});
142-
session_->SetParameters(GetParameters(params));
143-
session_->RemoveContent();
144-
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
145-
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
146-
// manually enforce HTTP GET to clear it.
147-
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
148-
switch (method) {
149-
case HttpMethod::kGet:
150-
session_->PrepareGet();
151-
break;
152-
case HttpMethod::kPost:
153-
session_->PreparePost();
154-
break;
155-
case HttpMethod::kPut:
156-
session_->PreparePut();
157-
break;
158-
case HttpMethod::kDelete:
159-
session_->PrepareDelete();
160-
break;
161-
case HttpMethod::kHead:
162-
session_->PrepareHead();
163-
break;
164-
}
165-
auto final_headers = MergeHeaders(default_headers_, headers);
166-
session_->SetHeader(final_headers);
167-
}
168-
169137
HttpClient::HttpClient(std::unordered_map<std::string, std::string> default_headers)
170138
: default_headers_{std::move(default_headers)},
171-
session_{std::make_unique<cpr::Session>()} {
139+
connection_pool_{std::make_unique<cpr::ConnectionPool>()} {
172140
// Set default Content-Type for all requests (including GET/HEAD/DELETE).
173141
// Many systems require that content type is set regardless and will fail,
174142
// even on an empty bodied request.
@@ -182,12 +150,9 @@ Result<HttpResponse> HttpClient::Get(
182150
const std::string& path, const std::unordered_map<std::string, std::string>& params,
183151
const std::unordered_map<std::string, std::string>& headers,
184152
const ErrorHandler& error_handler) {
185-
cpr::Response response;
186-
{
187-
std::lock_guard guard(session_mutex_);
188-
PrepareSession(path, HttpMethod::kGet, params, headers);
189-
response = session_->Get();
190-
}
153+
auto final_headers = MergeHeaders(default_headers_, headers);
154+
cpr::Response response =
155+
cpr::Get(cpr::Url{path}, GetParameters(params), final_headers, *connection_pool_);
191156

192157
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
193158
HttpResponse http_response;
@@ -199,13 +164,9 @@ Result<HttpResponse> HttpClient::Post(
199164
const std::string& path, const std::string& body,
200165
const std::unordered_map<std::string, std::string>& headers,
201166
const ErrorHandler& error_handler) {
202-
cpr::Response response;
203-
{
204-
std::lock_guard guard(session_mutex_);
205-
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
206-
session_->SetBody(cpr::Body{body});
207-
response = session_->Post();
208-
}
167+
auto final_headers = MergeHeaders(default_headers_, headers);
168+
cpr::Response response =
169+
cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers, *connection_pool_);
209170

210171
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
211172
HttpResponse http_response;
@@ -218,25 +179,16 @@ Result<HttpResponse> HttpClient::PostForm(
218179
const std::unordered_map<std::string, std::string>& form_data,
219180
const std::unordered_map<std::string, std::string>& headers,
220181
const ErrorHandler& error_handler) {
221-
cpr::Response response;
222-
223-
{
224-
std::lock_guard guard(session_mutex_);
225-
226-
// Override default Content-Type (application/json) with form-urlencoded
227-
auto form_headers = headers;
228-
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
229-
230-
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
231-
std::vector<cpr::Pair> pair_list;
232-
pair_list.reserve(form_data.size());
233-
for (const auto& [key, val] : form_data) {
234-
pair_list.emplace_back(key, val);
235-
}
236-
session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));
237-
238-
response = session_->Post();
182+
auto final_headers = MergeHeaders(default_headers_, headers);
183+
final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded);
184+
std::vector<cpr::Pair> pair_list;
185+
pair_list.reserve(form_data.size());
186+
for (const auto& [key, val] : form_data) {
187+
pair_list.emplace_back(key, val);
239188
}
189+
cpr::Response response =
190+
cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(), pair_list.end()),
191+
final_headers, *connection_pool_);
240192

241193
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
242194
HttpResponse http_response;
@@ -247,12 +199,8 @@ Result<HttpResponse> HttpClient::PostForm(
247199
Result<HttpResponse> HttpClient::Head(
248200
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
249201
const ErrorHandler& error_handler) {
250-
cpr::Response response;
251-
{
252-
std::lock_guard guard(session_mutex_);
253-
PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
254-
response = session_->Head();
255-
}
202+
auto final_headers = MergeHeaders(default_headers_, headers);
203+
cpr::Response response = cpr::Head(cpr::Url{path}, final_headers, *connection_pool_);
256204

257205
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
258206
HttpResponse http_response;
@@ -264,12 +212,9 @@ Result<HttpResponse> HttpClient::Delete(
264212
const std::string& path, const std::unordered_map<std::string, std::string>& params,
265213
const std::unordered_map<std::string, std::string>& headers,
266214
const ErrorHandler& error_handler) {
267-
cpr::Response response;
268-
{
269-
std::lock_guard guard(session_mutex_);
270-
PrepareSession(path, HttpMethod::kDelete, params, headers);
271-
response = session_->Delete();
272-
}
215+
auto final_headers = MergeHeaders(default_headers_, headers);
216+
cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params),
217+
final_headers, *connection_pool_);
273218

274219
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
275220
HttpResponse http_response;

src/iceberg/catalog/rest/http_client.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@
2121

2222
#include <cstdint>
2323
#include <memory>
24-
#include <mutex>
2524
#include <string>
2625
#include <unordered_map>
2726

28-
#include "iceberg/catalog/rest/endpoint.h"
2927
#include "iceberg/catalog/rest/iceberg_rest_export.h"
3028
#include "iceberg/catalog/rest/type_fwd.h"
3129
#include "iceberg/result.h"
@@ -34,7 +32,7 @@
3432
/// \brief Http client for Iceberg REST API.
3533

3634
namespace cpr {
37-
class Session;
35+
class ConnectionPool;
3836
} // namespace cpr
3937

4038
namespace iceberg::rest {
@@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient {
110108
const ErrorHandler& error_handler);
111109

112110
private:
113-
void PrepareSession(const std::string& path, HttpMethod method,
114-
const std::unordered_map<std::string, std::string>& params,
115-
const std::unordered_map<std::string, std::string>& headers);
116-
117111
std::unordered_map<std::string, std::string> default_headers_;
118-
119-
// TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent
120-
// calls
121-
std::unique_ptr<cpr::Session> session_;
122-
mutable std::mutex session_mutex_;
112+
std::unique_ptr<cpr::ConnectionPool> connection_pool_;
123113
};
124114

125115
} // namespace iceberg::rest

0 commit comments

Comments
 (0)