Skip to content

Commit 573ea6a

Browse files
committed
fix: address review feedback - typed exception, timeout cleanup, kwargs test
- Introduce SyncClientUnavailableError so the facade catches by type instead of matching error strings (review comment #1) - Add future.cancel() + logger.warning() on timeout to match the _run_coroutine_sync pattern in base.py (review comment #2) - Assert kwargs forwarding in the async bridge test (review comment #4)
1 parent 0729c44 commit 573ea6a

4 files changed

Lines changed: 32 additions & 18 deletions

File tree

packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
import asyncio
9+
import concurrent.futures
910
import inspect
1011
import logging
1112
from typing import TYPE_CHECKING, Any
@@ -26,27 +27,26 @@ class _AsyncBridgedModelFacade:
2627
"""Proxy that bridges ``model.generate()`` to ``model.agenerate()`` in async engine mode.
2728
2829
When a sync custom column runs inside ``asyncio.to_thread`` under the async engine,
29-
the sync HTTP client is unavailable. This proxy intercepts the resulting error and
30-
schedules ``agenerate()`` on the engine's persistent event loop via
31-
``run_coroutine_threadsafe``.
30+
the sync HTTP client is unavailable. This proxy intercepts the resulting
31+
``SyncClientUnavailableError`` and schedules ``agenerate()`` on the engine's persistent
32+
event loop via ``run_coroutine_threadsafe``.
3233
3334
All other attributes are forwarded to the underlying facade unchanged.
3435
"""
3536

36-
_SYNC_CLIENT_ERROR = "Sync methods are not available on an async-mode HttpModelClient."
37-
3837
__slots__ = ("_facade",)
3938

4039
def __init__(self, facade: Any) -> None:
4140
object.__setattr__(self, "_facade", facade)
4241

4342
def generate(self, *args: Any, **kwargs: Any) -> tuple[Any, list]:
43+
from data_designer.engine.models.clients.errors import SyncClientUnavailableError
44+
4445
facade = object.__getattribute__(self, "_facade")
4546
try:
4647
return facade.generate(*args, **kwargs)
47-
except RuntimeError as exc:
48-
if str(exc) != self._SYNC_CLIENT_ERROR:
49-
raise
48+
except SyncClientUnavailableError:
49+
pass # Fall through to async bridge
5050

5151
# We're in a worker thread (asyncio.to_thread) with no running loop.
5252
# Guard against accidental use from the event loop itself (would deadlock).
@@ -64,7 +64,12 @@ def generate(self, *args: Any, **kwargs: Any) -> tuple[Any, list]:
6464

6565
loop = ensure_async_engine_loop()
6666
future = asyncio.run_coroutine_threadsafe(facade.agenerate(*args, **kwargs), loop)
67-
return future.result(timeout=_SYNC_BRIDGE_TIMEOUT)
67+
try:
68+
return future.result(timeout=_SYNC_BRIDGE_TIMEOUT)
69+
except concurrent.futures.TimeoutError as exc:
70+
future.cancel()
71+
logger.warning("Async model bridge timed out after %ss; coroutine cancelled", _SYNC_BRIDGE_TIMEOUT)
72+
raise TimeoutError(f"model.generate() bridge timed out after {_SYNC_BRIDGE_TIMEOUT}s") from exc
6873

6974
def __getattr__(self, name: str) -> Any:
7075
return getattr(object.__getattribute__(self, "_facade"), name)

packages/data-designer-engine/src/data_designer/engine/models/clients/adapters/http_model_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
resolve_timeout,
1515
wrap_transport_error,
1616
)
17-
from data_designer.engine.models.clients.errors import map_http_error_to_provider_error
17+
from data_designer.engine.models.clients.errors import SyncClientUnavailableError, map_http_error_to_provider_error
1818
from data_designer.engine.models.clients.retry import RetryConfig, RetryTransport, create_retry_transport
1919

2020
if TYPE_CHECKING:
@@ -96,7 +96,7 @@ def _build_headers(self, extra_headers: dict[str, str]) -> dict[str, str]:
9696

9797
def _get_sync_client(self) -> httpx.Client:
9898
if self._mode != ClientConcurrencyMode.SYNC:
99-
raise RuntimeError("Sync methods are not available on an async-mode HttpModelClient.")
99+
raise SyncClientUnavailableError("Sync methods are not available on an async-mode HttpModelClient.")
100100
with self._init_lock:
101101
if self._closed:
102102
raise RuntimeError("Model client is closed.")

packages/data-designer-engine/src/data_designer/engine/models/clients/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ class ProviderErrorKind(str, Enum):
2929
UNSUPPORTED_CAPABILITY = "unsupported_capability"
3030

3131

32+
class SyncClientUnavailableError(RuntimeError):
33+
"""Raised when sync methods are called on an async-mode HttpModelClient."""
34+
35+
3236
class ProviderError(Exception):
3337
def __init__(
3438
self,

packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -509,12 +509,15 @@ def test_bridges_to_agenerate_on_sync_client_error(self) -> None:
509509
from unittest.mock import patch
510510

511511
from data_designer.engine.column_generators.generators.custom import _AsyncBridgedModelFacade
512+
from data_designer.engine.models.clients.errors import SyncClientUnavailableError
512513

513514
facade = Mock()
514-
facade.generate.side_effect = RuntimeError("Sync methods are not available on an async-mode HttpModelClient.")
515+
facade.generate.side_effect = SyncClientUnavailableError(
516+
"Sync methods are not available on an async-mode HttpModelClient."
517+
)
515518

516519
async def fake_agenerate(*args: Any, **kwargs: Any) -> tuple:
517-
return ("async_result", list(args))
520+
return ("async_result", list(args), kwargs)
518521

519522
facade.agenerate = fake_agenerate
520523
proxy = _AsyncBridgedModelFacade(facade)
@@ -528,15 +531,14 @@ async def fake_agenerate(*args: Any, **kwargs: Any) -> tuple:
528531
"data_designer.engine.dataset_builders.utils.async_concurrency.ensure_async_engine_loop",
529532
return_value=engine_loop,
530533
):
531-
# Positional prompt arg is forwarded to agenerate
532534
result = proxy.generate("hello", parser=str)
533-
assert result == ("async_result", ["hello"])
535+
assert result == ("async_result", ["hello"], {"parser": str})
534536
finally:
535537
engine_loop.call_soon_threadsafe(engine_loop.stop)
536538
engine_thread.join(timeout=5)
537539

538540
def test_non_client_mode_errors_propagate(self) -> None:
539-
"""Only the specific HttpModelClient sync-mode RuntimeError triggers bridging."""
541+
"""Only SyncClientUnavailableError triggers bridging; other errors propagate."""
540542
from data_designer.engine.column_generators.generators.custom import _AsyncBridgedModelFacade
541543

542544
# ValueError - different type entirely
@@ -546,7 +548,7 @@ def test_non_client_mode_errors_propagate(self) -> None:
546548
with pytest.raises(ValueError, match="invalid prompt format"):
547549
proxy.generate(prompt="hello")
548550

549-
# RuntimeError with a different message - same type, not caught
551+
# RuntimeError - same base type as SyncClientUnavailableError, but not caught
550552
facade = Mock()
551553
facade.generate.side_effect = RuntimeError("connection timed out for async request")
552554
proxy = _AsyncBridgedModelFacade(facade)
@@ -558,9 +560,12 @@ def test_deadlock_guard_on_event_loop(self) -> None:
558560
import asyncio
559561

560562
from data_designer.engine.column_generators.generators.custom import _AsyncBridgedModelFacade
563+
from data_designer.engine.models.clients.errors import SyncClientUnavailableError
561564

562565
facade = Mock()
563-
facade.generate.side_effect = RuntimeError("Sync methods are not available on an async-mode HttpModelClient.")
566+
facade.generate.side_effect = SyncClientUnavailableError(
567+
"Sync methods are not available on an async-mode HttpModelClient."
568+
)
564569
proxy = _AsyncBridgedModelFacade(facade)
565570

566571
async def call_from_loop() -> None:

0 commit comments

Comments
 (0)