Skip to content

Commit 1166f07

Browse files
committed
upgrade slick-net dependency, and refactor WebSocketClient management so that WebsocketClient can reconnect without creating a new object
1 parent 74fb53b commit 1166f07

5 files changed

Lines changed: 54 additions & 48 deletions

File tree

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212

1313
### Changed
1414
- Change log file opening mode to append for data logging
15+
- Upgraded slick-net dependency from v2.0.0 to v2.1.0
16+
- Changed `market_data_websocket_` and `user_data_websocket_` members from `shared_ptr` to `unique_ptr`
17+
- Refactored WebSocketClient destructor to call `reset_callbacks()` before closing sockets, eliminating busy-wait polling loops on disconnect
18+
- Removed atomic `pending_md_socket_close_` and `pending_user_socket_close_` counters
19+
- `stop()` no longer resets websocket pointers; connection state is preserved for reconnect
20+
- Disconnect callbacks no longer reset websocket pointers
21+
- `subscribe()` now checks socket status to reopen a disconnected (but existing) connection instead of only creating on null
22+
- Heartbeat subscription is now sent immediately after `open()` on user data socket creation
23+
24+
### Fixed
25+
- `unsubscribe()` now holds a reference to the `unique_ptr` instead of copying it
1526

1627
## [0.2.2] - 2026-03-05
1728

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ find_package(nlohmann_json CONFIG REQUIRED)
1818
find_package(OpenSSL CONFIG REQUIRED)
1919
find_package(jwt-cpp CONFIG REQUIRED)
2020

21-
find_package(slick-net 2.0.0 CONFIG QUIET)
21+
find_package(slick-net 2.1.0 CONFIG QUIET)
2222
if (NOT slick-net_FOUND)
2323
message(STATUS "fetching slick-net...")
2424
include(FetchContent)
2525
FetchContent_Declare(
2626
slick-net
2727
GIT_REPOSITORY https://github.com/SlickQuant/slick-net.git
28-
GIT_TAG v2.0.0
28+
GIT_TAG v2.1.0
2929
)
3030
FetchContent_MakeAvailable(slick-net)
3131
else()

include/coinbase/websocket.hpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ class WebSocketClient {
182182
DataHandler* data_handler_ = nullptr;
183183
std::string market_data_url_;
184184
std::string user_data_url_;
185-
std::shared_ptr<Websocket> market_data_websocket_;
186-
std::shared_ptr<Websocket> user_data_websocket_;
185+
std::unique_ptr<Websocket> market_data_websocket_;
186+
std::unique_ptr<Websocket> user_data_websocket_;
187187
std::array<std::unordered_set<std::string>, static_cast<uint8_t>(WebSocketChannel::__COUNT__)> product_ids_;
188188
std::vector<std::string> pending_subscriptions_;
189189
std::string user_id_;
@@ -192,8 +192,6 @@ class WebSocketClient {
192192
slick::SlickQueue<char> *data_queue_ = nullptr;
193193
std::thread logger_thread_;
194194
std::atomic_bool logger_run_ = false;
195-
std::atomic_int_fast8_t pending_md_socket_close_ = 0;
196-
std::atomic_int_fast8_t pending_user_socket_close_ = 0;
197195
uint64_t data_cursor_ = 0;
198196
static inline constexpr char empty_msg = '\0';
199197
};

src/websocket.cpp

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -160,24 +160,15 @@ WebSocketClient::WebSocketClient(
160160
WebSocketClient::~WebSocketClient() {
161161
if (market_data_websocket_) {
162162
if (market_data_websocket_->status() != Websocket::Status::DISCONNECTED) {
163+
market_data_websocket_->reset_callbacks();
163164
market_data_websocket_->close();
164-
if (pending_md_socket_close_.fetch_add(1, std::memory_order_acq_rel) == 0) {
165-
// waiting for on disconnected callback
166-
auto start = std::chrono::system_clock::now();
167-
while(pending_md_socket_close_.load(std::memory_order_acquire) == 1 &&
168-
(std::chrono::system_clock::now() - start) < std::chrono::seconds(2));
169-
}
170165
}
171166
market_data_websocket_.reset();
172167
}
173168
if (user_data_websocket_) {
174169
if (user_data_websocket_->status() != Websocket::Status::DISCONNECTED) {
170+
user_data_websocket_->reset_callbacks();
175171
user_data_websocket_->close();
176-
if (pending_user_socket_close_.fetch_add(1, std::memory_order_acq_rel) == 0) {
177-
auto start = std::chrono::system_clock::now();
178-
while(pending_user_socket_close_.load(std::memory_order_acquire) == 1 &&
179-
(std::chrono::system_clock::now() - start) < std::chrono::seconds(2));
180-
}
181172
}
182173
user_data_websocket_.reset();
183174
}
@@ -206,55 +197,65 @@ void WebSocketClient::stop() {
206197
if (market_data_websocket_->status() != Websocket::Status::DISCONNECTED) {
207198
market_data_websocket_->close();
208199
}
209-
market_data_websocket_.reset();
210200
}
211201
if (user_data_websocket_) {
212202
if (user_data_websocket_->status() != Websocket::Status::DISCONNECTED) {
213203
user_data_websocket_->close();
214204
}
215-
user_data_websocket_.reset();
216205
}
217206
}
218207

219208
void WebSocketClient::subscribe(const std::vector<std::string> &product_ids, const std::vector<WebSocketChannel> &channels) {
220209
for (auto channel : channels) {
221210
auto products = product_ids_[static_cast<uint8_t>(channel)];
222211
auto subscribe_json = json{{"type", "subscribe"}, {"product_ids", product_ids}, {"channel", to_string(channel)}};
223-
std::shared_ptr<Websocket> websocket;
212+
Websocket* websocket;
224213
if (channel == WebSocketChannel::USER) {
225-
if (!user_data_websocket_ && !user_data_url_.empty()) {
226-
user_data_websocket_ = std::make_shared<Websocket>(
227-
user_data_url_,
228-
[this]() { onUserDataConnected(); },
229-
[this]() { onUserDataDisconnected(); },
230-
[this](const char* data, std::size_t size) { onUserData(data, size); },
231-
[this](std::string err) { onUserDataError(std::move(err)); }
232-
);
214+
if (!user_data_websocket_) {
215+
if (!user_data_url_.empty()) {
216+
user_data_websocket_ = std::make_unique<Websocket>(
217+
user_data_url_,
218+
[this]() { onUserDataConnected(); },
219+
[this]() { onUserDataDisconnected(); },
220+
[this](const char* data, std::size_t size) { onUserData(data, size); },
221+
[this](std::string err) { onUserDataError(std::move(err)); }
222+
);
223+
user_data_websocket_->open();
224+
225+
// subscribe heartbeat to keep user channel alive
226+
auto heartbeat_sub = json{{"type", "subscribe"}, {"channel", "heartbeats"}};
227+
heartbeat_sub["jwt"] = generate_coinbase_jwt(user_data_url_.c_str());
228+
auto subscribe_str = heartbeat_sub.dump();
229+
user_data_websocket_->send(subscribe_str.c_str(), subscribe_str.size());
230+
}
231+
} else if (user_data_websocket_->status() > Websocket::Status::CONNECTED) {
233232
user_data_websocket_->open();
234-
pending_user_socket_close_.store(0, std::memory_order_release);
235233

236234
// subscribe heartbeat to keep user channel alive
237235
auto heartbeat_sub = json{{"type", "subscribe"}, {"channel", "heartbeats"}};
238236
heartbeat_sub["jwt"] = generate_coinbase_jwt(user_data_url_.c_str());
239237
auto subscribe_str = heartbeat_sub.dump();
240238
user_data_websocket_->send(subscribe_str.c_str(), subscribe_str.size());
241239
}
242-
websocket = user_data_websocket_;
240+
websocket = user_data_websocket_.get();
243241
subscribe_json["jwt"] = generate_coinbase_jwt(user_data_url_.c_str());
244242
}
245243
else {
246-
if (!market_data_websocket_ && !market_data_url_.empty()) {
247-
market_data_websocket_ = std::make_shared<Websocket>(
248-
market_data_url_,
249-
[this]() { onMarketDataConnected(); },
250-
[this]() { onMarketDataDisconnected(); },
251-
[this](const char* data, std::size_t size) { onMarketData(data, size); },
252-
[this](std::string err) { onMarketDataError(std::move(err)); }
253-
);
244+
if (!market_data_websocket_) {
245+
if (!market_data_url_.empty()) {
246+
market_data_websocket_ = std::make_unique<Websocket>(
247+
market_data_url_,
248+
[this]() { onMarketDataConnected(); },
249+
[this]() { onMarketDataDisconnected(); },
250+
[this](const char* data, std::size_t size) { onMarketData(data, size); },
251+
[this](std::string err) { onMarketDataError(std::move(err)); }
252+
);
253+
market_data_websocket_->open();
254+
}
255+
} else if (market_data_websocket_->status() > Websocket::Status::CONNECTED) {
254256
market_data_websocket_->open();
255-
pending_md_socket_close_.store(0, std::memory_order_release);
256257
}
257-
websocket = market_data_websocket_;
258+
websocket = market_data_websocket_.get();
258259
}
259260

260261
if (websocket == nullptr) {
@@ -269,7 +270,7 @@ void WebSocketClient::subscribe(const std::vector<std::string> &product_ids, con
269270

270271
void WebSocketClient::unsubscribe(const std::vector<std::string> &product_ids, const std::vector<WebSocketChannel> &channels) {
271272
for (auto channel : channels) {
272-
auto websocket = channel == WebSocketChannel::USER ? user_data_websocket_ : market_data_websocket_;
273+
auto &websocket = channel == WebSocketChannel::USER ? user_data_websocket_ : market_data_websocket_;
273274
auto unsubscribe_json = json{{"type", "unsubscribe"}, {"product_ids", product_ids}, {"channel", to_string(channel)}};
274275
auto unsubscribe_str = unsubscribe_json.dump();
275276
if (websocket == nullptr) {
@@ -369,8 +370,6 @@ void WebSocketClient::onMarketDataDisconnected() {
369370
data_handler_->callbacks_->onMarketDataDisconnected(this);
370371
data_handler_->resetMarketDataSequence(this);
371372
}
372-
pending_md_socket_close_.fetch_add(1, std::memory_order_acq_rel);
373-
market_data_websocket_.reset();
374373
}
375374

376375

@@ -391,8 +390,6 @@ void WebSocketClient::onUserDataDisconnected() {
391390
data_handler_->callbacks_->onUserDataDisconnected(this);
392391
data_handler_->resetUserDataSequence(this);
393392
}
394-
pending_user_socket_close_.fetch_add(1, std::memory_order_acq_rel);
395-
user_data_websocket_.reset();
396393
}
397394

398395
void WebSocketClient::onMarketData(const char* data, std::size_t size) {

tests/websocket_tests.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,18 +439,18 @@ namespace coinbase::tests {
439439

440440
TEST_F(WebSocketTests, RepeatedConnectDisconnect) {
441441
constexpr int kIterations = 5;
442+
WebSocketClient client_(this);
442443
for (int i = 0; i < kIterations; ++i) {
443-
client_ = std::make_unique<WebSocketClient>(this);
444444
auto connected_before = md_connected_count_.load(std::memory_order_relaxed);
445445
auto disconnected_before = md_disconnected_count_.load(std::memory_order_relaxed);
446-
client_->subscribe({"BTC-USD"}, {WebSocketChannel::LEVEL2});
446+
client_.subscribe({"BTC-USD"}, {WebSocketChannel::LEVEL2});
447447
auto start = std::chrono::steady_clock::now();
448448
while (md_connected_count_.load(std::memory_order_relaxed) == connected_before &&
449449
(std::chrono::steady_clock::now() - start) < std::chrono::seconds(5)) {
450450
std::this_thread::sleep_for(std::chrono::milliseconds(50));
451451
}
452452
const bool connected = md_connected_count_.load(std::memory_order_relaxed) > connected_before;
453-
client_.reset();
453+
client_.stop();
454454
if (connected) {
455455
start = std::chrono::steady_clock::now();
456456
while (md_disconnected_count_.load(std::memory_order_relaxed) == disconnected_before &&

0 commit comments

Comments
 (0)