Skip to content

Commit 79c86bd

Browse files
userFRMclaude
andauthored
fix(fpss): reconcile subscription state against server responses (#876)
* fix(fpss): reconcile subscription state against server responses A subscribe is tracked in the reconnect-replay set the instant the frame is sent, but the server answers asynchronously and may reject it. Until now nothing removed a rejected entry, so a MaxStreamsReached or InvalidPerms contract was re-attempted on every reconnect forever and permanently over-reported by active_subscriptions(). Correlate the REQ_RESPONSE to the subscribe it answers by the req_id allocated at send time: a rejected subscription is untracked, not replayed, and an accepted one stays tracked. The derived OHLCVC accumulator read the trade date only on its first trade. Once initialized, a trade carrying a new date merged onto the prior date's open/high/low and cumulative volume/count while the emitted bar date stayed stale. The derived bar now rolls to a fresh bar on a date change before applying the trade. A write failure during the paced reconnect replay was only warn-logged while the loop continued, leaving a contract silently unsubscribed on the new session until the next disconnect. A replay write failure now re-enters reconnect, matching how every other mid-replay I/O failure escalates; the per-burst flush path is made consistent so it escalates the same way. Full-stream and per-contract subscriptions are tracked independently and the server can broadcast a contract on both feeds; the subscribe surface now documents that overlapping the same kind and security type across both scopes double-delivers, since the client does not de-duplicate across them. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * fix(fpss): untrack rejected subscriptions by request identity, not value A duplicate per-contract or full-stream subscribe shares the one tracked entry the first subscribe created, but the subscribe paths registered an untrack-capable pending correlation on every call. A server rejection of the duplicate (for example MaxStreamsReached, the realistic trigger once the first subscribe has claimed the stream slot) then removed the entry by value, dropping the still-live original from active_subs and from the reconnect-replay set that clones it, silently silencing the stream. Only the subscribe that actually added the tracked entry now carries an untrack-capable correlation, so a duplicate's rejection can no longer touch the live subscription. Bound the pending correlation registry. Entries were removed only by a matching req_id response or the reconnect-time clear, so a session whose response the server suppresses, or that echoes the uncorrelated -1 sentinel a matching remove can never key on, leaked an entry for the life of a session that never reconnects. Each correlation now records its insertion instant; the registry is swept on insert, evicting entries past a generous response deadline and capping resident size with an oldest-first drop and a single warn-level log. Make WakeFd::from_raw_write_fd an unsafe fn. It adopts a caller-supplied raw descriptor and closes it on Drop, so a safe signature let a caller pass a borrowed or aliased fd and invite a double-close or operation on a recycled descriptor. The unsafe marker documents the ownership-transfer contract, matching the FromRawFd convention; the two call sites are wrapped in audited unsafe blocks. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * fix(fpss): gate the subscription-reconcile test helper to test builds only The helper is only called from in-crate unit tests, so cfg(test) is the correct gate. The previous test-helpers feature gate left it compiled but unused in the library target, tripping dead_code under -D warnings. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: preview <noreply@anthropic.com>
1 parent 225a801 commit 79c86bd

6 files changed

Lines changed: 705 additions & 27 deletions

File tree

crates/thetadatadx/src/fpss/accumulator.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,14 @@ impl OhlcvcAccumulator {
7575
price_type: i32,
7676
date: i32,
7777
) {
78-
if self.initialized {
78+
// A trade on a new date opens a fresh bar. Without this, an already-
79+
// initialized accumulator merges the new date's trade onto the prior
80+
// date's open/high/low and cumulative volume/count while the emitted
81+
// `Ohlcvc.date` stays stale — silently corrupting the derived bar
82+
// across a session boundary when no server bar or clear intervenes.
83+
// Falling through to the uninitialized branch re-seeds open/high/low/
84+
// close, volume, count, price_type, and date from this single trade.
85+
if self.initialized && date == self.date {
7986
self.ms_of_day = ms_of_day;
8087
let adjusted_price = change_price_type(price, price_type, self.price_type);
8188
self.volume += i64::from(size);
@@ -222,6 +229,35 @@ mod tests {
222229
assert_eq!(acc.count, 2_286_840_317_i64);
223230
}
224231

232+
/// A trade carrying a new date must roll the accumulator to a fresh bar
233+
/// rather than merging onto the prior date's open/high/low + cumulative
234+
/// volume/count. Without the roll, the emitted `date` stays stale and the
235+
/// new session's first bar inherits the previous day's totals (derive
236+
/// path, no intervening server bar or clear).
237+
#[test]
238+
fn ohlcvc_accumulator_rolls_on_date_change() {
239+
let mut acc = OhlcvcAccumulator::new();
240+
// Day 1: two trades accumulate.
241+
acc.process_trade(57_600_000, 15_025, 100, 8, 20240315);
242+
acc.process_trade(57_600_100, 15_100, 200, 8, 20240315);
243+
assert_eq!(acc.date, 20240315);
244+
assert_eq!(acc.volume, 300);
245+
assert_eq!(acc.count, 2);
246+
247+
// Day 2: the first trade on the new date opens a fresh bar. Open,
248+
// high, low, close all reset to this trade's price; volume and count
249+
// restart from this trade alone; the date advances.
250+
acc.process_trade(34_200_000, 14_950, 50, 8, 20240318);
251+
assert_eq!(acc.date, 20240318);
252+
assert_eq!(acc.open, 14_950);
253+
assert_eq!(acc.high, 14_950);
254+
assert_eq!(acc.low, 14_950);
255+
assert_eq!(acc.close, 14_950);
256+
assert_eq!(acc.volume, 50, "new-date volume must not include day 1");
257+
assert_eq!(acc.count, 1, "new-date count must not include day 1");
258+
assert_eq!(acc.ms_of_day, 34_200_000);
259+
}
260+
225261
#[test]
226262
fn change_price_type_tests() {
227263
assert_eq!(change_price_type(15025, 8, 8), 15025);

0 commit comments

Comments
 (0)