Skip to content

Commit 12ba6d6

Browse files
committed
feat(FS-1988): add min_attempts and absolute_max_elapsed_time_ms to BackoffStrategy
Closes a short-circuit in retry_with_backoff{,_async} where a single slow first attempt can exhaust max_elapsed_time before any retry fires. With the partitioner config (max_elapsed_time=5min, CLIENT_TIMEOUT_MS=30min), any chunk whose first attempt exceeds 5 minutes blew the retry budget on attempt 1 -- zero retries on subsequent transient errors. Documented in two customer-visible regressions in the platform partition path. New fields on BackoffStrategy: * min_attempts (default 0) -- minimum number of retry attempts that must fire before max_elapsed_time is honored. Counts retries (not the initial attempt). Default 0 preserves existing behavior. * absolute_max_elapsed_time_ms (default None) -- cap on when a new retry can START. Does NOT interrupt an in-flight func() call. Worst-case wall-clock under this cap is absolute_max_elapsed_time_ms + per_attempt_timeout. Default None preserves existing behavior. Loop changes in both retry_with_backoff and retry_with_backoff_async: * Post-attempt cap check honors min_attempts as a floor on the soft cap and treats the hard cap as unconditional. * Pre-sleep hard-cap check refuses to sleep into a doomed retry whose projected start would exceed the hard cap. * Post-sleep verification (belt-and-suspenders against late wakeups and rounding drift in the projection). * Helper extraction (_cap_hit_after_attempt, _raise_or_return_after_cap) dedupes logic between sync and async paths. Validation rejects min_attempts < 0, absolute_max_elapsed_time_ms <= 0, and absolute_max_elapsed_time_ms below max_elapsed_time. .genignore: ignore src/unstructured_client/utils/retries.py to preserve these fields across Speakeasy regens. Documented merge procedure for future template updates. Pushing these fields upstream to Speakeasy templates is filed as a follow-up. Tests: * T1-T14 in unit/test_retries.py with a fake-clock harness that monkeypatches time.time / time.sleep / asyncio.sleep / random.uniform. Covers the v1 reproducer (slow first attempt + min_attempts floor), floor-is-not-a-ceiling semantics, hard cap overrides floor, sleep truncation, TemporaryError early-return through both caps, and PermanentError short-circuit immunity. * Validation tests for the new __init__ guards. * New integration test test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe pins the body-replay invariant: chunks built from open file objects (cache_tmp_data=True) must produce replay-safe httpx requests so SDK retries actually deliver the original multipart payload. Iterates request.stream twice directly to bypass request.read() caching.
1 parent 3577cd6 commit 12ba6d6

5 files changed

Lines changed: 836 additions & 25 deletions

File tree

.genignore

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,16 @@ src/unstructured_client/users.py
1919
# - Adjust the custom url snippets in the file
2020
# - Bring back the ignore line and commit
2121
src/unstructured_client/general.py
22+
23+
# Ignore retries.py so we can carry the `min_attempts` and
24+
# `absolute_max_elapsed_time_ms` fields on BackoffStrategy plus the
25+
# soft/hard cap logic in retry_with_backoff{,_async}. Filed in FS-1988.
26+
# Speakeasy regen would otherwise wipe these fields. To pick up upstream
27+
# fixes to the generated retry runtime:
28+
# - Comment out this ignore line
29+
# - Generate locally
30+
# - Manually re-merge the FS-1988 additions
31+
# - Re-add the ignore line and commit
32+
# Long-term: push these fields upstream to Speakeasy templates so this
33+
# ignore can be removed.
34+
src/unstructured_client/utils/retries.py

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
### Breaking changes
44
* Removed deprecated connector config models from the SDK (e.g. `S3SourceConnectorConfig`, `AzureDestinationConnectorConfig`). Pass connector configs as plain dicts with arbitrary fields. The SDK is no longer coupled to backend connector schemas — new fields work without an SDK upgrade.
55

6+
### Features
7+
* Add `min_attempts` and `absolute_max_elapsed_time_ms` fields to `BackoffStrategy`. `min_attempts` is the minimum number of retry attempts that must fire before `max_elapsed_time` is honored; defaults to `0` (preserves existing behavior). `absolute_max_elapsed_time_ms` caps when a new retry can start (does not interrupt in-flight requests); defaults to `None`. Together these close a short-circuit where a single slow first attempt could exhaust the retry budget before any retry fired. See FS-1988.
8+
69
## 0.43.4
710

811
### Enhancements

_test_unstructured_client/integration/test_decorators.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,3 +813,79 @@ async def mock_send(_, request: httpx.Request, **kwargs):
813813
assert number_of_transport_failures == 0
814814
assert mock_endpoint_called
815815
assert res.status_code == 200
816+
817+
818+
def test_split_pdf_cache_tmp_data_chunk_request_stream_is_replay_safe(tmp_path):
819+
"""Regression test for FS-1988: when split_pdf_cache_tmp_data=True
820+
is set, chunks are loaded as opened file objects (not BytesIO) and
821+
handed to `create_pdf_chunk_request`. The resulting `httpx.Request`
822+
must be replay-safe — i.e. its body stream can be iterated more
823+
than once, returning identical bytes each time.
824+
825+
Body replay is what makes SDK-level retries on transient errors
826+
(ReadTimeout, ConnectError, etc.) actually deliver the original
827+
multipart payload to the server. A future Speakeasy template
828+
change or refactor of `serialize_request_body` that produced a
829+
single-consumption stream (e.g. an `Iterable[bytes]` over an open
830+
file handle) would silently break retries: the server would see
831+
an empty body on the second attempt.
832+
833+
The invariant is pinned by iterating `request.stream` twice
834+
directly — NOT via `request.read()`, which caches into
835+
`Request._content` and would paper over a non-replayable stream
836+
after the first call. This is the actual transport-level path
837+
httpcore uses when `AsyncClient.send` retries a request.
838+
"""
839+
from unstructured_client._hooks.custom.request_utils import (
840+
create_pdf_chunk_request,
841+
)
842+
843+
# Drive `create_pdf_chunk_request` directly with a file-object
844+
# chunk — the cache_tmp_data=True branch in `_get_pdf_chunk_files`.
845+
chunk_path = tmp_path / "chunk.pdf"
846+
src_bytes = Path("_sample_docs/layout-parser-paper.pdf").read_bytes()
847+
chunk_path.write_bytes(src_bytes)
848+
849+
pdf_chunk_file = open(chunk_path, "rb") # noqa: SIM115 -- closed manually
850+
try:
851+
form_data = {
852+
"files": (chunk_path.name, src_bytes, "application/pdf"),
853+
"strategy": "fast",
854+
}
855+
original_request = httpx.Request(
856+
method="POST",
857+
url="http://localhost:8000/general/v0/general",
858+
headers={
859+
"Content-Type": "multipart/form-data; boundary=test",
860+
"User-Agent": "test",
861+
},
862+
content=b"",
863+
)
864+
865+
chunk_request = create_pdf_chunk_request(
866+
form_data=form_data,
867+
pdf_chunk=(pdf_chunk_file, 1),
868+
original_request=original_request,
869+
filename=chunk_path.name,
870+
)
871+
872+
# Iterate the body stream twice. If the underlying stream is
873+
# not replayable (e.g. an open file handle that's exhausted
874+
# after the first pass), the second iteration yields empty
875+
# or partial bytes and the assert below fails.
876+
first_pass = b"".join(chunk_request.stream)
877+
second_pass = b"".join(chunk_request.stream)
878+
879+
assert len(first_pass) > 1000, (
880+
f"First iteration produced too-small body ({len(first_pass)} "
881+
"bytes); a real PDF chunk multipart envelope should be at "
882+
"least kilobytes."
883+
)
884+
assert first_pass == second_pass, (
885+
"Body stream not replay-safe: second iteration of "
886+
"chunk_request.stream returned different bytes than the "
887+
"first. SDK retries on transient errors would silently "
888+
"send a truncated or empty body to the server."
889+
)
890+
finally:
891+
pdf_chunk_file.close()

0 commit comments

Comments
 (0)