Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [8.0.25] - 2026-05-05

### Fixed

- **Windows `ERROR_IO_PENDING` (os error 997) no longer trips a fatal
FPSS read error.** On Windows the overlapped socket layer surfaces
in-flight reads as `ERROR_IO_PENDING` instead of `WSAEWOULDBLOCK`.
Rust `std` maps raw OS error 997 to `ErrorKind::Uncategorized`, so
the existing `WouldBlock | TimedOut` matches in
`crates/thetadatadx/src/fpss/io_loop.rs::is_read_timeout` and the
two retry arms in `crates/thetadatadx/src/fpss/framing.rs`
(pre-header and mid-payload) treated it as fatal — Python users on
Windows saw `FPSS read error error=IO error: Overlapped I/O
operation is in progress. (os error 997)` spam followed by a
reconnect storm. A new `is_transient_read` helper in `framing.rs`
matches `WouldBlock`, `TimedOut`, and `raw_os_error() == Some(997)`;
all three sites delegate to it so the I/O loop drains queued
commands and retries the way it does on Linux and macOS.
Closes #469.

### Changed

- `tdbe` 0.12.5 → 0.12.7.

## [8.0.24] - 2026-05-04

### Added
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/tdbe/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tdbe"
version = "0.12.5"
version = "0.12.6"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions crates/thetadatadx/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "thetadatadx"
version = "8.0.24"
version = "8.0.25"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down Expand Up @@ -40,7 +40,7 @@ frames = ["polars", "arrow"]
live-tests = []

[dependencies]
tdbe = { version = "0.12.5", path = "../tdbe" }
tdbe = { version = "0.12.6", path = "../tdbe" }

# gRPC + protobuf (tonic 0.14 extracted prost codec into tonic-prost)
tonic = { version = "=0.14.5", features = ["tls-ring", "tls-native-roots", "channel", "transport"] }
Expand Down Expand Up @@ -132,7 +132,7 @@ prost-build = "=0.14.3"
regex = "1.12.3"
toml = "1.1.2"
serde = { version = "1.0.228", features = ["derive"] }
tdbe = { version = "0.12.5", path = "../tdbe" }
tdbe = { version = "0.12.6", path = "../tdbe" }

[[bench]]
name = "bench_decode"
Expand Down
205 changes: 192 additions & 13 deletions crates/thetadatadx/src/fpss/framing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@ use tdbe::types::enums::StreamMsgType;

use super::protocol::READ_TIMEOUT_MS;

/// Windows `ERROR_IO_PENDING` raw OS error code.
///
/// On Windows the overlapped socket layer surfaces in-flight reads as
/// `ERROR_IO_PENDING` (Win32 error 997) instead of `WSAEWOULDBLOCK`. Rust
/// `std` maps 997 to `ErrorKind::Uncategorized`, so a `kind()` match on
/// `WouldBlock | TimedOut` misses it and a benign in-flight read appears as
/// a fatal I/O error. Callers must check the `raw_os_error()` to recognise
/// it as transient.
///
/// Reference: <https://learn.microsoft.com/en-us/windows/win32/debug/system-error-codes--500-999->
pub(crate) const ERROR_IO_PENDING: i32 = 997;

/// Classify a raw `std::io::Error` returned by `read()` as a transient
/// "no data right now, try again" condition.
///
/// Returns `true` for the three cases the FPSS framing and I/O loops must
/// retry / drain on rather than escalate to a fatal disconnect:
///
/// - `ErrorKind::WouldBlock` — Linux, macOS `SO_RCVTIMEO` on a non-blocking
/// socket.
/// - `ErrorKind::TimedOut` — macOS `SO_RCVTIMEO` on a blocking socket.
/// - `raw_os_error() == Some(997)` — Windows `ERROR_IO_PENDING` from the
/// overlapped I/O layer (issue #469). Maps to `ErrorKind::Uncategorized`
/// in `std`, so a `kind()` match alone misses it.
#[must_use]
pub(crate) fn is_transient_read(io_err: &std::io::Error) -> bool {
matches!(
io_err.kind(),
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
) || io_err.raw_os_error() == Some(ERROR_IO_PENDING)
}

/// Maximum payload length (single unsigned byte).
///
/// Source: `PacketStream.java` -- the length field is one byte.
Expand Down Expand Up @@ -231,13 +263,7 @@ fn read_header_with_timeout<R: Read>(
})
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e)
if n > 0
&& matches!(
e.kind(),
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
) =>
{
Err(e) if n > 0 && is_transient_read(&e) => {
// Drain-yield: the aggregate wall-clock cap exists so
// the command drain cannot be starved by a trickling
// sender. The partial header bytes are preserved on
Expand Down Expand Up @@ -345,12 +371,7 @@ fn read_exact_payload_with_timeout<R: Read>(
state.payload_read = n + k;
}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e)
if matches!(
e.kind(),
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
) =>
{
Err(e) if is_transient_read(&e) => {
// Drain-yield: the aggregate wall-clock cap exists so
// the command drain cannot be starved by a trickling
// sender. The partial payload bytes are preserved
Expand Down Expand Up @@ -1401,4 +1422,162 @@ mod tests {
));
assert!(!is_drain_yield(&io));
}

/// Windows `ERROR_IO_PENDING` (raw OS error 997) must classify as a
/// transient read. Rust `std` maps 997 to `ErrorKind::Uncategorized`,
/// so a plain `kind()` match would miss it and treat the in-flight
/// overlapped read as a fatal disconnect — which is exactly what the
/// Python user reported in issue #469.
#[test]
fn is_transient_read_recognises_windows_error_io_pending() {
let err = std::io::Error::from_raw_os_error(ERROR_IO_PENDING);
// Sanity: confirm the precondition that motivates this fix —
// `std` does not map 997 to a recognisable kind on any platform.
assert_ne!(err.kind(), std::io::ErrorKind::WouldBlock);
assert_ne!(err.kind(), std::io::ErrorKind::TimedOut);
assert_eq!(err.raw_os_error(), Some(997));
Comment on lines +1427 to +1438
assert!(
is_transient_read(&err),
"ERROR_IO_PENDING (os error 997) must be classified as transient"
);

// Other raw OS errors (e.g. ECONNRESET on Linux) must NOT be
// classified as transient — they are real disconnects.
let real_err = std::io::Error::from_raw_os_error(104); // ECONNRESET
assert!(
!is_transient_read(&real_err),
"ECONNRESET must not be classified as transient"
);

// The classic kinds still match.
let wb = std::io::Error::new(std::io::ErrorKind::WouldBlock, "x");
let to = std::io::Error::new(std::io::ErrorKind::TimedOut, "x");
assert!(is_transient_read(&wb));
assert!(is_transient_read(&to));
}

/// Reader that yields a prefix, then `n_stalls` errors of the given
/// raw OS error code, then a suffix. Models a Windows TLS socket
/// surfacing `ERROR_IO_PENDING` (997) between the header and payload.
struct PrefixThenOsErrThenResume {
prefix: Vec<u8>,
suffix: Vec<u8>,
prefix_pos: usize,
suffix_pos: usize,
remaining_stalls: usize,
os_error: i32,
}

impl std::io::Read for PrefixThenOsErrThenResume {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.prefix_pos < self.prefix.len() {
let remaining = &self.prefix[self.prefix_pos..];
let n = remaining.len().min(buf.len());
buf[..n].copy_from_slice(&remaining[..n]);
self.prefix_pos += n;
return Ok(n);
}
if self.remaining_stalls > 0 {
self.remaining_stalls -= 1;
return Err(std::io::Error::from_raw_os_error(self.os_error));
}
if self.suffix_pos < self.suffix.len() {
let remaining = &self.suffix[self.suffix_pos..];
let n = remaining.len().min(buf.len());
buf[..n].copy_from_slice(&remaining[..n]);
self.suffix_pos += n;
return Ok(n);
}
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"reader exhausted",
))
}
}

/// Reader that always returns a raw OS error after delivering a
/// prefix. Models a Windows socket where the read goes pending and
/// never completes within the test window.
struct AlwaysOsErrAfter {
prefix: Vec<u8>,
pos: usize,
os_error: i32,
}

impl std::io::Read for AlwaysOsErrAfter {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.pos < self.prefix.len() {
let remaining = &self.prefix[self.pos..];
let n = remaining.len().min(buf.len());
buf[..n].copy_from_slice(&remaining[..n]);
self.pos += n;
Ok(n)
} else {
Err(std::io::Error::from_raw_os_error(self.os_error))
}
}
}

/// Pre-header `ERROR_IO_PENDING` (zero bytes delivered) must propagate
/// as `Error::Io` — same path `WouldBlock` takes — so
/// `io_loop::is_read_timeout` can drain queued commands and retry on
/// the next poll instead of escalating to a reconnect storm. Issue
/// #469: this is exactly the case where the Python user on Windows
/// saw `Overlapped I/O operation is in progress. (os error 997)` spam
/// followed by repeated reconnect attempts.
#[test]
fn pre_header_error_io_pending_propagates_as_io() {
let mut reader = AlwaysOsErrAfter {
prefix: Vec::new(),
pos: 0,
os_error: ERROR_IO_PENDING,
};
let err = read_frame(&mut reader).unwrap_err();
match err {
crate::error::Error::Io(e) => {
assert_eq!(e.raw_os_error(), Some(ERROR_IO_PENDING));
}
other => panic!("expected Error::Io(ERROR_IO_PENDING), got {other:?}"),
}
}

/// Mid-header `ERROR_IO_PENDING` (one byte delivered, second stalls
/// briefly with os error 997) must retry within the per-stall
/// deadline and return the complete frame. Without the fix this
/// arm fell through to `Err(e) => Err(e.into())` and surfaced as a
/// fatal `FPSS read error` to the user.
#[test]
fn mid_header_error_io_pending_retries_and_recovers() {
let mut reader = PrefixThenOsErrThenResume {
prefix: vec![0x01],
suffix: vec![StreamMsgType::Ping as u8, 0xAA],
prefix_pos: 0,
suffix_pos: 0,
remaining_stalls: 3,
os_error: ERROR_IO_PENDING,
};
let frame = read_frame(&mut reader).unwrap().unwrap();
assert_eq!(frame.code, StreamMsgType::Ping);
assert_eq!(frame.payload, vec![0xAA]);
}

/// Mid-payload `ERROR_IO_PENDING` (header + partial payload, brief
/// stall with os error 997, rest arrives) must retry and complete.
/// This is the most common shape on Windows: a real frame whose
/// payload bytes finish arriving 50–76 ms after the first overlapped
/// pending notification.
#[test]
fn mid_payload_error_io_pending_retries_and_recovers() {
let mut reader = PrefixThenOsErrThenResume {
prefix: vec![0x04, StreamMsgType::Ping as u8, 0x01, 0x02],
suffix: vec![0x03, 0x04],
prefix_pos: 0,
suffix_pos: 0,
remaining_stalls: 3,
os_error: ERROR_IO_PENDING,
};
let frame = read_frame(&mut reader).unwrap().unwrap();
assert_eq!(frame.code, StreamMsgType::Ping);
assert_eq!(frame.payload, vec![0x01, 0x02, 0x03, 0x04]);
}
}
14 changes: 9 additions & 5 deletions crates/thetadatadx/src/fpss/io_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,17 @@ pub(super) fn io_loop<F>(
tracing::debug!("fpss-io thread exiting");
}

/// Check if an error is a read timeout (`WouldBlock` or `TimedOut`).
/// Check if an error is a transient read condition that should drain
/// commands and retry rather than tear the connection down.
///
/// Delegates to [`super::framing::is_transient_read`] for the kind
/// classification so all three FPSS read sites (this loop, mid-header,
/// mid-payload) share one definition. Recognises `WouldBlock`,
/// `TimedOut`, and Windows `ERROR_IO_PENDING` (raw OS 997, surfaced as
/// `ErrorKind::Uncategorized` by `std`) — see issue #469.
fn is_read_timeout(e: &Error) -> bool {
match e {
Error::Io(io_err) => matches!(
io_err.kind(),
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
),
Error::Io(io_err) => super::framing::is_transient_read(io_err),
_ => false,
}
}
Expand Down
24 changes: 24 additions & 0 deletions docs-site/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [8.0.25] - 2026-05-05

### Fixed

- **Windows `ERROR_IO_PENDING` (os error 997) no longer trips a fatal
FPSS read error.** On Windows the overlapped socket layer surfaces
in-flight reads as `ERROR_IO_PENDING` instead of `WSAEWOULDBLOCK`.
Rust `std` maps raw OS error 997 to `ErrorKind::Uncategorized`, so
the existing `WouldBlock | TimedOut` matches in
`crates/thetadatadx/src/fpss/io_loop.rs::is_read_timeout` and the
two retry arms in `crates/thetadatadx/src/fpss/framing.rs`
(pre-header and mid-payload) treated it as fatal — Python users on
Windows saw `FPSS read error error=IO error: Overlapped I/O
operation is in progress. (os error 997)` spam followed by a
reconnect storm. A new `is_transient_read` helper in `framing.rs`
matches `WouldBlock`, `TimedOut`, and `raw_os_error() == Some(997)`;
all three sites delegate to it so the I/O loop drains queued
commands and retries the way it does on Linux and macOS.
Closes #469.

### Changed

- `tdbe` 0.12.5 → 0.12.7.

## [8.0.24] - 2026-05-04

### Added
Expand Down
Loading
Loading