Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.53.0 - 2026-04-08

### Enhancements
- Added `TryNextRecord` and `FillBuffer` to `LiveBlocking` for more fine-grained
control around I/O
- Added `TimeoutConf` struct and `SetTimeoutConf()` builder method for configuring connect
and auth timeouts on the Live client (defaults to 10s and 30s)
- Added `SessionId()` and `Timeouts()` getters to `LiveBlocking` and `LiveThreaded`

## 0.52.0 - 2026-03-31

### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2)

project(
databento
VERSION 0.52.0
VERSION 0.53.0
LANGUAGES CXX
DESCRIPTION "Official Databento client library"
)
Expand Down
2 changes: 2 additions & 0 deletions include/databento/detail/live_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class LiveConnection : IWritable {
public:
LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
std::uint16_t port);
LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
std::uint16_t port, TcpClient::RetryConf retry_conf);

void WriteAll(std::string_view str);
void WriteAll(const std::byte* buffer, std::size_t size);
Expand Down
1 change: 1 addition & 0 deletions include/databento/detail/tcp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class TcpClient {
struct RetryConf {
std::uint32_t max_attempts{1};
std::chrono::seconds max_wait{std::chrono::minutes{1}};
std::chrono::seconds connect_timeout{std::chrono::seconds{10}};
};

TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port);
Expand Down
4 changes: 4 additions & 0 deletions include/databento/live.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class LiveBuilder {
LiveBuilder& SetCompression(Compression compression);
// Sets the behavior of the gateway when the client falls behind real time.
LiveBuilder& SetSlowReaderBehavior(SlowReaderBehavior slow_reader_behavior);
// Sets the timeouts for connecting and authenticating with the gateway.
// Defaults to 10 seconds for connect and 30 seconds for auth.
LiveBuilder& SetTimeoutConf(TimeoutConf timeout_conf);

/*
* Build a live client instance
Expand Down Expand Up @@ -85,5 +88,6 @@ class LiveBuilder {
std::string user_agent_ext_;
Compression compression_{Compression::None};
std::optional<SlowReaderBehavior> slow_reader_behavior_{};
TimeoutConf timeout_conf_{};
};
} // namespace databento
42 changes: 37 additions & 5 deletions include/databento/live_blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ namespace databento {
// Forward declaration
class ILogReceiver;
class LiveBuilder;

// Timeouts for the Live client's connection and authentication phases.
struct TimeoutConf {
std::chrono::seconds connect{10};
std::chrono::seconds auth{30};
};

class LiveThreaded;

// A client for interfacing with Databento's real-time and intraday replay
Expand Down Expand Up @@ -48,6 +55,8 @@ class LiveBlocking {
std::optional<databento::SlowReaderBehavior> SlowReaderBehavior() const {
return slow_reader_behavior_;
}
const databento::TimeoutConf& TimeoutConf() const { return timeout_conf_; }
std::uint64_t SessionId() const { return session_id_; }
const std::vector<LiveSubscription>& Subscriptions() const { return subscriptions_; }
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }

Expand Down Expand Up @@ -81,6 +90,26 @@ class LiveBlocking {
//
// This method should only be called after `Start`.
const Record* NextRecord(std::chrono::milliseconds timeout);
// Returns the next record from the internal buffer without performing any
// I/O. Returns `nullptr` if no complete record is buffered. The returned
// pointer is valid until the next call to `TryNextRecord`, `NextRecord`,
// or `FillBuffer`.
//
// This method should only be called after `Start`.
const Record* TryNextRecord();
// Reads available data from the connection into the internal buffer using
// the heartbeat timeout. Returns the number of bytes read and the status.
// A `read_size` of 0 with `Status::Closed` indicates the connection was
// closed by the gateway.
//
// This method should only be called after `Start`.
IReadable::Result FillBuffer();
// Reads available data from the connection into the internal buffer.
// Returns the number of bytes read and the status. A `read_size` of 0 with
// `Status::Closed` indicates the connection was closed by the gateway.
//
// This method should only be called after `Start`.
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
// Stops the session with the gateway. Once stopped, the session cannot be
// restarted.
void Stop();
Expand All @@ -99,25 +128,27 @@ class LiveBlocking {
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression,
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
databento::TimeoutConf timeout_conf);
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression,
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
databento::TimeoutConf timeout_conf);

std::string DetermineGateway() const;
std::uint64_t Authenticate();
std::string DecodeChallenge();
std::string DecodeChallenge(std::chrono::milliseconds timeout);
std::string GenerateCramReply(std::string_view challenge_key);
std::string EncodeAuthReq(std::string_view auth);
std::uint64_t DecodeAuthResp();
std::uint64_t DecodeAuthResp(std::chrono::milliseconds timeout);
void IncrementSubCounter();
void Subscribe(std::string_view sub_msg, const std::vector<std::string>& symbols,
bool use_snapshot);
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
const Record* ConsumeBufferedRecord();
RecordHeader* BufferRecordHeader();
std::chrono::milliseconds HeartbeatTimeout() const;
void CheckHeartbeatTimeout() const;
Expand All @@ -136,6 +167,7 @@ class LiveBlocking {
const std::optional<std::chrono::seconds> heartbeat_interval_;
const databento::Compression compression_;
const std::optional<databento::SlowReaderBehavior> slow_reader_behavior_;
const databento::TimeoutConf timeout_conf_;
detail::LiveConnection connection_;
std::uint32_t sub_counter_{};
std::vector<LiveSubscription> subscriptions_;
Expand Down
9 changes: 7 additions & 2 deletions include/databento/live_threaded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "databento/datetime.hpp" // UnixNanos
#include "databento/detail/scoped_thread.hpp" // ScopedThread
#include "databento/enums.hpp" // Schema, SType
#include "databento/live_blocking.hpp" // TimeoutConf
#include "databento/live_subscription.hpp"
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback

Expand Down Expand Up @@ -56,6 +57,8 @@ class LiveThreaded {
std::optional<std::chrono::seconds> HeartbeatInterval() const;
databento::Compression Compression() const;
std::optional<databento::SlowReaderBehavior> SlowReaderBehavior() const;
const databento::TimeoutConf& TimeoutConf() const;
std::uint64_t SessionId() const;
const std::vector<LiveSubscription>& Subscriptions() const;
std::vector<LiveSubscription>& Subscriptions();

Expand Down Expand Up @@ -111,14 +114,16 @@ class LiveThreaded {
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression,
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
databento::TimeoutConf timeout_conf);
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression,
std::optional<databento::SlowReaderBehavior> slow_reader_behavior);
std::optional<databento::SlowReaderBehavior> slow_reader_behavior,
databento::TimeoutConf timeout_conf);

// unique_ptr to be movable
std::unique_ptr<Impl> impl_;
Expand Down
10 changes: 5 additions & 5 deletions include/databento/publishers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ enum class Dataset : std::uint16_t {
XcisBbo = 13,
// NYSE National Trades
XcisTrades = 14,
// MEMX Memoir Depth
// MEMX MEMOIR Depth
MemxMemoir = 15,
// MIAX Pearl Depth
EprlDom = 16,
Expand Down Expand Up @@ -195,7 +195,7 @@ enum class Dataset : std::uint16_t {
XeurEobi = 38,
// European Energy Exchange EOBI
XeeeEobi = 39,
// Cboe Futures Exchange PITCH
// CFE Depth
XcbfPitch = 40,
// Blue Ocean ATS MEMOIR Depth
OceaMemoir = 41,
Expand Down Expand Up @@ -231,7 +231,7 @@ enum class Publisher : std::uint16_t {
XcisBboXcis = 13,
// NYSE National Trades
XcisTradesXcis = 14,
// MEMX Memoir Depth
// MEMX MEMOIR Depth
MemxMemoirMemx = 15,
// MIAX Pearl Depth
EprlDomEprl = 16,
Expand Down Expand Up @@ -411,9 +411,9 @@ enum class Publisher : std::uint16_t {
XeurEobiXoff = 103,
// European Energy Exchange EOBI - Off-Market Trades
XeeeEobiXoff = 104,
// Cboe Futures Exchange
// Cboe Futures Exchange (CFE)
XcbfPitchXcbf = 105,
// Cboe Futures Exchange - Off-Market Trades
// Cboe Futures Exchange (CFE) - Off-Market Trades
XcbfPitchXoff = 106,
// Blue Ocean ATS MEMOIR
OceaMemoirOcea = 107,
Expand Down
2 changes: 1 addition & 1 deletion pkg/PKGBUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Maintainer: Databento <support@databento.com>
_pkgname=databento-cpp
pkgname=databento-cpp-git
pkgver=0.52.0
pkgver=0.53.0
pkgrel=1
pkgdesc="Official C++ client for Databento"
arch=('any')
Expand Down
4 changes: 4 additions & 0 deletions src/detail/live_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& ga
std::uint16_t port)
: client_{log_receiver, gateway, port} {}

LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
std::uint16_t port, TcpClient::RetryConf retry_conf)
: client_{log_receiver, gateway, port, retry_conf} {}

void LiveConnection::WriteAll(std::string_view str) { client_.WriteAll(str); }

void LiveConnection::WriteAll(const std::byte* buffer, std::size_t size) {
Expand Down
95 changes: 85 additions & 10 deletions src/detail/tcp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
#ifdef _WIN32
#include <winsock2.h> // closesocket, recv, send, socket
#else
#include <fcntl.h> // fcntl, F_GETFL, F_SETFL, O_NONBLOCK
#include <netdb.h> // addrinfo, gai_strerror, getaddrinfo, freeaddrinfo
#include <netinet/in.h> // htons, IPPROTO_TCP
#include <sys/poll.h> // pollfd
#include <sys/socket.h> // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM
#include <unistd.h> // close, ssize_t
#include <sys/socket.h> // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM, getsockopt, SO_ERROR, SOL_SOCKET
#include <unistd.h> // close, ssize_t

#include <cerrno> // errno
#endif
Expand All @@ -31,6 +32,65 @@ int GetErrNo() {
return errno;
#endif
}

int Poll(::pollfd* fds, std::uint32_t nfds, int timeout_ms) {
#ifdef _WIN32
return ::WSAPoll(fds, nfds, timeout_ms);
#else
return ::poll(fds, static_cast<::nfds_t>(nfds), timeout_ms);
#endif
}

int GetSockOpt(databento::detail::Socket fd, int level, int optname, int* optval) {
#ifdef _WIN32
int len = sizeof(*optval);
return ::getsockopt(fd, level, optname, reinterpret_cast<char*>(optval), &len);
#else
socklen_t len = sizeof(*optval);
return ::getsockopt(fd, level, optname, optval, &len);
#endif
}

#ifdef _WIN32
constexpr int kConnectInProgress = WSAEWOULDBLOCK;
#else
constexpr int kConnectInProgress = EINPROGRESS;
#endif

// Saves the current blocking state, sets non-blocking, and returns a RAII guard
// that restores the original state on destruction.
struct BlockingGuard {
databento::detail::Socket fd;
#ifdef _WIN32
// No state to save on Windows
#else
int original_flags;
#endif

explicit BlockingGuard(databento::detail::Socket fd) : fd{fd} {
#ifdef _WIN32
unsigned long mode = 1;
::ioctlsocket(fd, FIONBIO, &mode);
#else
original_flags = ::fcntl(fd, F_GETFL, 0);
::fcntl(fd, F_SETFL, original_flags | O_NONBLOCK);
#endif
}

~BlockingGuard() {
#ifdef _WIN32
unsigned long mode = 0;
::ioctlsocket(fd, FIONBIO, &mode);
#else
::fcntl(fd, F_SETFL, original_flags);
#endif
}

BlockingGuard(const BlockingGuard&) = delete;
BlockingGuard& operator=(const BlockingGuard&) = delete;
BlockingGuard(BlockingGuard&&) = delete;
BlockingGuard& operator=(BlockingGuard&&) = delete;
};
} // namespace

TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway,
Expand Down Expand Up @@ -83,12 +143,7 @@ databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer,
// having no timeout
const auto timeout_ms = timeout.count() ? static_cast<int>(timeout.count()) : -1;
while (true) {
const int poll_status =
#ifdef _WIN32
::WSAPoll(&fds, 1, timeout_ms);
#else
::poll(&fds, 1, timeout_ms);
#endif
const int poll_status = Poll(&fds, 1, timeout_ms);
if (poll_status > 0) {
return ReadSome(buffer, max_size);
}
Expand Down Expand Up @@ -130,13 +185,33 @@ databento::detail::ScopedFd TcpClient::InitSocket(ILogReceiver* log_receiver,
}
std::unique_ptr<addrinfo, decltype(&::freeaddrinfo)> res{out, &::freeaddrinfo};
const auto max_attempts = std::max<std::uint32_t>(retry_conf.max_attempts, 1);
const auto timeout_ms = static_cast<int>(
std::chrono::duration_cast<std::chrono::milliseconds>(retry_conf.connect_timeout)
.count());
std::chrono::seconds backoff{1};
for (std::uint32_t attempt = 0; attempt < max_attempts; ++attempt) {
if (::connect(scoped_fd.Get(), res->ai_addr, res->ai_addrlen) == 0) {
BlockingGuard guard{scoped_fd.Get()};

const int connect_ret = ::connect(scoped_fd.Get(), res->ai_addr, res->ai_addrlen);
bool connected = (connect_ret == 0);
if (!connected && ::GetErrNo() == kConnectInProgress) {
pollfd pfd{scoped_fd.Get(), POLLOUT, {}};
const int poll_ret = Poll(&pfd, 1, timeout_ms);
if (poll_ret > 0) {
int so_error = 0;
GetSockOpt(scoped_fd.Get(), SOL_SOCKET, SO_ERROR, &so_error);
connected = (so_error == 0);
if (!connected) {
errno = so_error;
}
}
}

if (connected) {
break;
} else if (attempt + 1 == max_attempts) {
std::ostringstream err_msg;
err_msg << "Socket failed to connect after " << max_attempts << " attempts";
err_msg << "Socket failed to connect after " << max_attempts << " attempt(s)";
throw TcpError{::GetErrNo(), err_msg.str()};
}
std::ostringstream log_msg;
Expand Down
Loading
Loading