Skip to content

Commit 7b649ad

Browse files
authored
fix: add protections against dataraces in socket engines, cluster timer and queues. (#1587)
1 parent 7e4e74d commit 7b649ad

5 files changed

Lines changed: 28 additions & 10 deletions

File tree

include/dpp/queues.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,18 @@ class DPP_EXPORT http_request {
229229
/**
230230
* @brief True if request has been made.
231231
*/
232-
bool completed;
232+
std::atomic<bool> completed;
233233

234234
/**
235235
* @brief True for requests that are not going to discord (rate limits code skipped).
236236
*/
237237
bool non_discord;
238238

239+
/**
240+
* @brief Client mutex
241+
*/
242+
std::mutex cli_mutex;
243+
239244
/**
240245
* @brief HTTPS client
241246
*/
@@ -460,6 +465,11 @@ class DPP_EXPORT request_concurrency_queue {
460465
*/
461466
std::shared_mutex in_mutex;
462467

468+
/**
469+
* @brief Removals queue mutex thread safety.
470+
*/
471+
std::mutex rem_mutex;
472+
463473
/**
464474
* @brief Inbound queue timer. The timer is called every second,
465475
* and when it wakes up it checks for requests pending to be sent in the queue.
@@ -589,7 +599,7 @@ class DPP_EXPORT request_queue {
589599
* When globally rate limited the concurrency queues associated with this request queue
590600
* will not process any requests in their timers until the global rate limit expires.
591601
*/
592-
bool globally_ratelimited;
602+
std::atomic<bool> globally_ratelimited;
593603

594604
/**
595605
* @brief When we are globally rate limited until (unix epoch)

src/dpp/cluster/timer.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,11 @@ bool cluster::stop_timer(timer t) {
5757
void cluster::tick_timers() {
5858
time_t now = time(nullptr);
5959

60-
if (next_timer.empty()) {
61-
return;
60+
{
61+
std::lock_guard<std::mutex> l(timer_guard);
62+
if (next_timer.empty()) {
63+
return;
64+
}
6265
}
6366
do {
6467
timer_t cur_timer;

src/dpp/queues.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor
225225
}
226226
http_connect_info hci = https_client::get_host_info(_host);
227227
try {
228-
cli = std::make_unique<https_client>(
228+
std::unique_ptr<https_client> tmp = std::make_unique<https_client>(
229229
owner,
230230
hci.hostname,
231231
hci.port,
@@ -286,6 +286,10 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor
286286
});
287287
}
288288
);
289+
{
290+
std::lock_guard<std::mutex> client(this->cli_mutex);
291+
cli = std::move(tmp);
292+
}
289293
}
290294
catch (const std::exception& e) {
291295
owner->log(ll_error, "HTTP(S) error on " + hci.scheme + " connection to " + hci.hostname + ":" + std::to_string(hci.port) + ": " + std::string(e.what()));
@@ -313,7 +317,7 @@ request_concurrency_queue::request_concurrency_queue(class cluster* owner, class
313317
tick_and_deliver_requests(in_index);
314318
/* Clear pending removals in the removals queue */
315319
if (time(nullptr) % 90 == 0) {
316-
std::scoped_lock lock1{in_mutex};
320+
std::scoped_lock lock1{rem_mutex};
317321
for (auto it = removals.cbegin(); it != removals.cend();) {
318322
if ((*it)->is_completed()) {
319323
it = removals.erase(it);
@@ -404,7 +408,8 @@ void request_concurrency_queue::tick_and_deliver_requests(uint32_t index)
404408
std::unique_ptr<http_request> rq;
405409
{
406410
/* Find the owned pointer in requests_in */
407-
std::scoped_lock lock1{in_mutex};
411+
std::scoped_lock requests_in_lock{in_mutex};
412+
std::scoped_lock removals_queue_lock{rem_mutex};
408413

409414
const std::string &key = request_view->endpoint;
410415
auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{});

src/dpp/socketengines/epoll.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
5555
int epoll_handle{INVALID_SOCKET};
5656
static constexpr size_t MAX_EVENTS = 65536;
5757
std::array<struct epoll_event, MAX_EVENTS> events{};
58-
int sockets{0};
58+
std::mutex sockets_mutex;
5959

6060
socket_engine_epoll(const socket_engine_epoll&) = delete;
6161
socket_engine_epoll(socket_engine_epoll&&) = delete;
@@ -135,6 +135,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
135135

136136
if ((eh->flags & WANT_DELETION) != 0L) {
137137
remove_socket(fd);
138+
std::lock_guard<std::shared_mutex> lg(this->fds_mutex);
138139
fds.erase(fd);
139140
}
140141
}
@@ -143,7 +144,6 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
143144

144145
bool register_socket(const socket_events& e) final {
145146
bool r = socket_engine_base::register_socket(e);
146-
sockets++;
147147
if (r) {
148148
struct epoll_event ev{};
149149
ev.events = EPOLLET;
@@ -186,7 +186,6 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
186186

187187
bool remove_socket(dpp::socket fd) final {
188188
struct epoll_event ev{};
189-
sockets--;
190189
epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev);
191190
if (!owner->on_socket_close.empty()) {
192191
socket_close_t event(owner, 0, "");

src/dpp/socketengines/kqueue.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
108108

109109
if ((eh->flags & WANT_DELETION) != 0L) {
110110
remove_socket(kev.ident);
111+
std::lock_guard<std::shared_mutex> lg(this->fds_mutex);
111112
fds.erase(kev.ident);
112113
}
113114
}

0 commit comments

Comments
 (0)