Problem
External multi-model audit of v9.0.1 found that the documented "single-dispatcher SSOT" claim for FPSS streaming is false.
ThetaDataDx::start_streaming(callback) currently composes:
- Disruptor consumer thread --
crates/thetadatadx/src/fpss/io_loop/mod.rs:113 runs handle_events_with(...) against the LMAX ring; the consumer drains the ring and forwards each event to a crossbeam_channel::bounded(8192) (crates/thetadatadx/src/fpss/dispatcher.rs).
StreamingDispatcher consumer thread -- spawns its own thread that drains the crossbeam channel and invokes the user callback inside std::panic::catch_unwind.
The shim in crates/thetadatadx/src/client.rs:226-238 clones every FpssEvent into the second queue:
move |event: &FpssEvent| {
producer.send(event.clone());
}
Result: two queues, two consumer threads, full event clone per tick on the hot path. The "single dispatcher" wording in the SSOT docs is wrong.
Proposed solution (1-queue, validated by external audit)
A. Wire the user callback as the Disruptor's handle_events_with(...) closure directly. Delete StreamingDispatcher, crates/thetadatadx/src/fpss/dispatcher.rs, and the crossbeam-channel runtime dependency.
B. Wrap each callback invocation in std::panic::catch_unwind inside the new consumer closure -- the Disruptor consumer does not catch panics on its own, and the audit specifically called out that deleting StreamingDispatcher would otherwise eliminate the only panic-isolation path. A user panic must not silently kill the consumer thread.
C. Preserve the visible semantics the existing consumer guarantees:
- Pending handshake controls +
LoginSuccess are emitted before the main read loop (io_loop/mod.rs:127).
FpssEvent::Empty and FpssEvent::RawData are filtered at the consumer boundary (io_loop/mod.rs:115) and never reach the user callback.
D. Add an opt-in expert-mode inline path (start_streaming_inline) gated behind a Cargo feature flag. Document the forbidden operations explicitly:
- Cannot call
stop_streaming() / reconnect_streaming() from inside the callback (self-join deadlock against FpssClient::Drop joining io_handle from the same thread -- client.rs:470, fpss/mod.rs:871).
- Cannot block: the same loop owns reads + outbound drain + ping, so a slow callback starves all three.
E. Drop the proposed start_streaming_with_executor(callback, executor) helper. tokio::spawn_blocking per tick is a latency / memory trap and muddies the synchronous FPSS path contract. Document the institutional pattern instead: the user wires their own bounded queue inside the callback.
F. Document the Python-specific buffering pattern in sdks/python/README.md:
collections.deque(maxlen=N) for lowest overhead (GIL-atomic single ops, ring-buffer drop-oldest semantics).
queue.Queue(maxsize=N) for cross-thread blocking get() + drop-newest semantics.
G. Drop telemetry: keep producer.try_publish failure count as the hard-loss metric, plus add lag-streak telemetry derived from the Disruptor producer-vs-consumer cursor delta.
H. Soak tests (crates/thetadatadx/tests/):
- Slow callback under sustained load.
- Panicking callback (verify consumer thread survives).
- Callback-triggered
stop_streaming() / reconnect_streaming() from the dispatcher path (verify no self-join).
- Burst overload past ring capacity (verify drop counter increments correctly).
I. Re-bench end-to-end honestly. Before/after numbers in CHANGELOG.md and BENCHMARKS.md.
Impact
- Removes ~388 LOC of
dispatcher.rs plus the crossbeam-channel runtime dep.
- Removes one queue + one full-event clone per tick on the hot path.
- Makes the SSOT claim true for the first time.
- Public API stays the same:
start_streaming(callback) retains the same signature and same documented "callback must be fast" contract.
Problem
External multi-model audit of v9.0.1 found that the documented "single-dispatcher SSOT" claim for FPSS streaming is false.
ThetaDataDx::start_streaming(callback)currently composes:crates/thetadatadx/src/fpss/io_loop/mod.rs:113runshandle_events_with(...)against the LMAX ring; the consumer drains the ring and forwards each event to acrossbeam_channel::bounded(8192)(crates/thetadatadx/src/fpss/dispatcher.rs).StreamingDispatcherconsumer thread -- spawns its own thread that drains the crossbeam channel and invokes the user callback insidestd::panic::catch_unwind.The shim in
crates/thetadatadx/src/client.rs:226-238clones everyFpssEventinto the second queue:Result: two queues, two consumer threads, full event clone per tick on the hot path. The "single dispatcher" wording in the SSOT docs is wrong.
Proposed solution (1-queue, validated by external audit)
A. Wire the user callback as the Disruptor's
handle_events_with(...)closure directly. DeleteStreamingDispatcher,crates/thetadatadx/src/fpss/dispatcher.rs, and thecrossbeam-channelruntime dependency.B. Wrap each callback invocation in
std::panic::catch_unwindinside the new consumer closure -- the Disruptor consumer does not catch panics on its own, and the audit specifically called out that deletingStreamingDispatcherwould otherwise eliminate the only panic-isolation path. A user panic must not silently kill the consumer thread.C. Preserve the visible semantics the existing consumer guarantees:
LoginSuccessare emitted before the main read loop (io_loop/mod.rs:127).FpssEvent::EmptyandFpssEvent::RawDataare filtered at the consumer boundary (io_loop/mod.rs:115) and never reach the user callback.D. Add an opt-in expert-mode inline path (
start_streaming_inline) gated behind a Cargo feature flag. Document the forbidden operations explicitly:stop_streaming()/reconnect_streaming()from inside the callback (self-join deadlock againstFpssClient::Dropjoiningio_handlefrom the same thread --client.rs:470,fpss/mod.rs:871).E. Drop the proposed
start_streaming_with_executor(callback, executor)helper.tokio::spawn_blockingper tick is a latency / memory trap and muddies the synchronous FPSS path contract. Document the institutional pattern instead: the user wires their own bounded queue inside the callback.F. Document the Python-specific buffering pattern in
sdks/python/README.md:collections.deque(maxlen=N)for lowest overhead (GIL-atomic single ops, ring-buffer drop-oldest semantics).queue.Queue(maxsize=N)for cross-thread blockingget()+ drop-newest semantics.G. Drop telemetry: keep
producer.try_publishfailure count as the hard-loss metric, plus add lag-streak telemetry derived from the Disruptor producer-vs-consumer cursor delta.H. Soak tests (
crates/thetadatadx/tests/):stop_streaming()/reconnect_streaming()from the dispatcher path (verify no self-join).I. Re-bench end-to-end honestly. Before/after numbers in
CHANGELOG.mdandBENCHMARKS.md.Impact
dispatcher.rsplus thecrossbeam-channelruntime dep.start_streaming(callback)retains the same signature and same documented "callback must be fast" contract.