Skip to content

Commit 474d2a4

Browse files
userFRMclaude
andauthored
fix(streaming): harden FPSS subscription tracking, ring defaults, and credential validation (#809)
* fix(streaming): harden FPSS subscription tracking, ring defaults, and credential validation Oversized streaming credentials now return a typed invalid-parameter configuration error before the connection opens instead of panicking. The credentials payload builder validates the total length against the 255-byte frame limit up front and names both the limit and the actual size; every connect and auth call site propagates the result. Active subscriptions are de-duplicated by contract and by full-stream security type, so a repeated subscribe no longer accumulates duplicate tracked entries that replay multiple times on reconnect. Unsubscribe keeps its remove-once semantics. The command channel to the I/O worker is now bounded: the heartbeat takes natural backpressure with a blocking send and the public subscribe and unsubscribe methods return a typed queue-full error rather than dropping commands or growing the queue without limit. The direct streaming builder now defaults its buffer to the production streaming buffer size, so a builder-constructed client gets production-grade headroom for large streams by default; callers can still set a smaller size explicitly. The streaming backpressure documentation is corrected to state that newly arriving events are dropped when the buffer is full, not the oldest buffered events. Closes #803 Closes #804 Closes #805 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * fix(bench): consume the now-fallible build_credentials_payload result build_credentials_payload returns Result since it validates the credential lengths against the wire frame bounds, so discarding its value trips unused-must-use under -D warnings on the benches target. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 687fffe commit 474d2a4

13 files changed

Lines changed: 318 additions & 38 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5858
### Changed
5959

6060
- The `tdbe` time-and-calendar crate is folded into `thetadatadx` as a private internal module, so the workspace builds and publishes a single `thetadatadx` artifact. The curated public surface is unchanged (`TradeTick`, `greeks::all_greeks`, `SecType`, `utils::conditions`, and the calendar types are reached at `thetadatadx::*` paths); items that were previously foreign-public — `PriceError`, `MAX_PRICE_TYPE`, `greeks::Error` — are re-exported at public crate paths, and the data-layer internals (DST epoch math, canonical-JSON helpers, the FIT / FIE codecs) stay behind the existing `__internal` feature.
61+
- The direct `StreamingClientBuilder` now defaults its streaming buffer to the same size as the production streaming configuration, so a builder-constructed client gets production-grade overflow headroom by default for large streams (10k-15k option contracts plus full trade streams) instead of a much smaller buffer that drops events under market bursts; set `.ring_size(..)` to choose a smaller footprint. The streaming backpressure documentation is also corrected to state that newly arriving events are dropped when the buffer is full, not the oldest buffered events.
6162

6263
### Fixed
6364

@@ -70,6 +71,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7071
- Python — the standalone `StreamingClient` streaming connect, reconnect, and subscribe / unsubscribe paths release the GIL across their blocking I/O (the TLS connect and handshake, and the per-subscription wire write), so other Python threads keep running while a connect or subscribe is in flight; the typed exception raised on failure is unchanged.
7172
- The standalone Python and TypeScript streaming clients now forward the full streaming and reconnect config, so every tuning knob — including the wait strategy and consumer-core affinity, host selection, watchdog and keepalive cadences, and the reconnect backoff and replay pacing — is honored, matching the unified client and the C ABI.
7273
- The bundled WebSocket server acknowledges stream requests with ThetaData's stream-verification values: a successful subscribe, unsubscribe, or stop now returns `SUBSCRIBED` instead of `OK`, and a subscribe that arrives before streaming has started returns `ERROR` with a descriptive message instead of a false-positive `OK` that claimed success while installing nothing.
74+
- Oversized streaming credentials now fail with a typed invalid-parameter configuration error before the connection is opened instead of panicking the caller. The credentials payload is validated against the 255-byte protocol frame limit up front, and the error names both the limit and the actual size so a too-long email or password is reported as a normal recoverable failure.
75+
- Active streaming subscriptions are de-duplicated by contract and by full-stream security type, so a repeated subscribe no longer accumulates duplicate tracked entries that get replayed multiple times after a reconnect; unsubscribe still removes the tracked entry. The control channel that carries subscribe / unsubscribe / heartbeat commands to the I/O worker is now bounded, so a control-plane burst cannot grow it without limit: the heartbeat takes natural backpressure and the public subscribe / unsubscribe methods return a typed queue-full error rather than dropping a command or accumulating unbounded memory.
7376

7477
### Security
7578

crates/thetadatadx/benches/bench_protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ fn bench_contract_roundtrip(c: &mut Criterion) {
7474
fn bench_build_credentials_payload(c: &mut Criterion) {
7575
c.bench_function("build_credentials_payload", |b| {
7676
b.iter(|| {
77-
black_box(build_credentials_payload(
77+
let _ = black_box(build_credentials_payload(
7878
black_box("trader@example.com"),
7979
black_box("s3cret_p4ssw0rd!"),
8080
));

crates/thetadatadx/build_support_bin/sdk_surface/typescript.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ fn ts_streaming_method(method: &MethodSpec) -> String {
4747
the stream.\n\
4848
\n\
4949
Backpressure: a slow callback causes incoming events\n\
50-
to queue and, once the buffer is full, the oldest\n\
50+
to queue and, once the buffer is full, newly arriving\n\
5151
events are dropped. The dropped count is observable\n\
5252
via `droppedEventCount()`. The receive path is never\n\
5353
blocked by a slow callback, so the upstream connection\n\

crates/thetadatadx/src/flatfiles/session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub(crate) async fn login(
100100
creds: &Credentials,
101101
) -> Result<String, Error> {
102102
// CREDENTIALS frame.
103-
let creds_payload = build_credentials_payload(&creds.email, creds.password());
103+
let creds_payload = build_credentials_payload(&creds.email, creds.password())?;
104104
write_frame(stream, msg::CREDENTIALS, -1, &creds_payload).await?;
105105

106106
// VERSION frame.
@@ -208,7 +208,7 @@ mod tests {
208208
#[test]
209209
fn credentials_payload_layout_is_stable() {
210210
// Verifies leading byte is 0x00 and userlen is BE u16.
211-
let p = build_credentials_payload("a@b.c", "pw");
211+
let p = build_credentials_payload("a@b.c", "pw").expect("valid creds");
212212
assert_eq!(p[0], 0x00);
213213
assert_eq!(&p[1..3], &5u16.to_be_bytes());
214214
assert_eq!(&p[3..8], b"a@b.c");

crates/thetadatadx/src/fpss/io_loop/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,16 @@ where
810810
};
811811

812812
// Re-authenticate on the new stream.
813-
let cred_payload = build_credentials_payload(&creds.email, &creds.password);
813+
let cred_payload = match build_credentials_payload(&creds.email, &creds.password) {
814+
Ok(p) => p,
815+
Err(e) => {
816+
// Oversized credentials are a fatal configuration error, not a
817+
// transient I/O fault: retrying cannot make them fit. Surface
818+
// it and abandon the reconnect loop rather than spinning.
819+
tracing::error!(error = %e, "credentials payload invalid; aborting reconnect");
820+
break 'session;
821+
}
822+
};
814823
let frame = Frame::new(StreamMsgType::Credentials, cred_payload);
815824
if let Err(e) = write_frame(&mut new_stream, &frame) {
816825
tracing::warn!(error = %e, "failed to send credentials on reconnect");

crates/thetadatadx/src/fpss/io_loop/ping.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use super::super::protocol::build_ping_payload;
2828
// Reason: all parameters are moved into this function from a spawned thread closure.
2929
#[allow(clippy::needless_pass_by_value)]
3030
pub(in crate::fpss) fn ping_loop(
31-
cmd_tx: std_mpsc::Sender<IoCommand>,
31+
cmd_tx: std_mpsc::SyncSender<IoCommand>,
3232
shutdown: Arc<AtomicBool>,
3333
authenticated: Arc<AtomicBool>,
3434
interval: Duration,
@@ -55,6 +55,9 @@ pub(in crate::fpss) fn ping_loop(
5555
code: StreamMsgType::Ping,
5656
payload: ping_payload.clone(),
5757
};
58+
// Blocking send on the bounded channel: the heartbeat takes natural
59+
// backpressure if the I/O thread is momentarily behind, rather than
60+
// dropping a ping. `send` only errors once the receiver hangs up.
5861
if cmd_tx.send(cmd).is_err() {
5962
// I/O thread has exited
6063
break;
@@ -88,7 +91,8 @@ mod tests {
8891
/// -> Sleep cadence visible on `cmd_rx`.
8992
#[test]
9093
fn ping_loop_honors_configured_interval() {
91-
let (cmd_tx, cmd_rx) = std_mpsc::channel::<IoCommand>();
94+
let (cmd_tx, cmd_rx) =
95+
std_mpsc::sync_channel::<IoCommand>(crate::fpss::CMD_CHANNEL_CAPACITY);
9296
let shutdown = Arc::new(AtomicBool::new(false));
9397
let authenticated = Arc::new(AtomicBool::new(true));
9498
let interval = Duration::from_millis(30);
@@ -143,7 +147,8 @@ mod tests {
143147
/// produces noticeably-spaced pings rather than the 100 ms default.
144148
#[test]
145149
fn ping_loop_with_longer_interval_paces_slower_than_default() {
146-
let (cmd_tx, cmd_rx) = std_mpsc::channel::<IoCommand>();
150+
let (cmd_tx, cmd_rx) =
151+
std_mpsc::sync_channel::<IoCommand>(crate::fpss::CMD_CHANNEL_CAPACITY);
147152
let shutdown = Arc::new(AtomicBool::new(false));
148153
let authenticated = Arc::new(AtomicBool::new(true));
149154
let shutdown_clone = Arc::clone(&shutdown);

0 commit comments

Comments
 (0)