Skip to content

Commit 5e5882d

Browse files
committed
Protect websockets_ by mutex
1 parent 80ba18c commit 5e5882d

3 files changed

Lines changed: 114 additions & 1 deletion

File tree

CMakeLists.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ option(CROW_RETURNS_OK_ON_HTTP_OPTIONS_REQUEST
5757

5858
option(CROW_ENABLE_SSL "Enable Crow's SSL feature for supporting https" OFF)
5959
option(CROW_ENABLE_COMPRESSION "Enable Crow's Compression feature for supporting compressed http content" OFF)
60+
option(ENABLE_TSAN "Enable ThreadSanitizer" OFF)
6061

6162
if(CROW_GENERATE_SBOM OR CROW_BUILD_TESTS)
6263
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/CPM.cmake)
@@ -82,6 +83,17 @@ if(CROW_GENERATE_SBOM)
8283
)
8384
endif()
8485

86+
if(ENABLE_TSAN)
87+
message(STATUS "ThreadSanitizer enabled")
88+
add_compile_options(
89+
-fsanitize=thread
90+
-fno-omit-frame-pointer
91+
-O1
92+
-g
93+
)
94+
add_link_options(-fsanitize=thread)
95+
endif()
96+
8597
#####################################
8698
# Define Targets
8799
#####################################

include/crow/app.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ namespace crow
619619

620620
void close_websockets()
621621
{
622+
std::lock_guard<std::mutex> lock{websockets_mutex_};
622623
for (auto websocket : websockets_)
623624
{
624625
CROW_LOG_INFO << "Quitting Websocket: " << websocket;
@@ -629,11 +630,13 @@ namespace crow
629630

630631
void add_websocket(std::shared_ptr<websocket::connection> conn)
631632
{
633+
std::lock_guard<std::mutex> lock{websockets_mutex_};
632634
websockets_.push_back(conn);
633635
}
634636

635637
void remove_websocket(std::shared_ptr<websocket::connection> conn)
636638
{
639+
std::lock_guard<std::mutex> lock{websockets_mutex_};
637640
websockets_.erase(std::remove(websockets_.begin(), websockets_.end(), conn), websockets_.end());
638641
}
639642

@@ -846,6 +849,7 @@ namespace crow
846849
bool server_started_{false};
847850
std::condition_variable cv_started_;
848851
std::mutex start_mutex_;
852+
std::mutex websockets_mutex_; ///< \brief mutex to protect websockets_
849853
std::vector<std::shared_ptr<websocket::connection>> websockets_;
850854
};
851855

tests/unit_tests/test_websocket.cpp

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "catch2/catch_all.hpp"
22

33
#include "crow.h"
4+
#include <cstddef>
45

56
using namespace std;
67
using namespace crow;
@@ -649,4 +650,100 @@ TEST_CASE("mirror_websocket_subprotocols", "[websocket]")
649650
}
650651

651652
app.stop();
652-
}
653+
}
654+
655+
TEST_CASE("multithreaded_websockets_open_close", "[websocket]")
656+
{
657+
static std::string http_message =
658+
"GET /ws HTTP/1.1\r\n"
659+
"Connection: keep-alive, Upgrade\r\n"
660+
"upgrade: websocket\r\n"
661+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
662+
"Sec-WebSocket-Version: 13\r\n"
663+
"Host: localhost\r\n"
664+
"\r\n";
665+
666+
CROW_LOG_INFO << "Setting up app!\n";
667+
SimpleApp app;
668+
669+
CROW_WEBSOCKET_ROUTE(app, "/ws")
670+
.onaccept([&](const crow::request& req, void**) {
671+
CROW_LOG_INFO << "Accepted websocket with URL " << req.url;
672+
return true;
673+
})
674+
.onopen([&](websocket::connection&) {
675+
CROW_LOG_INFO << "Connected websocket";
676+
})
677+
.onmessage([&](websocket::connection& conn, const std::string& message, bool) {
678+
CROW_LOG_INFO << "Message is \"" << message << '\"';
679+
if (message == "quit-default")
680+
conn.close();
681+
else if (message == "quit-custom")
682+
conn.close("custom", crow::websocket::StartStatusCodesForPrivateUse + 10u);
683+
})
684+
.onclose([&](websocket::connection& conn, const std::string&, uint16_t) {
685+
// There should just be one connection
686+
CHECK_FALSE(conn.get_remote_ip().empty());
687+
CROW_LOG_INFO << "Closing websocket";
688+
});
689+
690+
app.validate();
691+
692+
CROW_LOG_WARNING << "Starting app!\n";
693+
auto _ = app.bindaddr(LOCALHOST_ADDRESS).port(45453).run_async();
694+
app.wait_for_server_start();
695+
CROW_LOG_WARNING << "App started!\n";
696+
asio::io_context ic;
697+
698+
const auto thread_function = [&]()
699+
{
700+
asio::ip::tcp::socket c(ic);
701+
c.connect(asio::ip::tcp::endpoint(
702+
asio::ip::make_address(LOCALHOST_ADDRESS), 45453));
703+
704+
CROW_LOG_WARNING << "Connected!\n";
705+
706+
char buf[2048];
707+
708+
//----------Handshake----------
709+
{
710+
std::fill_n(buf, 2048, 0);
711+
c.send(asio::buffer(http_message));
712+
713+
c.receive(asio::buffer(buf, 2048));
714+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
715+
}
716+
717+
//----------Close websocket----------
718+
std::fill_n(buf, 2048, 0);
719+
// Close message with, len = 2, status code = 1000
720+
char close_message[5]("\x88\x02\x03\xE8");
721+
c.send(asio::buffer(close_message, 4));
722+
c.receive(asio::buffer(buf, 2048));
723+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
724+
CHECK((int)(unsigned char)buf[0] == 0x88);
725+
CHECK((int)(unsigned char)buf[1] == 0x02);
726+
CHECK((int)(unsigned char)buf[2] == 0x03);
727+
CHECK((int)(unsigned char)buf[3] == 0xE8);
728+
};
729+
730+
const auto multiple_run_thread_function = [&](const std::size_t count){
731+
for(std::size_t i = 0; i < count; ++i) {
732+
thread_function();
733+
}
734+
};
735+
736+
constexpr std::size_t threads_count = 3;
737+
constexpr std::size_t thread_run = 10;
738+
739+
std::vector<std::thread> threads;
740+
for(std::size_t i = 0; i < threads_count; ++i) {
741+
threads.emplace_back(multiple_run_thread_function, thread_run);
742+
}
743+
for(std::thread& thread : threads) {
744+
thread.join();
745+
}
746+
747+
CROW_LOG_WARNING << "Stopping app!\n";
748+
app.stop();
749+
}

0 commit comments

Comments
 (0)