-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathsimple.cpp
More file actions
84 lines (77 loc) · 2.8 KB
/
simple.cpp
File metadata and controls
84 lines (77 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <csignal> // sig_atomic_t
#include <cstdint>
#include <databento/constants.hpp>
#include <databento/dbn.hpp>
#include <databento/enums.hpp>
#include <databento/live.hpp>
#include <databento/live_threaded.hpp>
#include <databento/log.hpp>
#include <databento/record.hpp>
#include <databento/symbol_map.hpp>
#include <databento/with_ts_out.hpp>
#include <iostream>
#include <memory>
namespace db = databento;
static std::sig_atomic_t volatile gSignal;
int main() {
db::PitSymbolMap symbol_mappings;
auto log_receiver = std::make_unique<db::ConsoleLogReceiver>(db::LogLevel::Debug);
auto client = db::LiveThreaded::Builder()
.SetLogReceiver(log_receiver.get())
.SetSendTsOut(true)
.SetKeyFromEnv()
.SetDataset(db::Dataset::GlbxMdp3)
.SetCompression(db::Compression::Zstd)
.BuildThreaded();
// Set up signal handler for Ctrl+C
std::signal(SIGINT, [](int signal) { gSignal = signal; });
std::vector<std::string> symbols{"ESZ6", "ESZ6 C8200", "ESZ6 P7500"};
client.Subscribe(symbols, db::Schema::Definition, db::SType::RawSymbol);
client.Subscribe(symbols, db::Schema::Mbo, db::SType::RawSymbol);
auto metadata_callback = [](db::Metadata&& metadata) {
std::cout << metadata << '\n';
};
auto record_callback = [&symbol_mappings](const db::Record& rec) {
using db::RType;
switch (rec.RType()) {
case RType::Mbo: {
auto ohlcv = rec.Get<db::WithTsOut<db::MboMsg>>();
std::cout << "Received tick for " << symbol_mappings[ohlcv.rec.hd.instrument_id]
<< " with ts_out " << ohlcv.ts_out.time_since_epoch().count() << ": "
<< ohlcv.rec << '\n';
break;
}
case RType::InstrumentDef: {
std::cout << "Received definition: " << rec.Get<db::InstrumentDefMsg>() << '\n';
break;
}
case RType::SymbolMapping: {
auto mapping = rec.Get<db::SymbolMappingMsg>();
symbol_mappings.OnSymbolMapping(mapping);
break;
}
case RType::System: {
const auto& system_msg = rec.Get<db::SystemMsg>();
if (!system_msg.IsHeartbeat()) {
std::cout << "Received system msg: " << system_msg.Msg() << '\n';
}
break;
}
case RType::Error: {
std::cerr << "Received error from gateway: " << rec.Get<db::ErrorMsg>().Err()
<< '\n';
break;
}
default: {
std::cerr << "Received unknown record with rtype " << std::hex
<< static_cast<std::uint16_t>(rec.RType()) << '\n';
}
}
return db::KeepGoing::Continue;
};
client.Start(metadata_callback, record_callback);
while (::gSignal == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
return 0;
}