From d25f421c7341fbf0c5ef48a43b290b9b4ffbd9d9 Mon Sep 17 00:00:00 2001 From: tk <> Date: Wed, 2 Jul 2025 22:34:20 +0000 Subject: [PATCH 1/6] tmp --- include/ccapi_cpp/ccapi_fix_connection.h | 2 +- include/ccapi_cpp/ccapi_http_connection.h | 2 +- include/ccapi_cpp/ccapi_ws_connection.h | 22 +- .../ccapi_execution_management_service.h | 14 +- ...pi_execution_management_service_ascendex.h | 13 +- ...agement_service_gateio_perpetual_futures.h | 11 +- .../service/ccapi_market_data_service.h | 16 +- include/ccapi_cpp/service/ccapi_service.h | 400 +++++++++++------- 8 files changed, 270 insertions(+), 210 deletions(-) diff --git a/include/ccapi_cpp/ccapi_fix_connection.h b/include/ccapi_cpp/ccapi_fix_connection.h index e81de28c..bf510846 100644 --- a/include/ccapi_cpp/ccapi_fix_connection.h +++ b/include/ccapi_cpp/ccapi_fix_connection.h @@ -69,7 +69,7 @@ class FixConnection { std::string url; Subscription subscription; Status status{Status::UNKNOWN}; - std::shared_ptr streamPtr; + std::shared_ptr streamPtr{nullptr}; }; } /* namespace ccapi */ diff --git a/include/ccapi_cpp/ccapi_http_connection.h b/include/ccapi_cpp/ccapi_http_connection.h index 566213b5..ffc395f1 100644 --- a/include/ccapi_cpp/ccapi_http_connection.h +++ b/include/ccapi_cpp/ccapi_http_connection.h @@ -30,7 +30,7 @@ class HttpConnection { std::string host; std::string port; - std::shared_ptr> streamPtr; + std::shared_ptr> streamPtr{nullptr}; TimePoint lastReceiveDataTp{std::chrono::seconds{0}}; boost::beast::flat_buffer buffer; diff --git a/include/ccapi_cpp/ccapi_ws_connection.h b/include/ccapi_cpp/ccapi_ws_connection.h index 161a554f..085cbb01 100644 --- a/include/ccapi_cpp/ccapi_ws_connection.h +++ b/include/ccapi_cpp/ccapi_ws_connection.h @@ -2,7 +2,7 @@ #define INCLUDE_CCAPI_CPP_CCAPI_WS_CONNECTION_H_ #include - +#include #include "ccapi_cpp/ccapi_logger.h" #include "ccapi_cpp/ccapi_subscription.h" @@ -16,9 +16,8 @@ class WsConnection { WsConnection(const WsConnection&) = delete; WsConnection& operator=(const WsConnection&) = delete; - WsConnection(std::string url, std::string group, std::vector subscriptionList, std::map credential, - std::shared_ptr>> streamPtr) - : url(url), group(group), subscriptionList(subscriptionList), credential(credential), streamPtr(streamPtr) { + WsConnection(const std::string& url, const std::string& group, const std::vector& subscriptionList, const std::map& credential) + : url(url), group(group), subscriptionList(subscriptionList), credential(credential) { std::map shortCredential; for (const auto& x : credential) { shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH))); @@ -39,7 +38,13 @@ class WsConnection { shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH))); } std::ostringstream oss; - oss << streamPtr; + std::visit([&oss](auto&& streamSharedPtr) { + if (streamSharedPtr) { + oss << streamSharedPtr.get(); + } else { + oss << "nullptr"; + } +}, streamPtr); std::string output = "WsConnection [longId = " + longId + ", id = " + id + ", url = " + url + ", group = " + group + ", subscriptionList = " + ccapi::toString(subscriptionList) + ", credential = " + ccapi::toString(shortCredential) + ", status = " + statusToString(status) + ", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() + @@ -116,6 +121,9 @@ class WsConnection { this->port = CCAPI_HTTP_PORT_DEFAULT; } } + if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") { + this->isSecure = true; + } } } @@ -133,7 +141,8 @@ class WsConnection { Status status{Status::UNKNOWN}; std::map headers; std::map credential; - std::shared_ptr>> streamPtr; + std::variant>>, + std::shared_ptr>> streamPtr; beast::websocket::close_code remoteCloseCode{}; beast::websocket::close_reason remoteCloseReason{}; std::string hostHttpHeaderValue; @@ -145,6 +154,7 @@ class WsConnection { std::array writeMessageBuffer; size_t writeMessageBufferWrittenLength{}; std::vector writeMessageBufferBoundary; + bool isSecure{}; }; } /* namespace ccapi */ diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service.h b/include/ccapi_cpp/service/ccapi_execution_management_service.h index 51188643..174786ce 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service.h @@ -61,21 +61,15 @@ class ExecutionManagementService : public Service { credential = that->credentialDefault; } - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()}); - return; - } const auto& fieldSet = subscription.getFieldSet(); if (fieldSet.find(CCAPI_EM_WEBSOCKET_ORDER_ENTRY) != fieldSet.end()) { - std::shared_ptr wsConnectionPtr(new WsConnection(that->baseUrlWsOrderEntry, "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWsOrderEntry, "", std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } else { - std::shared_ptr wsConnectionPtr(new WsConnection(that->baseUrlWs, "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs, "", std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h b/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h index b01a05fa..99eda574 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h @@ -363,16 +363,9 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService { } const auto& accountGroup = mapGetWithDefault(credential, that->apiAccountGroupName); - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()}); - return; - } - std::shared_ptr wsConnectionPtr( - new WsConnection(that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared( + that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "",std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); }); diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h b/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h index e88bb36f..0318c77e 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h @@ -101,15 +101,8 @@ class ExecutionManagementServiceGateioPerpetualFutures : public ExecutionManagem credential = that->credentialDefault; } - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()}); - return; - } - std::shared_ptr wsConnectionPtr(new WsConnection(that->baseUrlWs + settle, "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs + settle, "",std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } diff --git a/include/ccapi_cpp/service/ccapi_market_data_service.h b/include/ccapi_cpp/service/ccapi_market_data_service.h index 2130e0f9..42655f23 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service.h @@ -92,19 +92,9 @@ class MarketDataService : public Service { if (credential.empty()) { credential = that->credentialDefault; } - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - std::vector correlationIdList; - correlationIdList.reserve(subscriptionListGivenInstrumentGroup.size()); - std::transform(subscriptionListGivenInstrumentGroup.cbegin(), subscriptionListGivenInstrumentGroup.cend(), std::back_inserter(correlationIdList), - [](Subscription subscription) { return subscription.getCorrelationId(); }); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", correlationIdList); - return; - } - std::shared_ptr wsConnectionPtr(new WsConnection(url, instrumentGroup, subscriptionListGivenInstrumentGroup, credential, streamPtr)); + + auto wsConnectionPtr = std::make_shared(url, instrumentGroup, subscriptionListGivenInstrumentGroup, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index 5da4897f..4f338933 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -525,10 +525,6 @@ class Service : public std::enable_shared_from_this { return streamPtr; } - std::shared_ptr>> createWsStream(net::io_context* iocPtr, net::ssl::context* ctxPtr) { - return std::make_shared>>(*iocPtr, *ctxPtr); - } - void performRequestWithNewHttpConnection(std::shared_ptr httpConnectionPtr, const Request& request, http::request& req, const HttpRetry& retry, Queue* eventQueuePtr) { CCAPI_LOGGER_FUNCTION_ENTER; @@ -948,16 +944,19 @@ class Service : public std::enable_shared_from_this { return http::string_to_verb(methodStringUpper); } - void close(std::shared_ptr wsConnectionPtr, beast::websocket::close_code const code, beast::websocket::close_reason reason, ErrorCode& ec) { - if (wsConnectionPtr->status == WsConnection::Status::CLOSING) { - CCAPI_LOGGER_WARN("websocket connection is already in the state of closing"); - return; - } - wsConnectionPtr->status = WsConnection::Status::CLOSING; - wsConnectionPtr->remoteCloseCode = code; - wsConnectionPtr->remoteCloseReason = reason; - wsConnectionPtr->streamPtr->async_close(code, beast::bind_front_handler(&Service::onClose, shared_from_this(), wsConnectionPtr)); +void close(std::shared_ptr wsConnectionPtr, beast::websocket::close_code const code, beast::websocket::close_reason reason, ErrorCode& ec) { + if (wsConnectionPtr->status == WsConnection::Status::CLOSING) { + CCAPI_LOGGER_WARN("websocket connection is already in the state of closing"); + return; } + wsConnectionPtr->status = WsConnection::Status::CLOSING; + wsConnectionPtr->remoteCloseCode = code; + wsConnectionPtr->remoteCloseReason = reason; + + std::visit([&](auto& streamPtr) { + streamPtr->async_close(code, beast::bind_front_handler(&Service::onClose, shared_from_this(), wsConnectionPtr)); + }, wsConnectionPtr->streamPtr); +} virtual void prepareConnect(std::shared_ptr wsConnectionPtr) { this->connect(wsConnectionPtr); } @@ -991,92 +990,152 @@ class Service : public std::enable_shared_from_this { } void startConnectWs(std::shared_ptr wsConnectionPtr, long timeoutMilliseconds, tcp::resolver::results_type tcpResolverResults) { - beast::websocket::stream>& stream = *wsConnectionPtr->streamPtr; - if (timeoutMilliseconds > 0) { - beast::get_lowest_layer(stream).expires_after(std::chrono::milliseconds(timeoutMilliseconds)); - } - // Set SNI Hostname (many hosts need this to handshake successfully) - CCAPI_LOGGER_TRACE("wsConnectionPtr->host = " + wsConnectionPtr->host) - if (!SSL_set_tlsext_host_name(stream.next_layer().native_handle(), wsConnectionPtr->host.c_str())) { - beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; - CCAPI_LOGGER_DEBUG("error SSL_set_tlsext_host_name: " + ec.message()); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "set SNI Hostname", wsConnectionPtr->correlationIdList); - return; - } - CCAPI_LOGGER_TRACE("before async_connect"); - beast::get_lowest_layer(stream).async_connect(tcpResolverResults, beast::bind_front_handler(&Service::onConnectWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after async_connect"); + std::visit([&](auto& streamPtr) { + using StreamType = std::decay_t; + + if (timeoutMilliseconds > 0) { + beast::get_lowest_layer(*streamPtr).expires_after(std::chrono::milliseconds(timeoutMilliseconds)); + } + + if constexpr (std::is_same_v>>) { + // Set SNI hostname (only for WSS) + if (!SSL_set_tlsext_host_name(streamPtr->next_layer().native_handle(), wsConnectionPtr->host.c_str())) { + beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; + CCAPI_LOGGER_DEBUG("error SSL_set_tlsext_host_name: " + ec.message()); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, + "set SNI Hostname", wsConnectionPtr->correlationIdList); + return; + } + } + + CCAPI_LOGGER_TRACE("before async_connect"); + + beast::get_lowest_layer(*streamPtr).async_connect( + tcpResolverResults, + beast::bind_front_handler(&Service::onConnectWs, shared_from_this(), wsConnectionPtr) + ); + + CCAPI_LOGGER_TRACE("after async_connect"); + }, wsConnectionPtr->streamPtr); } void onConnectWs(std::shared_ptr wsConnectionPtr, beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) { - CCAPI_LOGGER_TRACE("async_connect callback start"); - if (ec) { - CCAPI_LOGGER_TRACE("fail"); - this->onFail(wsConnectionPtr); - return; - } - CCAPI_LOGGER_TRACE("connected"); - CCAPI_LOGGER_TRACE("ep.port() = " + std::to_string(ep.port())); - wsConnectionPtr->hostHttpHeaderValue = - this->hostHttpHeaderValueIgnorePort ? wsConnectionPtr->host : wsConnectionPtr->host + ':' + std::to_string(ep.port()); - CCAPI_LOGGER_TRACE("wsConnectionPtr->hostHttpHeaderValue = " + wsConnectionPtr->hostHttpHeaderValue); - beast::websocket::stream>& stream = *wsConnectionPtr->streamPtr; - beast::get_lowest_layer(stream).socket().set_option(tcp::no_delay(true)); - CCAPI_LOGGER_TRACE("before ssl async_handshake"); - stream.next_layer().async_handshake(ssl::stream_base::client, beast::bind_front_handler(&Service::onSslHandshakeWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after ssl async_handshake"); + CCAPI_LOGGER_TRACE("async_connect callback start"); + if (ec) { + CCAPI_LOGGER_TRACE("fail"); + this->onFail(wsConnectionPtr); + return; + } + + CCAPI_LOGGER_TRACE("connected"); + CCAPI_LOGGER_TRACE("ep.port() = " + std::to_string(ep.port())); + + wsConnectionPtr->hostHttpHeaderValue = + this->hostHttpHeaderValueIgnorePort ? wsConnectionPtr->host + : wsConnectionPtr->host + ':' + std::to_string(ep.port()); + + CCAPI_LOGGER_TRACE("wsConnectionPtr->hostHttpHeaderValue = " + wsConnectionPtr->hostHttpHeaderValue); + + // Use std::visit to access the concrete stream + std::visit([&](auto& streamPtr) { + using StreamType = std::decay_t; + + // Set TCP_NODELAY + beast::get_lowest_layer(*streamPtr).socket().set_option(tcp::no_delay(true)); + + if constexpr (std::is_same_v>>) { + CCAPI_LOGGER_TRACE("before ssl async_handshake"); + + streamPtr->next_layer().async_handshake( + ssl::stream_base::client, + beast::bind_front_handler(&Service::onSslHandshakeWs, shared_from_this(), wsConnectionPtr) + ); + + CCAPI_LOGGER_TRACE("after ssl async_handshake"); + } else { + // Non-SSL streams skip SSL handshake and go straight to WebSocket handshake + this->onSslHandshakeWs(wsConnectionPtr, {}); + } + }, wsConnectionPtr->streamPtr); } void onSslHandshakeWs(std::shared_ptr wsConnectionPtr, beast::error_code ec) { - CCAPI_LOGGER_TRACE("ssl async_handshake callback start"); - if (ec) { - CCAPI_LOGGER_TRACE("ssl handshake fail"); - this->onFail(wsConnectionPtr); - return; - } - CCAPI_LOGGER_TRACE("ssl handshaked"); - beast::websocket::stream>& stream = *wsConnectionPtr->streamPtr; - beast::get_lowest_layer(stream).expires_never(); - beast::websocket::stream_base::timeout opt{std::chrono::milliseconds(this->sessionOptions.websocketConnectTimeoutMilliseconds), - std::chrono::milliseconds(this->sessionOptions.pongWebsocketProtocolLevelTimeoutMilliseconds), true}; - - stream.set_option(opt); - stream.set_option(beast::websocket::stream_base::decorator([wsConnectionPtr](beast::websocket::request_type& req) { - req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING)); - for (const auto& kv : wsConnectionPtr->headers) { - req.set(kv.first, kv.second); - } - })); - CCAPI_LOGGER_TRACE("before ws async_handshake"); - stream.async_handshake(wsConnectionPtr->hostHttpHeaderValue, wsConnectionPtr->path, - beast::bind_front_handler(&Service::onWsHandshakeWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after ws async_handshake"); + CCAPI_LOGGER_TRACE("ssl async_handshake callback start"); + if (ec) { + CCAPI_LOGGER_TRACE("ssl handshake fail"); + this->onFail(wsConnectionPtr); + return; + } + CCAPI_LOGGER_TRACE("ssl handshaked"); + + std::visit([&](auto& streamPtr) { + auto& stream = *streamPtr; + beast::get_lowest_layer(stream).expires_never(); + + beast::websocket::stream_base::timeout opt{ + std::chrono::milliseconds(this->sessionOptions.websocketConnectTimeoutMilliseconds), + std::chrono::milliseconds(this->sessionOptions.pongWebsocketProtocolLevelTimeoutMilliseconds), + true + }; + + stream.set_option(opt); + stream.set_option(beast::websocket::stream_base::decorator([wsConnectionPtr](beast::websocket::request_type& req) { + req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); + for (const auto& kv : wsConnectionPtr->headers) { + req.set(kv.first, kv.second); + } + })); + + CCAPI_LOGGER_TRACE("before ws async_handshake"); + stream.async_handshake( + wsConnectionPtr->hostHttpHeaderValue, + wsConnectionPtr->path, + beast::bind_front_handler(&Service::onWsHandshakeWs, shared_from_this(), wsConnectionPtr) + ); + CCAPI_LOGGER_TRACE("after ws async_handshake"); + + }, wsConnectionPtr->streamPtr); } void onWsHandshakeWs(std::shared_ptr wsConnectionPtr, beast::error_code ec) { - CCAPI_LOGGER_TRACE("ws async_handshake callback start"); - if (ec) { - CCAPI_LOGGER_TRACE("ws handshake fail"); - this->onFail(wsConnectionPtr); - return; - } - CCAPI_LOGGER_TRACE("ws handshaked"); - this->onOpen(wsConnectionPtr); - this->wsConnectionPtrByIdMap.insert(std::make_pair(wsConnectionPtr->id, wsConnectionPtr)); - CCAPI_LOGGER_TRACE("about to start read"); - this->startReadWs(wsConnectionPtr); - auto& stream = *wsConnectionPtr->streamPtr; - stream.control_callback([wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { - that->onControlCallback(wsConnectionPtr, kind, payload); - }); + CCAPI_LOGGER_TRACE("ws async_handshake callback start"); + if (ec) { + CCAPI_LOGGER_TRACE("ws handshake fail"); + this->onFail(wsConnectionPtr); + return; + } + CCAPI_LOGGER_TRACE("ws handshaked"); + + // Finalize connection setup + this->onOpen(wsConnectionPtr); + this->wsConnectionPtrByIdMap.insert({wsConnectionPtr->id, wsConnectionPtr}); + CCAPI_LOGGER_TRACE("about to start read"); + + // Start reading messages + this->startReadWs(wsConnectionPtr); + + // Setup control callback (ping/pong/close) + std::visit([&](auto& streamPtr) { + streamPtr->control_callback( + [wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { + that->onControlCallback(wsConnectionPtr, kind, payload); + } + ); + }, wsConnectionPtr->streamPtr); } void startReadWs(std::shared_ptr wsConnectionPtr) { - auto& stream = *wsConnectionPtr->streamPtr; - CCAPI_LOGGER_TRACE("before async_read"); - auto& readMessageBuffer = wsConnectionPtr->readMessageBuffer; - stream.async_read(readMessageBuffer, beast::bind_front_handler(&Service::onReadWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after async_read"); + CCAPI_LOGGER_TRACE("before async_read"); + auto& readMessageBuffer = wsConnectionPtr->readMessageBuffer; + + std::visit([&](auto& streamPtr) { + streamPtr->async_read( + readMessageBuffer, + beast::bind_front_handler(&Service::onReadWs, shared_from_this(), wsConnectionPtr) + ); + }, wsConnectionPtr->streamPtr); + + CCAPI_LOGGER_TRACE("after async_read"); } void onReadWs(std::shared_ptr wsConnectionPtr, const ErrorCode& ec, std::size_t n) { @@ -1175,14 +1234,21 @@ class Service : public std::enable_shared_from_this { CCAPI_LOGGER_TRACE("writeMessageBufferBoundary = " + toString(writeMessageBufferBoundary)); } - void startWriteWs(std::shared_ptr wsConnectionPtr, const char* data, size_t numBytesToWrite) { - auto& stream = *wsConnectionPtr->streamPtr; - CCAPI_LOGGER_TRACE("before async_write"); - CCAPI_LOGGER_TRACE("numBytesToWrite = " + toString(numBytesToWrite)); +void startWriteWs(std::shared_ptr wsConnectionPtr, const char* data, size_t numBytesToWrite) { + CCAPI_LOGGER_TRACE("before async_write"); + CCAPI_LOGGER_TRACE("numBytesToWrite = " + toString(numBytesToWrite)); + + std::visit([&](auto& streamPtr) { + auto& stream = *streamPtr; // dereference shared_ptr stream.binary(false); - stream.async_write(net::buffer(data, numBytesToWrite), beast::bind_front_handler(&Service::onWriteWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after async_write"); - } + stream.async_write( + net::buffer(data, numBytesToWrite), + beast::bind_front_handler(&Service::onWriteWs, shared_from_this(), wsConnectionPtr) + ); + }, wsConnectionPtr->streamPtr); + + CCAPI_LOGGER_TRACE("after async_write"); +} void onWriteWs(std::shared_ptr wsConnectionPtr, const ErrorCode& ec, std::size_t n) { CCAPI_LOGGER_FUNCTION_ENTER; @@ -1245,8 +1311,8 @@ class Service : public std::enable_shared_from_this { } else { CCAPI_LOGGER_INFO("about to retry"); try { - auto thatWsConnectionPtr = that->createWsConnectionPtr(wsConnectionPtr); - that->prepareConnect(thatWsConnectionPtr); + that->setWsConnectionStream(wsConnectionPtr); + that->prepareConnect(wsConnectionPtr); that->connectNumRetryOnFailByConnectionUrlMap[urlBase] += 1; } catch (const beast::error_code& ec) { CCAPI_LOGGER_TRACE("fail"); @@ -1259,16 +1325,13 @@ class Service : public std::enable_shared_from_this { this->connectRetryOnFailTimerByConnectionIdMap[wsConnectionId] = timerPtr; } - std::shared_ptr createWsConnectionPtr(std::shared_ptr wsConnectionPtr) { - std::shared_ptr thatWsConnectionPtr = wsConnectionPtr; - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = this->createWsStream(this->serviceContextPtr->ioContextPtr, this->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - throw ec; + void setWsConnectionStream(std::shared_ptr wsConnectionPtr) { + if (wsConnectionPtr->isSecure){ + wsConnectionPtr->streamPtr = std::make_shared>>( + *this->serviceContextPtr->ioContextPtr, *this->serviceContextPtr->sslContextPtr); + } else { + wsConnectionPtr->streamPtr = std::make_shared>(*this->serviceContextPtr->ioContextPtr); } - thatWsConnectionPtr->streamPtr = streamPtr; - return thatWsConnectionPtr; } virtual void onFail(std::shared_ptr wsConnectionPtr) { @@ -1333,63 +1396,76 @@ class Service : public std::enable_shared_from_this { this->eventHandler(event, nullptr); CCAPI_LOGGER_INFO("connection " + toString(*wsConnectionPtr) + " is closed"); this->clearStates(wsConnectionPtr); - auto thisWsConnectionPtr = this->createWsConnectionPtr(wsConnectionPtr); + this->setWsConnectionStream(wsConnectionPtr); this->wsConnectionPtrByIdMap.erase(wsConnectionPtr->id); if (this->shouldContinue.load()) { - this->prepareConnect(thisWsConnectionPtr); + this->prepareConnect(wsConnectionPtr); } CCAPI_LOGGER_FUNCTION_EXIT; } void onMessage(std::shared_ptr wsConnectionPtr, const char* data, size_t dataSize) { - auto now = UtilTime::now(); - CCAPI_LOGGER_DEBUG("received a message from connection " + toString(*wsConnectionPtr)); - if (wsConnectionPtr->status != WsConnection::Status::OPEN && !this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id]) { - CCAPI_LOGGER_WARN("should not process remaining message on closing"); - return; - } - auto& stream = *wsConnectionPtr->streamPtr; - if (stream.got_text()) { - boost::beast::string_view textMessage(data, dataSize); - CCAPI_LOGGER_DEBUG(std::string("received a text message: ") + std::string(textMessage)); - try { - this->onTextMessage(wsConnectionPtr, textMessage, now); - } catch (const std::exception& e) { - CCAPI_LOGGER_ERROR(std::string("textMessage = ") + std::string(textMessage)); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); - } - } else if (stream.got_binary()) { - CCAPI_LOGGER_DEBUG(std::string("received a binary message: ") + UtilAlgorithm::stringToHex(std::string(data, dataSize))); -#if defined(CCAPI_ENABLE_SERVICE_MARKET_DATA) && \ - (defined(CCAPI_ENABLE_EXCHANGE_HUOBI) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP)) || \ - defined(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT) && \ - (defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_BITMART)) - if (this->needDecompressWebsocketMessage) { - std::string decompressed; - boost::beast::string_view payload(data, dataSize); - try { - ErrorCode ec = this->inflater.decompress(reinterpret_cast(&payload[0]), payload.size(), decompressed); - if (ec) { - CCAPI_LOGGER_FATAL(ec.message()); - } - CCAPI_LOGGER_DEBUG("decompressed = " + decompressed); - this->onTextMessage(wsConnectionPtr, decompressed, now); - } catch (const std::exception& e) { - std::stringstream ss; - ss << std::hex << std::setfill('0'); - for (int i = 0; i < payload.size(); ++i) { - ss << std::setw(2) << static_cast(reinterpret_cast(&payload[0])[i]); - } - CCAPI_LOGGER_ERROR("binaryMessage = " + ss.str()); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); + auto now = UtilTime::now(); + CCAPI_LOGGER_DEBUG("received a message from connection " + toString(*wsConnectionPtr)); + if (wsConnectionPtr->status != WsConnection::Status::OPEN && + !this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id]) { + CCAPI_LOGGER_WARN("should not process remaining message on closing"); + return; } - ErrorCode ec = this->inflater.inflate_reset(); - if (ec) { - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "decompress"); + + std::visit([&](auto& streamPtr) { + auto& stream = *streamPtr; // dereference shared_ptr + + if (stream.got_text()) { + boost::beast::string_view textMessage(data, dataSize); + CCAPI_LOGGER_DEBUG("received a text message: " + std::string(textMessage)); + try { + this->onTextMessage(wsConnectionPtr, textMessage, now); + } catch (const std::exception& e) { + CCAPI_LOGGER_ERROR("textMessage = " + std::string(textMessage)); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); + } + } else if (stream.got_binary()) { + CCAPI_LOGGER_DEBUG("received a binary message: " + + UtilAlgorithm::stringToHex(std::string(data, dataSize))); + + #if defined(CCAPI_ENABLE_SERVICE_MARKET_DATA) && \ + (defined(CCAPI_ENABLE_EXCHANGE_HUOBI) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP)) || \ + defined(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT) && \ + (defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_BITMART)) + + if (this->needDecompressWebsocketMessage) { + std::string decompressed; + boost::beast::string_view payload(data, dataSize); + try { + ErrorCode ec = this->inflater.decompress(reinterpret_cast(&payload[0]), + payload.size(), decompressed); + if (ec) { + CCAPI_LOGGER_FATAL(ec.message()); + } + CCAPI_LOGGER_DEBUG("decompressed = " + decompressed); + this->onTextMessage(wsConnectionPtr, decompressed, now); + } catch (const std::exception& e) { + std::stringstream ss; + ss << std::hex << std::setfill('0'); + for (int i = 0; i < payload.size(); ++i) { + ss << std::setw(2) << static_cast(reinterpret_cast(&payload[0])[i]); + } + CCAPI_LOGGER_ERROR("binaryMessage = " + ss.str()); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); + } + + ErrorCode ec = this->inflater.inflate_reset(); + if (ec) { + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "decompress"); + } + } + + #endif // decompress block + } - } -#endif - } + + }, wsConnectionPtr->streamPtr); // <-- std::variant } void onControlCallback(std::shared_ptr wsConnectionPtr, boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { @@ -1427,14 +1503,18 @@ class Service : public std::enable_shared_from_this { this->writeMessage(wsConnectionPtr, payload.data(), payload.length()); } - void ping(std::shared_ptr wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) { - if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) { - auto& stream = *wsConnectionPtr->streamPtr; - stream.async_ping( - "", [that = this, wsConnectionPtr](ErrorCode const& ec) { that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; }); - this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true; - } +void ping(std::shared_ptr wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) { + if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) { + std::visit([&](auto& streamPtr) { + streamPtr->async_ping( + boost::beast::websocket::ping_data(payload), + [that = this, wsConnectionPtr](ErrorCode const& ec) { + that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; + }); + this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true; + }, wsConnectionPtr->streamPtr); } +} virtual void pingOnApplicationLevel(std::shared_ptr wsConnectionPtr, ErrorCode& ec) {} From 1554889cfc17a104f83ff2aae2b75db11974a39a Mon Sep 17 00:00:00 2001 From: ubuntu <> Date: Thu, 3 Jul 2025 00:16:40 +0000 Subject: [PATCH 2/6] use std::variant --- include/ccapi_cpp/ccapi_ws_connection.h | 24 +- ...pi_execution_management_service_ascendex.h | 4 +- ...agement_service_gateio_perpetual_futures.h | 2 +- include/ccapi_cpp/service/ccapi_service.h | 342 +++++++++--------- 4 files changed, 181 insertions(+), 191 deletions(-) diff --git a/include/ccapi_cpp/ccapi_ws_connection.h b/include/ccapi_cpp/ccapi_ws_connection.h index 085cbb01..75fbc7f9 100644 --- a/include/ccapi_cpp/ccapi_ws_connection.h +++ b/include/ccapi_cpp/ccapi_ws_connection.h @@ -3,6 +3,7 @@ #include #include + #include "ccapi_cpp/ccapi_logger.h" #include "ccapi_cpp/ccapi_subscription.h" @@ -16,7 +17,8 @@ class WsConnection { WsConnection(const WsConnection&) = delete; WsConnection& operator=(const WsConnection&) = delete; - WsConnection(const std::string& url, const std::string& group, const std::vector& subscriptionList, const std::map& credential) + WsConnection(const std::string& url, const std::string& group, const std::vector& subscriptionList, + const std::map& credential) : url(url), group(group), subscriptionList(subscriptionList), credential(credential) { std::map shortCredential; for (const auto& x : credential) { @@ -38,13 +40,15 @@ class WsConnection { shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH))); } std::ostringstream oss; - std::visit([&oss](auto&& streamSharedPtr) { - if (streamSharedPtr) { - oss << streamSharedPtr.get(); - } else { - oss << "nullptr"; - } -}, streamPtr); + std::visit( + [&oss](auto&& streamSharedPtr) { + if (streamSharedPtr) { + oss << streamSharedPtr.get(); + } else { + oss << "nullptr"; + } + }, + streamPtr); std::string output = "WsConnection [longId = " + longId + ", id = " + id + ", url = " + url + ", group = " + group + ", subscriptionList = " + ccapi::toString(subscriptionList) + ", credential = " + ccapi::toString(shortCredential) + ", status = " + statusToString(status) + ", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() + @@ -141,8 +145,8 @@ class WsConnection { Status status{Status::UNKNOWN}; std::map headers; std::map credential; - std::variant>>, - std::shared_ptr>> streamPtr; + std::variant>>, std::shared_ptr>> + streamPtr; beast::websocket::close_code remoteCloseCode{}; beast::websocket::close_reason remoteCloseReason{}; std::string hostHttpHeaderValue; diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h b/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h index 99eda574..abc3cd63 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h @@ -363,8 +363,8 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService { } const auto& accountGroup = mapGetWithDefault(credential, that->apiAccountGroupName); - auto wsConnectionPtr = std::make_shared( - that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "",std::vector{subscription}, credential); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "", + std::vector{subscription}, credential); that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h b/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h index 0318c77e..8eebdaf5 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h @@ -101,7 +101,7 @@ class ExecutionManagementServiceGateioPerpetualFutures : public ExecutionManagem credential = that->credentialDefault; } - auto wsConnectionPtr = std::make_shared(that->baseUrlWs + settle, "",std::vector{subscription}, credential); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs + settle, "", std::vector{subscription}, credential); that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index 4f338933..732f8d92 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -944,19 +944,18 @@ class Service : public std::enable_shared_from_this { return http::string_to_verb(methodStringUpper); } -void close(std::shared_ptr wsConnectionPtr, beast::websocket::close_code const code, beast::websocket::close_reason reason, ErrorCode& ec) { - if (wsConnectionPtr->status == WsConnection::Status::CLOSING) { - CCAPI_LOGGER_WARN("websocket connection is already in the state of closing"); - return; - } - wsConnectionPtr->status = WsConnection::Status::CLOSING; - wsConnectionPtr->remoteCloseCode = code; - wsConnectionPtr->remoteCloseReason = reason; + void close(std::shared_ptr wsConnectionPtr, beast::websocket::close_code const code, beast::websocket::close_reason reason, ErrorCode& ec) { + if (wsConnectionPtr->status == WsConnection::Status::CLOSING) { + CCAPI_LOGGER_WARN("websocket connection is already in the state of closing"); + return; + } + wsConnectionPtr->status = WsConnection::Status::CLOSING; + wsConnectionPtr->remoteCloseCode = code; + wsConnectionPtr->remoteCloseReason = reason; - std::visit([&](auto& streamPtr) { - streamPtr->async_close(code, beast::bind_front_handler(&Service::onClose, shared_from_this(), wsConnectionPtr)); - }, wsConnectionPtr->streamPtr); -} + std::visit([&](auto& streamPtr) { streamPtr->async_close(code, beast::bind_front_handler(&Service::onClose, shared_from_this(), wsConnectionPtr)); }, + wsConnectionPtr->streamPtr); + } virtual void prepareConnect(std::shared_ptr wsConnectionPtr) { this->connect(wsConnectionPtr); } @@ -990,152 +989,143 @@ void close(std::shared_ptr wsConnectionPtr, beast::websocket::clos } void startConnectWs(std::shared_ptr wsConnectionPtr, long timeoutMilliseconds, tcp::resolver::results_type tcpResolverResults) { - std::visit([&](auto& streamPtr) { - using StreamType = std::decay_t; + std::visit( + [&](auto& streamPtr) { + using StreamType = std::decay_t; - if (timeoutMilliseconds > 0) { + if (timeoutMilliseconds > 0) { beast::get_lowest_layer(*streamPtr).expires_after(std::chrono::milliseconds(timeoutMilliseconds)); - } + } - if constexpr (std::is_same_v>>) { + if constexpr (std::is_same_v>>) { // Set SNI hostname (only for WSS) if (!SSL_set_tlsext_host_name(streamPtr->next_layer().native_handle(), wsConnectionPtr->host.c_str())) { - beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; - CCAPI_LOGGER_DEBUG("error SSL_set_tlsext_host_name: " + ec.message()); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, - "set SNI Hostname", wsConnectionPtr->correlationIdList); - return; + beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; + CCAPI_LOGGER_DEBUG("error SSL_set_tlsext_host_name: " + ec.message()); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "set SNI Hostname", wsConnectionPtr->correlationIdList); + return; } - } + } - CCAPI_LOGGER_TRACE("before async_connect"); + CCAPI_LOGGER_TRACE("before async_connect"); - beast::get_lowest_layer(*streamPtr).async_connect( - tcpResolverResults, - beast::bind_front_handler(&Service::onConnectWs, shared_from_this(), wsConnectionPtr) - ); + beast::get_lowest_layer(*streamPtr) + .async_connect(tcpResolverResults, beast::bind_front_handler(&Service::onConnectWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after async_connect"); - }, wsConnectionPtr->streamPtr); + CCAPI_LOGGER_TRACE("after async_connect"); + }, + wsConnectionPtr->streamPtr); } void onConnectWs(std::shared_ptr wsConnectionPtr, beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) { - CCAPI_LOGGER_TRACE("async_connect callback start"); - if (ec) { - CCAPI_LOGGER_TRACE("fail"); - this->onFail(wsConnectionPtr); - return; - } + CCAPI_LOGGER_TRACE("async_connect callback start"); + if (ec) { + CCAPI_LOGGER_TRACE("fail"); + this->onFail(wsConnectionPtr); + return; + } - CCAPI_LOGGER_TRACE("connected"); - CCAPI_LOGGER_TRACE("ep.port() = " + std::to_string(ep.port())); + CCAPI_LOGGER_TRACE("connected"); + CCAPI_LOGGER_TRACE("ep.port() = " + std::to_string(ep.port())); - wsConnectionPtr->hostHttpHeaderValue = - this->hostHttpHeaderValueIgnorePort ? wsConnectionPtr->host - : wsConnectionPtr->host + ':' + std::to_string(ep.port()); + wsConnectionPtr->hostHttpHeaderValue = + this->hostHttpHeaderValueIgnorePort ? wsConnectionPtr->host : wsConnectionPtr->host + ':' + std::to_string(ep.port()); - CCAPI_LOGGER_TRACE("wsConnectionPtr->hostHttpHeaderValue = " + wsConnectionPtr->hostHttpHeaderValue); + CCAPI_LOGGER_TRACE("wsConnectionPtr->hostHttpHeaderValue = " + wsConnectionPtr->hostHttpHeaderValue); - // Use std::visit to access the concrete stream - std::visit([&](auto& streamPtr) { - using StreamType = std::decay_t; + // Use std::visit to access the concrete stream + std::visit( + [&](auto& streamPtr) { + using StreamType = std::decay_t; - // Set TCP_NODELAY - beast::get_lowest_layer(*streamPtr).socket().set_option(tcp::no_delay(true)); + // Set TCP_NODELAY + beast::get_lowest_layer(*streamPtr).socket().set_option(tcp::no_delay(true)); - if constexpr (std::is_same_v>>) { + if constexpr (std::is_same_v>>) { CCAPI_LOGGER_TRACE("before ssl async_handshake"); - streamPtr->next_layer().async_handshake( - ssl::stream_base::client, - beast::bind_front_handler(&Service::onSslHandshakeWs, shared_from_this(), wsConnectionPtr) - ); + streamPtr->next_layer().async_handshake(ssl::stream_base::client, + beast::bind_front_handler(&Service::onSslHandshakeWs, shared_from_this(), wsConnectionPtr)); CCAPI_LOGGER_TRACE("after ssl async_handshake"); - } else { + } else { // Non-SSL streams skip SSL handshake and go straight to WebSocket handshake this->onSslHandshakeWs(wsConnectionPtr, {}); - } - }, wsConnectionPtr->streamPtr); + } + }, + wsConnectionPtr->streamPtr); } void onSslHandshakeWs(std::shared_ptr wsConnectionPtr, beast::error_code ec) { - CCAPI_LOGGER_TRACE("ssl async_handshake callback start"); - if (ec) { - CCAPI_LOGGER_TRACE("ssl handshake fail"); - this->onFail(wsConnectionPtr); - return; - } - CCAPI_LOGGER_TRACE("ssl handshaked"); + CCAPI_LOGGER_TRACE("ssl async_handshake callback start"); + if (ec) { + CCAPI_LOGGER_TRACE("ssl handshake fail"); + this->onFail(wsConnectionPtr); + return; + } + CCAPI_LOGGER_TRACE("ssl handshaked"); - std::visit([&](auto& streamPtr) { - auto& stream = *streamPtr; - beast::get_lowest_layer(stream).expires_never(); + std::visit( + [&](auto& streamPtr) { + auto& stream = *streamPtr; + beast::get_lowest_layer(stream).expires_never(); - beast::websocket::stream_base::timeout opt{ - std::chrono::milliseconds(this->sessionOptions.websocketConnectTimeoutMilliseconds), - std::chrono::milliseconds(this->sessionOptions.pongWebsocketProtocolLevelTimeoutMilliseconds), - true - }; + beast::websocket::stream_base::timeout opt{std::chrono::milliseconds(this->sessionOptions.websocketConnectTimeoutMilliseconds), + std::chrono::milliseconds(this->sessionOptions.pongWebsocketProtocolLevelTimeoutMilliseconds), true}; - stream.set_option(opt); - stream.set_option(beast::websocket::stream_base::decorator([wsConnectionPtr](beast::websocket::request_type& req) { + stream.set_option(opt); + stream.set_option(beast::websocket::stream_base::decorator([wsConnectionPtr](beast::websocket::request_type& req) { req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); for (const auto& kv : wsConnectionPtr->headers) { - req.set(kv.first, kv.second); + req.set(kv.first, kv.second); } - })); - - CCAPI_LOGGER_TRACE("before ws async_handshake"); - stream.async_handshake( - wsConnectionPtr->hostHttpHeaderValue, - wsConnectionPtr->path, - beast::bind_front_handler(&Service::onWsHandshakeWs, shared_from_this(), wsConnectionPtr) - ); - CCAPI_LOGGER_TRACE("after ws async_handshake"); + })); - }, wsConnectionPtr->streamPtr); + CCAPI_LOGGER_TRACE("before ws async_handshake"); + stream.async_handshake(wsConnectionPtr->hostHttpHeaderValue, wsConnectionPtr->path, + beast::bind_front_handler(&Service::onWsHandshakeWs, shared_from_this(), wsConnectionPtr)); + CCAPI_LOGGER_TRACE("after ws async_handshake"); + }, + wsConnectionPtr->streamPtr); } void onWsHandshakeWs(std::shared_ptr wsConnectionPtr, beast::error_code ec) { - CCAPI_LOGGER_TRACE("ws async_handshake callback start"); - if (ec) { - CCAPI_LOGGER_TRACE("ws handshake fail"); - this->onFail(wsConnectionPtr); - return; - } - CCAPI_LOGGER_TRACE("ws handshaked"); + CCAPI_LOGGER_TRACE("ws async_handshake callback start"); + if (ec) { + CCAPI_LOGGER_TRACE("ws handshake fail"); + this->onFail(wsConnectionPtr); + return; + } + CCAPI_LOGGER_TRACE("ws handshaked"); - // Finalize connection setup - this->onOpen(wsConnectionPtr); - this->wsConnectionPtrByIdMap.insert({wsConnectionPtr->id, wsConnectionPtr}); - CCAPI_LOGGER_TRACE("about to start read"); + // Finalize connection setup + this->onOpen(wsConnectionPtr); + this->wsConnectionPtrByIdMap.insert({wsConnectionPtr->id, wsConnectionPtr}); + CCAPI_LOGGER_TRACE("about to start read"); - // Start reading messages - this->startReadWs(wsConnectionPtr); + // Start reading messages + this->startReadWs(wsConnectionPtr); - // Setup control callback (ping/pong/close) - std::visit([&](auto& streamPtr) { - streamPtr->control_callback( - [wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { - that->onControlCallback(wsConnectionPtr, kind, payload); - } - ); - }, wsConnectionPtr->streamPtr); + // Setup control callback (ping/pong/close) + std::visit( + [&](auto& streamPtr) { + streamPtr->control_callback( + [wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { + that->onControlCallback(wsConnectionPtr, kind, payload); + }); + }, + wsConnectionPtr->streamPtr); } void startReadWs(std::shared_ptr wsConnectionPtr) { - CCAPI_LOGGER_TRACE("before async_read"); - auto& readMessageBuffer = wsConnectionPtr->readMessageBuffer; + CCAPI_LOGGER_TRACE("before async_read"); + auto& readMessageBuffer = wsConnectionPtr->readMessageBuffer; - std::visit([&](auto& streamPtr) { - streamPtr->async_read( - readMessageBuffer, - beast::bind_front_handler(&Service::onReadWs, shared_from_this(), wsConnectionPtr) - ); - }, wsConnectionPtr->streamPtr); + std::visit( + [&](auto& streamPtr) { streamPtr->async_read(readMessageBuffer, beast::bind_front_handler(&Service::onReadWs, shared_from_this(), wsConnectionPtr)); }, + wsConnectionPtr->streamPtr); - CCAPI_LOGGER_TRACE("after async_read"); + CCAPI_LOGGER_TRACE("after async_read"); } void onReadWs(std::shared_ptr wsConnectionPtr, const ErrorCode& ec, std::size_t n) { @@ -1234,21 +1224,20 @@ void close(std::shared_ptr wsConnectionPtr, beast::websocket::clos CCAPI_LOGGER_TRACE("writeMessageBufferBoundary = " + toString(writeMessageBufferBoundary)); } -void startWriteWs(std::shared_ptr wsConnectionPtr, const char* data, size_t numBytesToWrite) { - CCAPI_LOGGER_TRACE("before async_write"); - CCAPI_LOGGER_TRACE("numBytesToWrite = " + toString(numBytesToWrite)); + void startWriteWs(std::shared_ptr wsConnectionPtr, const char* data, size_t numBytesToWrite) { + CCAPI_LOGGER_TRACE("before async_write"); + CCAPI_LOGGER_TRACE("numBytesToWrite = " + toString(numBytesToWrite)); - std::visit([&](auto& streamPtr) { - auto& stream = *streamPtr; // dereference shared_ptr - stream.binary(false); - stream.async_write( - net::buffer(data, numBytesToWrite), - beast::bind_front_handler(&Service::onWriteWs, shared_from_this(), wsConnectionPtr) - ); - }, wsConnectionPtr->streamPtr); + std::visit( + [&](auto& streamPtr) { + auto& stream = *streamPtr; // dereference shared_ptr + stream.binary(false); + stream.async_write(net::buffer(data, numBytesToWrite), beast::bind_front_handler(&Service::onWriteWs, shared_from_this(), wsConnectionPtr)); + }, + wsConnectionPtr->streamPtr); - CCAPI_LOGGER_TRACE("after async_write"); -} + CCAPI_LOGGER_TRACE("after async_write"); + } void onWriteWs(std::shared_ptr wsConnectionPtr, const ErrorCode& ec, std::size_t n) { CCAPI_LOGGER_FUNCTION_ENTER; @@ -1326,11 +1315,11 @@ void startWriteWs(std::shared_ptr wsConnectionPtr, const char* dat } void setWsConnectionStream(std::shared_ptr wsConnectionPtr) { - if (wsConnectionPtr->isSecure){ - wsConnectionPtr->streamPtr = std::make_shared>>( - *this->serviceContextPtr->ioContextPtr, *this->serviceContextPtr->sslContextPtr); + if (wsConnectionPtr->isSecure) { + wsConnectionPtr->streamPtr = std::make_shared>>(*this->serviceContextPtr->ioContextPtr, + *this->serviceContextPtr->sslContextPtr); } else { - wsConnectionPtr->streamPtr = std::make_shared>(*this->serviceContextPtr->ioContextPtr); + wsConnectionPtr->streamPtr = std::make_shared>(*this->serviceContextPtr->ioContextPtr); } } @@ -1405,67 +1394,64 @@ void startWriteWs(std::shared_ptr wsConnectionPtr, const char* dat } void onMessage(std::shared_ptr wsConnectionPtr, const char* data, size_t dataSize) { - auto now = UtilTime::now(); - CCAPI_LOGGER_DEBUG("received a message from connection " + toString(*wsConnectionPtr)); - if (wsConnectionPtr->status != WsConnection::Status::OPEN && - !this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id]) { - CCAPI_LOGGER_WARN("should not process remaining message on closing"); - return; - } + auto now = UtilTime::now(); + CCAPI_LOGGER_DEBUG("received a message from connection " + toString(*wsConnectionPtr)); + if (wsConnectionPtr->status != WsConnection::Status::OPEN && !this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id]) { + CCAPI_LOGGER_WARN("should not process remaining message on closing"); + return; + } - std::visit([&](auto& streamPtr) { - auto& stream = *streamPtr; // dereference shared_ptr + std::visit( + [&](auto& streamPtr) { + auto& stream = *streamPtr; // dereference shared_ptr - if (stream.got_text()) { + if (stream.got_text()) { boost::beast::string_view textMessage(data, dataSize); CCAPI_LOGGER_DEBUG("received a text message: " + std::string(textMessage)); try { - this->onTextMessage(wsConnectionPtr, textMessage, now); + this->onTextMessage(wsConnectionPtr, textMessage, now); } catch (const std::exception& e) { - CCAPI_LOGGER_ERROR("textMessage = " + std::string(textMessage)); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); + CCAPI_LOGGER_ERROR("textMessage = " + std::string(textMessage)); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); } - } else if (stream.got_binary()) { - CCAPI_LOGGER_DEBUG("received a binary message: " + - UtilAlgorithm::stringToHex(std::string(data, dataSize))); + } else if (stream.got_binary()) { + CCAPI_LOGGER_DEBUG("received a binary message: " + UtilAlgorithm::stringToHex(std::string(data, dataSize))); - #if defined(CCAPI_ENABLE_SERVICE_MARKET_DATA) && \ - (defined(CCAPI_ENABLE_EXCHANGE_HUOBI) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP)) || \ - defined(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT) && \ - (defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_BITMART)) +#if defined(CCAPI_ENABLE_SERVICE_MARKET_DATA) && \ + (defined(CCAPI_ENABLE_EXCHANGE_HUOBI) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP)) || \ + defined(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT) && \ + (defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_BITMART)) if (this->needDecompressWebsocketMessage) { - std::string decompressed; - boost::beast::string_view payload(data, dataSize); - try { - ErrorCode ec = this->inflater.decompress(reinterpret_cast(&payload[0]), - payload.size(), decompressed); + std::string decompressed; + boost::beast::string_view payload(data, dataSize); + try { + ErrorCode ec = this->inflater.decompress(reinterpret_cast(&payload[0]), payload.size(), decompressed); if (ec) { - CCAPI_LOGGER_FATAL(ec.message()); + CCAPI_LOGGER_FATAL(ec.message()); } CCAPI_LOGGER_DEBUG("decompressed = " + decompressed); this->onTextMessage(wsConnectionPtr, decompressed, now); - } catch (const std::exception& e) { + } catch (const std::exception& e) { std::stringstream ss; ss << std::hex << std::setfill('0'); for (int i = 0; i < payload.size(); ++i) { - ss << std::setw(2) << static_cast(reinterpret_cast(&payload[0])[i]); + ss << std::setw(2) << static_cast(reinterpret_cast(&payload[0])[i]); } CCAPI_LOGGER_ERROR("binaryMessage = " + ss.str()); this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); - } + } - ErrorCode ec = this->inflater.inflate_reset(); - if (ec) { + ErrorCode ec = this->inflater.inflate_reset(); + if (ec) { this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "decompress"); + } } - } - - #endif // decompress block - } - - }, wsConnectionPtr->streamPtr); // <-- std::variant +#endif // decompress block + } + }, + wsConnectionPtr->streamPtr); // <-- std::variant } void onControlCallback(std::shared_ptr wsConnectionPtr, boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { @@ -1503,18 +1489,18 @@ void startWriteWs(std::shared_ptr wsConnectionPtr, const char* dat this->writeMessage(wsConnectionPtr, payload.data(), payload.length()); } -void ping(std::shared_ptr wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) { - if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) { - std::visit([&](auto& streamPtr) { - streamPtr->async_ping( - boost::beast::websocket::ping_data(payload), - [that = this, wsConnectionPtr](ErrorCode const& ec) { - that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; - }); - this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true; - }, wsConnectionPtr->streamPtr); + void ping(std::shared_ptr wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) { + if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) { + std::visit( + [&](auto& streamPtr) { + streamPtr->async_ping(boost::beast::websocket::ping_data(payload), [that = this, wsConnectionPtr](ErrorCode const& ec) { + that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; + }); + this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true; + }, + wsConnectionPtr->streamPtr); + } } -} virtual void pingOnApplicationLevel(std::shared_ptr wsConnectionPtr, ErrorCode& ec) {} From 08fbf731c62962cd9f2297d26ab162eab2f9f4ee Mon Sep 17 00:00:00 2001 From: ubuntu <> Date: Thu, 3 Jul 2025 03:21:26 +0000 Subject: [PATCH 3/6] add function clear to WsConnection --- include/ccapi_cpp/ccapi_ws_connection.h | 12 ++++++++++++ include/ccapi_cpp/service/ccapi_service.h | 2 ++ 2 files changed, 14 insertions(+) diff --git a/include/ccapi_cpp/ccapi_ws_connection.h b/include/ccapi_cpp/ccapi_ws_connection.h index 75fbc7f9..23f032b2 100644 --- a/include/ccapi_cpp/ccapi_ws_connection.h +++ b/include/ccapi_cpp/ccapi_ws_connection.h @@ -136,6 +136,18 @@ class WsConnection { this->setUrlParts(); } + void clear() { + this->status = Status::UNKNOWN; + this->headers.clear(); + this->streamPtr = {}; // Reset the variant (holds std::monostate if uninitialized) + this->remoteCloseCode = {}; + this->remoteCloseReason = {}; + this->readMessageBuffer.clear(); + this->writeMessageBuffer.fill(0); // Zero out the write buffer + this->writeMessageBufferWrittenLength = 0; + this->writeMessageBufferBoundary.clear(); + } + std::string longId; std::string id; std::string url; diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index 732f8d92..cfc0debb 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -1283,6 +1283,7 @@ class Service : public std::enable_shared_from_this { "connection " + toString(*wsConnectionPtr) + " has failed before opening", wsConnectionPtr->correlationIdList); std::string wsConnectionId = wsConnectionPtr->id; std::string wsConnectionUrl = wsConnectionPtr->url; + wsConnectionPtr->clear(); this->wsConnectionPtrByIdMap.erase(wsConnectionId); auto urlBase = UtilString::split(wsConnectionUrl, "?").at(0); long seconds = std::round(UtilAlgorithm::exponentialBackoff(1, 1, 2, std::min(this->connectNumRetryOnFailByConnectionUrlMap[urlBase], 6))); @@ -1386,6 +1387,7 @@ class Service : public std::enable_shared_from_this { CCAPI_LOGGER_INFO("connection " + toString(*wsConnectionPtr) + " is closed"); this->clearStates(wsConnectionPtr); this->setWsConnectionStream(wsConnectionPtr); + wsConnectionPtr->clear(); this->wsConnectionPtrByIdMap.erase(wsConnectionPtr->id); if (this->shouldContinue.load()) { this->prepareConnect(wsConnectionPtr); From ebbf54da57b6b34abf5e8d6777406797fcc7cef3 Mon Sep 17 00:00:00 2001 From: tk <> Date: Thu, 3 Jul 2025 04:04:06 +0000 Subject: [PATCH 4/6] remove clear function in WsConnection --- include/ccapi_cpp/ccapi_ws_connection.h | 12 ------------ include/ccapi_cpp/service/ccapi_service.h | 2 -- 2 files changed, 14 deletions(-) diff --git a/include/ccapi_cpp/ccapi_ws_connection.h b/include/ccapi_cpp/ccapi_ws_connection.h index 23f032b2..75fbc7f9 100644 --- a/include/ccapi_cpp/ccapi_ws_connection.h +++ b/include/ccapi_cpp/ccapi_ws_connection.h @@ -136,18 +136,6 @@ class WsConnection { this->setUrlParts(); } - void clear() { - this->status = Status::UNKNOWN; - this->headers.clear(); - this->streamPtr = {}; // Reset the variant (holds std::monostate if uninitialized) - this->remoteCloseCode = {}; - this->remoteCloseReason = {}; - this->readMessageBuffer.clear(); - this->writeMessageBuffer.fill(0); // Zero out the write buffer - this->writeMessageBufferWrittenLength = 0; - this->writeMessageBufferBoundary.clear(); - } - std::string longId; std::string id; std::string url; diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index cfc0debb..732f8d92 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -1283,7 +1283,6 @@ class Service : public std::enable_shared_from_this { "connection " + toString(*wsConnectionPtr) + " has failed before opening", wsConnectionPtr->correlationIdList); std::string wsConnectionId = wsConnectionPtr->id; std::string wsConnectionUrl = wsConnectionPtr->url; - wsConnectionPtr->clear(); this->wsConnectionPtrByIdMap.erase(wsConnectionId); auto urlBase = UtilString::split(wsConnectionUrl, "?").at(0); long seconds = std::round(UtilAlgorithm::exponentialBackoff(1, 1, 2, std::min(this->connectNumRetryOnFailByConnectionUrlMap[urlBase], 6))); @@ -1387,7 +1386,6 @@ class Service : public std::enable_shared_from_this { CCAPI_LOGGER_INFO("connection " + toString(*wsConnectionPtr) + " is closed"); this->clearStates(wsConnectionPtr); this->setWsConnectionStream(wsConnectionPtr); - wsConnectionPtr->clear(); this->wsConnectionPtrByIdMap.erase(wsConnectionPtr->id); if (this->shouldContinue.load()) { this->prepareConnect(wsConnectionPtr); From ec47d146d5413bbd29daed36a5e6c3ec154fa637 Mon Sep 17 00:00:00 2001 From: tk <> Date: Thu, 3 Jul 2025 17:59:30 +0000 Subject: [PATCH 5/6] tmp --- include/ccapi_cpp/service/ccapi_service.h | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index 732f8d92..003d59c8 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -22,9 +22,19 @@ #ifndef RAPIDJSON_PARSE_ERROR_NORETURN #define RAPIDJSON_PARSE_ERROR_NORETURN(parseErrorCode, offset) throw std::runtime_error(#parseErrorCode) #endif + #ifndef CCAPI_WEBSOCKET_WRITE_BUFFER_SIZE #define CCAPI_WEBSOCKET_WRITE_BUFFER_SIZE (1 << 20) #endif + +#ifndef CCAPI_SOCKET_SEND_BUFFER_SIZE +#define CCAPI_SOCKET_SEND_BUFFER_SIZE (1 << 20) +#endif + +#ifndef CCAPI_SOCKET_RECEIVE_BUFFER_SIZE +#define CCAPI_SOCKET_RECEIVE_BUFFER_SIZE (1 << 20) +#endif + #include #include "boost/asio/strand.hpp" @@ -1038,8 +1048,14 @@ class Service : public std::enable_shared_from_this { [&](auto& streamPtr) { using StreamType = std::decay_t; - // Set TCP_NODELAY - beast::get_lowest_layer(*streamPtr).socket().set_option(tcp::no_delay(true)); + auto& lowestLayer = beast::get_lowest_layer(*streamPtr).socket(); + + // Disable Nagle + lowestLayer.set_option(tcp::no_delay(true)); + + // Set buffer sizes (example: 1 MB) + lowestLayer.set_option(boost::asio::socket_base::send_buffer_size(CCAPI_SOCKET_SEND_BUFFER_SIZE)); + lowestLayer.set_option(boost::asio::socket_base::receive_buffer_size(CCAPI_SOCKET_RECEIVE_BUFFER_SIZE)); if constexpr (std::is_same_v>>) { CCAPI_LOGGER_TRACE("before ssl async_handshake"); From 4201dd881627074e09a9f5d69df31a5af5dc55b2 Mon Sep 17 00:00:00 2001 From: ubuntu <> Date: Thu, 10 Jul 2025 20:21:23 +0000 Subject: [PATCH 6/6] remove CCAPI_SOCKET_SEND_BUFFER_SIZE and CCAPI_SOCKET_RECEIVE_BUFFER_SIZE --- include/ccapi_cpp/service/ccapi_service.h | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index 003d59c8..cdd07a30 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -27,14 +27,6 @@ #define CCAPI_WEBSOCKET_WRITE_BUFFER_SIZE (1 << 20) #endif -#ifndef CCAPI_SOCKET_SEND_BUFFER_SIZE -#define CCAPI_SOCKET_SEND_BUFFER_SIZE (1 << 20) -#endif - -#ifndef CCAPI_SOCKET_RECEIVE_BUFFER_SIZE -#define CCAPI_SOCKET_RECEIVE_BUFFER_SIZE (1 << 20) -#endif - #include #include "boost/asio/strand.hpp" @@ -1050,12 +1042,8 @@ class Service : public std::enable_shared_from_this { auto& lowestLayer = beast::get_lowest_layer(*streamPtr).socket(); - // Disable Nagle - lowestLayer.set_option(tcp::no_delay(true)); - - // Set buffer sizes (example: 1 MB) - lowestLayer.set_option(boost::asio::socket_base::send_buffer_size(CCAPI_SOCKET_SEND_BUFFER_SIZE)); - lowestLayer.set_option(boost::asio::socket_base::receive_buffer_size(CCAPI_SOCKET_RECEIVE_BUFFER_SIZE)); + // Disable Nagle + lowestLayer.set_option(tcp::no_delay(true)); if constexpr (std::is_same_v>>) { CCAPI_LOGGER_TRACE("before ssl async_handshake");