Skip to content

Commit 5d43d4a

Browse files
committed
more thorough review
1 parent 7fd8d55 commit 5d43d4a

4 files changed

Lines changed: 243 additions & 90 deletions

File tree

_test_unstructured_client/unit/test_split_pdf_hook.py

Lines changed: 105 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,25 @@ def test_unit_clear_operation():
6969
assert hook.operation_timeouts.get(operation_id) is None
7070

7171

72+
def test_unit_clear_operation_closes_unconsumed_chunk_files(tmp_path: Path):
73+
hook = SplitPdfHook()
74+
operation_id = "cache-mode-clear"
75+
chunk_path = tmp_path / "chunk.pdf"
76+
chunk_path.write_bytes(b"%PDF")
77+
chunk_file = open(chunk_path, mode="rb") # pylint: disable=consider-using-with
78+
tempdir = MagicMock()
79+
80+
hook.coroutines_to_execute[operation_id] = [
81+
partial(hook.call_api_partial, pdf_chunk_file=chunk_file),
82+
]
83+
hook.tempdirs[operation_id] = tempdir
84+
85+
hook._clear_operation(operation_id)
86+
87+
assert chunk_file.closed
88+
tempdir.cleanup.assert_called_once()
89+
90+
7291
def test_unit_get_request_timeout_seconds_uses_request_timeout_extension():
7392
request = httpx.Request(
7493
"POST",
@@ -878,15 +897,17 @@ async def test_unit_run_tasks_allow_failed_transport_exception():
878897

879898

880899
@pytest.mark.asyncio
881-
async def test_unit_run_tasks_allow_failed_cancelled_error_propagates():
900+
async def test_unit_run_tasks_allow_failed_cancelled_error_becomes_failed_response():
882901
tasks = [
883902
partial(_slow_success_request, content="1"),
884903
partial(_cancelled_request),
885904
partial(_slow_success_request, content="3"),
886905
]
887906

888-
with pytest.raises(asyncio.CancelledError):
889-
await run_tasks(tasks, allow_failed=True)
907+
responses = await run_tasks(tasks, allow_failed=True)
908+
909+
assert [response.status_code for _, response in responses] == [200, 500, 200]
910+
assert isinstance(responses[1][1].extensions["transport_exception"], asyncio.CancelledError)
890911

891912

892913
@pytest.mark.asyncio
@@ -1123,10 +1144,9 @@ def before_request(self, hook_ctx, request):
11231144

11241145

11251146
@pytest.mark.asyncio
1126-
async def test_unit_sdk_hooks_before_request_async_allows_sync_hooks_to_overlap():
1147+
async def test_unit_sdk_hooks_before_request_async_serializes_same_sync_hook_instance():
11271148
release_hooks = threading.Event()
11281149
first_hook_started = threading.Event()
1129-
both_hooks_started = threading.Event()
11301150
active_lock = threading.Lock()
11311151
active_hooks = 0
11321152
max_active_hooks = 0
@@ -1140,8 +1160,6 @@ def before_request(self, hook_ctx, request):
11401160
max_active_hooks = max(max_active_hooks, active_hooks)
11411161
if active_hooks == 1:
11421162
first_hook_started.set()
1143-
if active_hooks == 2:
1144-
both_hooks_started.set()
11451163
try:
11461164
release_hooks.wait(timeout=1)
11471165
request.headers["X-Sync-Before-Hook"] = "called"
@@ -1159,7 +1177,8 @@ def before_request(self, hook_ctx, request):
11591177
first_task = asyncio.create_task(hooks.before_request_async(hook_ctx, first_request))
11601178
await asyncio.to_thread(first_hook_started.wait, 1)
11611179
second_task = asyncio.create_task(hooks.before_request_async(hook_ctx, second_request))
1162-
assert await asyncio.to_thread(both_hooks_started.wait, 1)
1180+
await asyncio.sleep(0.01)
1181+
assert max_active_hooks == 1
11631182
release_hooks.set()
11641183

11651184
returned_first, returned_second = await asyncio.gather(first_task, second_task)
@@ -1168,7 +1187,7 @@ def before_request(self, hook_ctx, request):
11681187
assert returned_second is second_request
11691188
assert first_request.headers["X-Sync-Before-Hook"] == "called"
11701189
assert second_request.headers["X-Sync-Before-Hook"] == "called"
1171-
assert max_active_hooks == 2
1190+
assert max_active_hooks == 1
11721191

11731192

11741193
@pytest.mark.asyncio
@@ -1280,40 +1299,94 @@ def test_unit_pdfium_new_document_closes_when_cached_split_fails(tmp_path: Path)
12801299

12811300

12821301
@pytest.mark.asyncio
1283-
async def test_unit_do_request_async_cancellation_during_before_request_cleans_up_later():
1302+
async def test_unit_split_pdf_before_request_async_cancellation_cleans_prepared_state():
12841303
setup_started = threading.Event()
12851304
release_setup = threading.Event()
1286-
cleanup_called = asyncio.Event()
1287-
cancellation_is_asyncio_cancelled_error = []
12881305
operation_id = "cancelled-during-setup"
1306+
tempdir = MagicMock()
1307+
hook = SplitPdfHook()
1308+
1309+
def slow_setup(hook_ctx, request):
1310+
del hook_ctx, request
1311+
setup_started.set()
1312+
release_setup.wait(timeout=1)
1313+
hook.coroutines_to_execute[operation_id] = []
1314+
hook.pending_operation_ids[operation_id] = operation_id
1315+
hook.tempdirs[operation_id] = tempdir
1316+
return httpx.Request(
1317+
"GET",
1318+
"http://localhost:8888/general/docs",
1319+
headers={"operation_id": operation_id},
1320+
extensions={"split_pdf_operation_id": operation_id},
1321+
)
1322+
1323+
hook_ctx = MagicMock(spec=BeforeRequestContext)
1324+
hook_ctx.operation_id = "partition"
1325+
request = httpx.Request("POST", "http://localhost:8888/general/v0/general")
1326+
1327+
with patch.object(hook, "_before_request_unlocked", side_effect=slow_setup):
1328+
task = asyncio.create_task(hook.before_request_async(hook_ctx, request))
1329+
await asyncio.to_thread(setup_started.wait, 1)
1330+
task.cancel()
1331+
release_setup.set()
1332+
1333+
with pytest.raises(asyncio.CancelledError):
1334+
await asyncio.wait_for(task, timeout=1)
1335+
1336+
assert operation_id not in hook.coroutines_to_execute
1337+
assert operation_id not in hook.pending_operation_ids
1338+
assert operation_id not in hook.tempdirs
1339+
tempdir.cleanup.assert_called_once()
1340+
1341+
with patch.object(hook, "_before_request_unlocked", return_value=request):
1342+
assert await asyncio.wait_for(hook.before_request_async(hook_ctx, request), timeout=1) is request
1343+
1344+
1345+
@pytest.mark.asyncio
1346+
async def test_unit_split_pdf_before_request_async_cancellation_before_admission_does_not_queue():
1347+
setup_started = threading.Event()
1348+
release_setup = threading.Event()
1349+
hook = SplitPdfHook()
1350+
hook_ctx = MagicMock(spec=BeforeRequestContext)
1351+
hook_ctx.operation_id = "partition"
1352+
request = httpx.Request("POST", "http://localhost:8888/general/v0/general")
1353+
1354+
def slow_setup(hook_ctx_arg, request_arg):
1355+
del hook_ctx_arg
1356+
setup_started.set()
1357+
release_setup.wait(timeout=1)
1358+
return request_arg
1359+
1360+
with patch.object(hook, "_before_request_unlocked", side_effect=slow_setup) as mock_setup:
1361+
first_task = asyncio.create_task(hook.before_request_async(hook_ctx, request))
1362+
await asyncio.to_thread(setup_started.wait, 1)
1363+
second_task = asyncio.create_task(hook.before_request_async(hook_ctx, request))
1364+
await asyncio.sleep(0)
1365+
1366+
second_task.cancel()
1367+
with pytest.raises(asyncio.CancelledError):
1368+
await second_task
1369+
1370+
release_setup.set()
1371+
assert await asyncio.wait_for(first_task, timeout=1) is request
1372+
1373+
assert mock_setup.call_count == 1
1374+
1375+
1376+
@pytest.mark.asyncio
1377+
async def test_unit_do_request_async_cancellation_during_before_request_cancels_setup():
1378+
setup_started = threading.Event()
1379+
release_setup = threading.Event()
12891380

12901381
class SlowBeforeRequestHook:
12911382
def before_request(self, hook_ctx, request):
12921383
del hook_ctx, request
12931384
setup_started.set()
12941385
release_setup.wait(timeout=1)
1295-
return httpx.Request(
1296-
"GET",
1297-
"http://localhost:8888/general/docs",
1298-
headers={"operation_id": operation_id},
1299-
extensions={"split_pdf_operation_id": operation_id},
1300-
)
1301-
1302-
class CancellationObserverHook:
1303-
async def after_error_async(self, hook_ctx, response, error):
1304-
del hook_ctx, response
1305-
cancellation_is_asyncio_cancelled_error.append(
1306-
isinstance(error, asyncio.CancelledError)
1307-
)
1308-
cleanup_called.set()
1309-
return None, error
1310-
1311-
def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch guard
1312-
raise AssertionError("async hook should be awaited")
1386+
return httpx.Request("GET", "http://localhost:8888/general/docs")
13131387

13141388
hooks = SDKHooks()
13151389
hooks.before_request_hooks = [SlowBeforeRequestHook()] # type: ignore[list-item]
1316-
hooks.after_error_hooks = [CancellationObserverHook()] # type: ignore[list-item]
13171390

13181391
client = _BlockingAsyncClient()
13191392
config = SDKConfiguration(
@@ -1335,15 +1408,11 @@ def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch
13351408

13361409
await asyncio.to_thread(setup_started.wait, 1)
13371410
task.cancel()
1411+
13381412
with pytest.raises(asyncio.CancelledError):
13391413
await asyncio.wait_for(task, timeout=0.05)
13401414

1341-
assert not cleanup_called.is_set()
1342-
13431415
release_setup.set()
1344-
await asyncio.wait_for(cleanup_called.wait(), timeout=1)
1345-
1346-
assert cancellation_is_asyncio_cancelled_error == [True]
13471416

13481417

13491418
def test_unit_before_request_uses_hook_ctx_timeout_when_request_timeout_missing():

0 commit comments

Comments
 (0)