@@ -21,6 +21,13 @@ namespace databento {
2121// Forward declaration
2222class ILogReceiver ;
2323class LiveBuilder ;
24+
25+ // Timeouts for the Live client's connection and authentication phases.
26+ struct TimeoutConf {
27+ std::chrono::seconds connect{10 };
28+ std::chrono::seconds auth{30 };
29+ };
30+
2431class LiveThreaded ;
2532
2633// A client for interfacing with Databento's real-time and intraday replay
@@ -48,6 +55,8 @@ class LiveBlocking {
4855 std::optional<databento::SlowReaderBehavior> SlowReaderBehavior () const {
4956 return slow_reader_behavior_;
5057 }
58+ const databento::TimeoutConf& TimeoutConf () const { return timeout_conf_; }
59+ std::uint64_t SessionId () const { return session_id_; }
5160 const std::vector<LiveSubscription>& Subscriptions () const { return subscriptions_; }
5261 std::vector<LiveSubscription>& Subscriptions () { return subscriptions_; }
5362
@@ -81,6 +90,26 @@ class LiveBlocking {
8190 //
8291 // This method should only be called after `Start`.
8392 const Record* NextRecord (std::chrono::milliseconds timeout);
93+ // Returns the next record from the internal buffer without performing any
94+ // I/O. Returns `nullptr` if no complete record is buffered. The returned
95+ // pointer is valid until the next call to `TryNextRecord`, `NextRecord`,
96+ // or `FillBuffer`.
97+ //
98+ // This method should only be called after `Start`.
99+ const Record* TryNextRecord ();
100+ // Reads available data from the connection into the internal buffer using
101+ // the heartbeat timeout. Returns the number of bytes read and the status.
102+ // A `read_size` of 0 with `Status::Closed` indicates the connection was
103+ // closed by the gateway.
104+ //
105+ // This method should only be called after `Start`.
106+ IReadable::Result FillBuffer ();
107+ // Reads available data from the connection into the internal buffer.
108+ // Returns the number of bytes read and the status. A `read_size` of 0 with
109+ // `Status::Closed` indicates the connection was closed by the gateway.
110+ //
111+ // This method should only be called after `Start`.
112+ IReadable::Result FillBuffer (std::chrono::milliseconds timeout);
84113 // Stops the session with the gateway. Once stopped, the session cannot be
85114 // restarted.
86115 void Stop ();
@@ -99,25 +128,27 @@ class LiveBlocking {
99128 std::optional<std::chrono::seconds> heartbeat_interval,
100129 std::size_t buffer_size, std::string user_agent_ext,
101130 databento::Compression compression,
102- std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
131+ std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
132+ databento::TimeoutConf timeout_conf);
103133 LiveBlocking (ILogReceiver* log_receiver, std::string key, std::string dataset,
104134 std::string gateway, std::uint16_t port, bool send_ts_out,
105135 VersionUpgradePolicy upgrade_policy,
106136 std::optional<std::chrono::seconds> heartbeat_interval,
107137 std::size_t buffer_size, std::string user_agent_ext,
108138 databento::Compression compression,
109- std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
139+ std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
140+ databento::TimeoutConf timeout_conf);
110141
111142 std::string DetermineGateway () const ;
112143 std::uint64_t Authenticate ();
113- std::string DecodeChallenge ();
144+ std::string DecodeChallenge (std::chrono::milliseconds timeout );
114145 std::string GenerateCramReply (std::string_view challenge_key);
115146 std::string EncodeAuthReq (std::string_view auth);
116- std::uint64_t DecodeAuthResp ();
147+ std::uint64_t DecodeAuthResp (std::chrono::milliseconds timeout );
117148 void IncrementSubCounter ();
118149 void Subscribe (std::string_view sub_msg, const std::vector<std::string>& symbols,
119150 bool use_snapshot);
120- IReadable::Result FillBuffer (std::chrono::milliseconds timeout );
151+ const Record* ConsumeBufferedRecord ( );
121152 RecordHeader* BufferRecordHeader ();
122153 std::chrono::milliseconds HeartbeatTimeout () const ;
123154 void CheckHeartbeatTimeout () const ;
@@ -136,6 +167,7 @@ class LiveBlocking {
136167 const std::optional<std::chrono::seconds> heartbeat_interval_;
137168 const databento::Compression compression_;
138169 const std::optional<databento::SlowReaderBehavior> slow_reader_behavior_;
170+ const databento::TimeoutConf timeout_conf_;
139171 detail::LiveConnection connection_;
140172 std::uint32_t sub_counter_{};
141173 std::vector<LiveSubscription> subscriptions_;
0 commit comments