diff --git a/include/ccapi_cpp/ccapi_fix_connection.h b/include/ccapi_cpp/ccapi_fix_connection.h index e81de28c..bf510846 100644 --- a/include/ccapi_cpp/ccapi_fix_connection.h +++ b/include/ccapi_cpp/ccapi_fix_connection.h @@ -69,7 +69,7 @@ class FixConnection { std::string url; Subscription subscription; Status status{Status::UNKNOWN}; - std::shared_ptr streamPtr; + std::shared_ptr streamPtr{nullptr}; }; } /* namespace ccapi */ diff --git a/include/ccapi_cpp/ccapi_http_connection.h b/include/ccapi_cpp/ccapi_http_connection.h index 566213b5..ffc395f1 100644 --- a/include/ccapi_cpp/ccapi_http_connection.h +++ b/include/ccapi_cpp/ccapi_http_connection.h @@ -30,7 +30,7 @@ class HttpConnection { std::string host; std::string port; - std::shared_ptr> streamPtr; + std::shared_ptr> streamPtr{nullptr}; TimePoint lastReceiveDataTp{std::chrono::seconds{0}}; boost::beast::flat_buffer buffer; diff --git a/include/ccapi_cpp/ccapi_ws_connection.h b/include/ccapi_cpp/ccapi_ws_connection.h index 161a554f..75fbc7f9 100644 --- a/include/ccapi_cpp/ccapi_ws_connection.h +++ b/include/ccapi_cpp/ccapi_ws_connection.h @@ -2,6 +2,7 @@ #define INCLUDE_CCAPI_CPP_CCAPI_WS_CONNECTION_H_ #include +#include #include "ccapi_cpp/ccapi_logger.h" #include "ccapi_cpp/ccapi_subscription.h" @@ -16,9 +17,9 @@ class WsConnection { WsConnection(const WsConnection&) = delete; WsConnection& operator=(const WsConnection&) = delete; - WsConnection(std::string url, std::string group, std::vector subscriptionList, std::map credential, - std::shared_ptr>> streamPtr) - : url(url), group(group), subscriptionList(subscriptionList), credential(credential), streamPtr(streamPtr) { + WsConnection(const std::string& url, const std::string& group, const std::vector& subscriptionList, + const std::map& credential) + : url(url), group(group), subscriptionList(subscriptionList), credential(credential) { std::map shortCredential; for (const auto& x : credential) { shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH))); @@ -39,7 +40,15 @@ class WsConnection { shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH))); } std::ostringstream oss; - oss << streamPtr; + std::visit( + [&oss](auto&& streamSharedPtr) { + if (streamSharedPtr) { + oss << streamSharedPtr.get(); + } else { + oss << "nullptr"; + } + }, + streamPtr); std::string output = "WsConnection [longId = " + longId + ", id = " + id + ", url = " + url + ", group = " + group + ", subscriptionList = " + ccapi::toString(subscriptionList) + ", credential = " + ccapi::toString(shortCredential) + ", status = " + statusToString(status) + ", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() + @@ -116,6 +125,9 @@ class WsConnection { this->port = CCAPI_HTTP_PORT_DEFAULT; } } + if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") { + this->isSecure = true; + } } } @@ -133,7 +145,8 @@ class WsConnection { Status status{Status::UNKNOWN}; std::map headers; std::map credential; - std::shared_ptr>> streamPtr; + std::variant>>, std::shared_ptr>> + streamPtr; beast::websocket::close_code remoteCloseCode{}; beast::websocket::close_reason remoteCloseReason{}; std::string hostHttpHeaderValue; @@ -145,6 +158,7 @@ class WsConnection { std::array writeMessageBuffer; size_t writeMessageBufferWrittenLength{}; std::vector writeMessageBufferBoundary; + bool isSecure{}; }; } /* namespace ccapi */ diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service.h b/include/ccapi_cpp/service/ccapi_execution_management_service.h index 51188643..174786ce 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service.h @@ -61,21 +61,15 @@ class ExecutionManagementService : public Service { credential = that->credentialDefault; } - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()}); - return; - } const auto& fieldSet = subscription.getFieldSet(); if (fieldSet.find(CCAPI_EM_WEBSOCKET_ORDER_ENTRY) != fieldSet.end()) { - std::shared_ptr wsConnectionPtr(new WsConnection(that->baseUrlWsOrderEntry, "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWsOrderEntry, "", std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } else { - std::shared_ptr wsConnectionPtr(new WsConnection(that->baseUrlWs, "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs, "", std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h b/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h index b01a05fa..abc3cd63 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service_ascendex.h @@ -363,16 +363,9 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService { } const auto& accountGroup = mapGetWithDefault(credential, that->apiAccountGroupName); - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()}); - return; - } - std::shared_ptr wsConnectionPtr( - new WsConnection(that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "", + std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); }); diff --git a/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h b/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h index e88bb36f..8eebdaf5 100644 --- a/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h +++ b/include/ccapi_cpp/service/ccapi_execution_management_service_gateio_perpetual_futures.h @@ -101,15 +101,8 @@ class ExecutionManagementServiceGateioPerpetualFutures : public ExecutionManagem credential = that->credentialDefault; } - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()}); - return; - } - std::shared_ptr wsConnectionPtr(new WsConnection(that->baseUrlWs + settle, "", {subscription}, credential, streamPtr)); + auto wsConnectionPtr = std::make_shared(that->baseUrlWs + settle, "", std::vector{subscription}, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } diff --git a/include/ccapi_cpp/service/ccapi_market_data_service.h b/include/ccapi_cpp/service/ccapi_market_data_service.h index 2130e0f9..42655f23 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service.h @@ -92,19 +92,9 @@ class MarketDataService : public Service { if (credential.empty()) { credential = that->credentialDefault; } - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - CCAPI_LOGGER_TRACE("fail"); - std::vector correlationIdList; - correlationIdList.reserve(subscriptionListGivenInstrumentGroup.size()); - std::transform(subscriptionListGivenInstrumentGroup.cbegin(), subscriptionListGivenInstrumentGroup.cend(), std::back_inserter(correlationIdList), - [](Subscription subscription) { return subscription.getCorrelationId(); }); - that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", correlationIdList); - return; - } - std::shared_ptr wsConnectionPtr(new WsConnection(url, instrumentGroup, subscriptionListGivenInstrumentGroup, credential, streamPtr)); + + auto wsConnectionPtr = std::make_shared(url, instrumentGroup, subscriptionListGivenInstrumentGroup, credential); + that->setWsConnectionStream(wsConnectionPtr); CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr)); that->prepareConnect(wsConnectionPtr); } diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index 5da4897f..cdd07a30 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -22,9 +22,11 @@ #ifndef RAPIDJSON_PARSE_ERROR_NORETURN #define RAPIDJSON_PARSE_ERROR_NORETURN(parseErrorCode, offset) throw std::runtime_error(#parseErrorCode) #endif + #ifndef CCAPI_WEBSOCKET_WRITE_BUFFER_SIZE #define CCAPI_WEBSOCKET_WRITE_BUFFER_SIZE (1 << 20) #endif + #include #include "boost/asio/strand.hpp" @@ -525,10 +527,6 @@ class Service : public std::enable_shared_from_this { return streamPtr; } - std::shared_ptr>> createWsStream(net::io_context* iocPtr, net::ssl::context* ctxPtr) { - return std::make_shared>>(*iocPtr, *ctxPtr); - } - void performRequestWithNewHttpConnection(std::shared_ptr httpConnectionPtr, const Request& request, http::request& req, const HttpRetry& retry, Queue* eventQueuePtr) { CCAPI_LOGGER_FUNCTION_ENTER; @@ -956,7 +954,9 @@ class Service : public std::enable_shared_from_this { wsConnectionPtr->status = WsConnection::Status::CLOSING; wsConnectionPtr->remoteCloseCode = code; wsConnectionPtr->remoteCloseReason = reason; - wsConnectionPtr->streamPtr->async_close(code, beast::bind_front_handler(&Service::onClose, shared_from_this(), wsConnectionPtr)); + + std::visit([&](auto& streamPtr) { streamPtr->async_close(code, beast::bind_front_handler(&Service::onClose, shared_from_this(), wsConnectionPtr)); }, + wsConnectionPtr->streamPtr); } virtual void prepareConnect(std::shared_ptr wsConnectionPtr) { this->connect(wsConnectionPtr); } @@ -991,21 +991,32 @@ class Service : public std::enable_shared_from_this { } void startConnectWs(std::shared_ptr wsConnectionPtr, long timeoutMilliseconds, tcp::resolver::results_type tcpResolverResults) { - beast::websocket::stream>& stream = *wsConnectionPtr->streamPtr; - if (timeoutMilliseconds > 0) { - beast::get_lowest_layer(stream).expires_after(std::chrono::milliseconds(timeoutMilliseconds)); - } - // Set SNI Hostname (many hosts need this to handshake successfully) - CCAPI_LOGGER_TRACE("wsConnectionPtr->host = " + wsConnectionPtr->host) - if (!SSL_set_tlsext_host_name(stream.next_layer().native_handle(), wsConnectionPtr->host.c_str())) { - beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; - CCAPI_LOGGER_DEBUG("error SSL_set_tlsext_host_name: " + ec.message()); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "set SNI Hostname", wsConnectionPtr->correlationIdList); - return; - } - CCAPI_LOGGER_TRACE("before async_connect"); - beast::get_lowest_layer(stream).async_connect(tcpResolverResults, beast::bind_front_handler(&Service::onConnectWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after async_connect"); + std::visit( + [&](auto& streamPtr) { + using StreamType = std::decay_t; + + if (timeoutMilliseconds > 0) { + beast::get_lowest_layer(*streamPtr).expires_after(std::chrono::milliseconds(timeoutMilliseconds)); + } + + if constexpr (std::is_same_v>>) { + // Set SNI hostname (only for WSS) + if (!SSL_set_tlsext_host_name(streamPtr->next_layer().native_handle(), wsConnectionPtr->host.c_str())) { + beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; + CCAPI_LOGGER_DEBUG("error SSL_set_tlsext_host_name: " + ec.message()); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "set SNI Hostname", wsConnectionPtr->correlationIdList); + return; + } + } + + CCAPI_LOGGER_TRACE("before async_connect"); + + beast::get_lowest_layer(*streamPtr) + .async_connect(tcpResolverResults, beast::bind_front_handler(&Service::onConnectWs, shared_from_this(), wsConnectionPtr)); + + CCAPI_LOGGER_TRACE("after async_connect"); + }, + wsConnectionPtr->streamPtr); } void onConnectWs(std::shared_ptr wsConnectionPtr, beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) { @@ -1015,16 +1026,38 @@ class Service : public std::enable_shared_from_this { this->onFail(wsConnectionPtr); return; } + CCAPI_LOGGER_TRACE("connected"); CCAPI_LOGGER_TRACE("ep.port() = " + std::to_string(ep.port())); + wsConnectionPtr->hostHttpHeaderValue = this->hostHttpHeaderValueIgnorePort ? wsConnectionPtr->host : wsConnectionPtr->host + ':' + std::to_string(ep.port()); + CCAPI_LOGGER_TRACE("wsConnectionPtr->hostHttpHeaderValue = " + wsConnectionPtr->hostHttpHeaderValue); - beast::websocket::stream>& stream = *wsConnectionPtr->streamPtr; - beast::get_lowest_layer(stream).socket().set_option(tcp::no_delay(true)); - CCAPI_LOGGER_TRACE("before ssl async_handshake"); - stream.next_layer().async_handshake(ssl::stream_base::client, beast::bind_front_handler(&Service::onSslHandshakeWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after ssl async_handshake"); + + // Use std::visit to access the concrete stream + std::visit( + [&](auto& streamPtr) { + using StreamType = std::decay_t; + + auto& lowestLayer = beast::get_lowest_layer(*streamPtr).socket(); + + // Disable Nagle + lowestLayer.set_option(tcp::no_delay(true)); + + if constexpr (std::is_same_v>>) { + CCAPI_LOGGER_TRACE("before ssl async_handshake"); + + streamPtr->next_layer().async_handshake(ssl::stream_base::client, + beast::bind_front_handler(&Service::onSslHandshakeWs, shared_from_this(), wsConnectionPtr)); + + CCAPI_LOGGER_TRACE("after ssl async_handshake"); + } else { + // Non-SSL streams skip SSL handshake and go straight to WebSocket handshake + this->onSslHandshakeWs(wsConnectionPtr, {}); + } + }, + wsConnectionPtr->streamPtr); } void onSslHandshakeWs(std::shared_ptr wsConnectionPtr, beast::error_code ec) { @@ -1035,22 +1068,29 @@ class Service : public std::enable_shared_from_this { return; } CCAPI_LOGGER_TRACE("ssl handshaked"); - beast::websocket::stream>& stream = *wsConnectionPtr->streamPtr; - beast::get_lowest_layer(stream).expires_never(); - beast::websocket::stream_base::timeout opt{std::chrono::milliseconds(this->sessionOptions.websocketConnectTimeoutMilliseconds), - std::chrono::milliseconds(this->sessionOptions.pongWebsocketProtocolLevelTimeoutMilliseconds), true}; - - stream.set_option(opt); - stream.set_option(beast::websocket::stream_base::decorator([wsConnectionPtr](beast::websocket::request_type& req) { - req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING)); - for (const auto& kv : wsConnectionPtr->headers) { - req.set(kv.first, kv.second); - } - })); - CCAPI_LOGGER_TRACE("before ws async_handshake"); - stream.async_handshake(wsConnectionPtr->hostHttpHeaderValue, wsConnectionPtr->path, - beast::bind_front_handler(&Service::onWsHandshakeWs, shared_from_this(), wsConnectionPtr)); - CCAPI_LOGGER_TRACE("after ws async_handshake"); + + std::visit( + [&](auto& streamPtr) { + auto& stream = *streamPtr; + beast::get_lowest_layer(stream).expires_never(); + + beast::websocket::stream_base::timeout opt{std::chrono::milliseconds(this->sessionOptions.websocketConnectTimeoutMilliseconds), + std::chrono::milliseconds(this->sessionOptions.pongWebsocketProtocolLevelTimeoutMilliseconds), true}; + + stream.set_option(opt); + stream.set_option(beast::websocket::stream_base::decorator([wsConnectionPtr](beast::websocket::request_type& req) { + req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); + for (const auto& kv : wsConnectionPtr->headers) { + req.set(kv.first, kv.second); + } + })); + + CCAPI_LOGGER_TRACE("before ws async_handshake"); + stream.async_handshake(wsConnectionPtr->hostHttpHeaderValue, wsConnectionPtr->path, + beast::bind_front_handler(&Service::onWsHandshakeWs, shared_from_this(), wsConnectionPtr)); + CCAPI_LOGGER_TRACE("after ws async_handshake"); + }, + wsConnectionPtr->streamPtr); } void onWsHandshakeWs(std::shared_ptr wsConnectionPtr, beast::error_code ec) { @@ -1061,21 +1101,34 @@ class Service : public std::enable_shared_from_this { return; } CCAPI_LOGGER_TRACE("ws handshaked"); + + // Finalize connection setup this->onOpen(wsConnectionPtr); - this->wsConnectionPtrByIdMap.insert(std::make_pair(wsConnectionPtr->id, wsConnectionPtr)); + this->wsConnectionPtrByIdMap.insert({wsConnectionPtr->id, wsConnectionPtr}); CCAPI_LOGGER_TRACE("about to start read"); + + // Start reading messages this->startReadWs(wsConnectionPtr); - auto& stream = *wsConnectionPtr->streamPtr; - stream.control_callback([wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { - that->onControlCallback(wsConnectionPtr, kind, payload); - }); + + // Setup control callback (ping/pong/close) + std::visit( + [&](auto& streamPtr) { + streamPtr->control_callback( + [wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { + that->onControlCallback(wsConnectionPtr, kind, payload); + }); + }, + wsConnectionPtr->streamPtr); } void startReadWs(std::shared_ptr wsConnectionPtr) { - auto& stream = *wsConnectionPtr->streamPtr; CCAPI_LOGGER_TRACE("before async_read"); auto& readMessageBuffer = wsConnectionPtr->readMessageBuffer; - stream.async_read(readMessageBuffer, beast::bind_front_handler(&Service::onReadWs, shared_from_this(), wsConnectionPtr)); + + std::visit( + [&](auto& streamPtr) { streamPtr->async_read(readMessageBuffer, beast::bind_front_handler(&Service::onReadWs, shared_from_this(), wsConnectionPtr)); }, + wsConnectionPtr->streamPtr); + CCAPI_LOGGER_TRACE("after async_read"); } @@ -1176,11 +1229,17 @@ class Service : public std::enable_shared_from_this { } void startWriteWs(std::shared_ptr wsConnectionPtr, const char* data, size_t numBytesToWrite) { - auto& stream = *wsConnectionPtr->streamPtr; CCAPI_LOGGER_TRACE("before async_write"); CCAPI_LOGGER_TRACE("numBytesToWrite = " + toString(numBytesToWrite)); - stream.binary(false); - stream.async_write(net::buffer(data, numBytesToWrite), beast::bind_front_handler(&Service::onWriteWs, shared_from_this(), wsConnectionPtr)); + + std::visit( + [&](auto& streamPtr) { + auto& stream = *streamPtr; // dereference shared_ptr + stream.binary(false); + stream.async_write(net::buffer(data, numBytesToWrite), beast::bind_front_handler(&Service::onWriteWs, shared_from_this(), wsConnectionPtr)); + }, + wsConnectionPtr->streamPtr); + CCAPI_LOGGER_TRACE("after async_write"); } @@ -1245,8 +1304,8 @@ class Service : public std::enable_shared_from_this { } else { CCAPI_LOGGER_INFO("about to retry"); try { - auto thatWsConnectionPtr = that->createWsConnectionPtr(wsConnectionPtr); - that->prepareConnect(thatWsConnectionPtr); + that->setWsConnectionStream(wsConnectionPtr); + that->prepareConnect(wsConnectionPtr); that->connectNumRetryOnFailByConnectionUrlMap[urlBase] += 1; } catch (const beast::error_code& ec) { CCAPI_LOGGER_TRACE("fail"); @@ -1259,16 +1318,13 @@ class Service : public std::enable_shared_from_this { this->connectRetryOnFailTimerByConnectionIdMap[wsConnectionId] = timerPtr; } - std::shared_ptr createWsConnectionPtr(std::shared_ptr wsConnectionPtr) { - std::shared_ptr thatWsConnectionPtr = wsConnectionPtr; - std::shared_ptr>> streamPtr(nullptr); - try { - streamPtr = this->createWsStream(this->serviceContextPtr->ioContextPtr, this->serviceContextPtr->sslContextPtr); - } catch (const beast::error_code& ec) { - throw ec; + void setWsConnectionStream(std::shared_ptr wsConnectionPtr) { + if (wsConnectionPtr->isSecure) { + wsConnectionPtr->streamPtr = std::make_shared>>(*this->serviceContextPtr->ioContextPtr, + *this->serviceContextPtr->sslContextPtr); + } else { + wsConnectionPtr->streamPtr = std::make_shared>(*this->serviceContextPtr->ioContextPtr); } - thatWsConnectionPtr->streamPtr = streamPtr; - return thatWsConnectionPtr; } virtual void onFail(std::shared_ptr wsConnectionPtr) { @@ -1333,10 +1389,10 @@ class Service : public std::enable_shared_from_this { this->eventHandler(event, nullptr); CCAPI_LOGGER_INFO("connection " + toString(*wsConnectionPtr) + " is closed"); this->clearStates(wsConnectionPtr); - auto thisWsConnectionPtr = this->createWsConnectionPtr(wsConnectionPtr); + this->setWsConnectionStream(wsConnectionPtr); this->wsConnectionPtrByIdMap.erase(wsConnectionPtr->id); if (this->shouldContinue.load()) { - this->prepareConnect(thisWsConnectionPtr); + this->prepareConnect(wsConnectionPtr); } CCAPI_LOGGER_FUNCTION_EXIT; } @@ -1348,48 +1404,58 @@ class Service : public std::enable_shared_from_this { CCAPI_LOGGER_WARN("should not process remaining message on closing"); return; } - auto& stream = *wsConnectionPtr->streamPtr; - if (stream.got_text()) { - boost::beast::string_view textMessage(data, dataSize); - CCAPI_LOGGER_DEBUG(std::string("received a text message: ") + std::string(textMessage)); - try { - this->onTextMessage(wsConnectionPtr, textMessage, now); - } catch (const std::exception& e) { - CCAPI_LOGGER_ERROR(std::string("textMessage = ") + std::string(textMessage)); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); - } - } else if (stream.got_binary()) { - CCAPI_LOGGER_DEBUG(std::string("received a binary message: ") + UtilAlgorithm::stringToHex(std::string(data, dataSize))); + + std::visit( + [&](auto& streamPtr) { + auto& stream = *streamPtr; // dereference shared_ptr + + if (stream.got_text()) { + boost::beast::string_view textMessage(data, dataSize); + CCAPI_LOGGER_DEBUG("received a text message: " + std::string(textMessage)); + try { + this->onTextMessage(wsConnectionPtr, textMessage, now); + } catch (const std::exception& e) { + CCAPI_LOGGER_ERROR("textMessage = " + std::string(textMessage)); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); + } + } else if (stream.got_binary()) { + CCAPI_LOGGER_DEBUG("received a binary message: " + UtilAlgorithm::stringToHex(std::string(data, dataSize))); + #if defined(CCAPI_ENABLE_SERVICE_MARKET_DATA) && \ (defined(CCAPI_ENABLE_EXCHANGE_HUOBI) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP)) || \ defined(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT) && \ (defined(CCAPI_ENABLE_EXCHANGE_HUOBI_USDT_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_HUOBI_COIN_SWAP) || defined(CCAPI_ENABLE_EXCHANGE_BITMART)) - if (this->needDecompressWebsocketMessage) { - std::string decompressed; - boost::beast::string_view payload(data, dataSize); - try { - ErrorCode ec = this->inflater.decompress(reinterpret_cast(&payload[0]), payload.size(), decompressed); - if (ec) { - CCAPI_LOGGER_FATAL(ec.message()); - } - CCAPI_LOGGER_DEBUG("decompressed = " + decompressed); - this->onTextMessage(wsConnectionPtr, decompressed, now); - } catch (const std::exception& e) { - std::stringstream ss; - ss << std::hex << std::setfill('0'); - for (int i = 0; i < payload.size(); ++i) { - ss << std::setw(2) << static_cast(reinterpret_cast(&payload[0])[i]); + + if (this->needDecompressWebsocketMessage) { + std::string decompressed; + boost::beast::string_view payload(data, dataSize); + try { + ErrorCode ec = this->inflater.decompress(reinterpret_cast(&payload[0]), payload.size(), decompressed); + if (ec) { + CCAPI_LOGGER_FATAL(ec.message()); + } + CCAPI_LOGGER_DEBUG("decompressed = " + decompressed); + this->onTextMessage(wsConnectionPtr, decompressed, now); + } catch (const std::exception& e) { + std::stringstream ss; + ss << std::hex << std::setfill('0'); + for (int i = 0; i < payload.size(); ++i) { + ss << std::setw(2) << static_cast(reinterpret_cast(&payload[0])[i]); + } + CCAPI_LOGGER_ERROR("binaryMessage = " + ss.str()); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); + } + + ErrorCode ec = this->inflater.inflate_reset(); + if (ec) { + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "decompress"); + } + } + +#endif // decompress block } - CCAPI_LOGGER_ERROR("binaryMessage = " + ss.str()); - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, e); - } - ErrorCode ec = this->inflater.inflate_reset(); - if (ec) { - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "decompress"); - } - } -#endif - } + }, + wsConnectionPtr->streamPtr); // <-- std::variant } void onControlCallback(std::shared_ptr wsConnectionPtr, boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { @@ -1429,10 +1495,14 @@ class Service : public std::enable_shared_from_this { void ping(std::shared_ptr wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) { if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) { - auto& stream = *wsConnectionPtr->streamPtr; - stream.async_ping( - "", [that = this, wsConnectionPtr](ErrorCode const& ec) { that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; }); - this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true; + std::visit( + [&](auto& streamPtr) { + streamPtr->async_ping(boost::beast::websocket::ping_data(payload), [that = this, wsConnectionPtr](ErrorCode const& ec) { + that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; + }); + this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true; + }, + wsConnectionPtr->streamPtr); } }