Skip to content

Commit 21ddca3

Browse files
committed
ADD: Add pull-based TimeseriesGetRange overloads
1 parent 7c0d86c commit 21ddca3

33 files changed

+539
-229
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99

1010
### Enhancements
1111
- Added support for `progress` field in `BatchJob` response
12+
- Added blocking (pull-based) `TimeseriesGetRange` overloads that return a `DbnStore`
13+
with `GetMetadata()` and `NextRecord()` methods, unifying the consumption pattern
14+
across historical, live, and file sources
15+
- Generalized `DbnFileStore` into `DbnStore` which accepts any `IReadable` source.
16+
`DbnFileStore` is retained as a type alias for backwards compatibility
17+
- Upgraded default `httplib` version to 0.37.2
1218

1319
## 0.50.0 - 2026-03-03
1420

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_HTTPLIB)
178178
find_package(httplib REQUIRED)
179179
endif()
180180
else()
181-
set(httplib_version 0.30.1)
181+
set(httplib_version 0.37.2)
182182
FetchContent_Declare(
183183
httplib
184184
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v${httplib_version}.tar.gz

README.md

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,15 @@ namespace db = databento;
138138
139139
int main() {
140140
auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build();
141-
db::TsSymbolMap symbol_map;
142-
auto decode_symbols = [&symbol_map](const db::Metadata& metadata) {
143-
symbol_map = metadata.CreateSymbolMap();
144-
};
145-
auto print_trades = [&symbol_map](const db::Record& record) {
146-
const auto& trade_msg = record.Get<db::TradeMsg>();
147-
std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg
148-
<< '\n';
149-
return db::KeepGoing::Continue;
150-
};
151-
client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
152-
{"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol,
153-
db::SType::InstrumentId, {}, decode_symbols, print_trades);
141+
auto store = client.TimeseriesGetRange(
142+
"GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
143+
{"ESM2", "NQZ2"}, db::Schema::Trades);
144+
auto symbol_map = store.GetMetadata().CreateSymbolMap();
145+
while (const auto* record = store.NextRecord()) {
146+
const auto& trade_msg = record->Get<db::TradeMsg>();
147+
std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": "
148+
<< trade_msg << '\n';
149+
}
154150
}
155151
```
156152

cmake/SourcesAndHeaders.cmake

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ set(headers
77
include/databento/dbn_decoder.hpp
88
include/databento/dbn_encoder.hpp
99
include/databento/dbn_file_store.hpp
10+
include/databento/dbn_store.hpp
1011
include/databento/detail/buffer.hpp
1112
include/databento/detail/dbn_buffer_decoder.hpp
1213
include/databento/detail/http_client.hpp
@@ -39,7 +40,8 @@ set(headers
3940
include/databento/v2.hpp
4041
include/databento/v3.hpp
4142
include/databento/with_ts_out.hpp
42-
src/stream_op_helper.hpp
43+
src/detail/http_stream_reader.hpp
44+
src/detail/stream_op_helper.hpp
4345
)
4446

4547
set(sources
@@ -49,10 +51,11 @@ set(sources
4951
src/dbn_constants.hpp
5052
src/dbn_decoder.cpp
5153
src/dbn_encoder.cpp
52-
src/dbn_file_store.cpp
54+
src/dbn_store.cpp
5355
src/detail/buffer.cpp
5456
src/detail/dbn_buffer_decoder.cpp
5557
src/detail/http_client.cpp
58+
src/detail/http_stream_reader.cpp
5659
src/detail/json_helpers.cpp
5760
src/detail/live_connection.cpp
5861
src/detail/scoped_fd.cpp

examples/historical/readme.cpp

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,13 @@ namespace db = databento;
99

1010
int main() {
1111
auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build();
12-
db::TsSymbolMap symbol_map;
13-
auto decode_symbols = [&symbol_map](const db::Metadata& metadata) {
14-
symbol_map = metadata.CreateSymbolMap();
15-
};
16-
auto print_trades = [&symbol_map](const db::Record& record) {
17-
const auto& trade_msg = record.Get<db::TradeMsg>();
12+
auto store =
13+
client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
14+
{"ESM2", "NQZ2"}, db::Schema::Trades);
15+
auto symbol_map = store.GetMetadata().CreateSymbolMap();
16+
while (const auto* record = store.NextRecord()) {
17+
const auto& trade_msg = record->Get<db::TradeMsg>();
1818
std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg
1919
<< '\n';
20-
return db::KeepGoing::Continue;
21-
};
22-
client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
23-
{"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol,
24-
db::SType::InstrumentId, {}, decode_symbols, print_trades);
20+
}
2521
}

examples/historical/timeseries_get_range.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@ namespace db = databento;
99
int main() {
1010
auto client = db::Historical::Builder().SetKeyFromEnv().Build();
1111
const auto limit = 1000;
12-
client.TimeseriesGetRange(
12+
auto store = client.TimeseriesGetRange(
1313
db::dataset::kGlbxMdp3, db::DateTimeRange<std::string>{"2022-10-03"}, {"ESZ2"},
14-
db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit,
15-
[](db::Metadata&& metadata) { std::cout << metadata << '\n'; },
16-
[](const db::Record& record) {
17-
const auto& trade_msg = record.Get<db::TradeMsg>();
18-
std::cout << trade_msg << '\n';
19-
return db::KeepGoing::Continue;
20-
});
14+
db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit);
15+
std::cout << store.GetMetadata() << '\n';
16+
while (const auto* record = store.NextRecord()) {
17+
const auto& trade_msg = record->Get<db::TradeMsg>();
18+
std::cout << trade_msg << '\n';
19+
}
2120

2221
return 0;
2322
}
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#include "databento/constants.hpp"
22
#include "databento/datetime.hpp"
3-
#include "databento/dbn_file_store.hpp"
3+
#include "databento/dbn_store.hpp"
44
#include "databento/enums.hpp"
55
#include "databento/historical.hpp"
66
#include "databento/record.hpp"
@@ -10,14 +10,13 @@ namespace db = databento;
1010
int main() {
1111
auto client = db::Historical::Builder().SetKeyFromEnv().Build();
1212
const auto limit = 1000;
13-
db::DbnFileStore dbn_file_store = client.TimeseriesGetRangeToFile(
13+
db::DbnStore dbn_store = client.TimeseriesGetRangeToFile(
1414
db::dataset::kGlbxMdp3, {"2022-10-03T00:00", "2022-10-04T00:00"}, {"ESZ2"},
1515
db::Schema::Ohlcv1M, db::SType::RawSymbol, db::SType::InstrumentId, limit,
1616
"ESZ2-ohlcv1m-20201003-20201004.dbn.zst");
17-
dbn_file_store.Replay([](const db::Record record) {
18-
const auto& ohlcv_bar = record.Get<db::OhlcvMsg>();
17+
while (const auto* record = dbn_store.NextRecord()) {
18+
const auto& ohlcv_bar = record->Get<db::OhlcvMsg>();
1919
std::cout << ohlcv_bar << '\n';
20-
return db::KeepGoing::Continue;
21-
});
20+
}
2221
return 0;
2322
}
Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,7 @@
11
#pragma once
22

3-
#include <filesystem> // path
4-
5-
#include "databento/dbn.hpp" // DecodeMetadata
6-
#include "databento/dbn_decoder.hpp" // DbnDecoder
7-
#include "databento/enums.hpp" // VersionUpgradePolicy
8-
#include "databento/log.hpp"
9-
#include "databento/record.hpp"
10-
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
3+
#include "databento/dbn_store.hpp"
114

125
namespace databento {
13-
// A reader for DBN files. This class provides both a callback API similar to
14-
// TimeseriesGetRange in historical data and LiveThreaded for live data as well
15-
// as a blocking API similar to that of LiveBlocking. Only one API should be
16-
// used on a given instance.
17-
class DbnFileStore {
18-
public:
19-
explicit DbnFileStore(const std::filesystem::path& file_path);
20-
DbnFileStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path,
21-
VersionUpgradePolicy upgrade_policy);
22-
23-
// Callback API: calling Replay consumes the input.
24-
void Replay(const MetadataCallback& metadata_callback,
25-
const RecordCallback& record_callback);
26-
void Replay(const RecordCallback& record_callback);
27-
28-
// Blocking API
29-
const Metadata& GetMetadata();
30-
// Returns the next record or `nullptr` if there are no remaining records.
31-
const Record* NextRecord();
32-
33-
private:
34-
void MaybeDecodeMetadata();
35-
36-
DbnDecoder decoder_;
37-
Metadata metadata_{};
38-
bool has_decoded_metadata_{false};
39-
};
40-
} // namespace databento
6+
using DbnFileStore = DbnStore;
7+
}

include/databento/dbn_store.hpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#pragma once
2+
3+
#include <filesystem> // path
4+
#include <memory> // unique_ptr
5+
6+
#include "databento/dbn.hpp" // DecodeMetadata
7+
#include "databento/dbn_decoder.hpp" // DbnDecoder
8+
#include "databento/enums.hpp" // VersionUpgradePolicy
9+
#include "databento/ireadable.hpp"
10+
#include "databento/log.hpp"
11+
#include "databento/record.hpp"
12+
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
13+
14+
namespace databento {
15+
// A reader for DBN data from files or streams. This class provides both a callback API
16+
// similar to `LiveThreaded` for live data as well as a blocking API similar to that of
17+
// `LiveBlocking`. Only one API should be used on a given instance.
18+
class DbnStore {
19+
public:
20+
explicit DbnStore(const std::filesystem::path& file_path);
21+
DbnStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path,
22+
VersionUpgradePolicy upgrade_policy);
23+
DbnStore(ILogReceiver* log_receiver, std::unique_ptr<IReadable> input,
24+
VersionUpgradePolicy upgrade_policy);
25+
26+
// Callback API: calling Replay consumes the input.
27+
void Replay(const MetadataCallback& metadata_callback,
28+
const RecordCallback& record_callback);
29+
void Replay(const RecordCallback& record_callback);
30+
31+
// Blocking API
32+
const databento::Metadata& GetMetadata();
33+
// Returns the next record or `nullptr` if there are no remaining records.
34+
const Record* NextRecord();
35+
36+
private:
37+
void MaybeDecodeMetadata();
38+
39+
DbnDecoder decoder_;
40+
databento::Metadata metadata_{};
41+
bool has_decoded_metadata_{false};
42+
};
43+
} // namespace databento

include/databento/detail/http_client.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
#include <nlohmann/json.hpp>
1010

1111
#include <cstdint>
12+
#include <memory> // unique_ptr
1213
#include <string>
1314

1415
namespace databento {
1516
class ILogReceiver;
17+
class IReadable;
1618
namespace detail {
1719
class HttpClient {
1820
public:
@@ -27,6 +29,8 @@ class HttpClient {
2729
const httplib::ContentReceiver& callback);
2830
void PostRawStream(const std::string& path, const httplib::Params& form_params,
2931
const httplib::ContentReceiver& callback);
32+
std::unique_ptr<IReadable> OpenPostStream(const std::string& path,
33+
const httplib::Params& form_params);
3034

3135
private:
3236
static bool IsErrorStatus(int status_code);

0 commit comments

Comments
 (0)