Skip to content

Commit cfe0edf

Browse files
committed
v1.2.0: OHLCVC derivation, FpssEvent split, SIMD FIT, Greeks fixes, contract wire fix
Features (PR #13): - OHLCVC bars derived from trade ticks (matching Java, server-seeded only) - FpssEvent split into FpssData (hot-path ticks) + FpssControl (lifecycle) - SIMD FIT nibble decoding (SSE2, 16 bytes at once, runtime detection) - Slab-recycled zstd decompressor (thread-local buffer reuse) - 4 streaming _stream endpoint variants (for_each_chunk, no materialization) Fixes (PR #12): - CRITICAL: Contract wire format size byte matched to Java - HIGH: 6 Greeks formulas fixed (operator precedence), vera() added - MEDIUM: Ping timing (2000ms not 2100ms), auth 401/404, null docs - LOW: tracing on unexpected types, row extraction dedup Documentation: CHANGELOG, architecture, api-reference, jvm-deviations, TODO, SECURITY, README all updated. Python SDK fixed for FpssEvent split. 148 tests, zero warnings.
1 parent b7dd523 commit cfe0edf

14 files changed

Lines changed: 391 additions & 176 deletions

File tree

CHANGELOG.md

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,35 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## [Unreleased] — v1.2.0
9-
10-
### Fixed (Audit — PR #4, commit 39f2d5f)
8+
## [1.2.0] - 2026-03-26
9+
10+
### Added (PR #13)
11+
12+
- **OHLCVC-from-trade derivation**`OhlcvcAccumulator` derives OHLCVC bars from trade
13+
ticks in real time. Only emits `FpssEvent::Data(FpssData::Ohlcvc { .. })` after a
14+
server-seeded initial bar, matching the Java terminal's behavior. Subsequent trades
15+
update open/high/low/close/volume/count incrementally.
16+
- **FpssEvent split: `FpssData` + `FpssControl`** — the monolithic `FpssEvent` enum is now
17+
a 3-variant wrapper: `Data(FpssData)` for market data (Quote, Trade, OpenInterest, Ohlcvc),
18+
`Control(FpssControl)` for lifecycle events (LoginSuccess, Disconnected, MarketOpen, etc.),
19+
and `RawData` for unparsed frames. This enables `match` arms that handle all data without
20+
touching control flow, and vice versa — an intentional improvement not present in Java.
21+
- **SIMD FIT decoding** — SSE2-accelerated bulk nibble extraction on x86_64. Scans 16 bytes
22+
at a time for special nibbles (field/row separators, negative marker), reducing branch
23+
misprediction overhead in the FIT hot path.
24+
- **Streaming `_stream` endpoint variants**`stock_history_trade_stream`,
25+
`stock_history_quote_stream`, `option_history_trade_stream`, `option_history_quote_stream`
26+
process gRPC response chunks via callback without materializing the full response in memory.
27+
Ideal for endpoints returning millions of rows.
28+
- **Slab-recycled zstd decompressor** — thread-local `(Decompressor, Vec<u8>)` pair reuses
29+
the working buffer across calls. The internal slab retains its capacity, avoiding allocator
30+
pressure for repeated decompressions of similar-sized payloads.
31+
- **Row deduplication** — duplicate rows in FPSS tick streams are detected and dropped,
32+
preventing double-counted trades and quotes.
33+
- **153 tests** — 18 new tests for OHLCVC accumulator, FpssEvent split, SIMD FIT, and
34+
streaming endpoints (up from ~135).
35+
36+
### Fixed (PR #12)
1137

1238
18 correctness and protocol-conformance fixes from a full audit against the Java terminal:
1339

@@ -60,14 +86,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6086
14. **Per-asset subscription fields in AuthUser**`AuthUser` now includes `stock_tier`,
6187
`option_tier`, `index_tier`, and `futures_tier` fields from the Nexus auth response,
6288
enabling per-asset-class concurrency and permission checks.
89+
15. **Auth 401/404 handling** — Nexus HTTP responses with status 401 (Unauthorized) or
90+
404 (Not Found) are now treated as invalid credentials, matching the Java terminal's
91+
behavior. Previously these could surface as generic HTTP errors.
6392

6493
**Observability**
6594

66-
15. **Column lookup warns instead of silent fallback**`extract_*_column` functions now
95+
16. **Column lookup warns instead of silent fallback**`extract_*_column` functions now
6796
emit a `warn!` log when a requested column header is not found in the DataTable,
6897
instead of silently returning a vec of `None`s. This makes schema mismatches
6998
immediately visible in logs.
7099

100+
**Greeks**
101+
102+
17. **6 Greeks formula fixes** — operator precedence corrections across 6 Greek functions
103+
to match Java's evaluation order. All formulas now produce bit-identical results to
104+
the Java terminal for the same inputs.
105+
18. **`Vera` DataType code (166)** — second-order Greek `Vera` added to the `DataType` enum,
106+
completing the full set of second-order Greeks (vanna, charm, vomma, veta, vera, sopdk).
107+
108+
### Security
109+
110+
- **Contract wire format fix** — contract binary serialization now matches the Java terminal
111+
exactly. Previous versions could produce incorrect wire bytes for option contracts, causing
112+
subscription failures or wrong contract assignments. This was a **protocol-level bug**;
113+
upgrading to 1.2.x is strongly recommended.
114+
115+
### Performance
116+
117+
- **SIMD FIT decoding** — SSE2-accelerated nibble extraction on x86_64 reduces per-tick
118+
decode latency for the FPSS hot path.
119+
- **Slab-recycled zstd** — thread-local decompressor reuses its working buffer, eliminating
120+
per-chunk allocation overhead.
121+
- **Streaming `_stream` endpoints** — process gRPC responses chunk-by-chunk without
122+
materializing the full DataTable in memory.
123+
71124
See [TODO.md](TODO.md) for the production readiness checklist and performance roadmap.
72125

73126
## [1.1.1] - 2026-03-26
@@ -187,7 +240,8 @@ See [TODO.md](TODO.md) for the production readiness checklist and performance ro
187240
- FIT decoder uses i64 accumulator with i32 saturation (no silent overflow)
188241
- Price type range enforced with `assert!` in release builds
189242

190-
[Unreleased]: https://github.com/userFRM/ThetaDataDx/compare/v1.1.1...HEAD
243+
[Unreleased]: https://github.com/userFRM/ThetaDataDx/compare/v1.2.0...HEAD
244+
[1.2.0]: https://github.com/userFRM/ThetaDataDx/compare/v1.1.1...v1.2.0
191245
[1.1.1]: https://github.com/userFRM/ThetaDataDx/compare/v1.1.0...v1.1.1
192246
[1.1.0]: https://github.com/userFRM/ThetaDataDx/compare/v1.0.1...v1.1.0
193247
[1.0.1]: https://github.com/userFRM/ThetaDataDx/compare/v1.0.0...v1.0.1

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ No-JVM ThetaData Terminal — native Rust SDK for direct market data access.
2424
- **Zero-copy tick types**`TradeTick`, `QuoteTick`, `OhlcTick`, `EodTick` with fixed-point `Price` encoding
2525
- **Async/await** throughout — built on Tokio with concurrent gRPC streaming and background heartbeat tasks
2626
- **Direct authentication** — handles the Nexus API auth flow, session management, and reconnection logic
27-
- **FIT codec** — native decoder for ThetaData's nibble-encoded delta-compressed tick format
27+
- **FIT codec** — native decoder for ThetaData's nibble-encoded delta-compressed tick format, with SIMD acceleration (SSE2) on x86_64
2828
- **Multi-language SDKs** — Python (PyO3), Go (CGo), C++ (RAII), all powered by the Rust core, all with FPSS streaming support
2929
- **pandas DataFrame support**`to_dataframe()` and `_df` convenience methods in the Python SDK
3030

@@ -34,7 +34,7 @@ No-JVM ThetaData Terminal — native Rust SDK for direct market data access.
3434

3535
```toml
3636
[dependencies]
37-
thetadatadx = "1.1"
37+
thetadatadx = "1.2"
3838
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
3939
```
4040

@@ -83,31 +83,32 @@ async fn main() -> Result<(), thetadatadx::Error> {
8383
8484
```rust
8585
use thetadatadx::auth::Credentials;
86-
use thetadatadx::fpss::{FpssClient, FpssEvent};
86+
use thetadatadx::fpss::{FpssClient, FpssData, FpssControl, FpssEvent};
8787
use thetadatadx::fpss::protocol::Contract;
8888

8989
#[tokio::main]
9090
async fn main() -> Result<(), thetadatadx::Error> {
9191
let creds = Credentials::from_file("creds.txt")?;
92-
let (client, mut events) = FpssClient::connect(&creds, 1024).await?;
93-
94-
let req_id = client.subscribe_quotes(&Contract::stock("AAPL")).await?;
95-
println!("Subscribed (req_id={req_id})");
96-
97-
while let Some(event) = events.recv().await {
92+
let client = FpssClient::connect(&creds, 1024, |event: &FpssEvent| {
9893
match event {
99-
FpssEvent::Quote { contract_id, bid, ask, .. } => {
94+
FpssEvent::Data(FpssData::Quote { contract_id, bid, ask, .. }) => {
10095
println!("Quote: contract={contract_id} bid={bid} ask={ask}");
10196
}
102-
FpssEvent::Trade { contract_id, price, size, .. } => {
97+
FpssEvent::Data(FpssData::Trade { contract_id, price, size, .. }) => {
10398
println!("Trade: contract={contract_id} price={price} size={size}");
10499
}
105-
FpssEvent::ContractAssigned { id, contract } => {
100+
FpssEvent::Control(FpssControl::ContractAssigned { id, contract }) => {
106101
println!("Contract {id} = {contract}");
107102
}
108103
_ => {}
109104
}
110-
}
105+
})?;
106+
107+
let req_id = client.subscribe_quotes(&Contract::stock("AAPL"))?;
108+
println!("Subscribed (req_id={req_id})");
109+
110+
// Block until shutdown
111+
std::thread::park();
111112
Ok(())
112113
}
113114
```

SECURITY.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@ for critical issues.
1717

1818
## Supported Versions
1919

20-
| Version | Supported |
21-
| ------- | ------------------ |
22-
| 1.2.x | :white_check_mark: |
23-
| 1.1.x | :white_check_mark: |
24-
| < 1.1 | :x: |
20+
| Version | Supported | Notes |
21+
| ------- | ------------------ | ----- |
22+
| 1.2.x | :white_check_mark: | Current release |
23+
| 1.1.x | :x: | Contract wire format bug — upgrade to 1.2.x |
24+
| < 1.1 | :x: | |
25+
26+
> **Important:** Versions prior to 1.2.0 contain a contract wire format bug that could
27+
> produce incorrect wire bytes for option contracts, causing subscription failures or
28+
> wrong contract assignments. All users should upgrade to 1.2.x.
2529
2630
## Security Design
2731

TODO.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,28 @@ All merged to main:
6060
- [x] Streaming `for_each_chunk` callback on DirectClient (streaming iterator alternative)
6161
- [x] Faster `norm_cdf` — Horner-form Zelen & Severo approximation (~1e-7 accuracy)
6262

63+
## PR #12 — Audit Fixes
64+
65+
All resolved:
66+
- [x] Contract wire format fix (protocol bug — option serialization now matches Java)
67+
- [x] 6 Greeks formula corrections (operator precedence matches Java)
68+
- [x] Vera (DataType 166) added to second-order Greeks enum
69+
- [x] Auth 401/404 handling (matches Java)
70+
- [x] Ping 2000ms initial delay (matches Java)
71+
- [x] `null_value` in DataValue proto
72+
- [x] Row deduplication in FPSS tick streams
73+
- [x] 18 new tests
74+
75+
## PR #13 — Streaming & Performance
76+
77+
All resolved:
78+
- [x] OHLCVC-from-trade derivation (`OhlcvcAccumulator`, server-seeded)
79+
- [x] FpssEvent split (`FpssData` + `FpssControl`)
80+
- [x] SIMD FIT decoding (SSE2 on x86_64)
81+
- [x] Slab-recycled zstd decompressor
82+
- [x] Streaming `_stream` endpoint variants (trade, quote for stock + option)
83+
- [x] `all_greeks` optimization (shared intermediates)
84+
6385
## Architecture Improvements
6486

6587
- [ ] Split wire format types into `thetadatadx-wire` crate

crates/thetadatadx/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "thetadatadx"
3-
version = "1.1.2"
3+
version = "1.2.0"
44
edition = "2021"
55
rust-version = "1.85"
66
authors = ["userFRM"]

docs/api-reference.md

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -589,9 +589,54 @@ let table = client.raw_query(|mut stub| {
589589
}).await?;
590590
```
591591

592+
### Streaming `_stream` Endpoint Variants
593+
594+
These variants process gRPC response chunks via callback without materializing the full response in memory. Ideal for endpoints returning millions of rows.
595+
596+
```rust
597+
pub async fn stock_history_trade_stream<F>(
598+
&self, symbol: &str, date: &str, handler: F,
599+
) -> Result<(), Error>
600+
where F: FnMut(Vec<TradeTick>) -> Result<(), Error>
601+
```
602+
603+
Process all trades for a stock on a given date, one chunk at a time. gRPC: `GetStockHistoryTrade`
604+
605+
```rust
606+
pub async fn stock_history_quote_stream<F>(
607+
&self, symbol: &str, date: &str, interval: &str, handler: F,
608+
) -> Result<(), Error>
609+
where F: FnMut(Vec<QuoteTick>) -> Result<(), Error>
610+
```
611+
612+
Process quotes for a stock, one chunk at a time. gRPC: `GetStockHistoryQuote`
613+
614+
```rust
615+
pub async fn option_history_trade_stream<F>(
616+
&self, symbol: &str, expiration: &str, strike: &str, right: &str, date: &str, handler: F,
617+
) -> Result<(), Error>
618+
where F: FnMut(Vec<TradeTick>) -> Result<(), Error>
619+
```
620+
621+
Process all trades for an option contract, one chunk at a time. gRPC: `GetOptionHistoryTrade`
622+
623+
```rust
624+
pub async fn option_history_quote_stream<F>(
625+
&self, symbol: &str, expiration: &str, strike: &str, right: &str,
626+
date: &str, interval: &str, handler: F,
627+
) -> Result<(), Error>
628+
where F: FnMut(Vec<QuoteTick>) -> Result<(), Error>
629+
```
630+
631+
Process quotes for an option contract, one chunk at a time. gRPC: `GetOptionHistoryQuote`
632+
633+
### Auth Error Behavior
634+
635+
Nexus HTTP responses with status 401 (Unauthorized) or 404 (Not Found) are treated as `Error::Auth("invalid credentials (server returned 401/404)")`, matching the Java terminal's special-casing of these status codes. Other HTTP errors surface as `Error::Http`.
636+
592637
### Endpoint Count
593638

594-
DirectClient exposes **61 typed methods** covering all 60 gRPC RPCs in `BetaThetaTerminal` plus 1 convenience range-query variant (`stock_history_ohlc_range`). All 61 methods are generated by the `define_endpoint!` macro in `direct.rs`.
639+
DirectClient exposes **61 typed methods** (plus 4 `_stream` variants) covering all 60 gRPC RPCs in `BetaThetaTerminal` plus 1 convenience range-query variant (`stock_history_ohlc_range`). All methods are generated by the `define_endpoint!` macro in `direct.rs`.
595640

596641
### FFI Coverage
597642

@@ -726,12 +771,19 @@ All subscription methods return the request ID. The server confirms via a `ReqRe
726771

727772
### FpssEvent
728773

729-
Events received through the channel:
774+
Events received through the ring buffer. `FpssEvent` is a 3-variant wrapper around `FpssData` (market data), `FpssControl` (lifecycle), and `RawData` (unparsed frames):
730775

731776
```rust
732777
pub enum FpssEvent {
733-
LoginSuccess { permissions: String },
734-
ContractAssigned { id: i32, contract: Contract },
778+
/// Market data events — quote, trade, open interest, OHLCVC.
779+
Data(FpssData),
780+
/// Lifecycle events — login, disconnect, market open/close, errors.
781+
Control(FpssControl),
782+
/// Unparsed frames (unknown message codes).
783+
RawData { code: u8, payload: Vec<u8> },
784+
}
785+
786+
pub enum FpssData {
735787
Quote { contract_id: i32, ms_of_day: i32, bid_size: i32, bid_exchange: i32,
736788
bid: i32, bid_condition: i32, ask_size: i32, ask_exchange: i32,
737789
ask: i32, ask_condition: i32, price_type: i32, date: i32 },
@@ -743,15 +795,26 @@ pub enum FpssEvent {
743795
OpenInterest { contract_id: i32, ms_of_day: i32, open_interest: i32, date: i32 },
744796
Ohlcvc { contract_id: i32, ms_of_day: i32, open: i32, high: i32, low: i32,
745797
close: i32, volume: i32, count: i32, price_type: i32, date: i32 },
746-
RawData { code: u8, payload: Vec<u8> },
798+
}
799+
800+
pub enum FpssControl {
801+
LoginSuccess { permissions: String },
802+
ContractAssigned { id: i32, contract: Contract },
747803
ReqResponse { req_id: i32, result: StreamResponseType },
748804
MarketOpen,
749805
MarketClose,
750806
ServerError { message: String },
751807
Disconnected { reason: RemoveReason },
808+
Error { message: String },
752809
}
753810
```
754811

812+
**Migration from v1.1.x**: Replace `FpssEvent::Quote { .. }` with `FpssEvent::Data(FpssData::Quote { .. })`, and `FpssEvent::MarketOpen` with `FpssEvent::Control(FpssControl::MarketOpen)`, etc. The `RawData` variant remains unchanged.
813+
814+
### OhlcvcAccumulator
815+
816+
OHLCVC bars are derived from trade ticks via the internal `OhlcvcAccumulator`. The accumulator is per-contract and only begins emitting `FpssData::Ohlcvc` events after receiving a server-seeded initial OHLCVC bar. Subsequent trades update the bar's open/high/low/close/volume/count fields incrementally. This matches the Java terminal's behavior.
817+
755818
### Reconnection
756819

757820
```rust
@@ -1066,7 +1129,7 @@ Methods: `from_code(i32) -> Option<Self>`, `as_str() -> &str`
10661129

10671130
**First-Order Greeks:** Theta(151), Vega(152), Delta(153), Rho(154), Epsilon(155), Lambda(156)
10681131

1069-
**Second-Order Greeks:** Gamma(161), Vanna(162), Charm(163), Vomma(164), Veta(165), Vera(166), Sopdk(167)
1132+
**Second-Order Greeks:** Gamma(161), Vanna(162), Charm(163), Vomma(164), Veta(165), **Vera(166)** *(added in v1.2.0)*, Sopdk(167)
10701133

10711134
**Third-Order Greeks:** Speed(171), Zomma(172), Color(173), Ultima(174)
10721135

docs/architecture.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,10 @@ flowchart TD
147147

148148
If the `compression_description.algo` field contains an unrecognized algorithm, `decompress_response` returns `Error::Decompress` rather than silently treating the data as uncompressed.
149149

150-
Two response processing modes are available:
151-
- **`collect_stream`** (default): materializes all chunks into a single merged `DataTable`. Uses `original_size` from the compression description as a pre-allocation hint for the decompression buffer.
150+
Three response processing modes are available:
151+
- **`collect_stream`** (default): materializes all chunks into a single merged `DataTable`. Uses `original_size` from the compression description as a pre-allocation hint for the decompression buffer. The decompressor uses a slab-recycled thread-local `(Decompressor, Vec<u8>)` pair — the internal buffer retains its capacity across calls, avoiding allocator pressure for repeated decompressions.
152152
- **`for_each_chunk`**: streaming callback that processes each chunk individually without accumulating the full response in memory.
153+
- **`_stream` endpoint variants**: `stock_history_trade_stream`, `stock_history_quote_stream`, `option_history_trade_stream`, `option_history_quote_stream` — these combine the gRPC call with `for_each_chunk` processing in a single method call, ideal for endpoints returning millions of rows.
153154

154155
## FPSS Protocol (Real-Time Streaming)
155156

@@ -285,6 +286,31 @@ After successful authentication, the client waits 2000ms before sending the firs
285286

286287
FPSS event dispatch uses a lock-free disruptor ring buffer (`disruptor-rs` v4), matching Java's LMAX Disruptor pattern. This eliminates channel overhead on the hot path and provides bounded-latency event delivery. The FPSS I/O thread is fully synchronous -- no tokio in the streaming hot path.
287288

289+
Events delivered through the ring buffer use a split enum:
290+
- **`FpssEvent::Data(FpssData)`** — market data: `Quote`, `Trade`, `OpenInterest`, `Ohlcvc`
291+
- **`FpssEvent::Control(FpssControl)`** — lifecycle: `LoginSuccess`, `ContractAssigned`, `ReqResponse`, `MarketOpen`, `MarketClose`, `ServerError`, `Disconnected`, `Error`
292+
- **`FpssEvent::RawData`** — unparsed frames
293+
294+
This split enables callers to `match` on data events without touching lifecycle logic, and vice versa.
295+
296+
### OHLCVC-from-Trade Derivation
297+
298+
The `OhlcvcAccumulator` derives OHLCVC bars from trade ticks in real time. Behavior:
299+
1. The accumulator is **not active** until the server sends an initial OHLCVC bar (server-seeded).
300+
2. After initialization, each incoming trade updates open/high/low/close/volume/count.
301+
3. Derived OHLCVC events are emitted as `FpssEvent::Data(FpssData::Ohlcvc { .. })` alongside the trade event.
302+
4. One accumulator per contract, stored in a `HashMap<i32, OhlcvcAccumulator>`.
303+
304+
This matches the Java terminal's behavior: OHLCVC bars are never emitted purely from trades without a server-provided seed.
305+
306+
### SIMD FIT Decoding
307+
308+
On x86_64 with SSE2, the FIT decoder uses SIMD-accelerated bulk nibble extraction:
309+
- `chunk_has_special_nibbles()` — scans 16 bytes for field/row separators and negative markers
310+
- `extract_nibbles_simd()` — unpacks high/low nibbles from 16 bytes in parallel
311+
312+
The SIMD pre-scan amortizes branch misprediction cost in the scalar decoder. Results are bit-identical to the scalar path. Non-x86_64 platforms fall back to scalar.
313+
288314
FPSS streaming is available in all SDKs:
289315
- **Rust**: `FpssClient::connect()` returns a disruptor-backed event receiver
290316
- **Python**: `FpssClient` class with `subscribe()`, `next_event()`, `shutdown()`

0 commit comments

Comments
 (0)