Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_fix_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class FixConnection {
std::string url;
Subscription subscription;
Status status{Status::UNKNOWN};
std::shared_ptr<T> streamPtr;
std::shared_ptr<T> streamPtr{nullptr};
};

} /* namespace ccapi */
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class HttpConnection {

std::string host;
std::string port;
std::shared_ptr<beast::ssl_stream<beast::tcp_stream>> streamPtr;
std::shared_ptr<beast::ssl_stream<beast::tcp_stream>> streamPtr{nullptr};
TimePoint lastReceiveDataTp{std::chrono::seconds{0}};

boost::beast::flat_buffer buffer;
Expand Down
24 changes: 19 additions & 5 deletions include/ccapi_cpp/ccapi_ws_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define INCLUDE_CCAPI_CPP_CCAPI_WS_CONNECTION_H_

#include <string>
#include <variant>

#include "ccapi_cpp/ccapi_logger.h"
#include "ccapi_cpp/ccapi_subscription.h"
Expand All @@ -16,9 +17,9 @@ class WsConnection {
WsConnection(const WsConnection&) = delete;
WsConnection& operator=(const WsConnection&) = delete;

WsConnection(std::string url, std::string group, std::vector<Subscription> subscriptionList, std::map<std::string, std::string> credential,
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr)
: url(url), group(group), subscriptionList(subscriptionList), credential(credential), streamPtr(streamPtr) {
WsConnection(const std::string& url, const std::string& group, const std::vector<Subscription>& subscriptionList,
const std::map<std::string, std::string>& credential)
: url(url), group(group), subscriptionList(subscriptionList), credential(credential) {
std::map<std::string, std::string> shortCredential;
for (const auto& x : credential) {
shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH)));
Expand All @@ -39,7 +40,15 @@ class WsConnection {
shortCredential.insert(std::make_pair(x.first, UtilString::firstNCharacter(x.second, CCAPI_CREDENTIAL_DISPLAY_LENGTH)));
}
std::ostringstream oss;
oss << streamPtr;
std::visit(
[&oss](auto&& streamSharedPtr) {
if (streamSharedPtr) {
oss << streamSharedPtr.get();
} else {
oss << "nullptr";
}
},
streamPtr);
std::string output = "WsConnection [longId = " + longId + ", id = " + id + ", url = " + url + ", group = " + group +
", subscriptionList = " + ccapi::toString(subscriptionList) + ", credential = " + ccapi::toString(shortCredential) +
", status = " + statusToString(status) + ", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() +
Expand Down Expand Up @@ -116,6 +125,9 @@ class WsConnection {
this->port = CCAPI_HTTP_PORT_DEFAULT;
}
}
if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") {
this->isSecure = true;
}
}
}

Expand All @@ -133,7 +145,8 @@ class WsConnection {
Status status{Status::UNKNOWN};
std::map<std::string, std::string> headers;
std::map<std::string, std::string> credential;
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr;
std::variant<std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>>, std::shared_ptr<beast::websocket::stream<beast::tcp_stream>>>
streamPtr;
beast::websocket::close_code remoteCloseCode{};
beast::websocket::close_reason remoteCloseReason{};
std::string hostHttpHeaderValue;
Expand All @@ -145,6 +158,7 @@ class WsConnection {
std::array<char, CCAPI_WEBSOCKET_WRITE_BUFFER_SIZE> writeMessageBuffer;
size_t writeMessageBufferWrittenLength{};
std::vector<size_t> writeMessageBufferBoundary;
bool isSecure{};
};

} /* namespace ccapi */
Expand Down
14 changes: 4 additions & 10 deletions include/ccapi_cpp/service/ccapi_execution_management_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,15 @@ class ExecutionManagementService : public Service {
credential = that->credentialDefault;
}

std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()});
return;
}
const auto& fieldSet = subscription.getFieldSet();
if (fieldSet.find(CCAPI_EM_WEBSOCKET_ORDER_ENTRY) != fieldSet.end()) {
std::shared_ptr<WsConnection> wsConnectionPtr(new WsConnection(that->baseUrlWsOrderEntry, "", {subscription}, credential, streamPtr));
auto wsConnectionPtr = std::make_shared<WsConnection>(that->baseUrlWsOrderEntry, "", std::vector<Subscription>{subscription}, credential);
that->setWsConnectionStream(wsConnectionPtr);
CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr));
that->prepareConnect(wsConnectionPtr);
} else {
std::shared_ptr<WsConnection> wsConnectionPtr(new WsConnection(that->baseUrlWs, "", {subscription}, credential, streamPtr));
auto wsConnectionPtr = std::make_shared<WsConnection>(that->baseUrlWs, "", std::vector<Subscription>{subscription}, credential);
that->setWsConnectionStream(wsConnectionPtr);
CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr));
that->prepareConnect(wsConnectionPtr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,16 +363,9 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService {
}
const auto& accountGroup = mapGetWithDefault(credential, that->apiAccountGroupName);

std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()});
return;
}
std::shared_ptr<WsConnection> wsConnectionPtr(
new WsConnection(that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "", {subscription}, credential, streamPtr));
auto wsConnectionPtr = std::make_shared<WsConnection>(that->baseUrlWs + "/" + accountGroup + "/api/pro/v1/stream", "",
std::vector<Subscription>{subscription}, credential);
that->setWsConnectionStream(wsConnectionPtr);
CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr));
that->prepareConnect(wsConnectionPtr);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,8 @@ class ExecutionManagementServiceGateioPerpetualFutures : public ExecutionManagem
credential = that->credentialDefault;
}

std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()});
return;
}
std::shared_ptr<WsConnection> wsConnectionPtr(new WsConnection(that->baseUrlWs + settle, "", {subscription}, credential, streamPtr));
auto wsConnectionPtr = std::make_shared<WsConnection>(that->baseUrlWs + settle, "", std::vector<Subscription>{subscription}, credential);
that->setWsConnectionStream(wsConnectionPtr);
CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr));
that->prepareConnect(wsConnectionPtr);
}
Expand Down
16 changes: 3 additions & 13 deletions include/ccapi_cpp/service/ccapi_market_data_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,9 @@ class MarketDataService : public Service {
if (credential.empty()) {
credential = that->credentialDefault;
}
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
std::vector<std::string> correlationIdList;
correlationIdList.reserve(subscriptionListGivenInstrumentGroup.size());
std::transform(subscriptionListGivenInstrumentGroup.cbegin(), subscriptionListGivenInstrumentGroup.cend(), std::back_inserter(correlationIdList),
[](Subscription subscription) { return subscription.getCorrelationId(); });
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", correlationIdList);
return;
}
std::shared_ptr<WsConnection> wsConnectionPtr(new WsConnection(url, instrumentGroup, subscriptionListGivenInstrumentGroup, credential, streamPtr));

auto wsConnectionPtr = std::make_shared<WsConnection>(url, instrumentGroup, subscriptionListGivenInstrumentGroup, credential);
that->setWsConnectionStream(wsConnectionPtr);
CCAPI_LOGGER_WARN("about to subscribe with new wsConnectionPtr " + toString(*wsConnectionPtr));
that->prepareConnect(wsConnectionPtr);
}
Expand Down
Loading
Loading