Skip to content

Commit 8958802

Browse files
awalker4claude
andcommitted
Fix concurrency issues in SplitPdfHook
Critical fixes: - Fix missing await in sync after_success (would have caused bugs) - Move instance-level settings (allow_failed, cache_tmp_data_feature, cache_tmp_data_dir) to per-operation dicts - This prevents race conditions where concurrent requests would overwrite each other's settings - Add safety check for tempdir existence before accessing in call_api_partial Per-operation settings storage: - Each operation_id now has its own isolated settings - Settings are properly cleaned up in _clear_operation - Default values are used when operation_id not found Tests added: - test_per_request_settings_isolation: Validates settings don't interfere between operations - test_per_request_settings_cleanup: Validates proper cleanup - test_concurrent_async_operations_isolation: Simulates real concurrent async operations - test_await_elements_uses_operation_settings: Validates _await_elements uses correct settings - test_default_values_used_when_operation_not_found: Validates fallback to defaults This fixes the race conditions that caused inconsistent behavior and "event loop is closed" warnings when making concurrent partition_async requests. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent fd56c65 commit 8958802

File tree

2 files changed

+234
-55
lines changed

2 files changed

+234
-55
lines changed

_test_unstructured_client/unit/test_split_pdf_hook.py

Lines changed: 186 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,4 +525,189 @@ def test_before_request_raises_pdf_validation_error_when_pdf_check_fails():
525525
# Verify that the mocked functions were called as expected
526526
mock_get_fields.assert_called_once_with(mock_request)
527527
mock_read_pdf.assert_called_once_with(mock_pdf_file)
528-
mock_check_pdf.assert_called_once_with(mock_pdf_reader)
528+
mock_check_pdf.assert_called_once_with(mock_pdf_reader)
529+
530+
531+
def test_per_request_settings_isolation():
532+
"""Test that multiple concurrent requests have isolated settings.
533+
534+
This validates the fix for race conditions where instance-level settings
535+
would be shared between concurrent requests, causing one request to use
536+
another's configuration.
537+
"""
538+
hook = SplitPdfHook()
539+
540+
# Simulate two different operations with different settings
541+
operation_id_1 = "op-1"
542+
operation_id_2 = "op-2"
543+
544+
# Set different settings for each operation
545+
hook.allow_failed[operation_id_1] = True
546+
hook.cache_tmp_data_feature[operation_id_1] = True
547+
hook.cache_tmp_data_dir[operation_id_1] = "/tmp/op1"
548+
hook.concurrency_level[operation_id_1] = 5
549+
550+
hook.allow_failed[operation_id_2] = False
551+
hook.cache_tmp_data_feature[operation_id_2] = False
552+
hook.cache_tmp_data_dir[operation_id_2] = "/tmp/op2"
553+
hook.concurrency_level[operation_id_2] = 10
554+
555+
# Verify that each operation has its own isolated settings
556+
assert hook.allow_failed[operation_id_1] is True
557+
assert hook.allow_failed[operation_id_2] is False
558+
559+
assert hook.cache_tmp_data_feature[operation_id_1] is True
560+
assert hook.cache_tmp_data_feature[operation_id_2] is False
561+
562+
assert hook.cache_tmp_data_dir[operation_id_1] == "/tmp/op1"
563+
assert hook.cache_tmp_data_dir[operation_id_2] == "/tmp/op2"
564+
565+
assert hook.concurrency_level[operation_id_1] == 5
566+
assert hook.concurrency_level[operation_id_2] == 10
567+
568+
569+
def test_per_request_settings_cleanup():
570+
"""Test that per-request settings are properly cleaned up after operation completes."""
571+
hook = SplitPdfHook()
572+
573+
operation_id = "test-op"
574+
575+
# Set up operation data
576+
hook.allow_failed[operation_id] = True
577+
hook.cache_tmp_data_feature[operation_id] = True
578+
hook.cache_tmp_data_dir[operation_id] = "/tmp/test"
579+
hook.concurrency_level[operation_id] = 8
580+
hook.coroutines_to_execute[operation_id] = []
581+
hook.api_successful_responses[operation_id] = []
582+
hook.api_failed_responses[operation_id] = []
583+
584+
# Verify data exists
585+
assert operation_id in hook.allow_failed
586+
assert operation_id in hook.cache_tmp_data_feature
587+
assert operation_id in hook.cache_tmp_data_dir
588+
assert operation_id in hook.concurrency_level
589+
590+
# Clear the operation
591+
hook._clear_operation(operation_id)
592+
593+
# Verify all data is cleaned up
594+
assert operation_id not in hook.allow_failed
595+
assert operation_id not in hook.cache_tmp_data_feature
596+
assert operation_id not in hook.cache_tmp_data_dir
597+
assert operation_id not in hook.concurrency_level
598+
assert operation_id not in hook.coroutines_to_execute
599+
assert operation_id not in hook.api_successful_responses
600+
assert operation_id not in hook.api_failed_responses
601+
602+
603+
@pytest.mark.asyncio
604+
async def test_concurrent_async_operations_isolation():
605+
"""Test that concurrent async operations maintain isolated settings.
606+
607+
This simulates the real-world scenario where multiple partition_async
608+
calls are made concurrently and ensures they don't interfere with each other.
609+
"""
610+
hook = SplitPdfHook()
611+
612+
# Track which settings each operation saw
613+
operation_settings = {}
614+
615+
async def simulate_operation(op_id: str, allow_failed: bool, cache_enabled: bool):
616+
"""Simulate an operation that sets and reads its own settings."""
617+
# Set operation-specific settings
618+
hook.allow_failed[op_id] = allow_failed
619+
hook.cache_tmp_data_feature[op_id] = cache_enabled
620+
hook.concurrency_level[op_id] = 5
621+
622+
# Simulate some async work
623+
await asyncio.sleep(0.01)
624+
625+
# Read back settings and verify they haven't changed
626+
operation_settings[op_id] = {
627+
'allow_failed': hook.allow_failed.get(op_id),
628+
'cache_enabled': hook.cache_tmp_data_feature.get(op_id),
629+
'concurrency_level': hook.concurrency_level.get(op_id),
630+
}
631+
632+
# Clean up
633+
hook._clear_operation(op_id)
634+
635+
# Run multiple operations concurrently with different settings
636+
tasks = [
637+
simulate_operation("op-1", True, True),
638+
simulate_operation("op-2", False, False),
639+
simulate_operation("op-3", True, False),
640+
simulate_operation("op-4", False, True),
641+
]
642+
643+
await asyncio.gather(*tasks)
644+
645+
# Verify each operation saw its own settings correctly
646+
assert operation_settings["op-1"] == {'allow_failed': True, 'cache_enabled': True, 'concurrency_level': 5}
647+
assert operation_settings["op-2"] == {'allow_failed': False, 'cache_enabled': False, 'concurrency_level': 5}
648+
assert operation_settings["op-3"] == {'allow_failed': True, 'cache_enabled': False, 'concurrency_level': 5}
649+
assert operation_settings["op-4"] == {'allow_failed': False, 'cache_enabled': True, 'concurrency_level': 5}
650+
651+
652+
@pytest.mark.asyncio
653+
async def test_await_elements_uses_operation_settings():
654+
"""Test that _await_elements correctly uses per-operation settings."""
655+
hook = SplitPdfHook()
656+
657+
operation_id = "test-op"
658+
659+
# Set operation-specific settings
660+
hook.allow_failed[operation_id] = True
661+
hook.cache_tmp_data_feature[operation_id] = False
662+
hook.concurrency_level[operation_id] = 3
663+
664+
# Mock the coroutines to execute
665+
async def mock_coroutine(async_client, limiter):
666+
"""Mock coroutine that returns a successful response."""
667+
response = httpx.Response(
668+
status_code=200,
669+
json=[{"element": "test"}],
670+
)
671+
return response
672+
673+
hook.coroutines_to_execute[operation_id] = [
674+
partial(mock_coroutine)
675+
]
676+
677+
# Mock run_tasks to verify it receives the correct settings
678+
with patch("unstructured_client._hooks.custom.split_pdf_hook.run_tasks") as mock_run_tasks:
679+
mock_run_tasks.return_value = [(1, httpx.Response(
680+
status_code=200,
681+
content=b'[{"element": "test"}]',
682+
))]
683+
684+
await hook._await_elements(operation_id)
685+
686+
# Verify run_tasks was called with the operation-specific settings
687+
mock_run_tasks.assert_called_once()
688+
call_args = mock_run_tasks.call_args
689+
690+
# Check that allow_failed matches what we set
691+
assert call_args.kwargs['allow_failed'] is True
692+
assert call_args.kwargs['concurrency_level'] == 3
693+
694+
695+
def test_default_values_used_when_operation_not_found():
696+
"""Test that default values are used when operation_id is not in the settings dicts."""
697+
hook = SplitPdfHook()
698+
699+
# Don't set any values for this operation
700+
operation_id = "missing-op"
701+
702+
# Access settings with .get() should return defaults
703+
from unstructured_client._hooks.custom.split_pdf_hook import (
704+
DEFAULT_ALLOW_FAILED,
705+
DEFAULT_CACHE_TMP_DATA,
706+
DEFAULT_CACHE_TMP_DATA_DIR,
707+
DEFAULT_CONCURRENCY_LEVEL,
708+
)
709+
710+
assert hook.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) == DEFAULT_ALLOW_FAILED
711+
assert hook.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA) == DEFAULT_CACHE_TMP_DATA
712+
assert hook.cache_tmp_data_dir.get(operation_id, DEFAULT_CACHE_TMP_DATA_DIR) == DEFAULT_CACHE_TMP_DATA_DIR
713+
assert hook.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) == DEFAULT_CONCURRENCY_LEVEL

0 commit comments

Comments
 (0)