fix: bridge model.generate() to agenerate() for custom columns in async engine#545
Conversation
…ync engine Custom column generators that call model.generate() fail under the async engine because the sync HTTP client is unavailable. Add an _AsyncBridgedModelFacade proxy in _build_models_dict() that intercepts the sync-client RuntimeError and schedules agenerate() on the engine's persistent event loop via run_coroutine_threadsafe. Includes a deadlock guard for async custom columns running on the event loop.
Code Review: PR #545 — fix: bridge model.generate() to agenerate() for custom columns in async engineSummaryThis PR adds an Files changed: 2 (1 source, 1 test) — +160 / -3 lines. FindingsCorrectness
Design
Testing
Style / Conventions
VerdictApprove with minor suggestions. The implementation is well-structured, follows project conventions, and includes solid test coverage. The proxy pattern is clean, the deadlock guard is a thoughtful addition, and the tests cover the important scenarios. The main concern is the exact-string error matching (Finding #1), which creates a fragile coupling to an upstream error message. This is acceptable for a short-term fix but should be tracked for improvement — ideally by introducing a typed exception in the HTTP client layer. The missing kwargs assertion in the bridge test (Finding #2) is minor but easy to fix. No blocking issues found. |
Greptile SummaryAdds
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py | Core change: adds _AsyncBridgedModelFacade proxy and wraps facades in _invoke_generator_function; async generator path (_ainvoke_generator_function) intentionally left unwrapped. |
| packages/data-designer-engine/src/data_designer/engine/models/errors.py | Adds case SyncClientUnavailableError(): raise arm so the sentinel error escapes the catch_llm_exceptions boundary and reaches the proxy. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/adapters/http_model_client.py | Narrows _get_sync_client error from generic RuntimeError to typed SyncClientUnavailableError, enabling exact-match interception in the proxy. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/errors.py | Introduces SyncClientUnavailableError(RuntimeError) as a typed sentinel; straightforward addition. |
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py | Renames _SYNC_BRIDGE_TIMEOUT → SYNC_BRIDGE_TIMEOUT to export the constant for reuse in custom.py. |
| packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py | Four new focused tests: sync passthrough, async bridge via run_coroutine_threadsafe, non-SyncClientUnavailableError propagation, and deadlock guard. |
Sequence Diagram
sequenceDiagram
participant EL as AsyncEngine EventLoop
participant TT as asyncio.to_thread (worker)
participant P as _AsyncBridgedModelFacade
participant F as ModelFacade
participant C as HttpModelClient (async mode)
EL->>TT: to_thread(generator_fn, row, params, {model: proxy})
TT->>P: model.generate(prompt, parser=…)
P->>F: facade.generate(…)
F->>C: _get_sync_client()
C-->>F: raise SyncClientUnavailableError
F-->>P: SyncClientUnavailableError (via catch_llm_exceptions re-raise)
P->>P: asyncio.get_running_loop() → RuntimeError (safe)
P->>P: ensure_async_engine_loop() → loop
P->>EL: run_coroutine_threadsafe(facade.agenerate(…), loop)
EL->>F: await agenerate(…)
F->>C: _get_async_client() + _apost(…)
C-->>F: response
F-->>EL: result tuple
EL-->>P: future resolved
P-->>TT: return result
TT-->>EL: row dict
Reviews (6): Last reviewed commit: "Merge branch 'main' into andreatgretel/f..." | Re-trigger Greptile
johnnygreco
left a comment
There was a problem hiding this comment.
Nice approach overall — transparent proxy that doesn't require user code changes is the right call. A few things worth looking at before merge.
Move _AsyncBridgedModelFacade wrapping from _build_models_dict() into _invoke_generator_function() so the async path gets raw facades. The bridge proxy is only needed for sync custom columns; async columns already have direct access to model.agenerate().
…gs test - Introduce SyncClientUnavailableError so the facade catches by type instead of matching error strings (review comment #1) - Add future.cancel() + logger.warning() on timeout to match the _run_coroutine_sync pattern in base.py (review comment #2) - Assert kwargs forwarding in the async bridge test (review comment #4)
…ptions The decorator catches all exceptions and wraps them into DataDesignerError, which prevented the async bridge proxy from ever seeing the original error. Add an early match case that re-raises SyncClientUnavailableError directly.
|
follow-up: while addressing the review feedback, Codex flagged that fixed in e6b0a81 by adding an early |
Drop the underscore prefix since the constant is exported and used across modules (base.py and custom.py).
johnnygreco
left a comment
There was a problem hiding this comment.
🛸 – sweet, thanks @andreatgretel!
📋 Summary
Custom column generators that call
model.generate()inside their function body fail under theasync engine because the sync HTTP client is unavailable. This adds a transparent proxy that
bridges to
model.agenerate()viarun_coroutine_threadsafe, so user code works unchangedin both engines.
🔗 Related Issue
N/A - discovered via NVIDIA-NeMo/Anonymizer#119 where a
model_compat.model_generate()workaroundwas needed; this PR moves the fix into DD so all consumers get it for free.
🔄 Changes
_AsyncBridgedModelFacadeproxy class that intercepts the sync-clientRuntimeErrorandschedules
agenerate()on the engine's persistent event loop(
custom.py#L25-L72)_build_models_dict()so the bridge is transparent to user codeHttpModelClientsync-mode error (not substring matching)🧪 Testing
make testpasses (197 column generator tests)✅ Checklist