Skip to content

Commit ed8e054

Browse files
committed
ADD: Add compression support to C++ live clients
1 parent 3a1c672 commit ed8e054

29 files changed

Lines changed: 639 additions & 122 deletions

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Changelog
22

3+
## 0.47.0 - TBD
4+
5+
### Enhancements
6+
- Added Zstd compression support to live clients which can be enabled with
7+
`LiveBuilder::SetCompression()`. It's disabled by default
8+
- Added `Compression()` getter to `LiveBlocking` and `LiveThreaded`
9+
10+
### Breaking changes
11+
- Added an overload to the `ReadSome` method on `IReadable` for timeout support
12+
313
## 0.46.1 - 2026-01-27
414

515
### Enhancements

cmake/SourcesAndHeaders.cmake

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ set(sources
5454
src/detail/dbn_buffer_decoder.cpp
5555
src/detail/http_client.cpp
5656
src/detail/json_helpers.cpp
57+
src/detail/live_connection.cpp
5758
src/detail/scoped_fd.cpp
5859
src/detail/sha256_hasher.cpp
5960
src/detail/tcp_client.cpp
61+
src/detail/tcp_readable.cpp
6062
src/detail/zstd_stream.cpp
6163
src/enums.cpp
6264
src/exceptions.cpp

examples/live/simple.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ int main() {
2525
.SetSendTsOut(true)
2626
.SetKeyFromEnv()
2727
.SetDataset(db::Dataset::GlbxMdp3)
28+
.SetCompression(db::Compression::Zstd)
2829
.BuildThreaded();
2930

3031
// Set up signal handler for Ctrl+C
3132
std::signal(SIGINT, [](int signal) { gSignal = signal; });
3233

33-
std::vector<std::string> symbols{"ESZ5", "ESZ5 C6200", "ESZ5 P5500"};
34+
std::vector<std::string> symbols{"ESZ6", "ESZ6 C8200", "ESZ6 P7500"};
3435
client.Subscribe(symbols, db::Schema::Definition, db::SType::RawSymbol);
3536
client.Subscribe(symbols, db::Schema::Mbo, db::SType::RawSymbol);
3637

include/databento/detail/buffer.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ class Buffer : public IReadable, public IWritable {
3838
// Will throw if `length > ReadCapacity()`.
3939
void ReadExact(std::byte* buffer, std::size_t length) override;
4040
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
41+
// timeout is ignored
42+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length,
43+
std::chrono::milliseconds timeout) override;
4144

4245
std::byte* ReadBegin() { return read_pos_; }
4346
std::byte* ReadEnd() { return write_pos_; }
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <cstddef>
5+
#include <cstdint>
6+
#include <optional>
7+
#include <string>
8+
#include <string_view>
9+
10+
#include "databento/detail/tcp_client.hpp"
11+
#include "databento/detail/zstd_stream.hpp"
12+
#include "databento/enums.hpp"
13+
#include "databento/ireadable.hpp"
14+
#include "databento/iwritable.hpp"
15+
16+
namespace databento::detail {
17+
// Manages the TCP connection to the live gateway with optionally compressed reads for
18+
// the DBN data.
19+
class LiveConnection : IWritable {
20+
public:
21+
LiveConnection(const std::string& gateway, std::uint16_t port);
22+
23+
void WriteAll(std::string_view str);
24+
void WriteAll(const std::byte* buffer, std::size_t size);
25+
void ReadExact(std::byte* buffer, std::size_t size);
26+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size);
27+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size,
28+
std::chrono::milliseconds timeout);
29+
// Closes the socket.
30+
void Close();
31+
// Sets compression for subsequent reads.
32+
void SetCompression(Compression compression);
33+
34+
private:
35+
TcpClient client_;
36+
std::optional<ZstdDecodeStream> zstd_stream_;
37+
};
38+
} // namespace databento::detail

include/databento/detail/tcp_client.hpp

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,11 @@
77
#include <string_view>
88

99
#include "databento/detail/scoped_fd.hpp" // ScopedFd
10+
#include "databento/ireadable.hpp"
1011

1112
namespace databento::detail {
1213
class TcpClient {
1314
public:
14-
enum class Status : std::uint8_t {
15-
Ok,
16-
Timeout,
17-
Closed,
18-
};
19-
20-
struct Result {
21-
std::size_t read_size;
22-
Status status;
23-
};
24-
2515
struct RetryConf {
2616
std::uint32_t max_attempts{1};
2717
std::chrono::seconds max_wait{std::chrono::minutes{1}};
@@ -33,11 +23,11 @@ class TcpClient {
3323
void WriteAll(std::string_view str);
3424
void WriteAll(const std::byte* buffer, std::size_t size);
3525
void ReadExact(std::byte* buffer, std::size_t size);
36-
Result ReadSome(std::byte* buffer, std::size_t max_size);
26+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size);
3727
// Passing a timeout of 0 will block until data is available of the socket is
3828
// closed, the same behavior as the Read overload without a timeout.
39-
Result ReadSome(std::byte* buffer, std::size_t max_size,
40-
std::chrono::milliseconds timeout);
29+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size,
30+
std::chrono::milliseconds timeout);
4131
// Closes the socket.
4232
void Close();
4333

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <cstddef>
5+
6+
#include "databento/detail/tcp_client.hpp"
7+
#include "databento/ireadable.hpp"
8+
9+
namespace databento::detail {
10+
// Adapter wrapping TcpClient to implement IReadable interface and be passed
11+
// as a non-owned pointer to ZstdDecodeStream.
12+
class TcpReadable : public IReadable {
13+
public:
14+
explicit TcpReadable(TcpClient* client) : client_{client} {}
15+
16+
void ReadExact(std::byte* buffer, std::size_t length) override;
17+
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
18+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length,
19+
std::chrono::milliseconds timeout) override;
20+
21+
private:
22+
TcpClient* client_;
23+
};
24+
} // namespace databento::detail

include/databento/detail/zstd_stream.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <zstd.h>
44

5+
#include <chrono>
56
#include <cstddef> // size_t
67
#include <memory> // unique_ptr
78
#include <vector>
@@ -19,9 +20,12 @@ class ZstdDecodeStream : public IReadable {
1920

2021
// Read exactly `length` bytes into `buffer`.
2122
void ReadExact(std::byte* buffer, std::size_t length) override;
22-
// Read at most `length` bytes. Returns the number of bytes read. Will only
23+
// Read at most `max_length` bytes. Returns the number of bytes read. Will only
2324
// return 0 if the end of the stream is reached.
2425
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
26+
// Read at most `max_length` bytes with timeout support.
27+
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length,
28+
std::chrono::milliseconds timeout) override;
2529

2630
IReadable* Input() const { return input_.get(); }
2731

@@ -44,6 +48,8 @@ class ZstdCompressStream : public IWritable {
4448
~ZstdCompressStream() override;
4549

4650
void WriteAll(const std::byte* buffer, std::size_t length) override;
51+
// Flush any buffered data without ending the stream
52+
void Flush();
4753

4854
private:
4955
ILogReceiver* log_receiver_;

include/databento/file_stream.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ class InFileStream : public IReadable {
1717
// Read at most `length` bytes. Returns the number of bytes read. Will only
1818
// return 0 if the end of the stream is reached.
1919
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
20+
// timeout is ignored
21+
Result ReadSome(std::byte* buffer, std::size_t max_length,
22+
std::chrono::milliseconds timeout) override;
2023

2124
private:
2225
std::ifstream stream_;

include/databento/ireadable.hpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,37 @@
11
#pragma once
22

3+
#include <chrono> // milliseconds
34
#include <cstddef> // byte, size_t
5+
#include <cstdint> // uint8_t
46

57
namespace databento {
68
// An abstract class for readable objects to allow for runtime polymorphism
79
// around DBN decoding.
810
class IReadable {
911
public:
12+
enum class Status : std::uint8_t {
13+
Ok, // Data read successfully
14+
Timeout, // Timeout reached before any data available
15+
Closed, // Stream is closed/EOF
16+
};
17+
18+
struct Result {
19+
std::size_t read_size; // Number of bytes read
20+
Status status; // Status of the read operation
21+
};
22+
1023
virtual ~IReadable() = default;
1124

1225
// Read exactly `length` bytes into `buffer`.
1326
virtual void ReadExact(std::byte* buffer, std::size_t length) = 0;
14-
// Read at most `length` bytes. Returns the number of bytes read. Will only
15-
// return 0 if the end of the stream is reached.
27+
// Read at most `length` bytes. Returns the number of bytes read. Will only return 0
28+
// if the end of the stream is reached.
1629
virtual std::size_t ReadSome(std::byte* buffer, std::size_t max_length) = 0;
30+
// Read at most `max_length` bytes with timeout support. Returns Result with bytes
31+
// read and status. Status will be Timeout if no data available within timeout period,
32+
// Closed if stream is closed, or Ok if data was read. A timeout of 0 means wait
33+
// indefinitely (same as the no-timeout overload).
34+
virtual Result ReadSome(std::byte* buffer, std::size_t max_length,
35+
std::chrono::milliseconds timeout) = 0;
1736
};
1837
} // namespace databento

0 commit comments

Comments
 (0)