Skip to content

Commit 49cf530

Browse files
committed
ADD: Add live heartbeat monitoring
1 parent dad2c0f commit 49cf530

File tree

14 files changed

+203
-31
lines changed

14 files changed

+203
-31
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@
33
## 0.52.0 - TBD
44

55
### Enhancements
6+
- Added client-side heartbeat timeout detection: `NextRecord` throws `HeartbeatTimeoutError`
7+
if no data is received for `heartbeat_interval` + 5 seconds (defaults to 35 seconds)
68
- Changed `SlowReaderBehavior::Skip` to send "skip" instead of "drop" to the gateway
79

10+
### Breaking changes
11+
- `NextRecord` now throws `LiveApiError` instead of `DbnResponseError` when the gateway
12+
closes the session. Code catching `DbnResponseError` for this case should be updated
13+
814
## 0.51.0 - 2026-03-17
915

1016
### Enhancements

CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,16 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_DATE)
202202
find_package(date REQUIRED)
203203
endif()
204204
else()
205-
set(date_version 3.0.3)
205+
set(date_version 3.0.4)
206+
set(CMAKE_WARN_DEPRECATED OFF CACHE BOOL "" FORCE)
206207
FetchContent_Declare(
207208
date_src
208209
URL https://github.com/HowardHinnant/date/archive/refs/tags/v${date_version}.tar.gz
209210
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
210211
)
212+
# Suppress deprecation warning from date's old cmake_minimum_required
211213
FetchContent_MakeAvailable(date_src)
214+
set(CMAKE_WARN_DEPRECATED ON CACHE BOOL "" FORCE)
212215
# Ignore compiler warnings in headers
213216
add_system_include_property(date)
214217
endif()

cmake/Findzstd.cmake

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Try config mode first. If the system provides a zstd CMake config (e.g. Homebrew),
2+
# use it directly so all targets are defined consistently.
3+
find_package(zstd CONFIG QUIET)
4+
if(zstd_FOUND)
5+
if(NOT TARGET zstd::libzstd)
6+
if(TARGET zstd::libzstd_shared)
7+
add_library(zstd::libzstd ALIAS zstd::libzstd_shared)
8+
elseif(TARGET zstd::libzstd_static)
9+
add_library(zstd::libzstd ALIAS zstd::libzstd_static)
10+
endif()
11+
endif()
12+
return()
13+
endif()
14+
115
include(FindPackageHandleStandardArgs)
216

317
if(WIN32)

include/databento/detail/live_connection.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@
1313
#include "databento/ireadable.hpp"
1414
#include "databento/iwritable.hpp"
1515

16+
// Forward declare
17+
namespace databento {
18+
class ILogReceiver;
19+
}
20+
1621
namespace databento::detail {
1722
// Manages the TCP connection to the live gateway with optionally compressed reads for
1823
// the DBN data.
1924
class LiveConnection : IWritable {
2025
public:
21-
LiveConnection(const std::string& gateway, std::uint16_t port);
26+
LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
27+
std::uint16_t port);
2228

2329
void WriteAll(std::string_view str);
2430
void WriteAll(const std::byte* buffer, std::size_t size);

include/databento/detail/tcp_client.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
#include "databento/detail/scoped_fd.hpp" // ScopedFd
1010
#include "databento/ireadable.hpp"
1111

12+
// Forward declare
13+
namespace databento {
14+
class ILogReceiver;
15+
}
16+
1217
namespace databento::detail {
1318
class TcpClient {
1419
public:
@@ -17,8 +22,9 @@ class TcpClient {
1722
std::chrono::seconds max_wait{std::chrono::minutes{1}};
1823
};
1924

20-
TcpClient(const std::string& gateway, std::uint16_t port);
21-
TcpClient(const std::string& gateway, std::uint16_t port, RetryConf retry_conf);
25+
TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port);
26+
TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port,
27+
RetryConf retry_conf);
2228

2329
void WriteAll(std::string_view str);
2430
void WriteAll(const std::byte* buffer, std::size_t size);
@@ -32,8 +38,8 @@ class TcpClient {
3238
void Close();
3339

3440
private:
35-
static ScopedFd InitSocket(const std::string& gateway, std::uint16_t port,
36-
RetryConf retry_conf);
41+
static ScopedFd InitSocket(ILogReceiver* log_receiver, const std::string& gateway,
42+
std::uint16_t port, RetryConf retry_conf);
3743

3844
ScopedFd socket_;
3945
};

include/databento/exceptions.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <httplib.h> // Error
77
#include <nlohmann/json.hpp> // json, parse_error
88

9+
#include <chrono>
910
#include <cstdint>
1011
#include <exception>
1112
#include <string>
@@ -132,6 +133,20 @@ class DbnResponseError : public Exception {
132133
explicit DbnResponseError(std::string message) : Exception{std::move(message)} {}
133134
};
134135

136+
// Exception indicating no data was received within the heartbeat timeout window.
137+
class HeartbeatTimeoutError : public Exception {
138+
public:
139+
explicit HeartbeatTimeoutError(std::chrono::seconds elapsed)
140+
: Exception{BuildMessage(elapsed)}, elapsed_{elapsed} {}
141+
142+
std::chrono::seconds Elapsed() const { return elapsed_; }
143+
144+
private:
145+
static std::string BuildMessage(std::chrono::seconds elapsed);
146+
147+
const std::chrono::seconds elapsed_;
148+
};
149+
135150
// Exception indicating something internal to the live API, but unrelated to TCP
136151
// went wrong.
137152
class LiveApiError : public Exception {

include/databento/live_blocking.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

33
#include <array>
4-
#include <chrono> // milliseconds
4+
#include <chrono> // milliseconds, steady_clock
55
#include <cstddef>
66
#include <cstdint>
77
#include <optional>
@@ -119,6 +119,8 @@ class LiveBlocking {
119119
bool use_snapshot);
120120
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
121121
RecordHeader* BufferRecordHeader();
122+
std::chrono::milliseconds HeartbeatTimeout() const;
123+
void CheckHeartbeatTimeout() const;
122124

123125
static constexpr std::size_t kMaxStrLen = 24L * 1024;
124126

@@ -142,5 +144,7 @@ class LiveBlocking {
142144
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
143145
std::uint64_t session_id_;
144146
Record current_record_{nullptr};
147+
std::chrono::steady_clock::time_point last_read_time_{
148+
std::chrono::steady_clock::now()};
145149
};
146150
} // namespace databento

src/detail/live_connection.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66

77
using databento::detail::LiveConnection;
88

9-
LiveConnection::LiveConnection(const std::string& gateway, std::uint16_t port)
10-
: client_{gateway, port} {}
9+
LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
10+
std::uint16_t port)
11+
: client_{log_receiver, gateway, port} {}
1112

1213
void LiveConnection::WriteAll(std::string_view str) { client_.WriteAll(str); }
1314

src/detail/tcp_client.cpp

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#else
66
#include <netdb.h> // addrinfo, gai_strerror, getaddrinfo, freeaddrinfo
77
#include <netinet/in.h> // htons, IPPROTO_TCP
8-
#include <sys/poll.h> // pollfd, POLLHUP
8+
#include <sys/poll.h> // pollfd
99
#include <sys/socket.h> // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM
1010
#include <unistd.h> // close, ssize_t
1111

@@ -18,6 +18,7 @@
1818
#include <thread>
1919

2020
#include "databento/exceptions.hpp" // TcpError
21+
#include "databento/log.hpp" // ILogReceiver
2122

2223
using databento::detail::TcpClient;
2324
using Status = databento::IReadable::Status;
@@ -32,12 +33,13 @@ int GetErrNo() {
3233
}
3334
} // namespace
3435

35-
TcpClient::TcpClient(const std::string& gateway, std::uint16_t port)
36-
: TcpClient{gateway, port, {}} {}
36+
TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway,
37+
std::uint16_t port)
38+
: TcpClient{log_receiver, gateway, port, {}} {}
3739

38-
TcpClient::TcpClient(const std::string& gateway, std::uint16_t port,
39-
RetryConf retry_conf)
40-
: socket_{InitSocket(gateway, port, retry_conf)} {}
40+
TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway,
41+
std::uint16_t port, RetryConf retry_conf)
42+
: socket_{InitSocket(log_receiver, gateway, port, retry_conf)} {}
4143

4244
void TcpClient::WriteAll(std::string_view str) {
4345
WriteAll(reinterpret_cast<const std::byte*>(str.data()), str.length());
@@ -103,9 +105,12 @@ databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer,
103105

104106
void TcpClient::Close() { socket_.Close(); }
105107

106-
databento::detail::ScopedFd TcpClient::InitSocket(const std::string& gateway,
108+
databento::detail::ScopedFd TcpClient::InitSocket(ILogReceiver* log_receiver,
109+
const std::string& gateway,
107110
std::uint16_t port,
108111
RetryConf retry_conf) {
112+
static constexpr auto kMethod = "TcpClient::TcpClient";
113+
109114
const detail::Socket fd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
110115
if (fd == -1) {
111116
throw TcpError{::GetErrNo(), "Failed to create socket"};
@@ -121,7 +126,7 @@ databento::detail::ScopedFd TcpClient::InitSocket(const std::string& gateway,
121126
const auto ret =
122127
::getaddrinfo(gateway.c_str(), std::to_string(port).c_str(), &hints, &out);
123128
if (ret != 0) {
124-
throw InvalidArgumentError{"TcpClient::TcpClient", "addr", ::gai_strerror(ret)};
129+
throw InvalidArgumentError{kMethod, "addr", ::gai_strerror(ret)};
125130
}
126131
std::unique_ptr<addrinfo, decltype(&::freeaddrinfo)> res{out, &::freeaddrinfo};
127132
const auto max_attempts = std::max<std::uint32_t>(retry_conf.max_attempts, 1);
@@ -134,7 +139,12 @@ databento::detail::ScopedFd TcpClient::InitSocket(const std::string& gateway,
134139
err_msg << "Socket failed to connect after " << max_attempts << " attempts";
135140
throw TcpError{::GetErrNo(), err_msg.str()};
136141
}
137-
// TODO(cg): Log
142+
std::ostringstream log_msg;
143+
log_msg << '[' << kMethod << "] Connection attempt " << (attempt + 1) << " to "
144+
<< gateway << ':' << port << " failed, retrying in " << backoff.count()
145+
<< " seconds";
146+
log_receiver->Receive(LogLevel::Warning, log_msg.str());
147+
138148
std::this_thread::sleep_for(backoff);
139149
backoff = (std::min)(backoff * 2, retry_conf.max_wait);
140150
}

src/exceptions.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ JsonResponseError JsonResponseError::TypeMismatch(std::string_view method_name,
8989
return JsonResponseError{err_msg.str()};
9090
}
9191

92+
using databento::HeartbeatTimeoutError;
93+
94+
std::string HeartbeatTimeoutError::BuildMessage(std::chrono::seconds elapsed) {
95+
std::ostringstream err_msg;
96+
err_msg << "Heartbeat timeout: no data received for " << elapsed.count()
97+
<< " seconds";
98+
return err_msg.str();
99+
}
100+
92101
using databento::LiveApiError;
93102

94103
LiveApiError LiveApiError::UnexpectedMsg(std::string_view message,

0 commit comments

Comments
 (0)