-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathhttp_client.cpp
More file actions
206 lines (188 loc) · 7.81 KB
/
http_client.cpp
File metadata and controls
206 lines (188 loc) · 7.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#include "databento/detail/http_client.hpp"
#include <sys/types.h>
#include <chrono> // seconds
#include <memory> // make_unique
#include <sstream> // ostringstream
#include "databento/constants.hpp" // kUserAgent
#include "databento/exceptions.hpp" // HttpResponseError, HttpRequestError, JsonResponseError
#include "databento/ireadable.hpp"
#include "databento/log.hpp" // ILogReceiver, LogLevel
#include "detail/http_stream_reader.hpp" // HttpStreamReader
using databento::detail::HttpClient;
constexpr std::chrono::seconds kTimeout{100};
const httplib::Headers& HttpClient::BaseHeaders() {
static const httplib::Headers kHeaders{
{"accept", "application/json"},
{"user-agent", kUserAgent},
};
return kHeaders;
}
HttpClient::HttpClient(databento::ILogReceiver* log_receiver, const std::string& key,
const std::string& gateway)
: log_receiver_{log_receiver}, client_{gateway} {
auto headers = HttpClient::BaseHeaders();
headers.insert(httplib::make_basic_authentication_header(key, ""));
client_.set_default_headers(headers);
client_.set_basic_auth(key, "");
client_.set_read_timeout(kTimeout);
client_.set_write_timeout(kTimeout);
}
HttpClient::HttpClient(databento::ILogReceiver* log_receiver, const std::string& key,
const std::string& gateway, std::uint16_t port)
: log_receiver_{log_receiver}, client_{gateway, port} {
auto headers = HttpClient::BaseHeaders();
headers.insert(httplib::make_basic_authentication_header(key, ""));
client_.set_default_headers(headers);
client_.set_basic_auth(key, "");
client_.set_read_timeout(kTimeout);
client_.set_write_timeout(kTimeout);
}
nlohmann::json HttpClient::GetJson(const std::string& path,
const httplib::Params& params) {
httplib::Result res = client_.Get(path, params, httplib::Headers{});
return HttpClient::CheckAndParseResponse(path, std::move(res));
}
nlohmann::json HttpClient::PostJson(const std::string& path,
const httplib::Params& form_params) {
// params will be encoded as form data
httplib::Result res = client_.Post(path, {}, form_params);
return HttpClient::CheckAndParseResponse(path, std::move(res));
}
void HttpClient::GetRawStream(const std::string& path, const httplib::Headers& headers,
const httplib::ContentReceiver& callback) {
std::string err_body{};
int err_status{};
const httplib::Result res = client_.Get(
path, headers, MakeStreamResponseHandler(err_status),
[&callback, &err_body, &err_status](const char* data, std::size_t length) {
// if an error response was received, read all content into
// err_body
if (err_status > 0) {
err_body.append(data, length);
return true;
}
return callback(data, length);
});
CheckStatusAndStreamRes(path, err_status, std::move(err_body), res);
}
void HttpClient::PostRawStream(const std::string& path,
const httplib::Params& form_params,
const httplib::ContentReceiver& callback) {
std::string err_body{};
int err_status{};
httplib::Request req;
req.method = "POST";
req.set_header("Content-Type", "application/x-www-form-urlencoded");
req.path = path;
req.body = httplib::detail::params_to_query_str(form_params);
req.response_handler = MakeStreamResponseHandler(err_status);
req.content_receiver = [&callback, &err_body, &err_status](
const char* data, std::size_t length, std::uint64_t,
std::uint64_t) {
// if an error response was received, read all content into
// err_body
if (err_status > 0) {
err_body.append(data, length);
return true;
}
return callback(data, length);
};
// NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection): dependency code
const httplib::Result res = client_.send(req);
CheckStatusAndStreamRes(path, err_status, std::move(err_body), res);
}
std::unique_ptr<databento::IReadable> HttpClient::OpenPostStream(
const std::string& path, const httplib::Params& form_params) {
const auto body = httplib::detail::params_to_query_str(form_params);
// NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection): dependency code
auto handle = client_.open_stream("POST", path, {}, {}, body,
"application/x-www-form-urlencoded");
if (handle.error != httplib::Error::Success) {
throw HttpRequestError{path, handle.error};
}
if (!handle.is_valid()) {
throw HttpRequestError{path, httplib::Error::Connection};
}
CheckWarnings(*handle.response);
if (IsErrorStatus(handle.response->status)) {
// Read the full error body
std::string err_body;
std::array<char, 4096> buf{};
ssize_t n;
while ((n = handle.read(buf.data(), buf.size())) > 0) {
err_body.append(buf.data(), static_cast<std::size_t>(n));
}
throw HttpResponseError{path, handle.response->status, std::move(err_body)};
}
return std::make_unique<HttpStreamReader>(std::move(handle));
}
httplib::ResponseHandler HttpClient::MakeStreamResponseHandler(int& out_status) {
return [this, &out_status](const httplib::Response& resp) {
if (HttpClient::IsErrorStatus(resp.status)) {
out_status = resp.status;
}
CheckWarnings(resp);
return true;
};
}
void HttpClient::CheckStatusAndStreamRes(const std::string& path, int status_code,
std::string&& err_body,
const httplib::Result& res) {
if (status_code > 0) {
throw HttpResponseError{path, status_code, std::move(err_body)};
}
if (res.error() != httplib::Error::Success &&
// canceled happens if `callback` returns false, which is based on the
// user input, and therefore not exceptional
res.error() != httplib::Error::Canceled) {
throw HttpRequestError{path, res.error()};
}
}
nlohmann::json HttpClient::CheckAndParseResponse(const std::string& path,
httplib::Result&& res) const {
if (res.error() != httplib::Error::Success) {
throw HttpRequestError{path, res.error()};
}
auto& response = res.value();
const auto status_code = response.status;
if (HttpClient::IsErrorStatus(status_code)) {
throw HttpResponseError{path, status_code, std::move(response.body)};
}
CheckWarnings(response);
try {
return nlohmann::json::parse(std::move(response.body));
} catch (const nlohmann::json::parse_error& parse_err) {
throw JsonResponseError::ParseError(path, parse_err);
}
}
void HttpClient::CheckWarnings(const httplib::Response& response) const {
// Returns empty string if not found. `get_header_value` is case insensitive
const auto raw = response.get_header_value("X-Warning");
if (!raw.empty()) {
try {
const auto json = nlohmann::json::parse(raw);
if (json.is_array()) {
for (const auto& warning_json : json.items()) {
const std::string warning = warning_json.value();
std::ostringstream msg;
msg << "[HttpClient::CheckWarnings] Server " << warning;
log_receiver_->Receive(LogLevel::Warning, msg.str());
}
return;
}
} catch (const std::exception& exc) {
std::ostringstream msg;
msg << "[HttpClient::CheckWarnings] Failed to parse warnings from HTTP "
"header: "
<< exc.what() << ". Raw contents: " << raw;
log_receiver_->Receive(LogLevel::Warning, msg.str());
return;
}
std::ostringstream msg;
msg << "[HttpClient::CheckWarnings] Failed to parse warnings from HTTP "
"header. Raw contents: "
<< raw;
log_receiver_->Receive(LogLevel::Warning, msg.str());
}
}
bool HttpClient::IsErrorStatus(int status_code) { return status_code >= 400; }