Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ jobs:
-DHAVE_RULES=ON
cmake --build build --config Release --parallel
sudo cmake --install build --prefix /usr
- name: Install gtest
uses: MarkusJx/googletest-installer@v1.1
- name: CMake configure
run: |
cmake -S . -B build \
-GNinja \
-DCMAKE_CXX_COMPILER=${{ matrix.compiler }} \
-DDATABENTO_USE_EXTERNAL_GTEST=0 \
-DDATABENTO_ENABLE_UNIT_TESTING=1 \
-DDATABENTO_ENABLE_EXAMPLES=1 \
-DDATABENTO_ENABLE_CLANG_TIDY=1 \
Expand Down
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
# Changelog

## 0.38.0 - 2025-06-10

### Enhancements
- Made the buffer size used by the live clients when reading from the TCP socket
configurable through the `LiveBuilder::SetBufferSize()` method
- Added log level prefix to `ConsoleLogReceiver` output
- Added `iomanip` compatibility: fill, precision, and width to `pretty::Px` (formerly
`FixPx`)
- Added new `pretty::Ts` helper type for human-readable formatting of `UnixNanos`

### Breaking changes
- Live client instances can only be created through the `LiveBuilder` class
- Changed `HeartbeatInterval()` getters on `LiveBlocking` and `LiveThreaded` to return
an `std::optional`
- Added new optional `ShouldLog` virtual method to `ILogReceiver` to
filter the levels of log messages that will be sent to the receiver

### Deprecations
- Deprecated `FixPx` in favor of `pretty::Px` which has consistent naming with the API
and the Python and Rust client libraries

### Bug fixes
- Ensure `CPPHTTPLIB_OPENSSL_SUPPORT` is defined at all locations where `cpp-httplib`
is included

## 0.37.1 - 2025-06-03

### Bug fixes
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0)

project(
databento
VERSION 0.37.1
VERSION 0.38.0
LANGUAGES CXX
DESCRIPTION "Official Databento client library"
)
Expand Down
3 changes: 2 additions & 1 deletion cmake/SourcesAndHeaders.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(headers
include/databento/live_threaded.hpp
include/databento/log.hpp
include/databento/metadata.hpp
include/databento/pretty.hpp
include/databento/publishers.hpp
include/databento/record.hpp
include/databento/symbol_map.hpp
Expand Down Expand Up @@ -58,14 +59,14 @@ set(sources
src/enums.cpp
src/exceptions.cpp
src/file_stream.cpp
src/fixed_price.cpp
src/flag_set.cpp
src/historical.cpp
src/live.cpp
src/live_blocking.cpp
src/live_threaded.cpp
src/log.cpp
src/metadata.cpp
src/pretty.cpp
src/publishers.cpp
src/record.cpp
src/symbol_map.cpp
Expand Down
8 changes: 5 additions & 3 deletions include/databento/detail/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
namespace databento::detail {
class Buffer : public IReadable, public IWritable {
public:
Buffer() : Buffer(64 * std::size_t{1 << 10}) {}
static constexpr std::size_t kDefaultBufSize = 64 * std::size_t{1 << 10};

Buffer() : Buffer(kDefaultBufSize) {}
explicit Buffer(std::size_t init_capacity)
: buf_{AlignedNew(init_capacity), AlignedDelete},
end_{buf_.get() + init_capacity},
Expand Down Expand Up @@ -80,7 +82,7 @@ class Buffer : public IReadable, public IWritable {

UniqueBufPtr buf_;
std::byte* end_;
std::byte* read_pos_{};
std::byte* write_pos_{};
std::byte* read_pos_;
std::byte* write_pos_;
};
} // namespace databento::detail
3 changes: 3 additions & 0 deletions include/databento/exceptions.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once

#ifndef CPPHTTPLIB_OPENSSL_SUPPORT
#define CPPHTTPLIB_OPENSSL_SUPPORT
#endif
#include <httplib.h> // Error
#include <nlohmann/json.hpp> // json, parse_error

Expand Down
21 changes: 6 additions & 15 deletions include/databento/fixed_price.hpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
#pragma once

#include <cstdint>
#include <sstream>
#include <string>

#include "databento/constants.hpp"
#include "databento/pretty.hpp"

namespace databento {
// A fixed-precision price.
struct FixPx {
bool IsUndefined() const { return val == databento::kUndefPrice; }
// Has been renamed to pretty::Px
using FixPx [[deprecated]] = pretty::Px;

std::int64_t val;
};

std::ostream& operator<<(std::ostream& stream, FixPx fix_px);

// Convert a fixed-precision price to a formatted string.
inline std::string PxToString(std::int64_t px) {
std::ostringstream ss;
ss << FixPx{px};
return ss.str();
// Has been moved to the pretty namespace
[[deprecated]] inline std::string PxToString(std::int64_t px) {
return pretty::PxToString(px);
}
} // namespace databento
26 changes: 23 additions & 3 deletions include/databento/live.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <chrono>
#include <cstddef>
#include <string>

#include "databento/enums.hpp" // VersionUpgradePolicy
Expand All @@ -9,22 +10,32 @@
#include "databento/publishers.hpp"

namespace databento {
// Forward declarations
class ILogReceiver;

// A helper class for constructing a Live client, either an instance of
// LiveBlocking or LiveThreaded.
class LiveBuilder {
public:
LiveBuilder() = default;
LiveBuilder();

/*
* Required settters
*/

// Sets `key_` based on the environment variable DATABENTO_API_KEY.
//
// NOTE: This is not thread-safe if `std::setenv` is used elsewhere in the
// program.
LiveBuilder& SetKeyFromEnv();
LiveBuilder& SetKey(std::string key);
LiveBuilder& SetDataset(std::string dataset);
LiveBuilder& SetDataset(Dataset dataset);
LiveBuilder& SetDataset(std::string dataset);

/*
* Optional settters
*/

// Whether to append the gateway send timestamp after each DBN message.
LiveBuilder& SetSendTsOut(bool send_ts_out);
// Set the version upgrade policy for when receiving DBN data from a prior
Expand All @@ -36,6 +47,13 @@ class LiveBuilder {
LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval);
// Overrides the gateway and port. This is an advanced method.
LiveBuilder& SetAddress(std::string gateway, std::uint16_t port);
// Overrides the size of the buffer used for reading data from the TCP socket.
LiveBuilder& SetBufferSize(std::size_t size);

/*
* Build a live client instance
*/

// Attempts to construct an instance of a blocking live client or throws an
// exception.
LiveBlocking BuildBlocking();
Expand All @@ -51,8 +69,10 @@ class LiveBuilder {
std::uint16_t port_{};
std::string key_;
std::string dataset_;

bool send_ts_out_{false};
VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3};
std::chrono::seconds heartbeat_interval_{};
std::optional<std::chrono::seconds> heartbeat_interval_{};
std::size_t buffer_size_;
};
} // namespace databento
32 changes: 21 additions & 11 deletions include/databento/live_blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <chrono> // milliseconds
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <utility> // pair
Expand All @@ -18,21 +19,17 @@
#include "databento/record.hpp" // Record, RecordHeader

namespace databento {
// Forward declaration
class ILogReceiver;
class LiveBuilder;
class LiveThreaded;

// A client for interfacing with Databento's real-time and intraday replay
// market data API. This client provides a blocking API for getting the next
// record. Unlike Historical, each instance of LiveBlocking is associated with a
// particular dataset.
class LiveBlocking {
public:
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::chrono::seconds heartbeat_interval);
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::chrono::seconds heartbeat_interval);
/*
* Getters
*/
Expand All @@ -45,8 +42,8 @@ class LiveBlocking {
VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; }
// The the first member of the pair will be true, when the heartbeat interval
// was overridden.
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const {
return {heartbeat_interval_.count() > 0, heartbeat_interval_};
std::optional<std::chrono::seconds> HeartbeatInterval() const {
return heartbeat_interval_;
}
const std::vector<LiveSubscription>& Subscriptions() const {
return subscriptions_;
Expand Down Expand Up @@ -93,6 +90,19 @@ class LiveBlocking {
void Resubscribe();

private:
friend LiveBuilder;
friend LiveThreaded;

LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size);
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size);

std::string DetermineGateway() const;
std::uint64_t Authenticate();
std::string DecodeChallenge();
Expand All @@ -115,11 +125,11 @@ class LiveBlocking {
bool send_ts_out_;
std::uint8_t version_{};
VersionUpgradePolicy upgrade_policy_;
std::chrono::seconds heartbeat_interval_;
std::optional<std::chrono::seconds> heartbeat_interval_;
detail::TcpClient client_;
std::uint32_t sub_counter_{};
std::vector<LiveSubscription> subscriptions_;
detail::Buffer buffer_{};
detail::Buffer buffer_;
// Must be 8-byte aligned for records
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
std::uint64_t session_id_;
Expand Down
32 changes: 21 additions & 11 deletions include/databento/live_threaded.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <functional> // function
#include <memory> // unique_ptr
#include <optional>
#include <string>
#include <string_view>
#include <utility> // pair
#include <vector>

Expand All @@ -14,15 +17,17 @@
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback

namespace databento {
// Forward declaration
class ILogReceiver;
class LiveBuilder;

// A client for interfacing with Databento's real-time and intraday replay
// market data API. This client provides a threaded event-driven API for
// receiving the next record. Unlike Historical, each instance of LiveThreaded
// is associated with a particular dataset.
class LiveThreaded {
public:
enum class ExceptionAction {
enum class ExceptionAction : std::uint8_t {
// Start a new session. Return this instead of calling `Start`, which would
// cause a deadlock.
Restart,
Expand All @@ -32,13 +37,6 @@ class LiveThreaded {
using ExceptionCallback =
std::function<ExceptionAction(const std::exception&)>;

LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::chrono::seconds heartbeat_interval);
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::chrono::seconds heartbeat_interval);
LiveThreaded(const LiveThreaded&) = delete;
LiveThreaded& operator=(const LiveThreaded&) = delete;
LiveThreaded(LiveThreaded&& other) noexcept;
Expand All @@ -57,7 +55,7 @@ class LiveThreaded {
VersionUpgradePolicy UpgradePolicy() const;
// The the first member of the pair will be true, when the heartbeat interval
// was overridden.
std::pair<bool, std::chrono::seconds> HeartbeatInterval() const;
std::optional<std::chrono::seconds> HeartbeatInterval() const;
const std::vector<LiveSubscription>& Subscriptions() const;
std::vector<LiveSubscription>& Subscriptions();

Expand Down Expand Up @@ -96,15 +94,27 @@ class LiveThreaded {
KeepGoing BlockForStop(std::chrono::milliseconds timeout);

private:
friend LiveBuilder;

struct Impl;

static void ProcessingThread(Impl* impl, MetadataCallback&& metadata_callback,
RecordCallback&& record_callback,
ExceptionCallback&& exception_callback);
static ExceptionAction ExceptionHandler(
Impl* impl, const ExceptionCallback& exception_callback,
const std::exception& exc, const char* pretty_function_name,
const char* message);
const std::exception& exc, std::string_view pretty_function_name,
std::string_view message);

LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size);
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size);

// unique_ptr to be movable
std::unique_ptr<Impl> impl_;
Expand Down
Loading
Loading