feat(grpc): in-house gRPC transport replacing tonic#524
Merged
Conversation
Replace tonic with an in-house gRPC client on top of h2, and route MDDS response decoding through a dedicated decoder pool that keeps CPU-bound work off the tokio reactor. Transport: - Channel: HTTP/2 client wrapping h2::client::SendRequest<Bytes> - Codec: length-prefix gRPC framing with prost encode/decode and propagated serialization errors - Status: parsed from HTTP/2 trailers (grpc-status, grpc-message); trailers-only responses surfaced from the initial HEADERS frame - ServerStreaming: async Stream over h2 RecvStream with per-call deadline support and GOAWAY classification; per-stream RST_STREAM distinguished from connection-level errors - ChannelPool: round-robin across N HTTP/2 connections with credit-aware pick on connection-level MAX_CONCURRENT_STREAMS; honours configured concurrent_requests - TLS via rustls 1.3; :scheme = https on TLS-backed Channels; mdds.max_message_size threaded through Channel -> Codec Codegen: - Custom prost-build configurator replacing tonic-build - Drift guard runs unconditionally during build - All MDDS endpoints routed through the new Channel Decoder pool (grpc::DecoderPool): - Decodes zstd-compressed protobuf responses on dedicated std::threads, keeping CPU-bound work off the tokio reactor - One LMAX Disruptor SPSC ring per decoder thread with pre-allocated slots; per-thread zstd context and reusable scratch buffer - Adaptive wait strategy (16 spins + 4 yields + spin-loop hint) tuned for decode-burst cadence - Auto-sized to num_cpus/2 capped to channel count; ring size configurable, validated as a power of two >= 64 - Message-agnostic via Box<dyn FnOnce() -> DecodeResult + Send> Configuration: - mdds.decoder_threads (0 = auto) - mdds.decoder_ring_size (power of two, >= 64, default 256) - mdds.max_message_size honoured by the in-house Codec - Optional mimalloc-allocator feature for downstream binaries Util: - Ring-size validation lifted to util::ring; FPSS and gRPC share the validator instead of duplicating it Bench harness: - 64-way concurrent burst over ChannelPool with mock h2 server - THETADATADX_BENCH_ROWS env override (256 / 4096 / 250000) - Allocator accounting captured only over the timed iter region Performance (vs main, 64-way concurrent burst): | rows | wall time | throughput | |---------|-----------|------------| | 256 | -33.9% | +51% | | 4 096 | -45.3% | +83% | | 250 000 | -49.6% | +99% | Drops: tonic, tonic-prost, tonic-prost-build.
b8fb128 to
4dd1fa1
Compare
5 tasks
9 tasks
userFRM
added a commit
that referenced
this pull request
May 15, 2026
Closes six pre-merge audit findings against the in-house gRPC transport at 0dadc17 on main. 1. Decoder pool panic containment + poison-aware publish loop (decoder_pool.rs). Consumer wraps each work closure in catch_unwind; on panic it flips a pool-wide Arc<AtomicBool> poison flag and drains still-queued ring slots with the transport-level poison reason rather than leaving callers parked on dead oneshots. The publish path itself is poison-aware: instead of disruptor's blocking publish() (which busy-waits until a slot frees and ignores the poison flag), submit_work() drives try_publish() in a bounded retry loop that re-checks the flag between every attempt — a poison flip propagates to every parked submitter within one back-off window. Submits made after the flag is set return DecoderSubmitError::Poisoned without touching the ring. Tests include panicking_work_poisons_pool, submit_after_poison_fails_fast, pending_in_flight_drains_with_poisoned_after_panic, and poison_flag_unblocks_publishers_on_full_ring (fills the ring with a barrier-parked head item, spawns extra submitters that park in the retry loop, flips the flag, asserts every parked submitter returns Poisoned within 250 ms). 2. grpc-message percent-decoding (status.rs). Parser percent- decodes per RFC 3986 (%HH escapes only, no +-as-space). Malformed escapes fall back to raw header bytes; non-UTF-8 bytes fall back to empty. Never invalidates a parsed grpc-status. Removed StatusParseError::MessageNotUtf8 (added in PR #524, never released). Added percent-encoding v2 direct dependency. 3. h2 error classification on the open path (channel.rs). ready(), send_request(), send_data() errors now route through classify_h2_error so connection-level failures (GOAWAY in either direction, IO failure, peer shutdown, open-phase drops) surface as ChannelError::ConnectionClosed. Per-stream RST_STREAM (any reason code) continues to surface as ChannelError::H2Stream. The ConnectionClosed Display string broadens to "h2 connection closed: ..." to reflect the wider scope. Variant docs on the enum, the module-level docstring, and the CHANGELOG record the contract reshape so downstream callers using ChannelError for retry / recycle policy can remap branches. 4. Picker counts dispatched, not pending (channel.rs). InFlightToken::new is now incremented before ready_fut.await so ChannelPool::next() sees every pending open the moment it commits, not just the ones that have cleared the h2 ready barrier. Eliminates head-of-line blocking under burst contention; the picker uses CAS-retry to settle on the lightest-loaded channel even under concurrent commits, and the lease pre-reserves an in-flight slot at pool.next() time so join_all-style dispatch fans out cleanly. 5. v10 changelog (CHANGELOG.md, docs-site changelog). Added an Unreleased block covering the Error::Transport payload change (tonic::transport::Error → String), MddsConfig::decoder_threads / decoder_ring_size, tonic and inhouse-grpc feature removal, MddsClient::stub removal, GrpcStatusKind::from_u32() rename, ChannelError variant reshape, and migration notes for each. The merged feature shipped after the v10.0.0 tag, so the Unreleased bucket is the correct landing site; the version bump itself is intentionally not part of this PR. 6. Codegen drift guard (build_support/grpc). Persisted snapshot at crates/thetadatadx/proto/beta_endpoints.snapshot.rs (git- tracked, normalised to LF line endings via .gitattributes so CRLF-on-checkout platforms still match). check() regenerates into a scratch dir under OUT_DIR and byte-compares against the committed snapshot. Drift fails the build with the byte offset and line number of the first divergence and the regeneration command; THETADATADX_GRPC_REGEN=1 cargo build refreshes the snapshot in-tree. cargo:rerun-if-changed and cargo:rerun-if-env-changed wired so cargo invalidates on snapshot or env-var edits.
userFRM
added a commit
that referenced
this pull request
May 15, 2026
Closes six pre-merge audit findings against the in-house gRPC transport at 0dadc17 on main. 1. Decoder pool panic containment + poison-aware publish loop (decoder_pool.rs). Consumer wraps each work closure in catch_unwind; on panic it flips a pool-wide Arc<AtomicBool> poison flag and drains still-queued ring slots with the transport-level poison reason rather than leaving callers parked on dead oneshots. The publish path itself is poison-aware: instead of disruptor's blocking publish() (which busy-waits until a slot frees and ignores the poison flag), submit_work() drives try_publish() in a bounded retry loop that re-checks the flag between every attempt — a poison flip propagates to every parked submitter within one back-off window. Submits made after the flag is set return DecoderSubmitError::Poisoned without touching the ring. Tests include panicking_work_poisons_pool, submit_after_poison_fails_fast, pending_in_flight_drains_with_poisoned_after_panic, and poison_flag_unblocks_publishers_on_full_ring (fills the ring with a barrier-parked head item, spawns extra submitters that park in the retry loop, flips the flag, asserts every parked submitter returns Poisoned within 250 ms). 2. grpc-message percent-decoding (status.rs). Parser percent- decodes per RFC 3986 (%HH escapes only, no +-as-space). Malformed escapes fall back to raw header bytes; non-UTF-8 bytes fall back to empty. Never invalidates a parsed grpc-status. Removed StatusParseError::MessageNotUtf8 (added in PR #524, never released). Added percent-encoding v2 direct dependency. 3. h2 error classification on the open path (channel.rs). ready(), send_request(), send_data() errors now route through classify_h2_error so connection-level failures (GOAWAY in either direction, IO failure, peer shutdown, open-phase drops) surface as ChannelError::ConnectionClosed. Per-stream RST_STREAM (any reason code) continues to surface as ChannelError::H2Stream. The ConnectionClosed Display string broadens to "h2 connection closed: ..." to reflect the wider scope. Variant docs on the enum, the module-level docstring, and the CHANGELOG record the contract reshape so downstream callers using ChannelError for retry / recycle policy can remap branches. 4. Picker counts dispatched, not pending (channel.rs). InFlightToken::new is now incremented before ready_fut.await so ChannelPool::next() sees every pending open the moment it commits, not just the ones that have cleared the h2 ready barrier. Eliminates head-of-line blocking under burst contention; the picker uses CAS-retry to settle on the lightest-loaded channel even under concurrent commits, and the lease pre-reserves an in-flight slot at pool.next() time so join_all-style dispatch fans out cleanly. 5. v10 changelog (CHANGELOG.md, docs-site changelog). Added an Unreleased block covering the Error::Transport payload change (tonic::transport::Error → String), MddsConfig::decoder_threads / decoder_ring_size, tonic and inhouse-grpc feature removal, MddsClient::stub removal, GrpcStatusKind::from_u32() rename, ChannelError variant reshape, and migration notes for each. The merged feature shipped after the v10.0.0 tag, so the Unreleased bucket is the correct landing site; the version bump itself is intentionally not part of this PR. 6. Codegen drift guard (build_support/grpc). Persisted snapshot at crates/thetadatadx/proto/beta_endpoints.snapshot.rs (git- tracked, normalised to LF line endings via .gitattributes so CRLF-on-checkout platforms still match). check() regenerates into a scratch dir under OUT_DIR and byte-compares against the committed snapshot. Drift fails the build with the byte offset and line number of the first divergence and the regeneration command; THETADATADX_GRPC_REGEN=1 cargo build refreshes the snapshot in-tree. cargo:rerun-if-changed and cargo:rerun-if-env-changed wired so cargo invalidates on snapshot or env-var edits.
userFRM
added a commit
that referenced
this pull request
May 15, 2026
* fix(grpc): close pre-merge audit findings Closes six pre-merge audit findings against the in-house gRPC transport at 0dadc17 on main. 1. Decoder pool panic containment + poison-aware publish loop (decoder_pool.rs). Consumer wraps each work closure in catch_unwind; on panic it flips a pool-wide Arc<AtomicBool> poison flag and drains still-queued ring slots with the transport-level poison reason rather than leaving callers parked on dead oneshots. The publish path itself is poison-aware: instead of disruptor's blocking publish() (which busy-waits until a slot frees and ignores the poison flag), submit_work() drives try_publish() in a bounded retry loop that re-checks the flag between every attempt — a poison flip propagates to every parked submitter within one back-off window. Submits made after the flag is set return DecoderSubmitError::Poisoned without touching the ring. Tests include panicking_work_poisons_pool, submit_after_poison_fails_fast, pending_in_flight_drains_with_poisoned_after_panic, and poison_flag_unblocks_publishers_on_full_ring (fills the ring with a barrier-parked head item, spawns extra submitters that park in the retry loop, flips the flag, asserts every parked submitter returns Poisoned within 250 ms). 2. grpc-message percent-decoding (status.rs). Parser percent- decodes per RFC 3986 (%HH escapes only, no +-as-space). Malformed escapes fall back to raw header bytes; non-UTF-8 bytes fall back to empty. Never invalidates a parsed grpc-status. Removed StatusParseError::MessageNotUtf8 (added in PR #524, never released). Added percent-encoding v2 direct dependency. 3. h2 error classification on the open path (channel.rs). ready(), send_request(), send_data() errors now route through classify_h2_error so connection-level failures (GOAWAY in either direction, IO failure, peer shutdown, open-phase drops) surface as ChannelError::ConnectionClosed. Per-stream RST_STREAM (any reason code) continues to surface as ChannelError::H2Stream. The ConnectionClosed Display string broadens to "h2 connection closed: ..." to reflect the wider scope. Variant docs on the enum, the module-level docstring, and the CHANGELOG record the contract reshape so downstream callers using ChannelError for retry / recycle policy can remap branches. 4. Picker counts dispatched, not pending (channel.rs). InFlightToken::new is now incremented before ready_fut.await so ChannelPool::next() sees every pending open the moment it commits, not just the ones that have cleared the h2 ready barrier. Eliminates head-of-line blocking under burst contention; the picker uses CAS-retry to settle on the lightest-loaded channel even under concurrent commits, and the lease pre-reserves an in-flight slot at pool.next() time so join_all-style dispatch fans out cleanly. 5. v10 changelog (CHANGELOG.md, docs-site changelog). Added an Unreleased block covering the Error::Transport payload change (tonic::transport::Error → String), MddsConfig::decoder_threads / decoder_ring_size, tonic and inhouse-grpc feature removal, MddsClient::stub removal, GrpcStatusKind::from_u32() rename, ChannelError variant reshape, and migration notes for each. The merged feature shipped after the v10.0.0 tag, so the Unreleased bucket is the correct landing site; the version bump itself is intentionally not part of this PR. 6. Codegen drift guard (build_support/grpc). Persisted snapshot at crates/thetadatadx/proto/beta_endpoints.snapshot.rs (git- tracked, normalised to LF line endings via .gitattributes so CRLF-on-checkout platforms still match). check() regenerates into a scratch dir under OUT_DIR and byte-compares against the committed snapshot. Drift fails the build with the byte offset and line number of the first divergence and the regeneration command; THETADATADX_GRPC_REGEN=1 cargo build refreshes the snapshot in-tree. cargo:rerun-if-changed and cargo:rerun-if-env-changed wired so cargo invalidates on snapshot or env-var edits. * chore(deps): re-lock sub-project Cargo.lock for percent-encoding
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #522.
Summary
In-house gRPC client stack built directly on the
h2crate. TheMddsClientand every generated MDDS endpoint now route through this transport;tonic,tonic-prost, andtonic-prost-buildare removed from the workspace.Transport machinery
Codec<Req, Resp>— phantom-typed wrapper performing prost encode/decode plus the 5-byte gRPC frame header ([1 byte compressed flag][4 bytes BE length][payload]). Rejects any non-zero compressed flag and any oversized frame.Status— HTTP/2 trailers parser (grpc-status+grpc-message). Missing or malformed trailers surface as typed errors rather than panics.Channel— owns one HTTP/2 connection.connect_tlsruns the TLS handshake throughtokio-rustls;connect_h2copens a plaintext connection for sidecars and mock harnesses.ServerStreaming—futures_core::Streamadapter over the h2 response body. Reads DATA frames into aBytesMutaccumulator, decodes each complete frame throughCodec, and on body close awaits trailers to surface either stream-end (OK status) orErr(ChannelError::Rpc)(non-OK status).Off-reactor decoder pool
DecoderPoolruns every response chunk'szstd decompress + DataTable::decodeon dedicatedstd::threads. Each decoder owns one LMAX Disruptor ring + the existing thread-local zstd state inmdds::decode::transport. Producers areMultiProducerclones so concurrent submissions from N tokio tasks stay lock-free in the steady state.DecoderWaitStrategy(16 spins + 4 yields + spin-loop hint) tuned for MDDS burst cadence — short enough that submit-to-dequeue handoff is single-digit microseconds, cooperative enough that idle decoders do not burn whole cores between bursts.MddsClient::connectwires the pool throughChannelPool::from_channels_with_decoders.MddsConfig:decoder_threads(0= auto-size tomin(channels, available_parallelism / 2)) anddecoder_ring_size(power of two,>= 64, validated viautil::ring::check_ring_size).util::ringlifted out offpss::ringso the FPSS read loop and the gRPC decoder pool share one validator.Hardening
Channel::server_streaming_with_deadlinecover both the open phase (h2 send_request + response head) and the streaming phase (DATA frames + trailers). On elapse the h2 stream is dropped (sending RST_STREAM) and the call surfacesChannelError::DeadlineExceeded.h2::Error::is_go_away/is_remote/is_iosurface asChannelError::ConnectionClosedso consumers can recycle the channel rather than retry on a dead connection.ServerStreamingcancels the underlying h2 stream cleanly. Cancelled decode work is elided — the decoder thread checksoneshot::Sender::is_closedbefore running the work, so cancelled RPCs do not burn CPU on results no one will read.grpc-encoding: identityis the only accepted body encoding; zstd-compressedcompressed_datapayloads insideResponseDataare decompressed by the decoder pool's dedicated threads.Pooling
ChannelPoolround-robins overNindependentChannels so workloads that exceed the per-connectionMAX_CONCURRENT_STREAMSlimit fan out across multiple h2 connections. The cursor is anAtomicUsizeadvanced withRelaxedordering.MddsClient::connectbuilds a 4-channel pool by default (clamped to the subscription-tier concurrent ceiling). Every endpoint call picks the next channel viaMddsClient::channel()— transparent to downstream callers.Custom codegen replacing
tonic-buildbuild_support/grpc/readsproto/mdds.proto, invokesprost-buildfor the message types, and installs a customServiceGeneratorthat emits one async function per RPC method into$OUT_DIR/beta_endpoints.rs. Each emitted function:with method path
/BetaEndpoints.BetaThetaTerminal/<MethodCamelCase>. The generator rejects unary or client-streaming RPCs at build time so a future proto change cannot silently mis-frame the wire.build_support/grpc::checkreruns the codegen and confirms the output is byte-identical to the file already on disk. Gated by theTHETADATADX_GRPC_CHECKenv var so CI can re-run the build with the var set and surface drift as a build failure.Endpoint migration
MddsClient::stub()is gone; every generatedlist_endpoint!/parsed_endpoint!/ streaming macro invocation routes throughcrate::proto::beta_theta_terminal::<method>(client.channel(), request).mdds::stream::{collect_stream, for_each_chunk}acceptgrpc::ServerStreaming<ResponseData>instead oftonic::Streaming<ResponseData>. Each chunk's decode now routes through the stream's attachedDecoderHandle.error.rsswapsError::Transport(tonic::transport::Error)forError::Transport(String)and replaces theFrom<tonic::Status>impl withFrom<crate::grpc::Status>+From<crate::grpc::ChannelError>.GrpcStatusKind::from_code(tonic::Code)becomesGrpcStatusKind::from_u32(u32)reading the wire code directly.Tonic removal
tonic,tonic-prost, andtonic-prost-buildare removed fromcrates/thetadatadx/Cargo.toml. Theinhouse-grpcCargo feature flag is dropped (the in-house transport is now the only path).cargo tree -i tonicreturns "no such package";cargo tree --workspace | grep tonicis empty.Test plan
Vec<Vec<u8>>payloads.ChannelError::Rpc, connect to closed port,DeadlineExceededmid-stream, GOAWAY classified asConnectionClosed,ChannelPoolround-robin distribution.stock_list_symbolsthrough the in-houseChannel(single chunk and two-chunk merge).From<grpc::Status>carries kind + message, unauthenticated kind, deadline routes toError::Timeout, rpc routes toError::Grpc.cargo fmt --all -- --checkcargo clippy --workspace --locked -- -D warningscargo clippy --workspace --locked --tests --benches -- -D warningscargo test --workspace --lockedcargo run -p thetadatadx --features config-file --bin generate_sdk_surfaces --locked -- --checkpython3 scripts/check_docs_consistency.pycargo tree -i tonicreturns emptyCross-binding ABI
The Rust transport changes preserve the binding-visible error contract:
Error::Transport(String)still formats identically — bindings parse the message, not the inner type.ChannelError::Rpc { status: Status }keys offstatus.code(), unchanged.CodecError::Encodeonly surfaces viaChannelError::Codec(_)which maps toError::Transportat the FFI boundary.Channel::connect_*_with_max_message_sizeare additive — the originalconnect_h2c/connect_tlskeep their signatures.Channel::with_decoderis a builder-style attachment; channels constructed without it fall back to the inline decode path.Bench numbers
grpc_channel(loopback mock, 100-sample p50)stock_list_symbolsstock_history_eodoption_history_quotegrpc_concurrent_burst(64-way burst on a 4-channel pool, loopback mock)Flamegraph at 4096 rows confirms
prost::Message::decode,decompress_response, andZSTD_decompressBlock_internalframes sit under the decoder thread group (87.3% of total samples) rather than the tokio worker stack (2.5%).Public surface impact
thetadatadx::grpc::{Channel, ChannelError, ChannelPool, Codec, CodecError, ServerStreaming, Status, StatusParseError}— new public surface.thetadatadx::grpc::{DecoderPool, DecoderHandle, DecoderPoolError, DecoderWaitStrategy, DecodeResult, default_decoder_thread_count}— new public decoder pool surface.thetadatadx::grpc::{stock_list_symbols, endpoints::bench_support}— new public functions.ChannelPool::from_channels_with_decoders— production constructor attaching a decoder pool to a channel set.Channel::with_decoder— builder-style decoder attachment.Channel::connect_h2c_with_max_message_size/Channel::connect_tls_with_max_message_size— codec-ceiling-aware constructors; the existingconnect_h2c/connect_tlskeep their signatures and delegate to the new ones withDEFAULT_MAX_MESSAGE_SIZE.MddsConfig::decoder_threads/MddsConfig::decoder_ring_size— new public fields with0/256production defaults.MddsClient::stubis gone (was crate-private; no downstream impact).Error::Transportchanges fromFrom<tonic::transport::Error>to carrying aString— callers that previously matched the inner error must update.inhouse-grpcCargo feature is removed.Allocator and bench knobs
Optional
mimalloc-allocatorfeaturethetadatadx = { features = ["mimalloc-allocator"] }pullsmimallocinto the dependency graph and re-exportsMiMallocatthetadatadx::mimalloc::MiMallocso the consuming binary can wire it in with one line:Default-off — applications that prefer the system allocator pay no compile or link cost. Full opt-in walk-through in
docs-site/docs/configuration.mdunder Performance tuning. Library crates cannot install a#[global_allocator]of their own, so the SDK only provides the re-export.THETADATADX_BENCH_ROWSenv override ongrpc_concurrent_burstThe concurrent-burst harness honours
THETADATADX_BENCH_ROWS=<n>so the same bench sweeps the small, medium, and large payload regimes without recompiling.