Skip to content

Commit e04126a

Browse files
committed
feat: thread pinning
1 parent adbde53 commit e04126a

8 files changed

Lines changed: 231 additions & 19 deletions

File tree

src/binance/config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ struct Config {
5858

5959
/// @brief 1 == top level, otherwise 5000 is Binance's maximum depth
6060
static constexpr uint16_t MAX_DEPTH = 100;
61+
62+
static constexpr uint8_t PX_SESSION_CPU_AFFINITY = 0;
63+
static constexpr uint8_t TX_SESSION_CPU_AFFINITY = 1;
6164
};
6265

6366
} // namespace binance

src/binance/fix_app.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,14 @@ namespace binance {
2626

2727
FixApp::FixApp(const std::vector<std::string>& symbols,
2828
std::unique_ptr<IAuth> auth,
29-
const uint16_t MAX_DEPTH)
30-
: symbols_(symbols), auth_(std::move(auth)), MAX_DEPTH_(MAX_DEPTH) {}
29+
const uint16_t MAX_DEPTH,
30+
const uint8_t PX_SESSION_CPU_AFFINITY,
31+
const uint8_t TX_SESSION_CPU_AFFINITY)
32+
: symbols_(symbols),
33+
auth_(std::move(auth)),
34+
MAX_DEPTH_(MAX_DEPTH),
35+
PX_SESSION_CPU_AFFINITY_(PX_SESSION_CPU_AFFINITY),
36+
TX_SESSION_CPU_AFFINITY_(TX_SESSION_CPU_AFFINITY) {}
3137

3238
void FixApp::subscribe_to_prices(const FIX::SessionID& session_id) const {
3339
spdlog::info("subscribing to depth. qualifier [{}], id [{}]",
@@ -123,21 +129,23 @@ void FixApp::onLogon(const FIX::SessionID& sessionId) {
123129
// TODO: now need to wait for all sessions to be logged on before nullifying keys
124130
// auth_->clear_keys();
125131

126-
if (sessionId.getSessionQualifier() == PRICE_SESSION_QUALIFIER_) {
132+
std::string thread_name = THREAD_NAME_ + "_" + sessionId.getSessionQualifier();
133+
utils::Threading::set_thread_name(thread_name);
134+
spdlog::info("naming FIX session thread, name [{}], id [{}]", thread_name,
135+
utils::Threading::get_os_thread_id());
136+
137+
if (sessionId.getSessionQualifier() == PX_SESSION_QUALIFIER_) {
138+
utils::Threading::set_current_thread_affinity(PX_SESSION_CPU_AFFINITY_);
127139
subscribe_to_prices(sessionId);
128-
} else if (sessionId.getSessionQualifier() == TRADE_SESSION_QUALIFIER_) {
140+
} else if (sessionId.getSessionQualifier() == TX_SESSION_QUALIFIER_) {
141+
utils::Threading::set_current_thread_affinity(TX_SESSION_CPU_AFFINITY_);
129142
subscribe_to_trades(sessionId);
130-
} else if (sessionId.getSessionQualifier() == ORDER_SESSION_QUALIFIER_) {
143+
} else if (sessionId.getSessionQualifier() == OX_SESSION_QUALIFIER_) {
131144
// do nothing for order session
132145
} else {
133146
spdlog::error("unknown session, qualifier [{}], id [{}]",
134147
sessionId.getSessionQualifier(), sessionId.toString());
135148
}
136-
137-
std::string thread_name = THREAD_NAME_ + "_" + sessionId.getSessionQualifier();
138-
utils::Threading::set_thread_name(thread_name);
139-
spdlog::info("naming FIX session thread, name [{}], id [{}]", thread_name,
140-
utils::Threading::get_os_thread_id());
141149
};
142150
void FixApp::onLogout(const FIX::SessionID& sessionId) {
143151
spdlog::info("session logout. qualifier [{}], id [{}]", sessionId.getSessionQualifier(),
@@ -210,9 +218,9 @@ void FixApp::onMessage(const FIX44::MarketDataSnapshotFullRefresh& m,
210218
}
211219
void FixApp::onMessage(const FIX44::MarketDataIncrementalRefresh& m,
212220
const FIX::SessionID& sessionID) {
213-
if (sessionID.getSessionQualifier() == PRICE_SESSION_QUALIFIER_) {
221+
if (sessionID.getSessionQualifier() == PX_SESSION_QUALIFIER_) {
214222
order_queue_.enqueue(std::make_shared<const FIX44::MarketDataIncrementalRefresh>(m));
215-
} else if (sessionID.getSessionQualifier() == TRADE_SESSION_QUALIFIER_) {
223+
} else if (sessionID.getSessionQualifier() == TX_SESSION_QUALIFIER_) {
216224
trade_queue_.enqueue(std::make_shared<const FIX44::MarketDataIncrementalRefresh>(m));
217225
} else {
218226
spdlog::error(

src/binance/fix_app.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ class FixApp final : public FIX::Application, public FIX44::MessageCracker {
2020
public:
2121
FixApp(const std::vector<std::string>& symbols,
2222
std::unique_ptr<IAuth> auth,
23-
const uint16_t MAX_DEPTH);
23+
const uint16_t MAX_DEPTH,
24+
const uint8_t PX_SESSION_CPU_AFFINITY,
25+
const uint8_t TX_SESSION_CPU_AFFINITY);
2426
~FixApp() override = default;
2527

2628
// Use the FIX44::MessageCracker to pull in the relevant overloads. This resolves the
@@ -43,12 +45,14 @@ class FixApp final : public FIX::Application, public FIX44::MessageCracker {
4345

4446
private:
4547
static inline constexpr std::string THREAD_NAME_ = "fix_session";
46-
static inline constexpr std::string PRICE_SESSION_QUALIFIER_ = "PX";
47-
static inline constexpr std::string TRADE_SESSION_QUALIFIER_ = "TX";
48-
static inline constexpr std::string ORDER_SESSION_QUALIFIER_ = "OX";
48+
static inline constexpr std::string PX_SESSION_QUALIFIER_ = "PX";
49+
static inline constexpr std::string TX_SESSION_QUALIFIER_ = "TX";
50+
static inline constexpr std::string OX_SESSION_QUALIFIER_ = "OX";
4951
const std::vector<std::string>& symbols_;
5052
const std::unique_ptr<IAuth> auth_;
5153
const uint16_t MAX_DEPTH_;
54+
const uint8_t PX_SESSION_CPU_AFFINITY_;
55+
const uint8_t TX_SESSION_CPU_AFFINITY_;
5256

5357
void onCreate(const FIX::SessionID&) override;
5458
void onLogon(const FIX::SessionID&) override;

src/binance/worker.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ Worker::Worker(std::unique_ptr<FixApp> app,
3131
Worker Worker::from_conf(Config& conf) {
3232
std::unique_ptr<IAuth> auth =
3333
std::make_unique<Auth>(conf.api_key, conf.private_key_path);
34-
auto app = std::make_unique<FixApp>(conf.symbols, std::move(auth), conf.MAX_DEPTH);
34+
auto app = std::make_unique<FixApp>(conf.symbols, std::move(auth), conf.MAX_DEPTH,
35+
conf.PX_SESSION_CPU_AFFINITY,
36+
conf.TX_SESSION_CPU_AFFINITY);
3537
auto settings = FIX::SessionSettings{conf.fix_config_path};
3638
auto store = std::make_unique<FIX::FileStoreFactory>(settings);
3739
auto log = std::make_unique<FIX::FileLogFactory>(settings);

src/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ int main() {
2121
spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] [%t] %v");
2222
spdlog::flush_every(std::chrono::microseconds(100));
2323
spdlog::info("hello");
24-
utils::Env::log_arch();
24+
utils::Env::log_current_architecture();
2525

2626
// Binance market data connectivity
2727
auto b_conf = binance::Config::from_env();

src/utils/env.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ struct Env {
2424
};
2525

2626
/// @brief log the architecutre in use
27-
static void log_arch() {
27+
static void log_current_architecture() {
2828
// detect architecture
2929
#if defined(__aarch64__) || defined(_M_ARM64)
3030
constexpr const char* ARCH = "ARM64";

src/utils/threading.cpp

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1+
12
#include "threading.h"
23

4+
#include <cstring>
5+
#include <iostream>
6+
#include <optional>
37
#include <string>
8+
#include <system_error>
9+
#include <thread>
410

511
#include "spdlog/spdlog.h"
612

@@ -9,8 +15,12 @@
915
#endif
1016

1117
#if defined(__linux__)
18+
#include <sched.h>
1219
#include <sys/syscall.h>
1320
#include <unistd.h>
21+
#elif defined(__APPLE__)
22+
#include <mach/mach.h>
23+
#include <mach/thread_policy.h>
1424
#elif defined(_WIN32)
1525
#include <windows.h>
1626

@@ -46,6 +56,51 @@ void Threading::set_thread_name(const std::string& name) {
4656
#endif
4757
}
4858

59+
// static function
60+
std::string Threading::get_thread_name() {
61+
#if defined(__linux__)
62+
char name[16] = {0}; // Linux max length for pthread_getname_np
63+
int rc = pthread_getname_np(pthread_self(), name, sizeof(name));
64+
if (rc != 0) {
65+
spdlog::error("failed to get thread name. error [{}]",
66+
std::system_category().message(rc));
67+
return {};
68+
}
69+
return std::string(name);
70+
71+
#elif defined(__APPLE__)
72+
char name[64] = {0}; // macOS max 64 including null terminator
73+
int rc = pthread_getname_np(pthread_self(), name, sizeof(name));
74+
if (rc != 0) {
75+
spdlog::error("failed to get thread name on macOS. error [{}]", rc);
76+
return {};
77+
}
78+
return std::string(name);
79+
80+
#elif defined(_WIN32)
81+
// Windows 10 1607+ has GetThreadDescription
82+
PWSTR wname = nullptr;
83+
HRESULT hr = GetThreadDescription(GetCurrentThread(), &wname);
84+
if (FAILED(hr) || wname == nullptr) {
85+
return {};
86+
}
87+
// Convert UTF-16 to UTF-8
88+
int size_needed =
89+
WideCharToMultiByte(CP_UTF8, 0, wname, -1, nullptr, 0, nullptr, nullptr);
90+
std::string name(size_needed, 0);
91+
WideCharToMultiByte(CP_UTF8, 0, wname, -1, &name[0], size_needed, nullptr, nullptr);
92+
LocalFree(wname); // free memory allocated by GetThreadDescription
93+
// remove trailing null if present
94+
if (!name.empty() && name.back() == '\0') {
95+
name.pop_back();
96+
}
97+
return name;
98+
99+
#else
100+
return {}; // Unsupported platform
101+
#endif
102+
}
103+
49104
// static function
50105
uint64_t Threading::get_os_thread_id() {
51106
#if defined(__linux__)
@@ -66,4 +121,99 @@ uint64_t Threading::get_os_thread_id() {
66121
#endif
67122
}
68123

124+
// static function
125+
unsigned int Threading::get_cpu_count() {
126+
#if defined(__linux__) || defined(__APPLE__)
127+
long n = ::sysconf(_SC_NPROCESSORS_ONLN);
128+
if (n > 0) {
129+
return static_cast<unsigned int>(n);
130+
}
131+
// Fallback if sysconf fails
132+
unsigned int std_n = std::thread::hardware_concurrency();
133+
return std_n > 0 ? std_n : 1;
134+
135+
#elif defined(_WIN32)
136+
SYSTEM_INFO sysinfo;
137+
GetSystemInfo(&sysinfo);
138+
return sysinfo.dwNumberOfProcessors;
139+
140+
#else
141+
unsigned int std_n = std::thread::hardware_concurrency();
142+
return std_n > 0 ? std_n : 1;
143+
#endif
144+
}
145+
146+
// static function
147+
bool Threading::set_native_thread_affinity(std::thread::native_handle_type handle,
148+
unsigned int cpu_id) {
149+
#if defined(__linux__)
150+
cpu_set_t cpuset;
151+
CPU_ZERO(&cpuset);
152+
CPU_SET(cpu_id, &cpuset);
153+
unsigned int num_cpus = get_cpu_count();
154+
if (cpu_id >= num_cpus) {
155+
spdlog::error("cpu_id exceeds available CPUs, cpu_id [{}] available [{}]", cpu_id,
156+
num_cpus);
157+
return false;
158+
}
159+
int rc = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuset);
160+
if (rc != 0) {
161+
spdlog::error("linux: failed to set thread affinity. error [{}]",
162+
std::system_category().message(rc));
163+
return false;
164+
}
165+
spdlog::info("linux: successfully set thread affinity. thread_name [{}] cpu [{}]",
166+
get_thread_name(), cpu_id);
167+
return true;
168+
169+
#elif defined(__APPLE__)
170+
thread_port_t mach_thread = pthread_mach_thread_np(handle);
171+
thread_affinity_policy_data_t policy{.affinity_tag = static_cast<int>(cpu_id + 1)};
172+
kern_return_t kr = thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
173+
reinterpret_cast<thread_policy_t>(&policy),
174+
THREAD_AFFINITY_POLICY_COUNT);
175+
if (kr != KERN_SUCCESS) {
176+
spdlog::error("macOS: failed to set thread affinity. kern_return_t [{}]", kr);
177+
return false;
178+
}
179+
spdlog::info("macOS: successfully set thread affinity. thread_name [{}] cpu [{}]",
180+
get_thread_name(), cpu_id);
181+
return true;
182+
183+
#elif defined(_WIN32)
184+
DWORD_PTR mask = static_cast<DWORD_PTR>(1) << cpu_id;
185+
DWORD_PTR result = SetThreadAffinityMask(handle, mask);
186+
if (result == 0) {
187+
spdlog::error("windows: failed to set thread affinity. error [{}]", GetLastError());
188+
return false;
189+
}
190+
spdlog::info("windows: successfully set thread affinity. thread_name [{}] cpu [{}]",
191+
get_thread_name(), cpu_id);
192+
return true;
193+
194+
#else
195+
(void)handle;
196+
(void)cpu_id;
197+
spdlog::error("thread affinity not supported on this platform");
198+
return false;
199+
#endif
200+
}
201+
202+
// static function
203+
bool Threading::set_thread_affinity(std::thread& t, unsigned int cpu_id) {
204+
return set_native_thread_affinity(t.native_handle(), cpu_id);
205+
}
206+
207+
// static function
208+
bool Threading::set_current_thread_affinity(unsigned int cpu_id) {
209+
#if defined(__linux__) || defined(__APPLE__)
210+
return set_native_thread_affinity(pthread_self(), cpu_id);
211+
#elif defined(_WIN32)
212+
return set_native_thread_affinity(GetCurrentThread(), cpu_id);
213+
#else
214+
spdlog::error("thread affinity not supported on this platform");
215+
return false;
216+
#endif
217+
}
218+
69219
} // namespace utils

src/utils/threading.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
#include <cstdint>
44
#include <string>
5+
#include <thread>
56

67
namespace utils {
78

89
class Threading {
910
public:
11+
/// @brief Returns the current thread's name. If unsupported, returns empty string
12+
static std::string get_thread_name();
13+
1014
/// @brief Sets the current thread's name (visible in debuggers and some profilers).
1115
/// This is useful for debugging and monitoring multi-threaded applications.
1216
/// The maximum allowed name length depends on the platform:
@@ -23,6 +27,47 @@ class Threading {
2327
/// @return A platform-specific thread identifier as an unsigned 64-bit integer.
2428
static uint64_t get_os_thread_id();
2529
static inline constexpr uint64_t ERROR_THREAD_ID = 9999999;
30+
31+
/// @brief Retrieves the number of logical CPU cores currently available to the process.
32+
/// This function returns the count of online logical processors (hardware threads)
33+
/// that the operating system makes available to the calling process.
34+
/// On Linux and macOS, it queries `sysconf(_SC_NPROCESSORS_ONLN)` for an accurate,
35+
/// real-time count that respects CPU hotplug events and container (cgroup) limits.
36+
/// On Windows, it uses `GetSystemInfo()`. If platform-specific calls fail, the function
37+
/// falls back to `std::thread::hardware_concurrency()`.
38+
/// The returned value reflects the number of CPUs available to user-space threads,
39+
/// not necessarily the total number of physical cores. Hyperthreaded CPUs are counted
40+
/// as multiple logical processors.
41+
/// @return The number of available logical CPUs (>= 1). If detection fails, returns 1.
42+
/// @note CPU indices are zero-based, ranging from 0 to (get_cpu_count() - 1).
43+
/// @see set_thread_affinity(), set_native_thread_affinity()
44+
static unsigned int get_cpu_count();
45+
46+
/// @brief Sets the CPU affinity for the calling (current) thread.
47+
/// Attempts to pin the current thread to a specific logical CPU core, preventing
48+
/// the operating system scheduler from migrating it to other cores.
49+
/// CPU affinity can improve performance and reduce latency by keeping execution
50+
/// on a fixed CPU, preserving cache locality and minimizing context switches.
51+
/// On Linux, this uses `pthread_setaffinity_np()` to bind the thread to the target CPU.
52+
/// On macOS, it applies a best-effort affinity hint via `thread_policy_set()`, since
53+
/// strict CPU pinning is not supported by the kernel. On Windows, it uses
54+
/// `SetThreadAffinityMask()` to achieve true CPU binding.
55+
/// @note On macOS, affinity is advisory only and may not guarantee the thread remains
56+
/// on the specified CPU.
57+
/// @note This function only affects the calling thread. To set affinity for another
58+
/// thread, use @ref set_thread_affinity(std::thread&, unsigned int).
59+
/// @see get_cpu_count(), set_thread_affinity(), set_native_thread_affinity()
60+
/// @param cpu_id The zero-based index of the target logical CPU.
61+
/// Valid range is `[0, get_cpu_count() - 1]`.
62+
/// @return `true` if the affinity was successfully applied (or hinted); `false`
63+
/// otherwise.
64+
static bool set_current_thread_affinity(unsigned int cpu_id);
65+
66+
private:
67+
static bool set_thread_affinity(std::thread& t, unsigned int cpu_id);
68+
// platform-specific affinity for native thread handle
69+
static bool set_native_thread_affinity(std::thread::native_handle_type handle,
70+
unsigned int cpu_id);
2671
};
2772

2873
} // namespace utils

0 commit comments

Comments
 (0)