-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathlive_blocking.hpp
More file actions
140 lines (127 loc) · 5.4 KB
/
live_blocking.hpp
File metadata and controls
140 lines (127 loc) · 5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#pragma once
#include <array>
#include <chrono> // milliseconds
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
#include "databento/datetime.hpp" // UnixNanos
#include "databento/dbn.hpp" // Metadata
#include "databento/detail/buffer.hpp"
#include "databento/detail/live_connection.hpp" // LiveConnection
#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy, Compression
#include "databento/live_subscription.hpp"
#include "databento/record.hpp" // Record, RecordHeader
namespace databento {
// Forward declaration
class ILogReceiver;
class LiveBuilder;
class LiveThreaded;
// A client for interfacing with Databento's real-time and intraday replay
// market data API. This client provides a blocking API for getting the next
// record. Unlike Historical, each instance of LiveBlocking is associated with a
// particular dataset.
class LiveBlocking {
public:
static LiveBuilder Builder();
/*
* Getters
*/
const std::string& Key() const { return key_; }
const std::string& Dataset() const { return dataset_; }
const std::string& Gateway() const { return gateway_; }
std::uint16_t Port() const { return port_; }
bool SendTsOut() const { return send_ts_out_; }
VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; }
std::optional<std::chrono::seconds> HeartbeatInterval() const {
return heartbeat_interval_;
}
databento::Compression Compression() const { return compression_; }
const std::vector<LiveSubscription>& Subscriptions() const { return subscriptions_; }
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }
/*
* Methods
*/
// Add a new subscription. A single client instance supports multiple
// subscriptions. Note there is no unsubscribe method. Subscriptions end
// when the client disconnects in its destructor.
void Subscribe(const std::vector<std::string>& symbols, Schema schema,
SType stype_in);
void Subscribe(const std::vector<std::string>& symbols, Schema schema, SType stype_in,
UnixNanos start);
void Subscribe(const std::vector<std::string>& symbols, Schema schema, SType stype_in,
const std::string& start);
void SubscribeWithSnapshot(const std::vector<std::string>& symbols, Schema schema,
SType stype_in);
// Notifies the gateway to start sending messages for all subscriptions.
//
// This method should only be called once per instance.
Metadata Start();
// Block on getting the next record. The returned reference is valid until
// this method is called again.
//
// This method should only be called after `Start`.
const Record& NextRecord();
// Block on getting the next record. The returned pointer is valid until
// this method is called again. Will return `nullptr` if the `timeout` is
// reached.
//
// This method should only be called after `Start`.
const Record* NextRecord(std::chrono::milliseconds timeout);
// Stops the session with the gateway. Once stopped, the session cannot be
// restarted.
void Stop();
// Closes the current connection and attempts to reconnect to the gateway.
void Reconnect();
// Resubscribes to all subscriptions, removing the original `start` time, if
// any. Usually performed after a `Reconnect()`.
void Resubscribe();
private:
friend LiveBuilder;
friend LiveThreaded;
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression);
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression);
std::string DetermineGateway() const;
std::uint64_t Authenticate();
std::string DecodeChallenge();
std::string GenerateCramReply(std::string_view challenge_key);
std::string EncodeAuthReq(std::string_view auth);
std::uint64_t DecodeAuthResp();
void IncrementSubCounter();
void Subscribe(std::string_view sub_msg, const std::vector<std::string>& symbols,
bool use_snapshot);
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
RecordHeader* BufferRecordHeader();
static constexpr std::size_t kMaxStrLen = 24L * 1024;
ILogReceiver* log_receiver_;
const std::string key_;
const std::string dataset_;
const std::string gateway_;
const std::string user_agent_ext_;
const std::uint16_t port_;
const bool send_ts_out_;
std::uint8_t version_{};
const VersionUpgradePolicy upgrade_policy_;
const std::optional<std::chrono::seconds> heartbeat_interval_;
const databento::Compression compression_;
detail::LiveConnection connection_;
std::uint32_t sub_counter_{};
std::vector<LiveSubscription> subscriptions_;
detail::Buffer buffer_;
// Must be 8-byte aligned for records
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
std::uint64_t session_id_;
Record current_record_{nullptr};
};
} // namespace databento