Skip to content

Commit 8e2bc3e

Browse files
authored
feat!: C++ callback wrapper + v8.0.30 release (closes #482) (#494)
* feat!: C++ callback wrapper + v8.0.30 release (closes #482) Migrate the C++ wrapper to the callback C ABI shipped in PR #490. `tdx::FpssClient` gains two header-only methods that wrap `tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback`: void set_callback(std::function<void(const FpssEvent&)> fn); void set_inline_callback(std::function<void(const FpssEvent&)> fn); The Client owns a `unique_ptr<std::function<...>>` -- a stable address survives moves of the owning client and is handed to the C ABI as the `void* ctx`. A free `extern "C"` shim recovers the function from `ctx` and invokes it with `const FpssEvent&`, swallowing any propagating exception so unwinding cannot cross the Rust/C boundary. The destructor's call to `tdx_fpss_shutdown` runs before the function storage is freed, so the dispatcher / reader threads cannot dereference stale state. `fpss_smoke.cpp` is restored on the callback path -- `#error` directive deleted, example rewritten to subscribe, register a queued callback, print events for five seconds, and exit cleanly. The CMake target builds clean. `sdks/cpp/README.md` streaming section now documents callback registration as the only entry point with a dedicated note on the inline opt-in's microsecond-budget contract. Version bump 8.0.29 -> 8.0.30 via `scripts/bump_version.py`. The [Unreleased] CHANGELOG block becomes `## [8.0.30] - 2026-05-06` -- the single coherent #482 release that bundles the dispatcher core (#489), C ABI callback (#490), and C++ wrapper migration. Python (#492) and TypeScript (#493) entries land when those PRs merge. Closes #482. * fix(cpp): address review on PR #494 -- memory safety on set_callback / move-assign set_callback / set_inline_callback now stage the new std::function into a local unique_ptr and only adopt it into callback_ after tdx_fpss_set_callback returns 0. The C ABI rejects subsequent registrations with -1 while keeping the previously installed (callback, ctx) live, so overwriting callback_ before checking the return code dangled the Rust-side ctx into freed storage. operator=(FpssClient&&) now drains the existing handle via tdx_fpss_shutdown before dropping the old callback_. tdx_fpss_shutdown stops the FPSS reader and joins the dispatcher drain thread before returning, so once it completes no thread can still observe the old ctx pointer.
1 parent a462932 commit 8e2bc3e

21 files changed

Lines changed: 305 additions & 67 deletions

File tree

CHANGELOG.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## [Unreleased]
8+
## [8.0.30] - 2026-05-06
9+
10+
This release closes #482: the entire FPSS streaming stack — Rust core,
11+
C ABI, Python, TypeScript, and C++ — moves to a callback-driven
12+
delivery model backed by a single `StreamingDispatcher` SSOT. Bundles
13+
PR #489 (dispatcher core), #490 (C ABI), #492 (Python), #493
14+
(TypeScript), and #494 (C++ wrapper).
915

1016
### Breaking
1117

@@ -86,9 +92,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
8692
and connect are atomic). All `std::sync::mpsc` usage and the
8793
poll-based receive path have been removed from `ffi/src/streaming.rs`.
8894

89-
The C++ wrapper's `next_event` / `FpssEventDeleter` / `FpssEventPtr`
90-
are also gone; the wrapper migrates to the callback API in a
91-
follow-up.
95+
- **C++ wrapper**: the poll-based `tdx::FpssClient::next_event` and the
96+
owning `FpssEventPtr` / `FpssEventDeleter` types are gone. Event
97+
delivery is now exclusively callback-driven through the new
98+
`set_callback` / `set_inline_callback` methods (see Added).
9299

93100
### Changed
94101

@@ -109,6 +116,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
109116
for the dispatcher path) but slow callbacks block the reader and
110117
cause vendor disconnects. Documented contract: callback must
111118
return within microseconds.
119+
- **C++ wrapper callback API.** `tdx::FpssClient::set_callback
120+
(std::function<void(const FpssEvent&)>)` for the default queued
121+
path; `set_inline_callback` for power-user opt-in directly on
122+
the FPSS reader thread. Both wrap the C ABI
123+
`tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback`
124+
shipped in this release. The `fpss_smoke` example is restored on
125+
the callback path.
112126

113127
## [8.0.29] - 2026-05-06
114128

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/thetadatadx/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "thetadatadx"
3-
version = "8.0.29"
3+
version = "8.0.30"
44
edition.workspace = true
55
rust-version.workspace = true
66
authors.workspace = true

docs-site/docs/changelog.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## [Unreleased]
8+
## [8.0.30] - 2026-05-06
9+
10+
This release closes #482: the entire FPSS streaming stack — Rust core,
11+
C ABI, Python, TypeScript, and C++ — moves to a callback-driven
12+
delivery model backed by a single `StreamingDispatcher` SSOT. Bundles
13+
PR #489 (dispatcher core), #490 (C ABI), #492 (Python), #493
14+
(TypeScript), and #494 (C++ wrapper).
915

1016
### Breaking
1117

@@ -86,9 +92,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
8692
and connect are atomic). All `std::sync::mpsc` usage and the
8793
poll-based receive path have been removed from `ffi/src/streaming.rs`.
8894

89-
The C++ wrapper's `next_event` / `FpssEventDeleter` / `FpssEventPtr`
90-
are also gone; the wrapper migrates to the callback API in a
91-
follow-up.
95+
- **C++ wrapper**: the poll-based `tdx::FpssClient::next_event` and the
96+
owning `FpssEventPtr` / `FpssEventDeleter` types are gone. Event
97+
delivery is now exclusively callback-driven through the new
98+
`set_callback` / `set_inline_callback` methods (see Added).
9299

93100
### Changed
94101

@@ -109,6 +116,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
109116
for the dispatcher path) but slow callbacks block the reader and
110117
cause vendor disconnects. Documented contract: callback must
111118
return within microseconds.
119+
- **C++ wrapper callback API.** `tdx::FpssClient::set_callback
120+
(std::function<void(const FpssEvent&)>)` for the default queued
121+
path; `set_inline_callback` for power-user opt-in directly on
122+
the FPSS reader thread. Both wrap the C ABI
123+
`tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback`
124+
shipped in this release. The `fpss_smoke` example is restored on
125+
the callback path.
112126

113127
## [8.0.29] - 2026-05-06
114128

ffi/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "thetadatadx-ffi"
3-
version = "8.0.29"
3+
version = "8.0.30"
44
edition.workspace = true
55
rust-version.workspace = true
66
authors.workspace = true

sdks/cpp/README.md

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -280,21 +280,35 @@ int main() {
280280
// Create a streaming client (separate from the historical Client)
281281
tdx::FpssClient fpss(creds, config);
282282

283-
// Subscribe to real-time quotes
284-
int req_id = fpss.subscribe_quotes("AAPL");
285-
std::cout << "Subscribed (req_id=" << req_id << ")" << std::endl;
283+
// Register a queued callback. The dispatcher drain thread invokes
284+
// `fn` for every event; the FPSS reader thread never blocks on
285+
// user code.
286+
fpss.set_callback([](const tdx::FpssEvent& event) {
287+
if (event.kind == TDX_FPSS_QUOTE) {
288+
std::cout << "quote bid=" << event.quote.bid
289+
<< " ask=" << event.quote.ask << std::endl;
290+
}
291+
});
286292

287-
// The C++ wrapper migrates to the new callback C ABI in a follow-up
288-
// PR (refs #482). Until that lands, drive the streaming connection
289-
// by calling the C symbols `tdx_fpss_set_callback` (queued, default)
290-
// or `tdx_fpss_set_inline_callback` (inline, microsecond-budget)
291-
// directly with an `extern "C" fn(const TdxFpssEvent*, void*)`.
293+
fpss.subscribe_quotes("AAPL");
294+
295+
// ... let the callback run ...
292296

293297
fpss.shutdown();
294298
}
295299
```
296300

297-
All prices in streaming events are `double` (f64) -- decoded during parsing. Access them directly: `event->quote.bid`, `event->trade.price`, etc. No `price_type` decoding needed.
301+
All prices in streaming events are `double` (f64) -- decoded during parsing. Access them directly: `event.quote.bid`, `event.trade.price`, etc. No `price_type` decoding needed.
302+
303+
### Inline callback (power user)
304+
305+
`set_inline_callback` is the opt-in alternative. Instead of routing events through the bounded crossbeam queue and dispatcher drain thread, the callback fires directly from the FPSS reader thread. This skips the queueing overhead but inverts the safety contract: the callback MUST return within microseconds. Any allocation, lock acquisition, syscall, or blocking I/O stalls the reader, fills the kernel TCP receive buffer, and causes the vendor to disconnect. Reach for this path only inside provably wait-free trading loops.
306+
307+
```cpp
308+
fpss.set_inline_callback([](const tdx::FpssEvent& event) {
309+
// wait-free hot loop -- no allocation, no locks, no I/O
310+
});
311+
```
298312

299313
### FpssClient API
300314

@@ -321,7 +335,9 @@ All prices in streaming events are `double` (f64) -- decoded during parsing. Acc
321335
| `contract_lookup(id)` | `optional<string>` | Look up a contract by server-assigned ID |
322336
| `contract_map()` | `map<int32_t, string>` | Get the full contract ID mapping |
323337
| `active_subscriptions()` | `vector<Subscription>` | Get active subscriptions as typed structs |
324-
| (callback registration) | `int` | The C++ wrapper migrates to the new callback C ABI in a follow-up PR (#482); until then call `tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback` from C directly |
338+
| `set_callback(std::function<void(const FpssEvent&)>)` | `void` | Queued path: dispatcher drain thread invokes `fn`, reader never blocks |
339+
| `set_inline_callback(std::function<void(const FpssEvent&)>)` | `void` | Inline path: `fn` fires on the FPSS reader thread (microsecond-budget contract) |
340+
| `dropped_events()` | `uint64_t` | Cumulative count of events the dispatcher dropped on queue overflow (0 for inline path) |
325341
| `reconnect()` | `void` | Reconnect streaming and restore subscriptions |
326342
| `shutdown()` | `void` | Shut down the FPSS client |
327343

sdks/cpp/examples/fpss_smoke.cpp

Lines changed: 116 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,118 @@
1-
// fpss_smoke.cpp -- C++ FPSS smoke test.
1+
// fpss_smoke.cpp -- C++ FPSS smoke test driven by the callback C ABI.
22
//
3-
// The poll-based `tdx::FpssClient::next_event` API was removed in
4-
// issue #482 (PR B) along with the underlying `tdx_fpss_next_event`
5-
// C ABI symbol. This example will be rewritten to drive the new
6-
// callback API (`tdx_fpss_set_callback` / `tdx_fpss_set_inline_callback`)
7-
// when the C++ wrapper migration ships in PR E.
8-
//
9-
// Compiling this file in its current form is intentional only after
10-
// the C++ wrapper migration lands; until then it is a static breakage
11-
// signal so downstream consumers do not silently miss the API change.
3+
// Subscribes to a stock and an option contract, registers a queued
4+
// callback, prints every event for a few seconds, then exits cleanly.
5+
6+
#include <atomic>
7+
#include <chrono>
8+
#include <iostream>
9+
#include <mutex>
10+
#include <stdexcept>
11+
#include <string>
12+
#include <thread>
13+
14+
#include "thetadx.hpp"
15+
16+
namespace {
17+
18+
constexpr const char* kSymbol = "AAPL";
19+
constexpr const char* kOptionSymbol = "SPY";
20+
constexpr const char* kExpiration = "20260417";
21+
constexpr const char* kStrike = "550";
22+
constexpr const char* kRight = "C";
23+
24+
constexpr auto kCollectFor = std::chrono::seconds(5);
25+
constexpr int kMaxEventsPrinted = 25;
26+
27+
const char* event_kind_name(tdx::FpssEventKind kind) {
28+
switch (kind) {
29+
case TDX_FPSS_QUOTE: return "quote";
30+
case TDX_FPSS_TRADE: return "trade";
31+
case TDX_FPSS_OPEN_INTEREST: return "open_interest";
32+
case TDX_FPSS_OHLCVC: return "ohlcvc";
33+
case TDX_FPSS_CONTROL: return "control";
34+
case TDX_FPSS_RAW_DATA: return "raw_data";
35+
}
36+
return "unknown";
37+
}
38+
39+
} // namespace
40+
41+
int main(int argc, char** argv) {
42+
const std::string creds_path = (argc > 1) ? argv[1] : "creds.txt";
43+
try {
44+
auto creds = tdx::Credentials::from_file(creds_path);
45+
auto config = tdx::Config::production();
46+
47+
tdx::FpssClient fpss(creds, config);
48+
49+
std::atomic<int> total_events{0};
50+
std::atomic<int> data_events{0};
51+
std::mutex print_mtx;
52+
53+
fpss.set_callback([&](const tdx::FpssEvent& event) {
54+
const int seq = total_events.fetch_add(1, std::memory_order_relaxed);
55+
if (event.kind != TDX_FPSS_CONTROL && event.kind != TDX_FPSS_RAW_DATA) {
56+
data_events.fetch_add(1, std::memory_order_relaxed);
57+
}
58+
if (seq >= kMaxEventsPrinted) return;
59+
std::lock_guard<std::mutex> guard(print_mtx);
60+
std::cout << "[" << seq << "] kind=" << event_kind_name(event.kind);
61+
switch (event.kind) {
62+
case TDX_FPSS_QUOTE:
63+
std::cout << " contract_id=" << event.quote.contract_id
64+
<< " bid=" << event.quote.bid
65+
<< " ask=" << event.quote.ask;
66+
break;
67+
case TDX_FPSS_TRADE:
68+
std::cout << " contract_id=" << event.trade.contract_id
69+
<< " price=" << event.trade.price
70+
<< " size=" << event.trade.size;
71+
break;
72+
case TDX_FPSS_OPEN_INTEREST:
73+
std::cout << " contract_id=" << event.open_interest.contract_id
74+
<< " open_interest=" << event.open_interest.open_interest;
75+
break;
76+
case TDX_FPSS_OHLCVC:
77+
std::cout << " contract_id=" << event.ohlcvc.contract_id
78+
<< " close=" << event.ohlcvc.close;
79+
break;
80+
case TDX_FPSS_CONTROL:
81+
std::cout << " control_kind=" << event.control.kind;
82+
if (event.control.detail) std::cout << " detail=" << event.control.detail;
83+
break;
84+
case TDX_FPSS_RAW_DATA:
85+
std::cout << " code=" << static_cast<int>(event.raw_data.code)
86+
<< " len=" << event.raw_data.payload_len;
87+
break;
88+
}
89+
std::cout << std::endl;
90+
});
91+
92+
if (fpss.subscribe_quotes(kSymbol) < 0) {
93+
throw std::runtime_error("subscribe_quotes failed");
94+
}
95+
if (fpss.subscribe_trades(kSymbol) < 0) {
96+
throw std::runtime_error("subscribe_trades failed");
97+
}
98+
if (fpss.subscribe_option_quotes(kOptionSymbol, kExpiration, kStrike, kRight) < 0) {
99+
throw std::runtime_error("subscribe_option_quotes failed");
100+
}
101+
102+
std::this_thread::sleep_for(kCollectFor);
103+
104+
const int total = total_events.load(std::memory_order_relaxed);
105+
const int data = data_events.load(std::memory_order_relaxed);
106+
const uint64_t dropped = fpss.dropped_events();
107+
108+
std::cout << "summary: total=" << total
109+
<< " data=" << data
110+
<< " dropped=" << dropped << std::endl;
12111

13-
#error "fpss_smoke.cpp depends on the removed `next_event` poll API. Re-enable when the C++ wrapper migrates to the callback C ABI in PR E (refs #482)."
112+
fpss.shutdown();
113+
return data > 0 ? 0 : 1;
114+
} catch (const std::exception& e) {
115+
std::cerr << "fpss_smoke error: " << e.what() << std::endl;
116+
return 2;
117+
}
118+
}

0 commit comments

Comments
 (0)