Skip to content

Commit da0352c

Browse files
committed
final pass
1 parent 5d43d4a commit da0352c

5 files changed

Lines changed: 177 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
### Fixes
88
* Route split-PDF `partition_async()` result collection through awaited async hook dispatch instead of creating a nested event loop in a worker thread.
9+
Sync-only hooks on the async path now run on a worker thread, so hook code that depends on event-loop-thread `contextvars` or thread-local state should pass that state explicitly.
910
* Add cancellation cleanup for in-flight split-PDF chunk tasks and preserve existing sync `partition()` split-PDF behavior with lazy executor creation.
1011

1112
## 0.43.2

_test_unstructured_client/integration/test_split_pdf_async_behavior.py renamed to _test_unstructured_client/unit/test_split_pdf_async_behavior.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def _make_sdk_split_task(
110110

111111
@pytest.mark.asyncio
112112
async def test_partition_async_split_collects_chunks_in_order_without_executor():
113-
operation_id = "integration-happy"
113+
operation_id = "unit-happy"
114114
split_hook = SplitPdfHook()
115115
active_chunks = 0
116116
max_active_chunks = 0
@@ -191,7 +191,7 @@ def _chunk_client_factory(*args, **kwargs):
191191

192192
@pytest.mark.asyncio
193193
async def test_partition_async_cancellation_cleans_split_state_and_tempdir():
194-
operation_id = "integration-cancel"
194+
operation_id = "unit-cancel"
195195
split_hook = SplitPdfHook()
196196
started = asyncio.Event()
197197
cancelled_counter = Counter()
@@ -236,7 +236,7 @@ async def _hanging_chunk(
236236

237237
@pytest.mark.asyncio
238238
async def test_partition_async_strict_failure_drains_sibling_chunks_before_close():
239-
operation_id = "integration-strict-failure"
239+
operation_id = "unit-strict-failure"
240240
split_hook = SplitPdfHook()
241241
sibling_started = asyncio.Event()
242242
sibling_cancelled = asyncio.Event()
@@ -370,7 +370,7 @@ async def _release_sync_hook_from_loop() -> None:
370370

371371
@pytest.mark.asyncio
372372
async def test_partition_async_reassembly_does_not_block_event_loop():
373-
operation_id = "integration-offload"
373+
operation_id = "unit-offload"
374374
split_hook = SplitPdfHook()
375375
reassembly_started = threading.Event()
376376
reassembly_released = threading.Event()

_test_unstructured_client/unit/test_split_pdf_hook.py

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@
44
import io
55
import logging
66
import threading
7-
from asyncio import Task
87
from collections import Counter
98
from concurrent import futures
109
from functools import partial
1110
from pathlib import Path
11+
from typing import Any, Coroutine
1212
from unittest.mock import AsyncMock, MagicMock, patch
1313

1414
import httpx
1515
import pytest
16-
import requests
17-
from requests_toolbelt import MultipartDecoder
16+
import requests # type: ignore[import-untyped]
17+
from requests_toolbelt import MultipartDecoder # type: ignore[import-untyped]
1818

1919
from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils
2020
from unstructured_client._hooks.custom.form_utils import (
@@ -438,7 +438,7 @@ async def _request_mock(
438438
@pytest.mark.asyncio
439439
async def test_unit_disallow_failed_coroutines(
440440
allow_failed: bool,
441-
tasks: list[Task],
441+
tasks: list[partial[Coroutine[Any, Any, httpx.Response]]],
442442
expected_responses: list[str],
443443
):
444444
"""Test disallow failed coroutines method properly sets the flag to False."""
@@ -824,6 +824,70 @@ def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch
824824
assert "Cancellation cleanup cancelled" in caplog.text
825825

826826

827+
@pytest.mark.asyncio
828+
async def test_unit_do_request_async_secondary_cancellation_waits_for_cleanup():
829+
cleanup_started = asyncio.Event()
830+
release_cleanup = asyncio.Event()
831+
cleanup_finished = asyncio.Event()
832+
833+
class PreparedRequestHook:
834+
def before_request(self, hook_ctx, request):
835+
del hook_ctx, request
836+
return httpx.Request(
837+
"GET",
838+
"http://localhost:8888/general/docs",
839+
headers={"operation_id": "secondary-cancel-cleanup"},
840+
extensions={"split_pdf_operation_id": "secondary-cancel-cleanup"},
841+
)
842+
843+
class SlowCleanupHook:
844+
async def after_error_async(self, hook_ctx, response, error):
845+
del hook_ctx, response, error
846+
cleanup_started.set()
847+
await release_cleanup.wait()
848+
cleanup_finished.set()
849+
return None, None
850+
851+
def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch guard
852+
raise AssertionError("async hook should be awaited")
853+
854+
hooks = SDKHooks()
855+
hooks.before_request_hooks = [PreparedRequestHook()] # type: ignore[list-item]
856+
hooks.after_error_hooks = [SlowCleanupHook()] # type: ignore[list-item]
857+
858+
client = _BlockingAsyncClient()
859+
config = SDKConfiguration(
860+
client=None,
861+
client_supplied=False,
862+
async_client=client, # type: ignore[arg-type]
863+
async_client_supplied=True,
864+
debug_logger=logging.getLogger("test"),
865+
)
866+
config.__dict__["_hooks"] = hooks
867+
sdk = BaseSDK(config)
868+
task = asyncio.create_task(
869+
sdk.do_request_async(
870+
_make_sdk_hook_context(),
871+
httpx.Request("POST", "http://localhost:8888/general/v0/general"),
872+
error_status_codes=[],
873+
)
874+
)
875+
876+
await client.started.wait()
877+
task.cancel()
878+
await cleanup_started.wait()
879+
task.cancel()
880+
await asyncio.sleep(0)
881+
882+
assert not task.done()
883+
884+
release_cleanup.set()
885+
with pytest.raises(asyncio.CancelledError):
886+
await task
887+
888+
assert cleanup_finished.is_set()
889+
890+
827891
def test_before_request_returns_dummy_with_timeout_and_operation_id():
828892
hook, mock_hook_ctx, result = _make_hook_with_split_request()
829893
operation_id = result.headers["operation_id"]
@@ -836,6 +900,17 @@ def test_before_request_returns_dummy_with_timeout_and_operation_id():
836900
assert operation_id in hook.pending_operation_ids
837901

838902

903+
def test_before_request_rejects_reused_operation_id():
904+
hook = SplitPdfHook()
905+
hook.coroutines_to_execute["reused-operation-id"] = []
906+
907+
with patch(
908+
"unstructured_client._hooks.custom.split_pdf_hook.uuid.uuid4",
909+
return_value="reused-operation-id",
910+
), pytest.raises(RuntimeError, match="Split PDF operation ID already in use"):
911+
_make_hook_with_split_request(hook=hook)
912+
913+
839914
def test_before_request_logs_split_plan(caplog: pytest.LogCaptureFixture):
840915
caplog.set_level(logging.INFO, logger="unstructured-client")
841916

@@ -1784,15 +1859,16 @@ def test_unit_allow_failed_partial_results(caplog: pytest.LogCaptureFixture):
17841859
hook.concurrency_level[operation_id] = 3
17851860
hook.allow_failed[operation_id] = True
17861861
hook.cache_tmp_data_feature[operation_id] = False
1787-
hook.executors[operation_id] = MagicMock()
1862+
executor = MagicMock()
1863+
hook.executors[operation_id] = executor
17881864

17891865
fake_future = MagicMock()
17901866
fake_future.result.return_value = [
17911867
(1, _httpx_json_response([{"page_number": 1}])),
17921868
(2, _httpx_response("boom", status_code=500)),
17931869
(3, _httpx_json_response([{"page_number": 3}])),
17941870
]
1795-
hook.executors[operation_id].submit.return_value = fake_future
1871+
executor.submit.return_value = fake_future
17961872

17971873
elements = hook._await_elements(operation_id)
17981874

src/unstructured_client/_hooks/custom/split_pdf_hook.py

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import tempfile
1111
import threading
1212
import uuid
13+
from collections import deque
1314
from collections.abc import Awaitable, Iterable
1415
from concurrent import futures
1516
from functools import partial
@@ -72,6 +73,67 @@ def __init__(self, index: int, inner: BaseException):
7273
self.inner = inner
7374

7475

76+
class _AsyncThreadSafeBoundedSemaphore:
77+
"""A cancellable async gate that can be shared across event loops."""
78+
79+
def __init__(self, value: int) -> None:
80+
if value <= 0:
81+
raise ValueError("Semaphore value must be greater than zero")
82+
self._initial_value = value
83+
self._value = value
84+
self._waiters: deque[asyncio.Future[None]] = deque()
85+
self._lock = threading.Lock()
86+
87+
async def acquire(self) -> None:
88+
loop = asyncio.get_running_loop()
89+
waiter: Optional[asyncio.Future[None]] = None
90+
with self._lock:
91+
if self._value > 0:
92+
self._value -= 1
93+
return
94+
waiter = loop.create_future()
95+
self._waiters.append(waiter)
96+
97+
try:
98+
await waiter
99+
except asyncio.CancelledError:
100+
release_transferred_slot = False
101+
with self._lock:
102+
try:
103+
self._waiters.remove(waiter)
104+
except ValueError:
105+
release_transferred_slot = waiter.done() and not waiter.cancelled()
106+
if release_transferred_slot:
107+
self.release()
108+
raise
109+
110+
def release(self) -> None:
111+
while True:
112+
with self._lock:
113+
while self._waiters:
114+
waiter = self._waiters.popleft()
115+
if not waiter.cancelled():
116+
break
117+
else:
118+
if self._value >= self._initial_value:
119+
raise ValueError("Semaphore released too many times")
120+
self._value += 1
121+
return
122+
123+
def _wake_waiter() -> None:
124+
if waiter.cancelled():
125+
self.release()
126+
else:
127+
waiter.set_result(None)
128+
129+
try:
130+
waiter.get_loop().call_soon_threadsafe(_wake_waiter)
131+
return
132+
except RuntimeError:
133+
# The waiting loop closed before it could receive the slot.
134+
continue
135+
136+
75137
def _get_request_timeout_seconds(request: httpx.Request) -> Optional[float]:
76138
timeout = request.extensions.get("timeout")
77139
if timeout is None:
@@ -352,8 +414,8 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH
352414
max_workers=1,
353415
thread_name_prefix="split-pdf-setup",
354416
)
355-
_split_pdf_setup_gate = threading.BoundedSemaphore(value=1)
356-
_split_pdf_setup_poll_interval_seconds = 0.01
417+
_split_pdf_setup_gate = _AsyncThreadSafeBoundedSemaphore(value=1)
418+
# Thread-local flag shared by all hook instances to prove pypdfium work is under the setup lock.
357419
_split_pdf_setup_state = threading.local()
358420

359421
def __init__(self) -> None:
@@ -635,6 +697,9 @@ def _before_request_unlocked(
635697
if split_size >= page_count and page_count == len(pdf.pages):
636698
return request
637699

700+
if operation_id in self.coroutines_to_execute:
701+
raise RuntimeError(f"Split PDF operation ID already in use: {operation_id}")
702+
638703
self.allow_failed[operation_id] = allow_failed
639704
self.cache_tmp_data_feature[operation_id] = cache_tmp_data_feature
640705
self.cache_tmp_data_dir[operation_id] = cache_tmp_data_dir
@@ -778,13 +843,17 @@ async def before_request_async(
778843

779844
@classmethod
780845
async def _acquire_split_pdf_setup_slot(cls) -> None:
781-
while not cls._split_pdf_setup_gate.acquire(blocking=False):
782-
await asyncio.sleep(cls._split_pdf_setup_poll_interval_seconds)
846+
await cls._split_pdf_setup_gate.acquire()
783847

784848
async def _finish_cancelled_split_setup(
785849
self,
786850
setup_future: asyncio.Future[Union[httpx.Request, Exception]],
787851
) -> Optional[Union[httpx.Request, Exception]]:
852+
"""Finish setup after caller cancellation so prepared state can be cleaned.
853+
854+
Non-cancellation failures mean setup failed before returning a prepared
855+
request, so there is no operation ID to clear here.
856+
"""
788857
while True:
789858
try:
790859
return await asyncio.shield(setup_future)

src/unstructured_client/basesdk.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222

2323
class _RequestBoundCancelledError(asyncio.CancelledError):
24+
"""Cancellation wrapper that exposes the request to after-error hooks."""
25+
2426
def __init__(self, request: httpx.Request, cancellation: asyncio.CancelledError):
2527
super().__init__(str(cancellation) or "Request cancelled")
2628
self.request = request
@@ -317,16 +319,25 @@ async def cleanup_cancelled_request(
317319
if cleanup_request is None and response is not None:
318320
cleanup_request = response.request
319321
assert cleanup_request is not None
320-
try:
321-
await hooks.after_error_async(
322+
cleanup_task = asyncio.create_task(
323+
hooks.after_error_async(
322324
AfterErrorContext(hook_ctx),
323325
response,
324326
_RequestBoundCancelledError(cleanup_request, cancellation),
325327
)
326-
except asyncio.CancelledError:
327-
logger.debug("Cancellation cleanup cancelled", exc_info=True)
328-
except Exception:
329-
logger.debug("Cancellation cleanup failed", exc_info=True)
328+
)
329+
while True:
330+
try:
331+
await asyncio.shield(cleanup_task)
332+
return
333+
except asyncio.CancelledError:
334+
if cleanup_task.done():
335+
logger.debug("Cancellation cleanup cancelled", exc_info=True)
336+
return
337+
logger.debug("Cancellation cleanup still running after cancellation")
338+
except BaseException:
339+
logger.debug("Cancellation cleanup failed", exc_info=True)
340+
return
330341

331342
async def do():
332343
http_res = None

0 commit comments

Comments
 (0)