Skip to content

fix: bridge model.generate() to agenerate() for custom columns in async engine#545

Merged
andreatgretel merged 7 commits intomainfrom
andreatgretel/feat/async-model-bridge-custom-columns
Apr 17, 2026
Merged

fix: bridge model.generate() to agenerate() for custom columns in async engine#545
andreatgretel merged 7 commits intomainfrom
andreatgretel/feat/async-model-bridge-custom-columns

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

📋 Summary

Custom column generators that call model.generate() inside their function body fail under the
async engine because the sync HTTP client is unavailable. This adds a transparent proxy that
bridges to model.agenerate() via run_coroutine_threadsafe, so user code works unchanged
in both engines.

🔗 Related Issue

N/A - discovered via NVIDIA-NeMo/Anonymizer#119 where a model_compat.model_generate() workaround
was needed; this PR moves the fix into DD so all consumers get it for free.

🔄 Changes

  • Add _AsyncBridgedModelFacade proxy class that intercepts the sync-client RuntimeError and
    schedules agenerate() on the engine's persistent event loop
    (custom.py#L25-L72)
  • Wrap facades in _build_models_dict() so the bridge is transparent to user code
  • Include deadlock guard: clear error if called from the event loop instead of hanging
  • Match only the exact HttpModelClient sync-mode error (not substring matching)

🧪 Testing

  • make test passes (197 column generator tests)
  • Unit tests added: 4 tests covering sync passthrough, async bridge, error propagation, deadlock guard
  • E2E tests added/updated — N/A, no end-to-end test infrastructure for async engine mode

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated — N/A, internal proxy class

…ync engine

Custom column generators that call model.generate() fail under the async
engine because the sync HTTP client is unavailable. Add an
_AsyncBridgedModelFacade proxy in _build_models_dict() that intercepts the
sync-client RuntimeError and schedules agenerate() on the engine's persistent
event loop via run_coroutine_threadsafe. Includes a deadlock guard for async
custom columns running on the event loop.
@andreatgretel andreatgretel requested a review from a team as a code owner April 14, 2026 16:58
@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #545 — fix: bridge model.generate() to agenerate() for custom columns in async engine

Summary

This PR adds an _AsyncBridgedModelFacade proxy class that transparently bridges synchronous model.generate() calls to model.agenerate() when a custom column generator runs inside the async engine. Under the async engine, sync custom columns execute via asyncio.to_thread, where the sync HTTP client is unavailable. The proxy intercepts the specific RuntimeError from the async-mode HttpModelClient and schedules agenerate() on the engine's persistent event loop via run_coroutine_threadsafe. A deadlock guard prevents misuse from the event loop thread itself.

Files changed: 2 (1 source, 1 test) — +160 / -3 lines.

Findings

Correctness

  1. Exact error string matching is fragile (Medium)
    custom.py:35 — The bridge triggers only when str(exc) == "Sync methods are not available on an async-mode HttpModelClient.". If the error message is ever reworded upstream (even a punctuation change), the bridge silently stops working and users get an opaque RuntimeError. Consider importing and matching on a specific exception subclass from the HTTP client, or at minimum matching a stable substring/prefix. If the upstream error message is under this project's control, adding a comment cross-referencing the source would help.

  2. kwargs not forwarded in test_bridges_to_agenerate_on_sync_client_error (Low)
    test_custom.py:533 — The test calls proxy.generate("hello", parser=str) and asserts result == ("async_result", ["hello"]), but the fake_agenerate mock (*args, **kwargs) silently drops parser=str from the assertion. The test doesn't verify that keyword arguments survive the bridge. Consider asserting on kwargs too, e.g., returning (args, kwargs) and checking both.

  3. Timeout not tested (Low)
    There is no test for the _SYNC_BRIDGE_TIMEOUT path. If agenerate() hangs, the future.result(timeout=...) will raise concurrent.futures.TimeoutError, which will propagate as-is rather than as a project-canonical error. This matches the existing pattern in base.py:45-49 which wraps the timeout, but the facade does not. Consider whether a TimeoutError leaking from this path is acceptable or should be wrapped.

Design

  1. Proxy is always applied, even in sync engine mode (Info)
    custom.py:337_build_models_dict() unconditionally wraps every facade in _AsyncBridgedModelFacade. In sync engine mode the proxy's generate() calls facade.generate() which succeeds immediately, so the overhead is one extra try/except per call. This is a reasonable trade-off for simplicity, but worth noting.

  2. __slots__ + __getattr__ proxy pattern is clean (Positive)
    The use of __slots__ with object.__setattr__ / object.__getattribute__ for the internal _facade attribute, combined with __getattr__ forwarding, is a solid proxy pattern that avoids attribute shadowing issues. The __repr__ is helpful for debugging.

  3. Deferred import of ensure_async_engine_loop inside generate() (Info)
    custom.py:62 — The import is deferred to avoid a circular import or heavy import at module load time. This is consistent with the project's lazy-import convention. Fine as-is.

Testing

  1. Good coverage of key scenarios (Positive)
    Four tests cover: sync passthrough, async bridge, error propagation for non-matching errors (both different exception type and different RuntimeError message), and the deadlock guard. The deadlock guard test (test_deadlock_guard_on_event_loop) correctly uses asyncio.run() to simulate calling from the event loop.

  2. Test creates a real event loop + thread (Info)
    test_custom.py:523-537 — The async bridge test spins up a real event loop on a background thread, which is the correct approach for testing run_coroutine_threadsafe. The cleanup in finally is proper (stop + join with timeout).

Style / Conventions

  1. Import of _SYNC_BRIDGE_TIMEOUT from base (Info)
    The constant is reused from base.py rather than duplicated, which is good. The leading underscore signals it's internal, consistent with its use in base.py.

  2. from __future__ import annotations present in source file (Positive)
    Follows the project's structural invariant.

Verdict

Approve with minor suggestions. The implementation is well-structured, follows project conventions, and includes solid test coverage. The proxy pattern is clean, the deadlock guard is a thoughtful addition, and the tests cover the important scenarios.

The main concern is the exact-string error matching (Finding #1), which creates a fragile coupling to an upstream error message. This is acceptable for a short-term fix but should be tracked for improvement — ideally by introducing a typed exception in the HTTP client layer. The missing kwargs assertion in the bridge test (Finding #2) is minor but easy to fix.

No blocking issues found.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 14, 2026

Greptile Summary

Adds _AsyncBridgedModelFacade, a transparent proxy that bridges model.generate() to model.agenerate() when sync custom column generators run inside asyncio.to_thread under the async engine. The fix is accomplished with a new typed sentinel error (SyncClientUnavailableError), a pass-through arm in handle_llm_exceptions, and run_coroutine_threadsafe scheduling on the engine's persistent loop.

Confidence Score: 5/5

Safe to merge — no correctness issues found; the bridge, deadlock guard, error propagation chain, and timeout handling are all correct.

All six changed files are consistent with each other. The error sentinel (SyncClientUnavailableError) flows correctly through catch_llm_exceptionshandle_llm_exceptions (re-raise) → proxy catch. The run_coroutine_threadsafe pattern is appropriate for bridging from a to_thread worker to the engine's persistent loop, the deadlock guard is sound, and all four new tests cover the critical paths. No P0 or P1 findings.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py Core change: adds _AsyncBridgedModelFacade proxy and wraps facades in _invoke_generator_function; async generator path (_ainvoke_generator_function) intentionally left unwrapped.
packages/data-designer-engine/src/data_designer/engine/models/errors.py Adds case SyncClientUnavailableError(): raise arm so the sentinel error escapes the catch_llm_exceptions boundary and reaches the proxy.
packages/data-designer-engine/src/data_designer/engine/models/clients/adapters/http_model_client.py Narrows _get_sync_client error from generic RuntimeError to typed SyncClientUnavailableError, enabling exact-match interception in the proxy.
packages/data-designer-engine/src/data_designer/engine/models/clients/errors.py Introduces SyncClientUnavailableError(RuntimeError) as a typed sentinel; straightforward addition.
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py Renames _SYNC_BRIDGE_TIMEOUTSYNC_BRIDGE_TIMEOUT to export the constant for reuse in custom.py.
packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py Four new focused tests: sync passthrough, async bridge via run_coroutine_threadsafe, non-SyncClientUnavailableError propagation, and deadlock guard.

Sequence Diagram

sequenceDiagram
    participant EL as AsyncEngine EventLoop
    participant TT as asyncio.to_thread (worker)
    participant P as _AsyncBridgedModelFacade
    participant F as ModelFacade
    participant C as HttpModelClient (async mode)

    EL->>TT: to_thread(generator_fn, row, params, {model: proxy})
    TT->>P: model.generate(prompt, parser=…)
    P->>F: facade.generate(…)
    F->>C: _get_sync_client()
    C-->>F: raise SyncClientUnavailableError
    F-->>P: SyncClientUnavailableError (via catch_llm_exceptions re-raise)
    P->>P: asyncio.get_running_loop() → RuntimeError (safe)
    P->>P: ensure_async_engine_loop() → loop
    P->>EL: run_coroutine_threadsafe(facade.agenerate(…), loop)
    EL->>F: await agenerate(…)
    F->>C: _get_async_client() + _apost(…)
    C-->>F: response
    F-->>EL: result tuple
    EL-->>P: future resolved
    P-->>TT: return result
    TT-->>EL: row dict
Loading

Reviews (6): Last reviewed commit: "Merge branch 'main' into andreatgretel/f..." | Re-trigger Greptile

Copy link
Copy Markdown
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice approach overall — transparent proxy that doesn't require user code changes is the right call. A few things worth looking at before merge.

Move _AsyncBridgedModelFacade wrapping from _build_models_dict() into
_invoke_generator_function() so the async path gets raw facades. The
bridge proxy is only needed for sync custom columns; async columns
already have direct access to model.agenerate().
…gs test

- Introduce SyncClientUnavailableError so the facade catches by type
  instead of matching error strings (review comment #1)
- Add future.cancel() + logger.warning() on timeout to match the
  _run_coroutine_sync pattern in base.py (review comment #2)
- Assert kwargs forwarding in the async bridge test (review comment #4)
…ptions

The decorator catches all exceptions and wraps them into DataDesignerError,
which prevented the async bridge proxy from ever seeing the original error.
Add an early match case that re-raises SyncClientUnavailableError directly.
@andreatgretel
Copy link
Copy Markdown
Contributor Author

follow-up: while addressing the review feedback, Codex flagged that @catch_llm_exceptions on ModelFacade.generate() has a blanket except Exception that would wrap SyncClientUnavailableError into a DataDesignerError before the proxy ever sees it - meaning the bridge would never actually trigger on real code paths (only on the bare Mock() in tests).

fixed in e6b0a81 by adding an early case SyncClientUnavailableError(): raise in handle_llm_exceptions. smoke test confirms the full chain works without mocking the error path: decorator -> propagation -> proxy catch -> async bridge.

andreatgretel and others added 3 commits April 15, 2026 22:14
Drop the underscore prefix since the constant is exported and used
across modules (base.py and custom.py).
Copy link
Copy Markdown
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛸 – sweet, thanks @andreatgretel!

@andreatgretel andreatgretel merged commit a965bc1 into main Apr 17, 2026
49 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants