Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions crates/thetadatadx/src/fpss/io_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,28 @@ pub(in crate::fpss) fn evict_stale_pending(map: &mut HashMap<i32, PendingSubEntr
);
}
}

/// Drop the in-flight subscribe correlation(s) for one tracked identity.
///
/// A pending correlation is only an authority to untrack while the tracked
/// entry it created is still live. Once that entry leaves the tracked set — an
/// unsubscribe removes it — the correlation points at nothing, so a later
/// rejection of its (now superseded) `req_id` must not act on the set: the
/// `(kind, contract)` slot may since have been re-subscribed into a new live
/// entry that a value match would wrongly drop. Removing the correlation at the
/// unsubscribe boundary keeps the invariant that at most one resident
/// correlation per identity exists and it always names the current live entry,
/// so an `apply_req_response` rejection can untrack purely by `req_id` lookup.
///
/// The map is keyed by `req_id`, so identity removal is a single retain pass.
/// The caller holds the `pending_subs` lock; this touches only in-memory map
/// state and performs no I/O.
pub(in crate::fpss) fn evict_pending_for_identity(
map: &mut HashMap<i32, PendingSubEntry>,
identity: &super::protocol::PendingSub,
) {
map.retain(|_, entry| &entry.sub != identity);
}
type ActiveFullSubs = Arc<
Mutex<
Vec<(
Expand Down Expand Up @@ -280,6 +302,16 @@ fn host_index(hosts: &[(String, u16)], addr: &str) -> Option<usize> {
/// id from a span longer than the 31-bit counter cycle) leaves the tracked set
/// untouched: with no correlation, untracking would risk dropping a healthy
/// subscription, so the conservative choice is to keep it.
///
/// The `req_id` lookup is a safe untrack because the pending registry holds the
/// invariant that at most one correlation per `(kind, contract)` (or `(kind,
/// sec_type)`) identity is resident, and it always names the current live
/// entry. A duplicate subscribe shares the live entry and registers no second
/// correlation; an unsubscribe removes the entry and evicts its correlation
/// (see [`evict_pending_for_identity`]). So a subscribe that is superseded by an
/// unsubscribe + re-subscribe of the same identity has no resident correlation
/// for its old `req_id`, and a late rejection of that id is a no-op rather than
/// a value match that would drop the re-subscribed live entry.
fn apply_req_response(
req_id: i32,
result: StreamResponseType,
Expand Down
183 changes: 174 additions & 9 deletions crates/thetadatadx/src/fpss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2236,15 +2236,32 @@ impl StreamingClient {
unsubscribe,
"sent full-stream subscription frame"
);
// Track / untrack for reconnection.
// Untrack-on-unsubscribe is terminal: remove the tracked entry, then
// evict the in-flight correlation for this identity for the same reason
// as the per-contract unsubscribe — the removed entry is no longer live,
// so a late rejection of its subscribe must not survive to untrack a
// future re-subscribe of the same `(kind, sec_type)` by value.
if unsubscribe {
self.active_full_subs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.retain(|(k, s)| !(*k == kind_for_track && *s == sec_type));
io_loop::evict_pending_for_identity(
&mut self
.pending_subs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner),
&protocol::PendingSub::Full(kind_for_track, sec_type),
);
return Ok(());
}

// Track for reconnection.
let mut subs = self
.active_full_subs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let newly_tracked = if unsubscribe {
subs.retain(|(k, s)| !(*k == kind_for_track && *s == sec_type));
false
} else if subs
let newly_tracked = if subs
.iter()
.any(|(k, s)| *k == kind_for_track && *s == sec_type)
{
Expand All @@ -2261,10 +2278,9 @@ impl StreamingClient {
// Record the pending full-stream subscribe by `req_id` so a server
// rejection drops exactly this entry from the replay set. Only the
// subscribe that actually added the tracked entry may carry an
// untrack-capable correlation: an unsubscribe removed its entry above
// and has nothing to untrack, and a duplicate subscribe shares the one
// live entry, so letting its rejection untrack by value would drop the
// live subscription.
// untrack-capable correlation: an unsubscribe is handled terminally
// above and a duplicate subscribe shares the one live entry, so letting
// its rejection untrack by value would drop the live subscription.
if newly_tracked {
let mut pending = self
.pending_subs
Expand Down Expand Up @@ -2372,6 +2388,21 @@ impl StreamingClient {
subs.retain(|(k, c)| !(k == &kind && c == contract));
}

// Evict any in-flight correlation for this identity. The removed entry
// is no longer live, so a late rejection of the subscribe that created
// it must not survive to untrack a future re-subscribe of the same
// `(kind, contract)` by value.
{
let mut pending = self
.pending_subs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
io_loop::evict_pending_for_identity(
&mut pending,
&protocol::PendingSub::Contract(kind, contract.clone()),
);
}

tracing::debug!(
req_id,
kind = ?kind,
Expand Down Expand Up @@ -3425,6 +3456,140 @@ mod full_stream_guard_tests {
client.shutdown();
}

/// Subscribe, unsubscribe, then re-subscribe the same contract, all before
/// the first subscribe's `REQ_RESPONSE` lands; a late rejection of that
/// superseded first request must not drop the re-subscribed live entry.
///
/// The first subscribe owns the correlation for its `req_id`; the
/// unsubscribe both removes the tracked entry and evicts that correlation,
/// so when the re-subscribe re-adds the entry it owns a fresh correlation
/// under a new `req_id`. A server rejection of the obsolete first `req_id`
/// then finds no resident correlation and is a no-op, leaving the live
/// re-subscribed entry tracked and in the reconnect-replay set (which is a
/// clone of `active_subs`).
#[test]
fn unsub_resub_then_reject_superseded_keeps_live_sub() {
use super::io_loop::apply_req_response_for_test;
use crate::tdbe::types::enums::StreamResponseType;

let client = StreamingClient::for_self_join_test(
0,
64,
HarnessPublishMode::BlockingPublish,
None,
|_event| {},
);

// req_id counter starts at 1: subscribe -> 1, unsubscribe -> 2,
// re-subscribe -> 3.
let contract = Contract::stock("AAPL");
client
.subscribe(contract.clone().trade())
.expect("first subscribe");
client
.unsubscribe(contract.clone().trade())
.expect("unsubscribe");
client
.subscribe(contract.clone().trade())
.expect("re-subscribe");

// Exactly one resident correlation: the unsubscribe evicted the first
// subscribe's (req_id 1) correlation, and the re-subscribe registered a
// fresh one (req_id 3).
assert_eq!(
client
.pending_subs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.len(),
1,
"unsubscribe must evict the superseded correlation; only the \
re-subscribe's correlation may remain"
);

// The server rejects the obsolete first req_id (1). With its
// correlation already evicted this is a no-op, not a value match that
// would drop the live re-subscribed entry.
apply_req_response_for_test(
&client.pending_subs,
&client.active_subs,
&client.active_full_subs,
1,
StreamResponseType::MaxStreamsReached,
);

let tracked = client.active_subscriptions();
assert_eq!(
tracked.len(),
1,
"rejecting a superseded subscribe must leave the re-subscribed entry \
tracked, got {tracked:?}"
);
assert!(
tracked.iter().any(|(_, c)| *c == contract),
"the re-subscribed contract must remain in active_subscriptions() so \
it is replayed on reconnect, got {tracked:?}"
);

client.shutdown();
}

/// The full-stream analogue: subscribe, unsubscribe, re-subscribe a
/// full-stream `(kind, sec_type)`, then reject the superseded first
/// request — the re-subscribed full-stream entry must stay tracked.
#[test]
fn full_unsub_resub_then_reject_superseded_keeps_live_sub() {
use super::io_loop::apply_req_response_for_test;
use crate::tdbe::types::enums::StreamResponseType;

let client = StreamingClient::for_self_join_test(
0,
64,
HarnessPublishMode::BlockingPublish,
None,
|_event| {},
);

// subscribe -> 1, unsubscribe -> 2, re-subscribe -> 3.
client
.subscribe(SecType::Stock.full_trades())
.expect("first full subscribe");
client
.unsubscribe(SecType::Stock.full_trades())
.expect("full unsubscribe");
client
.subscribe(SecType::Stock.full_trades())
.expect("re-subscribe full");

assert_eq!(
client
.pending_subs
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.len(),
1,
"full-stream unsubscribe must evict the superseded correlation"
);

apply_req_response_for_test(
&client.pending_subs,
&client.active_subs,
&client.active_full_subs,
1,
StreamResponseType::MaxStreamsReached,
);

let tracked = client.active_full_subscriptions();
assert_eq!(
tracked.len(),
1,
"rejecting a superseded full-stream subscribe must leave the \
re-subscribed entry tracked, got {tracked:?}"
);

client.shutdown();
}

/// The command channel is bounded: once it is saturated, a further
/// `try_send` reports `Full` rather than growing without limit. This
/// pins the backpressure contract `send_cmd` relies on to surface a
Expand Down
Loading