Skip to content

Commit 4902ba6

Browse files
harshal24-chavanMaratha96
authored andcommitted
Fix #2530: Replace blocking requestsBufferSize future with lock-free atomic
- Introduced `outstandingRequests()` to `HttpClient` to reflect total connection load (buffered requests + in-flight pipelined requests). - Implemented an `std::atomic<std::size_t>` tracker for `pipeliningCallbacks_` for inspection of in-flight requests from the main thread. - Retained helper functions for `requestsBuffer_` to safely guard `std::list` mutations. - Added `HttpClientOutstandingRequests` integration test using `PipeliningTest::normalPipe`.
1 parent e890559 commit 4902ba6

5 files changed

Lines changed: 122 additions & 35 deletions

File tree

lib/inc/drogon/HttpClient.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ class DROGON_EXPORT HttpClient : public trantor::NonCopyable
185185
*/
186186
virtual std::size_t requestsBufferSize() = 0;
187187

188+
/**
189+
* @brief Get the total number of outstanding requests (buffered + in-flight).
190+
*/
191+
virtual std::size_t outstandingRequests() const = 0;
192+
188193
/// Set the pipelining depth, which is the number of requests that are not
189194
/// responding.
190195
/**

lib/src/HttpClientImpl.cc

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ void HttpClientImpl::createTcpClient()
7979
thisPtr->requestsBuffer_.front().first);
8080
thisPtr->pipeliningCallbacks_.push(
8181
std::move(thisPtr->requestsBuffer_.front()));
82-
thisPtr->requestsBuffer_.pop_front();
82+
thisPtr->pipeliningCallbacksSize_.fetch_add(
83+
1, std::memory_order_relaxed);
84+
85+
thisPtr->popFrontRequest();
8386
}
8487
}
8588
else
@@ -340,7 +343,7 @@ void HttpClientImpl::sendRequestInLoop(const HttpRequestPtr &req,
340343
{
341344
if (iter->first == callbackParamsPtr->requestPtr)
342345
{
343-
thisPtr->requestsBuffer_.erase(iter);
346+
thisPtr->eraseRequest(iter);
344347
break;
345348
}
346349
}
@@ -419,12 +422,12 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
419422
{
420423
auto callbackPtr =
421424
std::make_shared<drogon::HttpReqCallback>(std::move(callback));
422-
requestsBuffer_.push_back(
423-
{req,
424-
[thisPtr = shared_from_this(),
425-
callbackPtr](ReqResult result, const HttpResponsePtr &response) {
426-
(*callbackPtr)(result, response);
427-
}});
425+
enqueueRequest(req,
426+
[thisPtr = shared_from_this(),
427+
callbackPtr](ReqResult result,
428+
const HttpResponsePtr &response) {
429+
(*callbackPtr)(result, response);
430+
});
428431

429432
if (domain_.empty() || !isDomainName_)
430433
{
@@ -436,7 +439,7 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
436439
// No ip address and no domain, respond with BadServerAddress
437440
else
438441
{
439-
requestsBuffer_.pop_front();
442+
popFrontRequest();
440443
(*callbackPtr)(ReqResult::BadServerAddress, nullptr);
441444
assert(requestsBuffer_.empty());
442445
}
@@ -480,7 +483,8 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
480483
{
481484
auto &reqAndCb = (thisPtr->requestsBuffer_).front();
482485
reqAndCb.second(ReqResult::BadServerAddress, nullptr);
483-
(thisPtr->requestsBuffer_).pop_front();
486+
487+
thisPtr->popFrontRequest();
484488
}
485489
});
486490
});
@@ -495,13 +499,11 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
495499
// Not connected, push request to buffer and wait for connection
496500
if (!connPtr || connPtr->disconnected())
497501
{
498-
requestsBuffer_.push_back(
499-
{req,
500-
[thisPtr,
501-
callback = std::move(callback)](ReqResult result,
502-
const HttpResponsePtr &response) {
503-
callback(result, response);
504-
}});
502+
enqueueRequest(req,
503+
[thisPtr, callback = std::move(callback)](
504+
ReqResult result, const HttpResponsePtr &response) {
505+
callback(result, response);
506+
});
505507
return;
506508
}
507509

@@ -517,16 +519,15 @@ void HttpClientImpl::sendRequestInLoop(const drogon::HttpRequestPtr &req,
517519
const HttpResponsePtr &response) {
518520
callback(result, response);
519521
}});
522+
pipeliningCallbacksSize_.fetch_add(1, std::memory_order_relaxed);
520523
}
521524
else
522525
{
523-
requestsBuffer_.push_back(
524-
{req,
525-
[thisPtr,
526-
callback = std::move(callback)](ReqResult result,
527-
const HttpResponsePtr &response) {
528-
callback(result, response);
529-
}});
526+
enqueueRequest(req,
527+
[thisPtr, callback = std::move(callback)](
528+
ReqResult result, const HttpResponsePtr &response) {
529+
callback(result, response);
530+
});
530531
}
531532
}
532533

@@ -562,6 +563,7 @@ void HttpClientImpl::handleResponse(
562563
#endif
563564
auto cb = std::move(reqAndCb);
564565
pipeliningCallbacks_.pop();
566+
pipeliningCallbacksSize_.fetch_sub(1, std::memory_order_relaxed);
565567
handleCookies(resp);
566568
cb.second(ReqResult::Ok, resp);
567569

@@ -576,7 +578,8 @@ void HttpClientImpl::handleResponse(
576578
auto &reqAndCallback = requestsBuffer_.front();
577579
sendReq(connPtr, reqAndCallback.first);
578580
pipeliningCallbacks_.push(std::move(reqAndCallback));
579-
requestsBuffer_.pop_front();
581+
pipeliningCallbacksSize_.fetch_add(1, std::memory_order_relaxed);
582+
popFrontRequest();
580583
}
581584
else
582585
{
@@ -592,6 +595,7 @@ void HttpClientImpl::handleResponse(
592595
{
593596
auto cb = std::move(pipeliningCallbacks_.front());
594597
pipeliningCallbacks_.pop();
598+
pipeliningCallbacksSize_.fetch_sub(1, std::memory_order_relaxed);
595599
cb.second(ReqResult::NetworkFailure, nullptr);
596600
}
597601
}
@@ -674,12 +678,13 @@ void HttpClientImpl::onError(ReqResult result)
674678
{
675679
auto cb = std::move(pipeliningCallbacks_.front());
676680
pipeliningCallbacks_.pop();
681+
pipeliningCallbacksSize_.fetch_sub(1, std::memory_order_relaxed);
677682
cb.second(result, nullptr);
678683
}
679684
while (!requestsBuffer_.empty())
680685
{
681686
auto cb = std::move(requestsBuffer_.front().second);
682-
requestsBuffer_.pop_front();
687+
popFrontRequest();
683688
cb(result, nullptr);
684689
}
685690
tcpClientPtr_.reset();

lib/src/HttpClientImpl.h

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <trantor/net/EventLoop.h>
2020
#include <trantor/net/Resolver.h>
2121
#include <trantor/net/TcpClient.h>
22+
#include <atomic>
2223
#include <cstddef>
2324
#include <functional>
2425
#include <future>
@@ -118,21 +119,42 @@ class HttpClientImpl final : public HttpClient,
118119
sockOptCallback_ = std::move(cb);
119120
}
120121

122+
std::size_t outstandingRequests() const override
123+
{
124+
return requestsBufferSize_.load(std::memory_order_relaxed) +
125+
pipeliningCallbacksSize_.load(std::memory_order_relaxed);
126+
}
127+
121128
std::size_t requestsBufferSize() override
122129
{
123-
if (loop_->isInLoopThread())
124-
{
125-
return requestsBuffer_.size();
126-
}
127-
else
130+
return requestsBufferSize_.load(std::memory_order_relaxed);
131+
}
132+
133+
void enqueueRequest(const HttpRequestPtr &req,
134+
const HttpReqCallback &callback)
135+
{
136+
requestsBuffer_.push_back({req, callback});
137+
requestsBufferSize_.fetch_add(1, std::memory_order_relaxed);
138+
}
139+
140+
void popFrontRequest()
141+
{
142+
if (!requestsBuffer_.empty())
128143
{
129-
std::promise<std::size_t> bufferSize;
130-
loop_->queueInLoop(
131-
[&] { bufferSize.set_value(requestsBuffer_.size()); });
132-
return bufferSize.get_future().get();
144+
requestsBuffer_.pop_front();
145+
requestsBufferSize_.fetch_sub(1, std::memory_order_relaxed);
133146
}
134147
}
135148

149+
using RequestBufferIter =
150+
std::list<std::pair<HttpRequestPtr, HttpReqCallback>>::iterator;
151+
152+
void eraseRequest(RequestBufferIter iter)
153+
{
154+
requestsBuffer_.erase(iter);
155+
requestsBufferSize_.fetch_sub(1, std::memory_order_relaxed);
156+
}
157+
136158
private:
137159
std::shared_ptr<trantor::TcpClient> tcpClientPtr_;
138160
trantor::EventLoop *loop_;
@@ -157,6 +179,8 @@ class HttpClientImpl final : public HttpClient,
157179
void onError(ReqResult result);
158180
std::string domain_;
159181
bool isDomainName_{true}; // true if domain_ is name
182+
std::atomic<std::size_t> requestsBufferSize_{0};
183+
std::atomic<std::size_t> pipeliningCallbacksSize_{0};
160184
size_t pipeliningDepth_{0};
161185
bool enableCookies_{false};
162186
std::vector<Cookie> validCookies_;

lib/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ if (BUILD_CTL)
6464
integration_test/server/CustomHeaderFilter.cc
6565
integration_test/server/DoNothingPlugin.cc
6666
integration_test/server/ForwardCtrl.cc
67+
integration_test/server/HttpClientOutstandingRequests.cc
6768
integration_test/server/JsonTestController.cc
6869
integration_test/server/ListParaCtl.cc
6970
integration_test/server/PipeliningTest.cc
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#include <drogon/drogon_test.h>
2+
#include <drogon/HttpClient.h>
3+
#include <atomic>
4+
#include <thread>
5+
#include <chrono>
6+
7+
using namespace drogon;
8+
9+
DROGON_TEST(HttpClientOutstandingRequests)
10+
{
11+
auto client = HttpClient::newHttpClient("http://127.0.0.1:8848");
12+
13+
// Enable pipelining to allow multiple in-flight requests simultaneously
14+
client->setPipeliningDepth(64);
15+
16+
CHECK(client->requestsBufferSize() == 0);
17+
CHECK(client->outstandingRequests() == 0);
18+
19+
const int totalRequests = 50;
20+
std::atomic<int> completedRequests{0};
21+
22+
for (int i = 0; i < totalRequests; ++i)
23+
{
24+
auto req = HttpRequest::newHttpRequest();
25+
req->setPath("/PipeliningTest/normalPipe");
26+
27+
client->sendRequest(
28+
req,
29+
[&completedRequests](ReqResult result,
30+
const HttpResponsePtr &response) {
31+
if (result == ReqResult::Ok)
32+
{
33+
completedRequests++;
34+
}
35+
});
36+
}
37+
38+
size_t currentOutstanding = client->outstandingRequests();
39+
size_t currentBuffer = client->outstandingRequests();
40+
41+
CHECK(currentOutstanding > 0);
42+
43+
CHECK(currentOutstanding >= currentBuffer);
44+
45+
while (completedRequests < totalRequests)
46+
{
47+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
48+
}
49+
50+
CHECK(client->requestsBufferSize() == 0);
51+
CHECK(client->outstandingRequests() == 0);
52+
}

0 commit comments

Comments
 (0)