Skip to content

Commit 1da3d01

Browse files
committed
feat(hft): finalize dual-threaded engine & alignment verification
1 parent 076b37a commit 1da3d01

3 files changed

Lines changed: 180 additions & 0 deletions

File tree

benchmark_hft_engine.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
2+
#include "quanux/MarketTick.hpp"
3+
#include "quanux/SPSCQueue.hpp"
4+
#include <atomic>
5+
#include <chrono>
6+
#include <iostream>
7+
#include <thread>
8+
#include <vector>
9+
10+
using namespace quanux;
11+
12+
// Signal simplified for benchmark
13+
struct Signal {
14+
uint64_t tick_ts;
15+
uint32_t instrument_id;
16+
double z_score;
17+
double volatility;
18+
};
19+
20+
int main() {
21+
SPSCQueue<Signal> queue(4096);
22+
std::atomic<bool> running{true};
23+
std::vector<uint64_t> latencies;
24+
latencies.reserve(1000000);
25+
26+
// Consumer Thread (Execution)
27+
std::thread consumer([&]() {
28+
Signal sig;
29+
while (running) {
30+
if (queue.pop(sig)) {
31+
auto now = std::chrono::steady_clock::now();
32+
auto tick_time = std::chrono::steady_clock::time_point(
33+
std::chrono::nanoseconds(sig.tick_ts));
34+
auto latency = std::chrono::duration_cast<std::chrono::nanoseconds>(
35+
now - tick_time)
36+
.count();
37+
latencies.push_back(latency);
38+
}
39+
}
40+
});
41+
42+
// Producer Loop (Simulation)
43+
std::cout << "Running Micro-Benchmark (1M messages)..." << std::endl;
44+
auto start = std::chrono::steady_clock::now();
45+
46+
for (int i = 0; i < 1000000; ++i) {
47+
auto now = std::chrono::steady_clock::now();
48+
uint64_t ts = std::chrono::duration_cast<std::chrono::nanoseconds>(
49+
now.time_since_epoch())
50+
.count();
51+
Signal sig{ts, 1, 3.0, 0.5}; // Always trigger
52+
while (!queue.push(sig)) {
53+
// spin
54+
}
55+
}
56+
57+
// Wait for drain
58+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
59+
running = false;
60+
consumer.join();
61+
auto end = std::chrono::steady_clock::now();
62+
63+
std::cout << "Done." << std::endl;
64+
65+
// Calculate Stats
66+
uint64_t sum = 0;
67+
uint64_t min_lat = -1;
68+
uint64_t max_lat = 0;
69+
for (auto lat : latencies) {
70+
sum += lat;
71+
if (lat < min_lat)
72+
min_lat = lat;
73+
if (lat > max_lat)
74+
max_lat = lat;
75+
}
76+
77+
if (!latencies.empty()) {
78+
std::cout << "Avg Latency: " << (sum / latencies.size()) << " ns"
79+
<< std::endl;
80+
std::cout << "Min Latency: " << min_lat << " ns" << std::endl;
81+
std::cout << "Max Latency: " << max_lat << " ns" << std::endl;
82+
std::cout << "Throughput: "
83+
<< (1000000.0 /
84+
std::chrono::duration_cast<std::chrono::milliseconds>(end -
85+
start)
86+
.count() *
87+
1000)
88+
<< " msg/sec" << std::endl;
89+
}
90+
91+
return 0;
92+
}

check_struct.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#include "quanux/MarketTick.hpp"
2+
#include <cstddef>
3+
#include <iostream>
4+
5+
int main() {
6+
std::cout << "Sizeof MarketTick: " << sizeof(quanux::MarketTick) << std::endl;
7+
std::cout << "Alignof MarketTick: " << alignof(quanux::MarketTick)
8+
<< std::endl;
9+
std::cout << "Offset internal_arrival_ts: "
10+
<< offsetof(quanux::MarketTick, internal_arrival_ts) << std::endl;
11+
std::cout << "Offset _pad: " << offsetof(quanux::MarketTick, _pad)
12+
<< std::endl;
13+
14+
if (sizeof(quanux::MarketTick) == 64) {
15+
std::cout << "SUCCESS: 64 bytes" << std::endl;
16+
return 0;
17+
} else {
18+
std::cout << "FAILURE: " << sizeof(quanux::MarketTick) << " bytes"
19+
<< std::endl;
20+
return 1;
21+
}
22+
}

verify_hft_engine.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
2+
import asyncio
3+
import nats
4+
import struct
5+
import time
6+
import random
7+
8+
# MarketTick Structure (64-byte aligned)
9+
# uint64_t local_rec_ts; // 8
10+
# uint64_t exchange_ts; // 8
11+
# double price; // 8
12+
# uint32_t size; // 4
13+
# uint32_t flags; // 4
14+
# uint32_t instrument_id; // 4
15+
# uint64_t internal_arrival_ts; // 8
16+
# uint64_t processing_start_ts; // 8
17+
# uint8_t _pad[8]; // 8 (Explicit) + 4 (Implicit after instrument_id)
18+
# Total: 64 bytes
19+
20+
# Q=8, d=8, I=4.
21+
# Layout: Q Q d I I I [4x implicit] Q Q [8x explicit]
22+
STRUCT_FMT = "QQdIII4xQQ8x"
23+
24+
async def run():
25+
nc = await nats.connect("nats://localhost:4222")
26+
27+
print("Connected to NATS. Sending binary ticks to MARKET.BIN...")
28+
29+
instrument_id = 101 # Example ID for "AAPL"
30+
price = 150.0
31+
32+
start_time = time.time()
33+
34+
for i in range(100):
35+
price += random.uniform(-0.05, 0.05)
36+
now_ns = int(time.time_ns())
37+
38+
# Pack the struct
39+
# We leave internal timestamps 0, engine will fill them
40+
payload = struct.pack(STRUCT_FMT,
41+
now_ns, # local_rec_ts
42+
now_ns, # exchange_ts
43+
price, # price
44+
100, # size
45+
0, # flags
46+
instrument_id,# instrument_id
47+
0, # internal_arrival_ts (placeholder)
48+
0 # processing_start_ts (placeholder)
49+
)
50+
51+
await nc.publish("MARKET.BIN", payload)
52+
53+
if i % 10 == 0:
54+
print(f"Sent tick {i}: Price {price:.2f}")
55+
56+
# Burst simulation
57+
# await asyncio.sleep(0.0001)
58+
59+
# Determine throughput
60+
elapsed = time.time() - start_time
61+
print(f"Sent 100 ticks in {elapsed:.4f}s")
62+
63+
await nc.close()
64+
65+
if __name__ == '__main__':
66+
asyncio.run(run())

0 commit comments

Comments
 (0)