diff --git a/crates/thetadatadx/src/fpss/io_loop/mod.rs b/crates/thetadatadx/src/fpss/io_loop/mod.rs index c1c9bdcc6..b79ec6b7f 100644 --- a/crates/thetadatadx/src/fpss/io_loop/mod.rs +++ b/crates/thetadatadx/src/fpss/io_loop/mod.rs @@ -140,6 +140,28 @@ pub(in crate::fpss) fn evict_stale_pending(map: &mut HashMap, + identity: &super::protocol::PendingSub, +) { + map.retain(|_, entry| &entry.sub != identity); +} type ActiveFullSubs = Arc< Mutex< Vec<( @@ -280,6 +302,16 @@ fn host_index(hosts: &[(String, u16)], addr: &str) -> Option { /// id from a span longer than the 31-bit counter cycle) leaves the tracked set /// untouched: with no correlation, untracking would risk dropping a healthy /// subscription, so the conservative choice is to keep it. +/// +/// The `req_id` lookup is a safe untrack because the pending registry holds the +/// invariant that at most one correlation per `(kind, contract)` (or `(kind, +/// sec_type)`) identity is resident, and it always names the current live +/// entry. A duplicate subscribe shares the live entry and registers no second +/// correlation; an unsubscribe removes the entry and evicts its correlation +/// (see [`evict_pending_for_identity`]). So a subscribe that is superseded by an +/// unsubscribe + re-subscribe of the same identity has no resident correlation +/// for its old `req_id`, and a late rejection of that id is a no-op rather than +/// a value match that would drop the re-subscribed live entry. fn apply_req_response( req_id: i32, result: StreamResponseType, diff --git a/crates/thetadatadx/src/fpss/mod.rs b/crates/thetadatadx/src/fpss/mod.rs index 49baeb99e..78ab5d424 100644 --- a/crates/thetadatadx/src/fpss/mod.rs +++ b/crates/thetadatadx/src/fpss/mod.rs @@ -2236,15 +2236,32 @@ impl StreamingClient { unsubscribe, "sent full-stream subscription frame" ); - // Track / untrack for reconnection. + // Untrack-on-unsubscribe is terminal: remove the tracked entry, then + // evict the in-flight correlation for this identity for the same reason + // as the per-contract unsubscribe — the removed entry is no longer live, + // so a late rejection of its subscribe must not survive to untrack a + // future re-subscribe of the same `(kind, sec_type)` by value. + if unsubscribe { + self.active_full_subs + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .retain(|(k, s)| !(*k == kind_for_track && *s == sec_type)); + io_loop::evict_pending_for_identity( + &mut self + .pending_subs + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner), + &protocol::PendingSub::Full(kind_for_track, sec_type), + ); + return Ok(()); + } + + // Track for reconnection. let mut subs = self .active_full_subs .lock() .unwrap_or_else(std::sync::PoisonError::into_inner); - let newly_tracked = if unsubscribe { - subs.retain(|(k, s)| !(*k == kind_for_track && *s == sec_type)); - false - } else if subs + let newly_tracked = if subs .iter() .any(|(k, s)| *k == kind_for_track && *s == sec_type) { @@ -2261,10 +2278,9 @@ impl StreamingClient { // Record the pending full-stream subscribe by `req_id` so a server // rejection drops exactly this entry from the replay set. Only the // subscribe that actually added the tracked entry may carry an - // untrack-capable correlation: an unsubscribe removed its entry above - // and has nothing to untrack, and a duplicate subscribe shares the one - // live entry, so letting its rejection untrack by value would drop the - // live subscription. + // untrack-capable correlation: an unsubscribe is handled terminally + // above and a duplicate subscribe shares the one live entry, so letting + // its rejection untrack by value would drop the live subscription. if newly_tracked { let mut pending = self .pending_subs @@ -2372,6 +2388,21 @@ impl StreamingClient { subs.retain(|(k, c)| !(k == &kind && c == contract)); } + // Evict any in-flight correlation for this identity. The removed entry + // is no longer live, so a late rejection of the subscribe that created + // it must not survive to untrack a future re-subscribe of the same + // `(kind, contract)` by value. + { + let mut pending = self + .pending_subs + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + io_loop::evict_pending_for_identity( + &mut pending, + &protocol::PendingSub::Contract(kind, contract.clone()), + ); + } + tracing::debug!( req_id, kind = ?kind, @@ -3425,6 +3456,140 @@ mod full_stream_guard_tests { client.shutdown(); } + /// Subscribe, unsubscribe, then re-subscribe the same contract, all before + /// the first subscribe's `REQ_RESPONSE` lands; a late rejection of that + /// superseded first request must not drop the re-subscribed live entry. + /// + /// The first subscribe owns the correlation for its `req_id`; the + /// unsubscribe both removes the tracked entry and evicts that correlation, + /// so when the re-subscribe re-adds the entry it owns a fresh correlation + /// under a new `req_id`. A server rejection of the obsolete first `req_id` + /// then finds no resident correlation and is a no-op, leaving the live + /// re-subscribed entry tracked and in the reconnect-replay set (which is a + /// clone of `active_subs`). + #[test] + fn unsub_resub_then_reject_superseded_keeps_live_sub() { + use super::io_loop::apply_req_response_for_test; + use crate::tdbe::types::enums::StreamResponseType; + + let client = StreamingClient::for_self_join_test( + 0, + 64, + HarnessPublishMode::BlockingPublish, + None, + |_event| {}, + ); + + // req_id counter starts at 1: subscribe -> 1, unsubscribe -> 2, + // re-subscribe -> 3. + let contract = Contract::stock("AAPL"); + client + .subscribe(contract.clone().trade()) + .expect("first subscribe"); + client + .unsubscribe(contract.clone().trade()) + .expect("unsubscribe"); + client + .subscribe(contract.clone().trade()) + .expect("re-subscribe"); + + // Exactly one resident correlation: the unsubscribe evicted the first + // subscribe's (req_id 1) correlation, and the re-subscribe registered a + // fresh one (req_id 3). + assert_eq!( + client + .pending_subs + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .len(), + 1, + "unsubscribe must evict the superseded correlation; only the \ + re-subscribe's correlation may remain" + ); + + // The server rejects the obsolete first req_id (1). With its + // correlation already evicted this is a no-op, not a value match that + // would drop the live re-subscribed entry. + apply_req_response_for_test( + &client.pending_subs, + &client.active_subs, + &client.active_full_subs, + 1, + StreamResponseType::MaxStreamsReached, + ); + + let tracked = client.active_subscriptions(); + assert_eq!( + tracked.len(), + 1, + "rejecting a superseded subscribe must leave the re-subscribed entry \ + tracked, got {tracked:?}" + ); + assert!( + tracked.iter().any(|(_, c)| *c == contract), + "the re-subscribed contract must remain in active_subscriptions() so \ + it is replayed on reconnect, got {tracked:?}" + ); + + client.shutdown(); + } + + /// The full-stream analogue: subscribe, unsubscribe, re-subscribe a + /// full-stream `(kind, sec_type)`, then reject the superseded first + /// request — the re-subscribed full-stream entry must stay tracked. + #[test] + fn full_unsub_resub_then_reject_superseded_keeps_live_sub() { + use super::io_loop::apply_req_response_for_test; + use crate::tdbe::types::enums::StreamResponseType; + + let client = StreamingClient::for_self_join_test( + 0, + 64, + HarnessPublishMode::BlockingPublish, + None, + |_event| {}, + ); + + // subscribe -> 1, unsubscribe -> 2, re-subscribe -> 3. + client + .subscribe(SecType::Stock.full_trades()) + .expect("first full subscribe"); + client + .unsubscribe(SecType::Stock.full_trades()) + .expect("full unsubscribe"); + client + .subscribe(SecType::Stock.full_trades()) + .expect("re-subscribe full"); + + assert_eq!( + client + .pending_subs + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .len(), + 1, + "full-stream unsubscribe must evict the superseded correlation" + ); + + apply_req_response_for_test( + &client.pending_subs, + &client.active_subs, + &client.active_full_subs, + 1, + StreamResponseType::MaxStreamsReached, + ); + + let tracked = client.active_full_subscriptions(); + assert_eq!( + tracked.len(), + 1, + "rejecting a superseded full-stream subscribe must leave the \ + re-subscribed entry tracked, got {tracked:?}" + ); + + client.shutdown(); + } + /// The command channel is bounded: once it is saturated, a further /// `try_send` reports `Full` rather than growing without limit. This /// pins the backpressure contract `send_cmd` relies on to surface a