Skip to content

Commit 6e5977f

Browse files
committed
Protect websockets_ by mutex
1 parent 80ba18c commit 6e5977f

3 files changed

Lines changed: 100 additions & 1 deletion

File tree

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ option(CROW_RETURNS_OK_ON_HTTP_OPTIONS_REQUEST
5757

5858
option(CROW_ENABLE_SSL "Enable Crow's SSL feature for supporting https" OFF)
5959
option(CROW_ENABLE_COMPRESSION "Enable Crow's Compression feature for supporting compressed http content" OFF)
60+
option(CROW_ENABLE_TSAN "Enable ThreadSanitizer" OFF)
6061

6162
if(CROW_GENERATE_SBOM OR CROW_BUILD_TESTS)
6263
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CPM.cmake)
@@ -82,6 +83,12 @@ if(CROW_GENERATE_SBOM)
8283
)
8384
endif()
8485

86+
if(CROW_ENABLE_TSAN)
87+
message(STATUS "ThreadSanitizer enabled")
88+
add_compile_options(-fsanitize=thread -fno-omit-frame-pointer)
89+
add_link_options(-fsanitize=thread)
90+
endif()
91+
8592
#####################################
8693
# Define Targets
8794
#####################################

include/crow/app.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ namespace crow
619619

620620
void close_websockets()
621621
{
622+
std::lock_guard<std::mutex> lock{websockets_mutex_};
622623
for (auto websocket : websockets_)
623624
{
624625
CROW_LOG_INFO << "Quitting Websocket: " << websocket;
@@ -629,11 +630,13 @@ namespace crow
629630

630631
void add_websocket(std::shared_ptr<websocket::connection> conn)
631632
{
633+
std::lock_guard<std::mutex> lock{websockets_mutex_};
632634
websockets_.push_back(conn);
633635
}
634636

635637
void remove_websocket(std::shared_ptr<websocket::connection> conn)
636638
{
639+
std::lock_guard<std::mutex> lock{websockets_mutex_};
637640
websockets_.erase(std::remove(websockets_.begin(), websockets_.end(), conn), websockets_.end());
638641
}
639642

@@ -846,6 +849,7 @@ namespace crow
846849
bool server_started_{false};
847850
std::condition_variable cv_started_;
848851
std::mutex start_mutex_;
852+
std::mutex websockets_mutex_; ///< \brief mutex to protect websockets_
849853
std::vector<std::shared_ptr<websocket::connection>> websockets_;
850854
};
851855

tests/unit_tests/test_websocket.cpp

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "catch2/catch_all.hpp"
22

33
#include "crow.h"
4+
#include <cstddef>
5+
#include <thread>
46

57
using namespace std;
68
using namespace crow;
@@ -649,4 +651,90 @@ TEST_CASE("mirror_websocket_subprotocols", "[websocket]")
649651
}
650652

651653
app.stop();
652-
}
654+
}
655+
656+
TEST_CASE("multithreaded_websockets_open_close", "[websocket]")
657+
{
658+
static std::string http_message =
659+
"GET /ws HTTP/1.1\r\n"
660+
"Connection: keep-alive, Upgrade\r\n"
661+
"upgrade: websocket\r\n"
662+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
663+
"Sec-WebSocket-Version: 13\r\n"
664+
"Host: localhost\r\n"
665+
"\r\n";
666+
667+
CROW_LOG_INFO << "Setting up app!\n";
668+
SimpleApp app;
669+
670+
CROW_WEBSOCKET_ROUTE(app, "/ws")
671+
.onaccept([&](const crow::request& req, void**) {
672+
CROW_LOG_INFO << "Accepted websocket with URL " << req.url;
673+
return true;
674+
})
675+
.onopen([&](websocket::connection&) {
676+
CROW_LOG_INFO << "Connected websocket";
677+
})
678+
.onmessage([&](websocket::connection&, const std::string&, bool) {})
679+
.onclose([&](websocket::connection& conn, const std::string&, uint16_t) {
680+
// There should just be one connection
681+
CHECK_FALSE(conn.get_remote_ip().empty());
682+
CROW_LOG_INFO << "Closing websocket";
683+
});
684+
685+
app.validate();
686+
687+
CROW_LOG_WARNING << "Starting app!\n";
688+
auto _ = app.bindaddr(LOCALHOST_ADDRESS).port(45453).run_async();
689+
app.wait_for_server_start();
690+
CROW_LOG_WARNING << "App started!\n";
691+
asio::io_context ic;
692+
693+
const auto thread_function = [&]()
694+
{
695+
asio::ip::tcp::socket c(ic);
696+
c.connect(asio::ip::tcp::endpoint(
697+
asio::ip::make_address(LOCALHOST_ADDRESS), 45453));
698+
699+
CROW_LOG_WARNING << "Connected!\n";
700+
701+
char buf[2048];
702+
703+
//----------Handshake----------
704+
{
705+
std::fill_n(buf, 2048, 0);
706+
c.send(asio::buffer(http_message));
707+
708+
c.receive(asio::buffer(buf, 2048));
709+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
710+
}
711+
712+
//----------Close websocket----------
713+
std::fill_n(buf, 2048, 0);
714+
// Close message with, len = 2, status code = 1000
715+
char close_message[5]("\x88\x02\x03\xE8");
716+
c.send(asio::buffer(close_message, 4));
717+
c.receive(asio::buffer(buf, 2048));
718+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
719+
};
720+
721+
const auto multiple_run_thread_function = [&](const std::size_t count){
722+
for(std::size_t i = 0; i < count; ++i) {
723+
thread_function();
724+
}
725+
};
726+
727+
constexpr std::size_t threads_count = 3;
728+
constexpr std::size_t thread_run = 10;
729+
730+
std::vector<std::thread> threads;
731+
for(std::size_t i = 0; i < threads_count; ++i) {
732+
threads.emplace_back(multiple_run_thread_function, thread_run);
733+
}
734+
for(std::thread& thread : threads) {
735+
thread.join();
736+
}
737+
738+
CROW_LOG_WARNING << "Stopping app!\n";
739+
app.stop();
740+
}

0 commit comments

Comments
 (0)