Skip to content

Commit a8b95ad

Browse files
committed
MOD: Update protocol for slow reader differences
1 parent 2868ef8 commit a8b95ad

12 files changed

Lines changed: 54 additions & 52 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
## 0.47.1 - TBD
44

55
### Enhancements
6-
- Added `SlowReadBehavior` enum and `LiveBuilder::SetSlowReadBehavior()` to configure
6+
- Added `SlowReaderBehavior` enum and `LiveBuilder::SetSlowReaderBehavior()` to configure
77
gateway behavior when client falls behind
8-
- Added `SlowReadBehavior()` getter to `LiveBlocking` and `LiveThreaded`
8+
- Added `SlowReaderBehavior()` getter to `LiveBlocking` and `LiveThreaded`
99

1010
### Bug fixes
1111
- Added conversion for missing schemas for function `RTypeFromSchema`

include/databento/enums.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ enum class DatasetCondition : std::uint8_t {
5050

5151
// Live session parameter which controls gateway behavior when the client
5252
// falls behind real time.
53-
enum class SlowReadBehavior : std::uint8_t {
53+
enum class SlowReaderBehavior : std::uint8_t {
5454
// Send a warning but continue reading.
5555
Warn = 0,
5656
// Skip records to catch up.
@@ -671,7 +671,7 @@ const char* ToString(SplitDuration duration_interval);
671671
const char* ToString(Delivery delivery);
672672
const char* ToString(JobState state);
673673
const char* ToString(DatasetCondition condition);
674-
const char* ToString(SlowReadBehavior slow_read_behavior);
674+
const char* ToString(SlowReaderBehavior slow_reader_behavior);
675675
const char* ToString(RType r_type);
676676
const char* ToString(Side side);
677677
const char* ToString(Action action);
@@ -698,7 +698,7 @@ std::ostream& operator<<(std::ostream& out, SplitDuration duration_interval);
698698
std::ostream& operator<<(std::ostream& out, Delivery delivery);
699699
std::ostream& operator<<(std::ostream& out, JobState state);
700700
std::ostream& operator<<(std::ostream& out, DatasetCondition condition);
701-
std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior);
701+
std::ostream& operator<<(std::ostream& out, SlowReaderBehavior slow_reader_behavior);
702702
std::ostream& operator<<(std::ostream& out, RType r_type);
703703
std::ostream& operator<<(std::ostream& out, Side side);
704704
std::ostream& operator<<(std::ostream& out, Action action);

include/databento/live.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class LiveBuilder {
5656
// Sets the compression mode for the read stream.
5757
LiveBuilder& SetCompression(Compression compression);
5858
// Sets the behavior of the gateway when the client falls behind real time.
59-
LiveBuilder& SetSlowReadBehavior(SlowReadBehavior slow_read_behavior);
59+
LiveBuilder& SetSlowReaderBehavior(SlowReaderBehavior slow_reader_behavior);
6060

6161
/*
6262
* Build a live client instance
@@ -84,6 +84,6 @@ class LiveBuilder {
8484
std::size_t buffer_size_;
8585
std::string user_agent_ext_;
8686
Compression compression_{Compression::None};
87-
std::optional<SlowReadBehavior> slow_read_behavior_{};
87+
std::optional<SlowReaderBehavior> slow_reader_behavior_{};
8888
};
8989
} // namespace databento

include/databento/live_blocking.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ class LiveBlocking {
4545
return heartbeat_interval_;
4646
}
4747
databento::Compression Compression() const { return compression_; }
48-
std::optional<databento::SlowReadBehavior> SlowReadBehavior() const {
49-
return slow_read_behavior_;
48+
std::optional<databento::SlowReaderBehavior> SlowReaderBehavior() const {
49+
return slow_reader_behavior_;
5050
}
5151
const std::vector<LiveSubscription>& Subscriptions() const { return subscriptions_; }
5252
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }
@@ -99,14 +99,14 @@ class LiveBlocking {
9999
std::optional<std::chrono::seconds> heartbeat_interval,
100100
std::size_t buffer_size, std::string user_agent_ext,
101101
databento::Compression compression,
102-
std::optional<databento::SlowReadBehavior> slow_read_behavior);
102+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
103103
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
104104
std::string gateway, std::uint16_t port, bool send_ts_out,
105105
VersionUpgradePolicy upgrade_policy,
106106
std::optional<std::chrono::seconds> heartbeat_interval,
107107
std::size_t buffer_size, std::string user_agent_ext,
108108
databento::Compression compression,
109-
std::optional<databento::SlowReadBehavior> slow_read_behavior);
109+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
110110

111111
std::string DetermineGateway() const;
112112
std::uint64_t Authenticate();
@@ -133,7 +133,7 @@ class LiveBlocking {
133133
const VersionUpgradePolicy upgrade_policy_;
134134
const std::optional<std::chrono::seconds> heartbeat_interval_;
135135
const databento::Compression compression_;
136-
const std::optional<databento::SlowReadBehavior> slow_read_behavior_;
136+
const std::optional<databento::SlowReaderBehavior> slow_reader_behavior_;
137137
detail::LiveConnection connection_;
138138
std::uint32_t sub_counter_{};
139139
std::vector<LiveSubscription> subscriptions_;

include/databento/live_threaded.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class LiveThreaded {
5555
VersionUpgradePolicy UpgradePolicy() const;
5656
std::optional<std::chrono::seconds> HeartbeatInterval() const;
5757
databento::Compression Compression() const;
58-
std::optional<databento::SlowReadBehavior> SlowReadBehavior() const;
58+
std::optional<databento::SlowReaderBehavior> SlowReaderBehavior() const;
5959
const std::vector<LiveSubscription>& Subscriptions() const;
6060
std::vector<LiveSubscription>& Subscriptions();
6161

@@ -111,14 +111,14 @@ class LiveThreaded {
111111
std::optional<std::chrono::seconds> heartbeat_interval,
112112
std::size_t buffer_size, std::string user_agent_ext,
113113
databento::Compression compression,
114-
std::optional<databento::SlowReadBehavior> slow_read_behavior);
114+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
115115
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
116116
std::string gateway, std::uint16_t port, bool send_ts_out,
117117
VersionUpgradePolicy upgrade_policy,
118118
std::optional<std::chrono::seconds> heartbeat_interval,
119119
std::size_t buffer_size, std::string user_agent_ext,
120120
databento::Compression compression,
121-
std::optional<databento::SlowReadBehavior> slow_read_behavior);
121+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
122122

123123
// unique_ptr to be movable
124124
std::unique_ptr<Impl> impl_;

src/enums.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,13 @@ const char* ToString(DatasetCondition condition) {
108108
}
109109
}
110110

111-
const char* ToString(SlowReadBehavior slow_read_behavior) {
112-
switch (slow_read_behavior) {
113-
case SlowReadBehavior::Warn: {
111+
const char* ToString(SlowReaderBehavior slow_reader_behavior) {
112+
switch (slow_reader_behavior) {
113+
case SlowReaderBehavior::Warn: {
114114
return "warn";
115115
}
116-
case SlowReadBehavior::Skip: {
117-
return "skip";
116+
case SlowReaderBehavior::Skip: {
117+
return "drop";
118118
}
119119
default: {
120120
return "Unknown";
@@ -861,8 +861,8 @@ std::ostream& operator<<(std::ostream& out, DatasetCondition condition) {
861861
return out;
862862
}
863863

864-
std::ostream& operator<<(std::ostream& out, SlowReadBehavior slow_read_behavior) {
865-
out << ToString(slow_read_behavior);
864+
std::ostream& operator<<(std::ostream& out, SlowReaderBehavior slow_reader_behavior) {
865+
out << ToString(slow_reader_behavior);
866866
return out;
867867
}
868868

src/live.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ LiveBuilder& LiveBuilder::SetCompression(Compression compression) {
8484
return *this;
8585
}
8686

87-
LiveBuilder& LiveBuilder::SetSlowReadBehavior(SlowReadBehavior slow_read_behavior) {
88-
slow_read_behavior_ = slow_read_behavior;
87+
LiveBuilder& LiveBuilder::SetSlowReaderBehavior(
88+
SlowReaderBehavior slow_reader_behavior) {
89+
slow_reader_behavior_ = slow_reader_behavior;
8990
return *this;
9091
}
9192

@@ -96,14 +97,14 @@ databento::LiveBlocking LiveBuilder::BuildBlocking() {
9697
dataset_, send_ts_out_,
9798
upgrade_policy_, heartbeat_interval_,
9899
buffer_size_, user_agent_ext_,
99-
compression_, slow_read_behavior_};
100+
compression_, slow_reader_behavior_};
100101
}
101102
return databento::LiveBlocking{log_receiver_, key_,
102103
dataset_, gateway_,
103104
port_, send_ts_out_,
104105
upgrade_policy_, heartbeat_interval_,
105106
buffer_size_, user_agent_ext_,
106-
compression_, slow_read_behavior_};
107+
compression_, slow_reader_behavior_};
107108
}
108109

109110
databento::LiveThreaded LiveBuilder::BuildThreaded() {
@@ -113,14 +114,14 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() {
113114
dataset_, send_ts_out_,
114115
upgrade_policy_, heartbeat_interval_,
115116
buffer_size_, user_agent_ext_,
116-
compression_, slow_read_behavior_};
117+
compression_, slow_reader_behavior_};
117118
}
118119
return databento::LiveThreaded{log_receiver_, key_,
119120
dataset_, gateway_,
120121
port_, send_ts_out_,
121122
upgrade_policy_, heartbeat_interval_,
122123
buffer_size_, user_agent_ext_,
123-
compression_, slow_read_behavior_};
124+
compression_, slow_reader_behavior_};
124125
}
125126

126127
void LiveBuilder::Validate() {

src/live_blocking.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ LiveBlocking::LiveBlocking(
3333
VersionUpgradePolicy upgrade_policy,
3434
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
3535
std::string user_agent_ext, databento::Compression compression,
36-
std::optional<databento::SlowReadBehavior> slow_read_behavior)
36+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior)
3737

3838
: log_receiver_{log_receiver},
3939
key_{std::move(key)},
@@ -45,7 +45,7 @@ LiveBlocking::LiveBlocking(
4545
upgrade_policy_{upgrade_policy},
4646
heartbeat_interval_{heartbeat_interval},
4747
compression_{compression},
48-
slow_read_behavior_{slow_read_behavior},
48+
slow_reader_behavior_{slow_reader_behavior},
4949
connection_{gateway_, port_},
5050
buffer_{buffer_size},
5151
session_id_{this->Authenticate()} {}
@@ -56,7 +56,7 @@ LiveBlocking::LiveBlocking(
5656
VersionUpgradePolicy upgrade_policy,
5757
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
5858
std::string user_agent_ext, databento::Compression compression,
59-
std::optional<databento::SlowReadBehavior> slow_read_behavior)
59+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior)
6060
: log_receiver_{log_receiver},
6161
key_{std::move(key)},
6262
dataset_{std::move(dataset)},
@@ -67,7 +67,7 @@ LiveBlocking::LiveBlocking(
6767
upgrade_policy_{upgrade_policy},
6868
heartbeat_interval_{heartbeat_interval},
6969
compression_{compression},
70-
slow_read_behavior_{slow_read_behavior},
70+
slow_reader_behavior_{slow_reader_behavior},
7171
connection_{gateway_, port_},
7272
buffer_{buffer_size},
7373
session_id_{this->Authenticate()} {}
@@ -343,8 +343,8 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) {
343343
if (heartbeat_interval_.has_value()) {
344344
req_stream << "|heartbeat_interval_s=" << heartbeat_interval_->count();
345345
}
346-
if (slow_read_behavior_.has_value()) {
347-
req_stream << "|slow_read_behavior=" << *slow_read_behavior_;
346+
if (slow_reader_behavior_.has_value()) {
347+
req_stream << "|slow_reader_behavior=" << *slow_reader_behavior_;
348348
}
349349
req_stream << '\n';
350350
return req_stream.str();

src/live_threaded.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,23 @@ LiveThreaded::LiveThreaded(
6262
VersionUpgradePolicy upgrade_policy,
6363
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
6464
std::string user_agent_ext, databento::Compression compression,
65-
std::optional<databento::SlowReadBehavior> slow_read_behavior)
65+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior)
6666
: impl_{std::make_unique<Impl>(log_receiver, std::move(key), std::move(dataset),
6767
send_ts_out, upgrade_policy, heartbeat_interval,
6868
buffer_size, std::move(user_agent_ext), compression,
69-
slow_read_behavior)} {}
69+
slow_reader_behavior)} {}
7070

7171
LiveThreaded::LiveThreaded(
7272
ILogReceiver* log_receiver, std::string key, std::string dataset,
7373
std::string gateway, std::uint16_t port, bool send_ts_out,
7474
VersionUpgradePolicy upgrade_policy,
7575
std::optional<std::chrono::seconds> heartbeat_interval, std::size_t buffer_size,
7676
std::string user_agent_ext, databento::Compression compression,
77-
std::optional<databento::SlowReadBehavior> slow_read_behavior)
77+
std::optional<databento::SlowReaderBehavior> slow_reader_behavior)
7878
: impl_{std::make_unique<Impl>(
7979
log_receiver, std::move(key), std::move(dataset), std::move(gateway), port,
8080
send_ts_out, upgrade_policy, heartbeat_interval, buffer_size,
81-
std::move(user_agent_ext), compression, slow_read_behavior)} {}
81+
std::move(user_agent_ext), compression, slow_reader_behavior)} {}
8282

8383
const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); }
8484

@@ -102,8 +102,8 @@ databento::Compression LiveThreaded::Compression() const {
102102
return impl_->blocking.Compression();
103103
}
104104

105-
std::optional<databento::SlowReadBehavior> LiveThreaded::SlowReadBehavior() const {
106-
return impl_->blocking.SlowReadBehavior();
105+
std::optional<databento::SlowReaderBehavior> LiveThreaded::SlowReaderBehavior() const {
106+
return impl_->blocking.SlowReaderBehavior();
107107
}
108108

109109
const std::vector<databento::LiveSubscription>& LiveThreaded::Subscriptions() const {

tests/include/mock/mock_lsg_server.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class MockLsgServer {
4949
std::function<void(MockLsgServer&)> serve_fn);
5050
MockLsgServer(std::string dataset, bool ts_out, Compression compression,
5151
std::function<void(MockLsgServer&)> serve_fn);
52-
MockLsgServer(std::string dataset, bool ts_out, SlowReadBehavior slow_read_behavior,
52+
MockLsgServer(std::string dataset, bool ts_out,
53+
SlowReaderBehavior slow_reader_behavior,
5354
std::function<void(MockLsgServer&)> serve_fn);
5455

5556
std::uint16_t Port() const { return port_; }
@@ -121,7 +122,7 @@ class MockLsgServer {
121122
bool ts_out_;
122123
std::chrono::seconds heartbeat_interval_;
123124
Compression compression_{Compression::None};
124-
std::optional<SlowReadBehavior> slow_read_behavior_{};
125+
std::optional<SlowReaderBehavior> slow_reader_behavior_{};
125126
std::uint16_t port_{};
126127
detail::ScopedFd socket_{};
127128
detail::ScopedFd conn_fd_{};

0 commit comments

Comments
 (0)