Skip to content

Commit 2805b50

Browse files
committed
Refactoring operations to be within a thread to scale up backtesting sweeps
1 parent 3a01f41 commit 2805b50

15 files changed

Lines changed: 366 additions & 94 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ execute_process(
1111
project(BacktestingEngine)
1212

1313
# Set the C++ standard
14-
set(CMAKE_CXX_STANDARD 20)
14+
set(CMAKE_CXX_STANDARD 23)
1515
set(CMAKE_CXX_STANDARD_REQUIRED ON)
1616

1717
# Configure libpqxx build

backtesting-engine-cpp.xcodeproj/project.pbxproj

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@
141141
94829FC32FCC1D1200710E6E /* env.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = env.hpp; sourceTree = "<group>"; };
142142
94829FC42FCC1D1A00710E6E /* env.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = env.cpp; sourceTree = "<group>"; };
143143
948A9CCD2C906A5600E23669 /* CONVENTIONS.md */ = {isa = PBXFileReference; lastKnownFileType = net.daringfireball.markdown; path = CONVENTIONS.md; sourceTree = "<group>"; };
144+
94B4F02A2FD618B300B08FB4 /* backtestLog.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = backtestLog.hpp; sourceTree = "<group>"; };
145+
94B4F02B2FD618B300B08FB4 /* threadPool.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = threadPool.hpp; sourceTree = "<group>"; };
144146
94BBA4512D2EA2640010E04D /* build.sh */ = {isa = PBXFileReference; lastKnownFileType = text.script.sh; path = build.sh; sourceTree = "<group>"; };
145147
94C331A02FA899A8006BD690 /* decimal_json.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = decimal_json.hpp; sourceTree = "<group>"; };
146148
94CD832A2D2D22C900041BBA /* config.yml */ = {isa = PBXFileReference; lastKnownFileType = text.yaml; path = config.yml; sourceTree = "<group>"; };
@@ -1505,6 +1507,8 @@
15051507
94B8C7932D3D770800E17EB6 /* utilities */ = {
15061508
isa = PBXGroup;
15071509
children = (
1510+
94B4F02A2FD618B300B08FB4 /* backtestLog.hpp */,
1511+
94B4F02B2FD618B300B08FB4 /* threadPool.hpp */,
15081512
945F475C2FD5607E00D19164 /* queueKeys.hpp */,
15091513
943770432FD4351100317424 /* parameterSweep.hpp */,
15101514
94829FC32FCC1D1200710E6E /* env.hpp */,
@@ -3811,7 +3815,7 @@
38113815
ASSETCATALOG_COMPILER_GENERATE_SWIFT_ASSET_SYMBOL_EXTENSIONS = YES;
38123816
CLANG_ANALYZER_NONNULL = YES;
38133817
CLANG_ANALYZER_NUMBER_OBJECT_CONVERSION = YES_AGGRESSIVE;
3814-
CLANG_CXX_LANGUAGE_STANDARD = "gnu++20";
3818+
CLANG_CXX_LANGUAGE_STANDARD = "gnu++23";
38153819
CLANG_ENABLE_MODULES = YES;
38163820
CLANG_ENABLE_OBJC_ARC = YES;
38173821
CLANG_ENABLE_OBJC_WEAK = YES;
@@ -3872,7 +3876,7 @@
38723876
ASSETCATALOG_COMPILER_GENERATE_SWIFT_ASSET_SYMBOL_EXTENSIONS = YES;
38733877
CLANG_ANALYZER_NONNULL = YES;
38743878
CLANG_ANALYZER_NUMBER_OBJECT_CONVERSION = YES_AGGRESSIVE;
3875-
CLANG_CXX_LANGUAGE_STANDARD = "gnu++20";
3879+
CLANG_CXX_LANGUAGE_STANDARD = "gnu++23";
38763880
CLANG_ENABLE_MODULES = YES;
38773881
CLANG_ENABLE_OBJC_ARC = YES;
38783882
CLANG_ENABLE_OBJC_WEAK = YES;

include/tradingResults.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct TradingResultsStats {
3636
struct TradingResults {
3737
std::string RUN_ID;
3838
std::string timestamp;
39+
double durationSeconds; // wall-clock seconds for this run's backtest
3940
trading_definitions::Configuration config;
4041
TradingResultsStats results;
4142

include/utilities/backtestLog.hpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Backtesting Engine in C++
2+
//
3+
// (c) 2026 Ryan McCaffery | https://mccaffers.com
4+
// This code is licensed under MIT license (see LICENSE.txt for details)
5+
// ---------------------------------------
6+
7+
#ifndef UTILITIES_BACKTEST_LOG_HPP
8+
#define UTILITIES_BACKTEST_LOG_HPP
9+
10+
#include <atomic>
11+
#include <iostream>
12+
#include <mutex>
13+
#include <string_view>
14+
15+
// Cross-cutting logging switches for backtest execution.
16+
//
17+
// When backtests run concurrently (see RedisRunner's thread pool) the chatty
18+
// per-strategy output would both interleave and race on the shared std::cout /
19+
// std::cerr. RedisRunner sets `quiet` so those call sites skip their output
20+
// entirely (no stream access -> no race), while error() serialises the rare
21+
// failure path behind a mutex so genuine problems still surface safely.
22+
namespace backtest_log {
23+
24+
inline std::atomic<bool> quiet{false};
25+
26+
inline std::mutex& errorMutex() {
27+
static std::mutex m;
28+
return m;
29+
}
30+
31+
inline void error(std::string_view message) {
32+
std::scoped_lock lock(errorMutex());
33+
std::cerr << message << std::endl;
34+
}
35+
36+
} // namespace backtest_log
37+
38+
#endif // UTILITIES_BACKTEST_LOG_HPP

include/utilities/threadPool.hpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Backtesting Engine in C++
2+
//
3+
// (c) 2026 Ryan McCaffery | https://mccaffers.com
4+
// This code is licensed under MIT license (see LICENSE.txt for details)
5+
// ---------------------------------------
6+
7+
#ifndef UTILITIES_THREAD_POOL_HPP
8+
#define UTILITIES_THREAD_POOL_HPP
9+
10+
#include <condition_variable>
11+
#include <cstddef>
12+
#include <exception>
13+
#include <functional>
14+
#include <mutex>
15+
#include <queue>
16+
#include <stop_token>
17+
#include <thread>
18+
#include <utility>
19+
#include <vector>
20+
21+
// A small fixed-size pool for CPU-bound work, built on std::jthread.
22+
//
23+
// std::jthread (not std::thread) buys us three things here:
24+
// * Constructor exception safety: if the OS throws while spawning the Nth
25+
// worker, unwinding the vector auto-request_stop()s and joins the workers
26+
// already started. std::thread would hit un-joined threads -> terminate().
27+
// * A trivial destructor: ~std::jthread does request_stop() then join(), so
28+
// there is no manual stop flag, signal, or join loop to maintain.
29+
// * Native cancellation: each worker takes a std::stop_token and waits with
30+
// condition_variable_any, so a stop request wakes the wait directly.
31+
//
32+
// submit() applies backpressure: it blocks once `capacity_` tasks are in flight
33+
// (queued + executing). This bounds memory and paces the producer to the
34+
// workers. wait() blocks until everything has finished and never throws, so it
35+
// is safe to call during stack unwinding (e.g. from a destructor). The first
36+
// exception thrown by any task is captured and surfaced via takeError().
37+
class ThreadPool {
38+
public:
39+
explicit ThreadPool(std::size_t threads) : capacity_(threads * 2) {
40+
workers_.reserve(threads);
41+
for (std::size_t i = 0; i < threads; ++i) {
42+
workers_.emplace_back([this](std::stop_token st) { workerLoop(st); });
43+
}
44+
}
45+
46+
ThreadPool(const ThreadPool&) = delete;
47+
ThreadPool& operator=(const ThreadPool&) = delete;
48+
49+
// Enqueue a task, blocking while `capacity_` tasks are already in flight.
50+
void submit(std::function<void()> task) {
51+
std::unique_lock lock(mutex_);
52+
slotFree_.wait(lock, [this] { return inFlight_ < capacity_; });
53+
tasks_.push(std::move(task));
54+
++inFlight_;
55+
workAvailable_.notify_one();
56+
}
57+
58+
// Block until no task is queued or executing. Never throws.
59+
void wait() {
60+
std::unique_lock lock(mutex_);
61+
idle_.wait(lock, [this] { return inFlight_ == 0; });
62+
}
63+
64+
// Returns and clears the first exception captured from a task, if any.
65+
std::exception_ptr takeError() {
66+
std::scoped_lock lock(mutex_);
67+
return std::exchange(firstError_, nullptr);
68+
}
69+
70+
private:
71+
void workerLoop(std::stop_token st) {
72+
for (;;) {
73+
std::function<void()> task;
74+
{
75+
std::unique_lock lock(mutex_);
76+
workAvailable_.wait(lock, st, [this] { return !tasks_.empty(); });
77+
if (tasks_.empty()) {
78+
return; // woken by a stop request with nothing left to do
79+
}
80+
task = std::move(tasks_.front());
81+
tasks_.pop();
82+
}
83+
84+
std::exception_ptr err;
85+
try {
86+
task();
87+
} catch (...) {
88+
err = std::current_exception();
89+
}
90+
91+
{
92+
std::scoped_lock lock(mutex_);
93+
if (err && !firstError_) {
94+
firstError_ = err;
95+
}
96+
--inFlight_;
97+
if (inFlight_ == 0) {
98+
idle_.notify_all();
99+
}
100+
}
101+
slotFree_.notify_one();
102+
}
103+
}
104+
105+
std::mutex mutex_;
106+
std::condition_variable_any workAvailable_; // workers wait here (stop-aware)
107+
std::condition_variable slotFree_; // submit() waits here
108+
std::condition_variable idle_; // wait() waits here
109+
std::queue<std::function<void()>> tasks_;
110+
std::size_t inFlight_ = 0;
111+
std::size_t capacity_;
112+
std::exception_ptr firstError_;
113+
114+
// Declared last so the jthreads stop and join before the synchronisation
115+
// members they touch above are destroyed.
116+
std::vector<std::jthread> workers_;
117+
};
118+
119+
#endif // UTILITIES_THREAD_POOL_HPP

scripts/build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fi
2727

2828
# 1. Generate build files (Passing your CXX flags directly to CMake instead of configure)
2929
cmake .. \
30-
-DCMAKE_CXX_STANDARD=20 \
30+
-DCMAKE_CXX_STANDARD=23 \
3131
-DCMAKE_BUILD_TYPE=Release \
3232
-DSKIP_BUILD_TEST=ON \
3333
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON

scripts/build_dep.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ SDK_PATH="$(xcrun --show-sdk-path 2>/dev/null || true)"
2626
build_dep() {
2727
local dep_dir="$1"
2828
local -a cmake_args=(
29-
-DCMAKE_CXX_STANDARD=20
29+
-DCMAKE_CXX_STANDARD=23
3030
-DCMAKE_BUILD_TYPE=Release
3131
-DSKIP_BUILD_TEST=ON
3232
-DCMAKE_CXX_FLAGS="-w"

source/commands/loadCommand.cpp

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88

99
#include <array>
1010
#include <charconv>
11-
#include <iostream>
11+
#include <print>
12+
#include <ranges>
1213
#include <stdexcept>
1314
#include <string>
1415
#include <system_error>
@@ -26,7 +27,8 @@
2627
#include "parameterSweep.hpp"
2728
#include "queueKeys.hpp"
2829
#include "redisLoader.hpp"
29-
#include "trading_definitions.hpp"
30+
#include "run_configuration.hpp"
31+
#include "strategy.hpp"
3032

3133
namespace {
3234

@@ -60,8 +62,8 @@ sweep::ParameterGenerator buildRandomStrategySweep() {
6062
sweep::ParameterGenerator generator;
6163
// generator.addRange("OHLC_COUNT", 80, 20, 140); // 80, 100, 120, 140
6264
// generator.addList("OHLC_MINUTES", {1, 3, 5, 8});
63-
generator.addList("STOP_DISTANCE_IN_PIPS", {1.0, 1.5, 2.0});
64-
generator.addList("LIMIT_DISTANCE_IN_PIPS", {1.0, 1.5, 2.0});
65+
generator.addList("STOP_DISTANCE_IN_PIPS", {1.0, 1.5, 10.0});
66+
generator.addList("LIMIT_DISTANCE_IN_PIPS", {1.0, 1.5, 10.0});
6567
return generator;
6668
}
6769

@@ -70,8 +72,8 @@ sweep::ParameterGenerator buildRandomStrategySweep() {
7072
RunConfiguration makeRunConfiguration(const std::string& runId) {
7173
return RunConfiguration{
7274
.RUN_ID = runId,
73-
.SYMBOLS = "EURUSD,AUDUSD",
74-
.LAST_MONTHS = 2,
75+
.SYMBOLS = "EURUSD",
76+
.LAST_MONTHS = 6,
7577
};
7678
}
7779

@@ -82,7 +84,9 @@ Strategy makeStrategy(const sweep::Combination& combo) {
8284
using namespace trading_definitions;
8385

8486
return Strategy{
85-
.UUID = "",
87+
// Each parameter combination gets its own UUID so a single backtest
88+
// result is uniquely identifiable and traceable back to its inputs.
89+
.UUID = boost::uuids::to_string(boost::uuids::random_generator()()),
8690
.TRADING_VARIABLES = TradingVariables{
8791
.STRATEGY = "RandomStrategy",
8892
.STOP_DISTANCE_IN_PIPS = toDecimal(combo.get("STOP_DISTANCE_IN_PIPS")),
@@ -108,36 +112,40 @@ Strategy makeStrategy(const sweep::Combination& combo) {
108112
int LoadCommand::run() {
109113
// One RUN_ID identifies the whole sweep; each combination becomes its own
110114
// queue entry, distinguished by its parameter values.
111-
const std::string runId = boost::uuids::to_string(boost::uuids::random_generator()());
112-
const std::string redisHost = env::getOr("REDIS_HOST", "127.0.0.1");
115+
const auto runId = boost::uuids::to_string(boost::uuids::random_generator()());
116+
const auto redisHost = env::getOr("REDIS_HOST", "127.0.0.1");
113117

114118
// Build random here
115-
const sweep::ParameterGenerator generator = buildRandomStrategySweep();
119+
const auto generator = buildRandomStrategySweep();
116120

117-
const std::vector<sweep::Combination> combinations = generator.generateAllCombinations();
121+
const auto combinations = generator.generateAllCombinations();
118122

119-
std::cout << "LoadCommand: sweeping " << combinations.size()
120-
<< " parameter combination(s) for RUN_ID=" << runId << std::endl;
123+
std::println("LoadCommand: sweeping {} parameter combination(s) for RUN_ID={}",
124+
combinations.size(), runId);
121125

122-
// Serialise every swept strategy first.
123-
std::vector<std::string> strategyPayloads;
124-
strategyPayloads.reserve(combinations.size());
125-
for (const sweep::Combination& combo : combinations) {
126-
const nlohmann::json j = makeStrategy(combo);
127-
strategyPayloads.push_back(j.dump());
128-
}
126+
// Serialise every swept strategy first. combinations is a sized range, so
127+
// std::ranges::to reserves up front (no manual reserve needed). The json type
128+
// is pinned explicitly because makeStrategy returns a Strategy and relies on
129+
// the implicit conversion for .dump().
130+
const auto strategyPayloads =
131+
combinations | std::views::transform([](const sweep::Combination& combo) {
132+
const nlohmann::json j = makeStrategy(combo);
133+
return j.dump();
134+
}) | std::ranges::to<std::vector<std::string>>();
129135

130136
// Push all strategies BEFORE the run descriptor. A worker that sees the run
131137
// immediately drains the strategy list and retires the run when empty, so the
132138
// full set must already be present the moment the run becomes visible.
133-
const std::string strategyKey = queue_keys::strategyKey(runId);
134-
const int strategyStatus = RedisLoader::loadPayloadBatch(
139+
const auto strategyKey = queue_keys::strategyKey(runId);
140+
const auto strategyStatus = RedisLoader::loadPayloadBatch(
135141
redisHost, 6379, strategyKey, strategyPayloads);
136142
if (strategyStatus != 0) {
137143
return strategyStatus;
138144
}
139145

140-
// Now advertise the run so workers can pick it up.
146+
// Now advertise the run so workers can pick it up. runJson is pinned to
147+
// nlohmann::json (not auto) because makeRunConfiguration returns a
148+
// RunConfiguration and relies on the implicit conversion for .dump().
141149
const nlohmann::json runJson = makeRunConfiguration(runId);
142150
return RedisLoader::loadPayload(redisHost, 6379, queue_keys::RUN,
143151
runJson.dump());

source/commands/runCommand.cpp

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,26 @@
66

77
#include "runCommand.hpp"
88

9-
#include <iostream>
109
#include <string>
1110

1211
#include "backtestRunner.hpp"
1312
#include "env.hpp"
1413
#include "jsonParser.hpp"
1514
#include "redisRunner.hpp"
1615

17-
namespace {
18-
19-
int runBacktestFromBase64(const std::string& questdbHost,
20-
const std::string& base64Config) {
21-
auto config = JsonParser::parseConfigurationFromBase64(base64Config);
22-
return runBacktest(questdbHost, config);
23-
}
24-
25-
} // namespace
26-
2716
int RunCommand::run(int argc, const char* argv[]) {
2817
if (argc < 3) {
29-
std::cerr << "Usage: BacktestingEngine run <questdb-host>\n"
30-
<< " BacktestingEngine run <questdb-host> <base64-config>"
31-
<< std::endl;
18+
// C++23 std::println is faster, safer, and cleaner than iostreams
19+
std::println(stderr, "Usage: BacktestingEngine run <questdb-host>\n"
20+
" BacktestingEngine run <questdb-host> <base64-config>");
3221
return 1;
3322
}
23+
3424
if (argc == 3) {
3525
return RedisRunner::run(argv[2], env::getOr("REDIS_HOST", "127.0.0.1"));
3626
}
37-
return runBacktestFromBase64(argv[2], argv[3]);
38-
}
27+
28+
// Else we'll just parse one base64 blob
29+
auto config = JsonParser::parseConfigurationFromBase64(argv[3]);
30+
return runBacktest(argv[2], config);
31+
}

0 commit comments

Comments
 (0)