Skip to content

fix: single-dispatcher SSOT for FPSS streaming (codex H1) #513

@userFRM

Description

@userFRM

Problem

External multi-model audit of v9.0.1 found that the documented "single-dispatcher SSOT" claim for FPSS streaming is false.

ThetaDataDx::start_streaming(callback) currently composes:

  1. Disruptor consumer thread -- crates/thetadatadx/src/fpss/io_loop/mod.rs:113 runs handle_events_with(...) against the LMAX ring; the consumer drains the ring and forwards each event to a crossbeam_channel::bounded(8192) (crates/thetadatadx/src/fpss/dispatcher.rs).
  2. StreamingDispatcher consumer thread -- spawns its own thread that drains the crossbeam channel and invokes the user callback inside std::panic::catch_unwind.

The shim in crates/thetadatadx/src/client.rs:226-238 clones every FpssEvent into the second queue:

move |event: &FpssEvent| {
    producer.send(event.clone());
}

Result: two queues, two consumer threads, full event clone per tick on the hot path. The "single dispatcher" wording in the SSOT docs is wrong.

Proposed solution (1-queue, validated by external audit)

A. Wire the user callback as the Disruptor's handle_events_with(...) closure directly. Delete StreamingDispatcher, crates/thetadatadx/src/fpss/dispatcher.rs, and the crossbeam-channel runtime dependency.

B. Wrap each callback invocation in std::panic::catch_unwind inside the new consumer closure -- the Disruptor consumer does not catch panics on its own, and the audit specifically called out that deleting StreamingDispatcher would otherwise eliminate the only panic-isolation path. A user panic must not silently kill the consumer thread.

C. Preserve the visible semantics the existing consumer guarantees:

  • Pending handshake controls + LoginSuccess are emitted before the main read loop (io_loop/mod.rs:127).
  • FpssEvent::Empty and FpssEvent::RawData are filtered at the consumer boundary (io_loop/mod.rs:115) and never reach the user callback.

D. Add an opt-in expert-mode inline path (start_streaming_inline) gated behind a Cargo feature flag. Document the forbidden operations explicitly:

  • Cannot call stop_streaming() / reconnect_streaming() from inside the callback (self-join deadlock against FpssClient::Drop joining io_handle from the same thread -- client.rs:470, fpss/mod.rs:871).
  • Cannot block: the same loop owns reads + outbound drain + ping, so a slow callback starves all three.

E. Drop the proposed start_streaming_with_executor(callback, executor) helper. tokio::spawn_blocking per tick is a latency / memory trap and muddies the synchronous FPSS path contract. Document the institutional pattern instead: the user wires their own bounded queue inside the callback.

F. Document the Python-specific buffering pattern in sdks/python/README.md:

  • collections.deque(maxlen=N) for lowest overhead (GIL-atomic single ops, ring-buffer drop-oldest semantics).
  • queue.Queue(maxsize=N) for cross-thread blocking get() + drop-newest semantics.

G. Drop telemetry: keep producer.try_publish failure count as the hard-loss metric, plus add lag-streak telemetry derived from the Disruptor producer-vs-consumer cursor delta.

H. Soak tests (crates/thetadatadx/tests/):

  • Slow callback under sustained load.
  • Panicking callback (verify consumer thread survives).
  • Callback-triggered stop_streaming() / reconnect_streaming() from the dispatcher path (verify no self-join).
  • Burst overload past ring capacity (verify drop counter increments correctly).

I. Re-bench end-to-end honestly. Before/after numbers in CHANGELOG.md and BENCHMARKS.md.

Impact

  • Removes ~388 LOC of dispatcher.rs plus the crossbeam-channel runtime dep.
  • Removes one queue + one full-event clone per tick on the hot path.
  • Makes the SSOT claim true for the first time.
  • Public API stays the same: start_streaming(callback) retains the same signature and same documented "callback must be fast" contract.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestrustPull requests that update rust code

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions