Skip to content

Commit 615958a

Browse files
committed
feat: thread pinning, tighter threading, housekeeping
1 parent 3beb2cf commit 615958a

13 files changed

Lines changed: 271 additions & 83 deletions

File tree

README.md

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[![Build](https://github.com/milsanore/trader.cpp/actions/workflows/commits.yml/badge.svg)](https://github.com/milsanore/trader.cpp/actions/workflows/commits.yml)
1+
[![Nightly](https://github.com/milsanore/trader.cpp/actions/workflows/nightly.yml/badge.svg)](https://github.com/milsanore/trader.cpp/actions/workflows/nightly.yml)
22
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=milsanore_trader.cpp&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=milsanore_trader.cpp)
33
[![codecov](https://codecov.io/github/milsanore/trader.cpp/graph/badge.svg?token=C787ZTXBQC)](https://codecov.io/github/milsanore/trader.cpp)
44

@@ -177,6 +177,8 @@ NB: this app uses `make` as a recipe book, but it's not essential:
177177
- FIX
178178
- ✅ debug quickfix to confirm if it's running in it's own thread
179179
- switch to Fix8
180+
- security
181+
- OpenSSF Scorecard
180182
- other
181183
- nix
182184
- decimal type
@@ -189,17 +191,25 @@ NB: this app uses `make` as a recipe book, but it's not essential:
189191
# Design
190192
```mermaid
191193
sequenceDiagram
192-
participant TM as Thread 1<br>(Main + UI)
193-
participant TO as Thread 2<br>(UI Orderbook Worker)
194-
participant TF as Thread 3<br>(FIX Worker)
195-
participant TL as Thread 4<br>(UI Log Worker)
196-
197-
TF-->>TF: subscribe to Binance <br> + push to <queue>
198-
TF->>TO: pull from <queue>
199-
TO-->>TO: build UI
200-
TO->>TM: request render
201-
TL-->>TL: poll log file <br> + build UI
202-
TL->>TM: request render
194+
participant MAIN as Main Thread
195+
participant LOGS as Log UI Thread
196+
participant BOOK as Orderbook Thread
197+
participant TRADES as Trade Thread
198+
participant FIX as FIX Thread
199+
200+
FIX-->>FIX: subscribe to Binance <br> + push to <queue>
201+
202+
FIX->>TRADES: pull from <queue>
203+
TRADES-->>TRADES: build UI
204+
205+
FIX->>BOOK: pull from <queue>
206+
BOOK-->>BOOK: build UI
207+
208+
LOGS-->>LOGS: poll log file <br> + build UI
209+
210+
TRADES->>MAIN: request render
211+
BOOK->>MAIN: request render
212+
LOGS->>MAIN: request render
203213
```
204214

205215
# Credits

src/binance/config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ struct Config {
5858

5959
/// @brief 1 == top level, otherwise 5000 is Binance's maximum depth
6060
static constexpr uint16_t MAX_DEPTH = 100;
61+
62+
static constexpr uint8_t PX_SESSION_CPU_AFFINITY = 0;
63+
static constexpr uint8_t TX_SESSION_CPU_AFFINITY = 1;
6164
};
6265

6366
} // namespace binance

src/binance/fix_app.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ namespace binance {
2626

2727
FixApp::FixApp(const std::vector<std::string>& symbols,
2828
std::unique_ptr<IAuth> auth,
29-
const uint16_t MAX_DEPTH)
30-
: symbols_(symbols), auth_(std::move(auth)), MAX_DEPTH_(MAX_DEPTH) {
31-
// TODO(mils): I think this is a singleton
32-
utils::Threading::set_thread_name(THREAD_NAME_);
33-
spdlog::info("naming FIXs own thread, name [{}], id [{}]", THREAD_NAME_,
34-
utils::Threading::get_os_thread_id());
35-
}
29+
const uint16_t MAX_DEPTH,
30+
const uint8_t PX_SESSION_CPU_AFFINITY,
31+
const uint8_t TX_SESSION_CPU_AFFINITY)
32+
: symbols_(symbols),
33+
auth_(std::move(auth)),
34+
MAX_DEPTH_(MAX_DEPTH),
35+
PX_SESSION_CPU_AFFINITY_(PX_SESSION_CPU_AFFINITY),
36+
TX_SESSION_CPU_AFFINITY_(TX_SESSION_CPU_AFFINITY) {}
3637

3738
void FixApp::subscribe_to_prices(const FIX::SessionID& session_id) const {
3839
spdlog::info("subscribing to depth. qualifier [{}], id [{}]",
@@ -128,11 +129,18 @@ void FixApp::onLogon(const FIX::SessionID& sessionId) {
128129
// TODO: now need to wait for all sessions to be logged on before nullifying keys
129130
// auth_->clear_keys();
130131

131-
if (sessionId.getSessionQualifier() == PRICE_SESSION_QUALIFIER_) {
132+
std::string thread_name = THREAD_NAME_ + "_" + sessionId.getSessionQualifier();
133+
utils::Threading::set_thread_name(thread_name);
134+
spdlog::info("naming FIX session thread, name [{}], id [{}]", thread_name,
135+
utils::Threading::get_os_thread_id());
136+
137+
if (sessionId.getSessionQualifier() == PX_SESSION_QUALIFIER_) {
138+
utils::Threading::set_current_thread_affinity(PX_SESSION_CPU_AFFINITY_);
132139
subscribe_to_prices(sessionId);
133-
} else if (sessionId.getSessionQualifier() == TRADE_SESSION_QUALIFIER_) {
140+
} else if (sessionId.getSessionQualifier() == TX_SESSION_QUALIFIER_) {
141+
utils::Threading::set_current_thread_affinity(TX_SESSION_CPU_AFFINITY_);
134142
subscribe_to_trades(sessionId);
135-
} else if (sessionId.getSessionQualifier() == ORDER_SESSION_QUALIFIER_) {
143+
} else if (sessionId.getSessionQualifier() == OX_SESSION_QUALIFIER_) {
136144
// do nothing for order session
137145
} else {
138146
spdlog::error("unknown session, qualifier [{}], id [{}]",
@@ -210,9 +218,9 @@ void FixApp::onMessage(const FIX44::MarketDataSnapshotFullRefresh& m,
210218
}
211219
void FixApp::onMessage(const FIX44::MarketDataIncrementalRefresh& m,
212220
const FIX::SessionID& sessionID) {
213-
if (sessionID.getSessionQualifier() == PRICE_SESSION_QUALIFIER_) {
221+
if (sessionID.getSessionQualifier() == PX_SESSION_QUALIFIER_) {
214222
order_queue_.enqueue(std::make_shared<const FIX44::MarketDataIncrementalRefresh>(m));
215-
} else if (sessionID.getSessionQualifier() == TRADE_SESSION_QUALIFIER_) {
223+
} else if (sessionID.getSessionQualifier() == TX_SESSION_QUALIFIER_) {
216224
trade_queue_.enqueue(std::make_shared<const FIX44::MarketDataIncrementalRefresh>(m));
217225
} else {
218226
spdlog::error(

src/binance/fix_app.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ class FixApp final : public FIX::Application, public FIX44::MessageCracker {
2020
public:
2121
FixApp(const std::vector<std::string>& symbols,
2222
std::unique_ptr<IAuth> auth,
23-
const uint16_t MAX_DEPTH);
23+
const uint16_t MAX_DEPTH,
24+
const uint8_t PX_SESSION_CPU_AFFINITY,
25+
const uint8_t TX_SESSION_CPU_AFFINITY);
2426
~FixApp() override = default;
2527

2628
// Use the FIX44::MessageCracker to pull in the relevant overloads. This resolves the
@@ -42,13 +44,15 @@ class FixApp final : public FIX::Application, public FIX44::MessageCracker {
4244
// TODO: we will have one of these per instrument
4345

4446
private:
45-
static inline constexpr std::string THREAD_NAME_ = "tradercppFIX2";
46-
static inline constexpr std::string PRICE_SESSION_QUALIFIER_ = "PX";
47-
static inline constexpr std::string TRADE_SESSION_QUALIFIER_ = "TX";
48-
static inline constexpr std::string ORDER_SESSION_QUALIFIER_ = "OX";
47+
static inline constexpr std::string THREAD_NAME_ = "fix_session";
48+
static inline constexpr std::string PX_SESSION_QUALIFIER_ = "PX";
49+
static inline constexpr std::string TX_SESSION_QUALIFIER_ = "TX";
50+
static inline constexpr std::string OX_SESSION_QUALIFIER_ = "OX";
4951
const std::vector<std::string>& symbols_;
5052
const std::unique_ptr<IAuth> auth_;
5153
const uint16_t MAX_DEPTH_;
54+
const uint8_t PX_SESSION_CPU_AFFINITY_;
55+
const uint8_t TX_SESSION_CPU_AFFINITY_;
5256

5357
void onCreate(const FIX::SessionID&) override;
5458
void onLogon(const FIX::SessionID&) override;

src/binance/worker.cpp

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include <quickfix/FileStore.h>
55
#include <quickfix/Session.h>
66
#include <quickfix/SessionSettings.h>
7-
#include <quickfix/SocketInitiator.h>
7+
#include <quickfix/ThreadedSocketInitiator.h>
88

99
#include <memory>
1010

@@ -20,38 +20,25 @@ Worker::Worker(std::unique_ptr<FixApp> app,
2020
std::unique_ptr<FIX::FileStoreFactory> store,
2121
FIX::SessionSettings settings,
2222
std::unique_ptr<FIX::FileLogFactory> log,
23-
std::unique_ptr<FIX::SocketInitiator> initiator,
24-
const std::function<void(std::stop_token)>& task)
23+
std::unique_ptr<FIX::ThreadedSocketInitiator> initiator)
2524
: app_(std::move(app)),
2625
store_(std::move(store)),
2726
settings_(std::move(settings)),
2827
log_(std::move(log)),
29-
initiator_(std::move(initiator)),
30-
worker_task_(task) {
31-
// default behaviour
32-
if (!task) {
33-
worker_task_ = {[this]([[maybe_unused]] const std::stop_token& stoken) {
34-
utils::Threading::set_thread_name(THREAD_NAME_);
35-
spdlog::info("starting FIX wrapper thread. name [{}], id [{}]", THREAD_NAME_,
36-
utils::Threading::get_os_thread_id());
37-
// NB: SocketInitiator::start() is a blocking call, so the stop_token
38-
// cannot cancel the thread. NB: The `stop()` function has to forcibly
39-
// stop it with `initiator_->stop()`.
40-
initiator_->start();
41-
spdlog::info("started FIX initiator");
42-
}};
43-
}
44-
}
28+
initiator_(std::move(initiator)) {}
4529

4630
// static member function
4731
Worker Worker::from_conf(Config& conf) {
4832
std::unique_ptr<IAuth> auth =
4933
std::make_unique<Auth>(conf.api_key, conf.private_key_path);
50-
auto app = std::make_unique<FixApp>(conf.symbols, std::move(auth), conf.MAX_DEPTH);
34+
auto app = std::make_unique<FixApp>(conf.symbols, std::move(auth), conf.MAX_DEPTH,
35+
conf.PX_SESSION_CPU_AFFINITY,
36+
conf.TX_SESSION_CPU_AFFINITY);
5137
auto settings = FIX::SessionSettings{conf.fix_config_path};
5238
auto store = std::make_unique<FIX::FileStoreFactory>(settings);
5339
auto log = std::make_unique<FIX::FileLogFactory>(settings);
54-
auto initiator = std::make_unique<FIX::SocketInitiator>(*app, *store, settings, *log);
40+
auto initiator =
41+
std::make_unique<FIX::ThreadedSocketInitiator>(*app, *store, settings, *log);
5542

5643
// hand over object ownership to the instance being created (by the static
5744
// function)
@@ -60,27 +47,13 @@ Worker Worker::from_conf(Config& conf) {
6047
}
6148

6249
void Worker::start() {
63-
try {
64-
worker_ = std::jthread{worker_task_};
65-
} catch (const std::exception& e) {
66-
spdlog::error("error starting FIX. error [{}]", e.what());
67-
} catch (...) {
68-
spdlog::error("error starting FIX. unknown error");
69-
}
50+
initiator_->start();
51+
spdlog::info("started FIX initiator");
7052
}
7153

7254
void Worker::stop() {
73-
try {
74-
if (initiator_) {
75-
initiator_->stop(); // TODO(mils): does this need a try/catch?
76-
}
77-
spdlog::info("stopped FIX initiator");
78-
} catch (const std::exception& e) {
79-
spdlog::error("error stopping FIX initiator. error [{}]", e.what());
80-
} catch (...) {
81-
spdlog::error("error stopping FIX initiator. unknown error");
82-
}
83-
worker_ = std::jthread{};
55+
initiator_->stop(); // TODO(mils): does this need a try/catch?
56+
spdlog::info("stopped FIX initiator");
8457
}
8558

8659
moodycamel::ConcurrentQueue<std::shared_ptr<const FIX44::Message>>&

src/binance/worker.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include <quickfix/FileLog.h>
44
#include <quickfix/FileStore.h>
55
#include <quickfix/SessionSettings.h>
6-
#include <quickfix/SocketInitiator.h>
6+
#include <quickfix/ThreadedSocketInitiator.h>
77
#include <quickfix/fix44/Message.h>
88

99
#include <memory>
@@ -18,13 +18,11 @@ namespace binance {
1818
/// @brief Binance DI container
1919
class Worker final {
2020
public:
21-
static inline constexpr std::string THREAD_NAME_ = "tradercppFIX1";
2221
Worker(std::unique_ptr<FixApp> app,
2322
std::unique_ptr<FIX::FileStoreFactory> store,
2423
FIX::SessionSettings settings,
2524
std::unique_ptr<FIX::FileLogFactory> log,
26-
std::unique_ptr<FIX::SocketInitiator> initiator,
27-
const std::function<void(std::stop_token)>& task = {});
25+
std::unique_ptr<FIX::ThreadedSocketInitiator> initiator);
2826
/// @brief factory for concrete Binance instances, using config
2927
/// @param conf Binance configuration parameters
3028
/// @return
@@ -44,10 +42,7 @@ class Worker final {
4442
std::unique_ptr<FIX::FileStoreFactory> store_;
4543
FIX::SessionSettings settings_;
4644
std::unique_ptr<FIX::FileLogFactory> log_;
47-
std::unique_ptr<FIX::SocketInitiator> initiator_;
48-
// thread
49-
std::jthread worker_;
50-
std::function<void(std::stop_token)> worker_task_;
45+
std::unique_ptr<FIX::ThreadedSocketInitiator> initiator_;
5146
};
5247

5348
} // namespace binance

src/main.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
#include "utils/threading.h"
1111

1212
int main() {
13-
utils::Threading::set_thread_name("tradercppMAIN");
13+
utils::Threading::set_thread_name("main");
1414

15-
// LOG CONFIG
15+
// log config
1616
spdlog::cfg::load_env_levels("LOG_LEVEL");
1717
const char* log_path = std::getenv("LOG_PATH");
1818
const auto logger = spdlog::basic_logger_mt("basic_logger", log_path);
@@ -21,7 +21,7 @@ int main() {
2121
spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] [%t] %v");
2222
spdlog::flush_every(std::chrono::microseconds(100));
2323
spdlog::info("hello");
24-
utils::Env::log_arch();
24+
utils::Env::log_current_architecture();
2525

2626
// Binance market data connectivity
2727
auto b_conf = binance::Config::from_env();

src/ui/log_box/log_file_watcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class LogFileWatcher : public ILogWatcher, public efsw::FileWatchListener {
1515
using Callback = std::function<void(std::vector<std::string>)>;
1616

1717
public:
18-
static inline constexpr std::string THREAD_NAME_ = "tradercppuiLOG";
18+
static inline constexpr std::string THREAD_NAME_ = "ui_logwatcher";
1919

2020
explicit LogFileWatcher(std::string directory, std::string filename)
2121
: directory_(directory), filename_(filename) {

src/ui/order_book_box.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace ui {
1616

1717
class OrderBookBox {
1818
public:
19-
static inline constexpr std::string THREAD_NAME_ = "tradercppuiBOOK";
19+
static inline constexpr std::string THREAD_NAME_ = "ui_orderbook";
2020
// Constructor: takes a label string
2121
OrderBookBox(IScreen& screen,
2222
moodycamel::ConcurrentQueue<std::shared_ptr<const FIX44::Message>>& queue,

src/ui/trade_box.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace ui {
2424

2525
class TradeBox {
2626
public:
27-
static inline constexpr std::string THREAD_NAME_ = "tradercppuiTX";
27+
static inline constexpr std::string THREAD_NAME_ = "ui_tradebox";
2828
TradeBox(IScreen& screen,
2929
binance::Config& binance_config,
3030
moodycamel::ConcurrentQueue<std::shared_ptr<const FIX44::Message>>& queue,

0 commit comments

Comments
 (0)