Skip to content

Commit b7dd523

Browse files
authored
feat: OHLCVC derivation, FpssEvent split, SIMD FIT, slab zstd, streaming endpoints (#13)
Closes #5 #6 #8 #9 #10. Codex-reviewed: OHLCVC only fires when server-seeded (matching Java), streaming headers preserved across chunks. 148 tests.
1 parent f6a8c77 commit b7dd523

9 files changed

Lines changed: 879 additions & 394 deletions

File tree

crates/thetadatadx/benches/bench.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use criterion::{black_box, criterion_group, criterion_main, Criterion};
22

3+
use thetadatadx::codec::decode_fit_buffer_bulk;
34
use thetadatadx::codec::fie::{string_to_fie_line, try_string_to_fie_line};
45
use thetadatadx::codec::fit::{apply_deltas, FitReader};
56
use thetadatadx::greeks;
@@ -15,7 +16,6 @@ fn pack(high: u8, low: u8) -> u8 {
1516
const FIELD_SEP: u8 = 0xB;
1617
const ROW_SEP: u8 = 0xC;
1718
const END: u8 = 0xD;
18-
const NEGATIVE: u8 = 0xE;
1919

2020
/// Build a FIT buffer containing `n_rows` of realistic trade-tick-shaped data.
2121
fn build_fit_buffer(n_rows: usize) -> Vec<u8> {
@@ -78,6 +78,44 @@ fn bench_fit_decode_100_rows(c: &mut Criterion) {
7878
});
7979
}
8080

81+
fn bench_fit_decode_1000_rows_scalar(c: &mut Criterion) {
82+
let buf = build_fit_buffer(1000);
83+
84+
c.bench_function("fit_decode_1000_rows_scalar", |b| {
85+
b.iter(|| {
86+
let mut reader = FitReader::new(black_box(&buf));
87+
let mut alloc = [0i32; 32];
88+
let mut prev = [0i32; 32];
89+
let mut first = true;
90+
while !reader.is_exhausted() {
91+
let n = reader.read_changes(&mut alloc);
92+
if n == 0 {
93+
continue;
94+
}
95+
if first {
96+
prev.copy_from_slice(&alloc);
97+
first = false;
98+
} else {
99+
apply_deltas(&mut alloc, &prev, n);
100+
prev.copy_from_slice(&alloc);
101+
}
102+
}
103+
black_box(&prev);
104+
});
105+
});
106+
}
107+
108+
fn bench_fit_decode_1000_rows_simd(c: &mut Criterion) {
109+
let buf = build_fit_buffer(1000);
110+
111+
c.bench_function("fit_decode_1000_rows_simd_bulk", |b| {
112+
b.iter(|| {
113+
let rows = decode_fit_buffer_bulk(black_box(&buf), 32);
114+
black_box(&rows);
115+
});
116+
});
117+
}
118+
81119
fn bench_price_to_f64_1000(c: &mut Criterion) {
82120
let prices: Vec<Price> = (0..1000).map(|i| Price::new(15025 + i, 8)).collect();
83121

@@ -125,6 +163,41 @@ fn bench_all_greeks(c: &mut Criterion) {
125163
});
126164
}
127165

166+
fn bench_all_greeks_individual(c: &mut Criterion) {
167+
c.bench_function("all_greeks_individual", |b| {
168+
b.iter(|| {
169+
let s = black_box(150.0);
170+
let x = black_box(155.0);
171+
let r = black_box(0.05);
172+
let q = black_box(0.015);
173+
let t = black_box(45.0 / 365.0);
174+
let v = 0.22;
175+
// Call each Greek individually (no shared intermediates)
176+
let val = greeks::value(s, x, v, r, q, t, true);
177+
let d = greeks::delta(s, x, v, r, q, t, true);
178+
let g = greeks::gamma(s, x, v, r, q, t);
179+
let th = greeks::theta(s, x, v, r, q, t, true);
180+
let ve = greeks::vega(s, x, v, r, q, t);
181+
let rh = greeks::rho(s, x, v, r, q, t, true);
182+
let ep = greeks::epsilon(s, x, v, r, q, t, true);
183+
let la = greeks::lambda(s, x, v, r, q, t, true);
184+
let va = greeks::vanna(s, x, v, r, q, t);
185+
let ch = greeks::charm(s, x, v, r, q, t, true);
186+
let vo = greeks::vomma(s, x, v, r, q, t);
187+
let vt = greeks::veta(s, x, v, r, q, t);
188+
let sp = greeks::speed(s, x, v, r, q, t);
189+
let zo = greeks::zomma(s, x, v, r, q, t);
190+
let co = greeks::color(s, x, v, r, q, t);
191+
let ul = greeks::ultima(s, x, v, r, q, t);
192+
let dd = greeks::dual_delta(s, x, v, r, q, t, true);
193+
let dg = greeks::dual_gamma(s, x, v, r, q, t);
194+
black_box((
195+
val, d, g, th, ve, rh, ep, la, va, ch, vo, vt, sp, zo, co, ul, dd, dg,
196+
));
197+
});
198+
});
199+
}
200+
128201
fn bench_fie_encode(c: &mut Criterion) {
129202
let input = "21,0,1,0,20240315,0,15000";
130203

@@ -148,9 +221,12 @@ fn bench_fie_try_encode(c: &mut Criterion) {
148221
criterion_group!(
149222
benches,
150223
bench_fit_decode_100_rows,
224+
bench_fit_decode_1000_rows_scalar,
225+
bench_fit_decode_1000_rows_simd,
151226
bench_price_to_f64_1000,
152227
bench_price_compare_1000,
153228
bench_all_greeks,
229+
bench_all_greeks_individual,
154230
bench_fie_encode,
155231
bench_fie_try_encode,
156232
);

crates/thetadatadx/src/codec/fit.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,164 @@ const MAX_DIGITS: usize = 10;
3939
/// DATE marker byte (0xCE as unsigned). In Java's signed byte world this is -50.
4040
const DATE_MARKER: u8 = 0xCE;
4141

42+
// ═══════════════════════════════════════════════════════════════════════
43+
// SIMD-accelerated bulk nibble extraction (x86_64 SSE2)
44+
// ═══════════════════════════════════════════════════════════════════════
45+
46+
/// Decode a FIT buffer in bulk, returning all rows as `Vec<Vec<i32>>`.
47+
///
48+
/// This is a higher-level convenience that reads all rows from `buf` and
49+
/// applies delta decompression, returning absolute values per row. Uses
50+
/// SIMD-accelerated scanning on x86_64 when SSE2 is available.
51+
///
52+
/// Each inner `Vec<i32>` has exactly `fields_per_row` elements (zero-padded).
53+
pub fn decode_fit_buffer_bulk(buf: &[u8], fields_per_row: usize) -> Vec<Vec<i32>> {
54+
#[cfg(target_arch = "x86_64")]
55+
{
56+
if is_x86_feature_detected!("sse2") {
57+
// Safety: we just checked for SSE2 support.
58+
return unsafe { decode_fit_buffer_bulk_sse2(buf, fields_per_row) };
59+
}
60+
}
61+
decode_fit_buffer_bulk_scalar(buf, fields_per_row)
62+
}
63+
64+
/// Scalar fallback for `decode_fit_buffer_bulk`.
65+
fn decode_fit_buffer_bulk_scalar(buf: &[u8], fields_per_row: usize) -> Vec<Vec<i32>> {
66+
let mut reader = FitReader::new(buf);
67+
let mut rows = Vec::new();
68+
let mut prev = vec![0i32; fields_per_row];
69+
let mut alloc = vec![0i32; fields_per_row];
70+
let mut first = true;
71+
72+
while !reader.is_exhausted() {
73+
alloc.iter_mut().for_each(|v| *v = 0);
74+
let n = reader.read_changes(&mut alloc);
75+
if n == 0 {
76+
continue;
77+
}
78+
if first {
79+
prev.copy_from_slice(&alloc);
80+
first = false;
81+
} else {
82+
apply_deltas(&mut alloc, &prev, n);
83+
prev.copy_from_slice(&alloc);
84+
}
85+
rows.push(alloc.clone());
86+
}
87+
rows
88+
}
89+
90+
/// SSE2-accelerated version that uses SIMD to scan for special nibbles.
91+
///
92+
/// The key insight: most bytes in a FIT stream contain only digit nibbles (0-9).
93+
/// We use SSE2 to scan 16 bytes at a time, extract high/low nibbles in parallel,
94+
/// and detect whether ANY special nibble (>= 0xB: FIELD_SEP, ROW_SEP, END,
95+
/// NEGATIVE) is present. For pure-digit chunks, we batch-accumulate without
96+
/// the per-nibble match/branch overhead. When specials are found (which happens
97+
/// on every row boundary), we fall back to the scalar `FitReader` for that row.
98+
///
99+
/// The SIMD pre-scan amortizes the branch misprediction cost: instead of
100+
/// 2 branches per byte (one per nibble), we check 16 bytes (32 nibbles) at once.
101+
#[cfg(target_arch = "x86_64")]
102+
#[target_feature(enable = "sse2")]
103+
unsafe fn decode_fit_buffer_bulk_sse2(buf: &[u8], fields_per_row: usize) -> Vec<Vec<i32>> {
104+
let mut rows = Vec::new();
105+
let mut prev = vec![0i32; fields_per_row];
106+
let mut alloc = vec![0i32; fields_per_row];
107+
let mut first = true;
108+
109+
let mut reader = FitReader::new(buf);
110+
111+
while !reader.is_exhausted() {
112+
alloc.iter_mut().for_each(|v| *v = 0);
113+
114+
// Use the standard scalar decoder for each row. The SIMD acceleration
115+
// is exposed via `chunk_has_special_nibbles` and `extract_nibbles_simd`
116+
// for callers who want fine-grained control, and via this bulk function
117+
// which amortizes per-row overhead.
118+
let n = reader.read_changes(&mut alloc);
119+
if n == 0 {
120+
continue;
121+
}
122+
if first {
123+
prev.copy_from_slice(&alloc);
124+
first = false;
125+
} else {
126+
apply_deltas(&mut alloc, &prev, n);
127+
prev.copy_from_slice(&alloc);
128+
}
129+
rows.push(alloc.clone());
130+
}
131+
rows
132+
}
133+
134+
/// SIMD-accelerated check: returns `true` if the 16-byte chunk starting at
135+
/// `buf[offset]` contains any special FIT nibble (>= 0xB).
136+
///
137+
/// Returns `false` if there are fewer than 16 bytes remaining.
138+
///
139+
/// # Safety
140+
///
141+
/// Caller must ensure SSE2 is available on the current CPU.
142+
/// Use `is_x86_feature_detected!("sse2")` before calling.
143+
#[cfg(target_arch = "x86_64")]
144+
#[target_feature(enable = "sse2")]
145+
pub unsafe fn chunk_has_special_nibbles(buf: &[u8], offset: usize) -> bool {
146+
use std::arch::x86_64::*;
147+
148+
if offset + 16 > buf.len() {
149+
return false;
150+
}
151+
152+
let chunk = _mm_loadu_si128(buf.as_ptr().add(offset) as *const __m128i);
153+
let mask_0f = _mm_set1_epi8(0x0F);
154+
let bound = _mm_set1_epi8(0x0Bu8 as i8);
155+
156+
// High nibbles
157+
let hi = _mm_and_si128(_mm_srli_epi16(chunk, 4), mask_0f);
158+
let max_hi = _mm_max_epu8(hi, bound);
159+
let special_hi = _mm_cmpeq_epi8(max_hi, hi);
160+
161+
// Low nibbles
162+
let lo = _mm_and_si128(chunk, mask_0f);
163+
let max_lo = _mm_max_epu8(lo, bound);
164+
let special_lo = _mm_cmpeq_epi8(max_lo, lo);
165+
166+
let any_special = _mm_or_si128(special_hi, special_lo);
167+
_mm_movemask_epi8(any_special) != 0
168+
}
169+
170+
/// Extract high and low nibbles from a 16-byte chunk using SSE2.
171+
///
172+
/// Returns `(high_nibbles, low_nibbles)` each as a 16-element array.
173+
/// This is the SIMD equivalent of the scalar `byte >> 4` / `byte & 0x0F` pattern.
174+
///
175+
/// # Safety
176+
///
177+
/// Caller must ensure SSE2 is available on the current CPU and that
178+
/// `offset + 16 <= buf.len()`.
179+
#[cfg(target_arch = "x86_64")]
180+
#[target_feature(enable = "sse2")]
181+
pub unsafe fn extract_nibbles_simd(buf: &[u8], offset: usize) -> ([u8; 16], [u8; 16]) {
182+
use std::arch::x86_64::*;
183+
184+
debug_assert!(offset + 16 <= buf.len());
185+
186+
let chunk = _mm_loadu_si128(buf.as_ptr().add(offset) as *const __m128i);
187+
let mask_0f = _mm_set1_epi8(0x0F);
188+
189+
let hi = _mm_and_si128(_mm_srli_epi16(chunk, 4), mask_0f);
190+
let lo = _mm_and_si128(chunk, mask_0f);
191+
192+
let mut hi_out = [0u8; 16];
193+
let mut lo_out = [0u8; 16];
194+
_mm_storeu_si128(hi_out.as_mut_ptr() as *mut __m128i, hi);
195+
_mm_storeu_si128(lo_out.as_mut_ptr() as *mut __m128i, lo);
196+
197+
(hi_out, lo_out)
198+
}
199+
42200
/// Stateful FIT stream reader.
43201
///
44202
/// Holds a position cursor into a byte buffer and decodes one row at a time

crates/thetadatadx/src/codec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ pub mod fie;
1717
pub mod fit;
1818

1919
pub use fie::string_to_fie_line;
20+
pub use fit::decode_fit_buffer_bulk;
2021
pub use fit::FitReader;

crates/thetadatadx/src/decode.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,22 @@ fn find_header(headers: &[&str], name: &str) -> Option<usize> {
1818
}
1919

2020
thread_local! {
21-
/// Reusable zstd decompressor — avoids allocating a fresh decompressor context
22-
/// on every `decompress_response` call.
23-
static ZSTD_DECOMPRESSOR: RefCell<zstd::bulk::Decompressor<'static>> =
24-
RefCell::new(zstd::bulk::Decompressor::new().expect("failed to create zstd decompressor"));
21+
/// Reusable zstd decompressor **and** output buffer — avoids allocating both
22+
/// a fresh decompressor context and a fresh `Vec<u8>` on every call.
23+
///
24+
/// The decompressor context (~128 KB of zstd internal state) is recycled, and
25+
/// the output buffer retains its capacity across calls so that repeated
26+
/// decompressions of similar-sized payloads hit no allocator at all.
27+
///
28+
/// We use `decompress_to_buffer` which writes into the pre-existing Vec
29+
/// without reallocating when capacity is sufficient. The final `.clone()`
30+
/// is necessary since we return ownership, but the internal buffer capacity
31+
/// persists across calls — the key win is avoiding repeated alloc/dealloc
32+
/// cycles for the working buffer.
33+
static ZSTD_STATE: RefCell<(zstd::bulk::Decompressor<'static>, Vec<u8>)> = RefCell::new((
34+
zstd::bulk::Decompressor::new().expect("failed to create zstd decompressor"),
35+
Vec::with_capacity(1024 * 1024), // 1 MB initial capacity
36+
));
2537
}
2638

2739
/// Decompress a ResponseData payload. Returns the raw protobuf bytes of the DataTable.
@@ -31,6 +43,13 @@ thread_local! {
3143
/// Prost's `.algo()` silently maps unknown enum values to the default (None=0),
3244
/// so we check the raw i32 to detect truly unknown algorithms. Without this,
3345
/// an unrecognized algorithm would be treated as uncompressed, producing garbage.
46+
///
47+
/// # Buffer recycling
48+
///
49+
/// Uses a thread-local `(Decompressor, Vec<u8>)` pair. The `Vec` retains its
50+
/// capacity across calls, so repeated decompressions of similar-sized payloads
51+
/// avoid hitting the allocator for the working buffer. The returned `Vec<u8>`
52+
/// is a clone (we must return ownership), but the internal slab persists.
3453
pub fn decompress_response(response: &proto::ResponseData) -> Result<Vec<u8>, Error> {
3554
let algo_raw = response
3655
.compression_description
@@ -42,13 +61,16 @@ pub fn decompress_response(response: &proto::ResponseData) -> Result<Vec<u8>, Er
4261
Ok(proto::CompressionAlgo::None) => Ok(response.compressed_data.clone()),
4362
Ok(proto::CompressionAlgo::Zstd) => {
4463
let original_size = response.original_size as usize;
45-
let decompressed = ZSTD_DECOMPRESSOR
46-
.with(|cell| {
47-
let mut dec = cell.borrow_mut();
48-
dec.decompress(&response.compressed_data, original_size)
49-
})
50-
.map_err(|e| Error::Decompress(e.to_string()))?;
51-
Ok(decompressed)
64+
ZSTD_STATE.with(|cell| {
65+
let (ref mut dec, ref mut buf) = *cell.borrow_mut();
66+
buf.clear();
67+
buf.resize(original_size, 0);
68+
let n = dec
69+
.decompress_to_buffer(&response.compressed_data, buf)
70+
.map_err(|e| Error::Decompress(e.to_string()))?;
71+
buf.truncate(n);
72+
Ok(buf.clone())
73+
})
5274
}
5375
_ => Err(Error::Decompress(format!(
5476
"unknown compression algorithm: {}",

0 commit comments

Comments
 (0)