Skip to content

Commit ada2f31

Browse files
committed
ADD: Add TryNextRecord and FillBuffer to C++
1 parent 34c9816 commit ada2f31

File tree

4 files changed

+199
-11
lines changed

4 files changed

+199
-11
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.53.0 - TBD
4+
5+
### Enhancements
6+
- Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained
7+
control around I/O
8+
39
## 0.52.0 - 2026-03-31
410

511
### Enhancements

include/databento/live_blocking.hpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,26 @@ class LiveBlocking {
8181
//
8282
// This method should only be called after `Start`.
8383
const Record* NextRecord(std::chrono::milliseconds timeout);
84+
// Returns the next record from the internal buffer without performing any
85+
// I/O. Returns `nullptr` if no complete record is buffered. The returned
86+
// pointer is valid until the next call to `TryNextRecord`, `NextRecord`,
87+
// or `FillBuffer`.
88+
//
89+
// This method should only be called after `Start`.
90+
const Record* TryNextRecord();
91+
// Reads available data from the connection into the internal buffer using
92+
// the heartbeat timeout. Returns the number of bytes read and the status.
93+
// A `read_size` of 0 with `Status::Closed` indicates the connection was
94+
// closed by the gateway.
95+
//
96+
// This method should only be called after `Start`.
97+
IReadable::Result FillBuffer();
98+
// Reads available data from the connection into the internal buffer.
99+
// Returns the number of bytes read and the status. A `read_size` of 0 with
100+
// `Status::Closed` indicates the connection was closed by the gateway.
101+
//
102+
// This method should only be called after `Start`.
103+
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
84104
// Stops the session with the gateway. Once stopped, the session cannot be
85105
// restarted.
86106
void Stop();
@@ -117,7 +137,7 @@ class LiveBlocking {
117137
void IncrementSubCounter();
118138
void Subscribe(std::string_view sub_msg, const std::vector<std::string>& symbols,
119139
bool use_snapshot);
120-
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
140+
const Record* ConsumeBufferedRecord();
121141
RecordHeader* BufferRecordHeader();
122142
std::chrono::milliseconds HeartbeatTimeout() const;
123143
void CheckHeartbeatTimeout() const;

src/live_blocking.cpp

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,8 @@ const databento::Record& LiveBlocking::NextRecord() {
196196
}
197197

198198
const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds timeout) {
199-
// need some unread_bytes
200-
const auto unread_bytes = buffer_.ReadCapacity();
201-
if (unread_bytes == 0) {
199+
// need at least a header to read the record size
200+
if (buffer_.ReadCapacity() < sizeof(RecordHeader)) {
202201
const auto read_res = FillBuffer(timeout);
203202
if (read_res.status == Status::Timeout) {
204203
CheckHeartbeatTimeout();
@@ -208,7 +207,7 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time
208207
throw LiveApiError{"Gateway closed the session"};
209208
}
210209
}
211-
// check length
210+
// wait for the full record
212211
while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) {
213212
const auto read_res = FillBuffer(timeout);
214213
if (read_res.status == Status::Timeout) {
@@ -219,12 +218,17 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time
219218
throw LiveApiError{"Gateway closed the session"};
220219
}
221220
}
222-
current_record_ = Record{BufferRecordHeader()};
223-
const auto bytes_to_consume = current_record_.Size();
224-
buffer_.ConsumeNoShift(bytes_to_consume);
225-
current_record_ = DbnDecoder::DecodeRecordCompat(
226-
version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_);
227-
return &current_record_;
221+
return ConsumeBufferedRecord();
222+
}
223+
224+
const databento::Record* LiveBlocking::TryNextRecord() {
225+
if (buffer_.ReadCapacity() < sizeof(RecordHeader)) {
226+
return nullptr;
227+
}
228+
if (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) {
229+
return nullptr;
230+
}
231+
return ConsumeBufferedRecord();
228232
}
229233

230234
void LiveBlocking::Stop() { connection_.Close(); }
@@ -447,6 +451,10 @@ void LiveBlocking::IncrementSubCounter() {
447451
}
448452
}
449453

454+
databento::IReadable::Result LiveBlocking::FillBuffer() {
455+
return FillBuffer(HeartbeatTimeout());
456+
}
457+
450458
databento::IReadable::Result LiveBlocking::FillBuffer(
451459
std::chrono::milliseconds timeout) {
452460
buffer_.Shift();
@@ -459,6 +467,14 @@ databento::IReadable::Result LiveBlocking::FillBuffer(
459467
return read_res;
460468
}
461469

470+
const databento::Record* LiveBlocking::ConsumeBufferedRecord() {
471+
current_record_ = Record{BufferRecordHeader()};
472+
buffer_.ConsumeNoShift(current_record_.Size());
473+
current_record_ = DbnDecoder::DecodeRecordCompat(
474+
version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_);
475+
return &current_record_;
476+
}
477+
462478
databento::RecordHeader* LiveBlocking::BufferRecordHeader() {
463479
return reinterpret_cast<RecordHeader*>(buffer_.ReadBegin());
464480
}

tests/src/live_blocking_tests.cpp

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "databento/datetime.hpp"
1616
#include "databento/enums.hpp" // Schema, SType
1717
#include "databento/exceptions.hpp"
18+
#include "databento/ireadable.hpp"
1819
#include "databento/live.hpp"
1920
#include "databento/live_blocking.hpp"
2021
#include "databento/live_subscription.hpp"
@@ -765,4 +766,149 @@ TEST_F(LiveBlockingTests, TestHeartbeatTimeoutOnNextRecordWithTimeout) {
765766
EXPECT_TRUE(got_timeout_exception) << "Expected heartbeat timeout exception";
766767
}
767768

769+
TEST_F(LiveBlockingTests, TestTryNextRecordEmptyBuffer) {
770+
constexpr auto kTsOut = false;
771+
const mock::MockLsgServer mock_server{
772+
dataset::kXnasItch, kTsOut, [](mock::MockLsgServer& self) {
773+
self.Accept();
774+
self.Authenticate();
775+
std::this_thread::sleep_for(std::chrono::milliseconds{200});
776+
}};
777+
778+
LiveBlocking target = builder_.SetDataset(dataset::kXnasItch)
779+
.SetSendTsOut(kTsOut)
780+
.SetAddress(kLocalhost, mock_server.Port())
781+
.BuildBlocking();
782+
// Buffer is empty, no I/O should be performed
783+
const auto* rec = target.TryNextRecord();
784+
EXPECT_EQ(rec, nullptr);
785+
}
786+
787+
TEST_F(LiveBlockingTests, TestTryNextRecordAfterFillBuffer) {
788+
constexpr auto kTsOut = false;
789+
constexpr OhlcvMsg kRec{DummyHeader<OhlcvMsg>(RType::Ohlcv1M), 1, 2, 3, 4, 5};
790+
const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut,
791+
[kRec](mock::MockLsgServer& self) {
792+
self.Accept();
793+
self.Authenticate();
794+
self.SendRecord(kRec);
795+
}};
796+
797+
LiveBlocking target = builder_.SetDataset(dataset::kXnasItch)
798+
.SetSendTsOut(kTsOut)
799+
.SetAddress(kLocalhost, mock_server.Port())
800+
.BuildBlocking();
801+
const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000});
802+
ASSERT_EQ(fill_res.status, IReadable::Status::Ok);
803+
ASSERT_GT(fill_res.read_size, 0);
804+
const auto* rec = target.TryNextRecord();
805+
ASSERT_NE(rec, nullptr);
806+
ASSERT_TRUE(rec->Holds<OhlcvMsg>());
807+
EXPECT_EQ(rec->Get<OhlcvMsg>(), kRec);
808+
// Buffer drained
809+
EXPECT_EQ(target.TryNextRecord(), nullptr);
810+
}
811+
812+
TEST_F(LiveBlockingTests, TestFillBufferReturnsClosed) {
813+
constexpr auto kTsOut = false;
814+
const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut,
815+
[](mock::MockLsgServer& self) {
816+
self.Accept();
817+
self.Authenticate();
818+
self.Close();
819+
}};
820+
821+
LiveBlocking target = builder_.SetDataset(dataset::kXnasItch)
822+
.SetSendTsOut(kTsOut)
823+
.SetAddress(kLocalhost, mock_server.Port())
824+
.BuildBlocking();
825+
const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000});
826+
EXPECT_EQ(fill_res.status, IReadable::Status::Closed);
827+
EXPECT_EQ(fill_res.read_size, 0);
828+
}
829+
830+
TEST_F(LiveBlockingTests, TestTryNextRecordPollLoop) {
831+
constexpr auto kTsOut = false;
832+
constexpr auto kRecCount = 5;
833+
constexpr OhlcvMsg kRec{DummyHeader<OhlcvMsg>(RType::Ohlcv1M), 1, 2, 3, 4, 5};
834+
const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut,
835+
[kRec](mock::MockLsgServer& self) {
836+
self.Accept();
837+
self.Authenticate();
838+
for (size_t i = 0; i < kRecCount; ++i) {
839+
self.SendRecord(kRec);
840+
}
841+
self.Close();
842+
}};
843+
844+
LiveBlocking target = builder_.SetDataset(dataset::kXnasItch)
845+
.SetSendTsOut(kTsOut)
846+
.SetAddress(kLocalhost, mock_server.Port())
847+
.BuildBlocking();
848+
int record_count = 0;
849+
while (true) {
850+
while (const auto* rec = target.TryNextRecord()) {
851+
ASSERT_TRUE(rec->Holds<OhlcvMsg>());
852+
EXPECT_EQ(rec->Get<OhlcvMsg>(), kRec);
853+
++record_count;
854+
}
855+
const auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000});
856+
if (fill_res.status == IReadable::Status::Closed) {
857+
break;
858+
}
859+
}
860+
EXPECT_EQ(record_count, kRecCount);
861+
}
862+
863+
TEST_F(LiveBlockingTests, TestTryNextRecordPartialRecord) {
864+
constexpr auto kTsOut = false;
865+
constexpr MboMsg kRec{DummyHeader<MboMsg>(RType::Mbo),
866+
1,
867+
2,
868+
3,
869+
{},
870+
4,
871+
Action::Add,
872+
Side::Bid,
873+
UnixNanos{},
874+
TimeDeltaNanos{},
875+
100};
876+
877+
bool send_remaining{};
878+
std::mutex send_remaining_mutex;
879+
std::condition_variable send_remaining_cv;
880+
const mock::MockLsgServer mock_server{
881+
dataset::kGlbxMdp3, kTsOut,
882+
[kRec, &send_remaining, &send_remaining_mutex,
883+
&send_remaining_cv](mock::MockLsgServer& self) {
884+
self.Accept();
885+
self.Authenticate();
886+
self.SplitSendRecord(kRec, send_remaining, send_remaining_mutex,
887+
send_remaining_cv);
888+
}};
889+
890+
LiveBlocking target = builder_.SetDataset(dataset::kGlbxMdp3)
891+
.SetSendTsOut(kTsOut)
892+
.SetAddress(kLocalhost, mock_server.Port())
893+
.BuildBlocking();
894+
// Read partial record (just header)
895+
auto fill_res = target.FillBuffer(std::chrono::milliseconds{1000});
896+
ASSERT_EQ(fill_res.status, IReadable::Status::Ok);
897+
// Record is incomplete
898+
EXPECT_EQ(target.TryNextRecord(), nullptr);
899+
// Signal server to send remaining bytes
900+
{
901+
const std::lock_guard<std::mutex> lock{send_remaining_mutex};
902+
send_remaining = true;
903+
send_remaining_cv.notify_one();
904+
}
905+
// Read the rest
906+
fill_res = target.FillBuffer(std::chrono::milliseconds{1000});
907+
ASSERT_EQ(fill_res.status, IReadable::Status::Ok);
908+
const auto* rec = target.TryNextRecord();
909+
ASSERT_NE(rec, nullptr);
910+
ASSERT_TRUE(rec->Holds<MboMsg>());
911+
EXPECT_EQ(rec->Get<MboMsg>(), kRec);
912+
}
913+
768914
} // namespace databento::tests

0 commit comments

Comments
 (0)