Skip to content

chore: async engine readiness - blockers and polish before default#553

Open
andreatgretel wants to merge 5 commits intomainfrom
andreatgretel/chore/async-engine-readiness
Open

chore: async engine readiness - blockers and polish before default#553
andreatgretel wants to merge 5 commits intomainfrom
andreatgretel/chore/async-engine-readiness

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

📋 Summary

Hardens the async engine for production readiness before it becomes the default execution path. Processor callback failures now propagate as DatasetGenerationError (fail-fast) instead of silently dropping row groups, allow_resize=True triggers a graceful sync fallback with DeprecationWarning, and try/finally cleanup in run() ensures workers are always drained.

🔗 Related Issue

Closes #462

🔄 Changes

🔧 Changed

  • Processor callback failures (pre-batch and post-batch) now propagate as DatasetGenerationError instead of silently dropping row groups
  • allow_resize=True triggers graceful sync fallback with DeprecationWarning instead of raising
  • try/finally refactor in AsyncTaskScheduler.run() eliminates duplicated cleanup logic and ensures workers are always cancelled via asyncio.shield()
  • _run_cell_by_cell_generator now branches on instance flag _use_async instead of module-level DATA_DESIGNER_ASYNC_ENGINE, so the sync fallback decision propagates correctly

✨ Added

  • Row-count guards in pre-batch (strict_row_count=True) and post-batch callbacks reject unsupported row-count changes
  • Partial-completion warning when actual records fall short of target
  • task_traces field on PreviewResults for async scheduler trace data
  • Tests: post-batch failure propagation, early shutdown worker drain, checkpoint correctness, execution graph integration

🐛 Fixed

  • DatasetGenerationError from callbacks no longer gets double-wrapped by the scheduler's generic except Exception handler
  • stacklevel in allow_resize DeprecationWarning now points at user code instead of library internals

🔍 Attention Areas

⚠️ Reviewers: Please pay special attention to the following:

  • async_scheduler.py — fail-fast semantics change: pre/post-batch failures now propagate instead of being swallowed
  • dataset_builder.py_resolve_async_compatibility replaces _validate_async_compatibility, row-count guards in callbacks

🧪 Testing

  • make test passes
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

All 48 async-related tests pass. 18 smoke tests pass. Ruff check and format clean on all changed files.

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (if applicable)

- Processor callback failures (pre-batch and post-batch) now raise
  DatasetGenerationError instead of silently dropping row groups
- Early shutdown and all error paths drain in-flight workers via a
  finally block in AsyncTaskScheduler.run()
- Pre-batch and post-batch processors that change row count in async
  mode raise immediately (strict_row_count guard)
- Partial completion logs a warning when actual < target records
- allow_resize=True auto-falls back to sync engine with a deprecation
  warning instead of raising, using a per-run _use_async flag
- Preview path mirrors the trace check from the full build path;
  PreviewResults exposes task_traces

Closes #462
- Prevent double-wrapping of DatasetGenerationError in scheduler callbacks
- Fix stacklevel in allow_resize DeprecationWarning to point at user code
- Update stale comment to reflect fail-fast behavior
- Rename misleading test and remove unused caplog fixture
- Add zero-warnings assertion for happy-path case
- Move warnings import to module level
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 16, 2026

Greptile Summary

This PR hardens the async engine for production readiness by changing pre/post-batch processor failures from silent row-group drops to fail-fast DatasetGenerationError propagation, replacing allow_resize=True error-raising with a graceful sync fallback + DeprecationWarning, and consolidating cleanup into a try/finally block so workers are always drained. An instance-level _use_async flag on DatasetBuilder replaces the module-level DATA_DESIGNER_ASYNC_ENGINE check in _run_cell_by_cell_generator, correctly propagating the sync-fallback decision across the execution path.

Confidence Score: 5/5

Safe to merge — behavioral changes are well-tested and the only finding is a dead-code style nit.

All 48 async tests pass and the changed logic is clearly exercised by new/updated tests. The single finding (dispatch_error is None being dead code) is P2 style and has no runtime impact.

async_scheduler.py — the dispatch_error variable and its check on line 293 can be simplified.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Key behavioral changes: pre/post-batch failures now propagate as DatasetGenerationError instead of silently dropping rows; try/finally refactor ensures workers are always drained; active_worker_count property added. One P2: dispatch_error is None check in the post-finally block is dead code since that branch is only reachable on success.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py _validate_async_compatibility replaced by _resolve_async_compatibility; allow_resize now triggers sync fallback + DeprecationWarning instead of raising; _use_async instance flag replaces module-level DATA_DESIGNER_ASYNC_ENGINE checks; partial-completion warning added; trace helper extracted. Clean.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py strict_row_count parameter added to run_pre_batch_on_df and run_post_batch; raises DatasetProcessingError on row-count change when enabled. Straightforward and correct.
packages/data-designer/src/data_designer/interface/data_designer.py except DeprecationWarning: raise guards added before the generic except Exception handlers so DeprecationWarning isn't swallowed when warnings-as-errors is active; task_traces wired into PreviewResults.
packages/data-designer-config/src/data_designer/config/preview_results.py task_traces: list[Any]
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Tests updated to reflect fail-fast semantics; new tests for post-batch failure propagation and early-shutdown worker drain. Good coverage of changed behavior.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py test_validate_async_compatibility updated to test_resolve_async_compatibility; now asserts DeprecationWarning + sync fallback instead of raised error; new partial-completion test added.
packages/data-designer-engine/tests/engine/models/test_async_engine_switch.py Patching module-level DATA_DESIGNER_ASYNC_ENGINE replaced by setting builder._use_async directly, matching the new instance-flag approach. Simpler and more robust.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[build / build_preview] --> B[_resolve_async_compatibility]
    B -->|allow_resize detected| C[DeprecationWarning\n+ sync fallback\n_use_async = False]
    B -->|no allow_resize| D[_use_async = True]
    C --> E[Sync path\n_fan_out_with_threads]
    D --> F[Async path\n_fan_out_with_async]
    F --> G[AsyncTaskScheduler.run]
    G --> H[_main_dispatch_loop]
    H -->|exception| I[except BaseException\ndispatch_error = exc\nraise]
    H -->|success| J[finally: cancel admission\n+ asyncio.shield cancel_workers]
    I --> J
    J -->|exception path| K[exception propagates to caller]
    J -->|success path| L[reporter.log_final\ncheck _rg_states]
    H --> M{_run_seeds_complete_check}
    M -->|on_seeds_complete raises| N[wrap as DatasetGenerationError\nfail-fast]
    M -->|ok| O{_checkpoint_completed_row_groups}
    O -->|on_before_checkpoint raises| P[wrap as DatasetGenerationError\nfail-fast]
    O -->|ok| Q[del _rg_states rg_id\non_finalize_row_group]
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 293-298

Comment:
**`dispatch_error is None` guard is unreachable dead code**

The condition `dispatch_error is None` is always `True` when line 293 is evaluated. If `dispatch_error` is set (i.e., `_main_dispatch_loop` threw), the `raise` on line 279 propagates the exception through the `finally` block and past lines 290–298 entirely — those lines are only reachable on a clean exit. The variable and its check can be simplified:

```suggestion
            if self._rg_states:
                incomplete = list(self._rg_states)
                logger.error(
                    f"Scheduler exited with {len(self._rg_states)} unfinished row group(s): {incomplete}. "
                    "These row groups were not checkpointed."
                )
```

And the `dispatch_error: BaseException | None = None` declaration and its assignment in the `except` branch can be removed entirely.

How can I resolve this? If you propose a fix, please make it concise.

Reviews (3): Last reviewed commit: "test: fold metadata-count test into drop..." | Re-trigger Greptile

@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #553 — chore: async engine readiness - blockers and polish before default

Summary

This PR hardens the async engine for production readiness before it becomes the default execution path. The changes fall into three categories:

  1. Fail-fast error semantics — Processor callback failures (pre-batch and post-batch) now propagate as DatasetGenerationError instead of silently dropping row groups.
  2. Graceful allow_resize fallbackallow_resize=True triggers a sync fallback with DeprecationWarning instead of raising an error.
  3. Cleanup reliabilitytry/finally refactor in AsyncTaskScheduler.run() ensures workers are always drained, eliminating duplicated cleanup logic.

The PR also adds row-count guards, partial-completion warnings, task_traces on PreviewResults, and comprehensive tests for the new behaviors.

Files changed: 8 (245 additions, 103 deletions)

Findings

High Severity

None found.

Medium Severity

1. DatasetProcessingError from strict_row_count gets wrapped into DatasetGenerationError — intentional but may lose specificity

In processor_runner.py:87, the strict row-count check raises DatasetProcessingError. Since DatasetProcessingError and DatasetGenerationError are sibling classes (both extend DataDesignerError), the scheduler's except Exception as exc handler in _run_seeds_complete_check will wrap it into a DatasetGenerationError. This means callers cannot distinguish a row-count violation from other generation failures by exception type.

The original DatasetProcessingError is preserved via __cause__, so it can be introspected, but the top-level type changes. If this is intentional (normalizing errors at the scheduler boundary), a brief comment explaining this wrapping policy would help future maintainers.

  • async_scheduler.py:541-543 — wrapping site
  • processor_runner.py:87-90 — originating raise

Low Severity

2. task_traces typed as list[Any] in config package

In preview_results.py:26, task_traces uses list[Any] rather than a more specific type. This is understandable since importing TaskTrace from the engine package would violate the dependency direction (config cannot depend on engine). Consider adding a brief comment noting this is intentional (e.g., # Typed as Any to avoid config -> engine import), or define a protocol/TypeAlias in the config package if the trace shape stabilizes.

  • packages/data-designer-config/src/data_designer/config/preview_results.py:26

3. Semaphore release skipped for remaining row groups on fail-fast

In _checkpoint_completed_row_groups (line 498-522), when DatasetGenerationError propagates out of the for rg_id, rg_size in completed: loop, the semaphore is released for the failing row group (via finally), but remaining row groups in the completed list won't have their semaphores released. Since the scheduler is shutting down and no one reuses it, this has no practical impact. However, if the scheduler were ever made reusable, this could become a semaphore leak. No action needed now — just documenting for awareness.

  • async_scheduler.py:498-522

4. stacklevel=4 in DeprecationWarning is fragile

The stacklevel=4 in _resolve_async_compatibility (line 279) correctly points at user code through the chain: _resolve_async_compatibilitybuild()/build_preview()DataDesigner.build()/preview() → user code. However, if an intermediate layer is added (e.g., a decorator or wrapper method), the warning will point at the wrong frame. Consider adding a comment documenting the expected call chain so future editors know to update the stacklevel.

  • dataset_builder.py:279

Positive Observations

5. Clean try/finally refactoring in run()

The consolidation of admission cancellation and worker drain into a single finally block (lines 272-280) is well done. It eliminates the duplicated cleanup between the happy path and CancelledError path, and using asyncio.shield to protect the worker cancellation is correct practice.

6. Double-wrap prevention for DatasetGenerationError

The second commit adds except DatasetGenerationError: raise before the generic except Exception handler in both _checkpoint_completed_row_groups (line 503-504) and _run_seeds_complete_check (line 539-540). This ensures that when the on_before_checkpoint callback raises DatasetGenerationError directly (e.g., the row-count guard in dataset_builder.py:403-407), it isn't re-wrapped. Good defensive fix.

7. Instance flag _use_async instead of re-reading module global

Replacing the module-level DATA_DESIGNER_ASYNC_ENGINE check in _run_cell_by_cell_generator with the instance flag _use_async (line 537) ensures the sync fallback decision from _resolve_async_compatibility propagates correctly to all downstream code paths. The tests were also cleanly updated to test the instance flag directly rather than patching the module global.

8. Solid test coverage

The new tests (test_scheduler_post_batch_failure_raises, test_early_shutdown_drains_workers, test_scheduler_pre_batch_failure_propagates_across_row_groups) cover the critical behavioral changes well. The existing tests were correctly updated from the old "drop and continue" assertions to "fail-fast" assertions.

Verdict

APPROVE — This is a well-structured hardening PR with clear fail-fast semantics, proper cleanup guarantees, and good test coverage. The medium finding (error type wrapping) is a design-level note worth discussing but not blocking. The low-severity items are minor housekeeping suggestions.

Copy link
Copy Markdown
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking polish items on top of an otherwise clean PR. One release-notes flag worth calling out: the switch from silently drop the row group to fail the whole run on pre-/post-batch callback failure is a user-visible behavior change. Anyone with a flaky processor will now see their run error out instead of producing a smaller dataset — worth a CHANGELOG entry before this becomes the default engine.

Line-specific suggestions below.

# -- Partial completion warning ------------------------------------------------


def test_write_metadata_records_actual_and_target_counts() -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring and inline comment acknowledge this covers the write_metadata building block, not the new partial-completion warning at dataset_builder.py:337-341. A caplog-based test over _build_async (or a thin extraction of the warning block) would directly pin the logger.warning("⚠️ Generated {actual} of {num_records} ...") path — the feature is listed in the PR description but currently has no direct assertion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_dropped_rows_reduce_actual_record_count — drops all rows in one row group via the scheduler and asserts actual_num_records < num_records. Tests the condition that triggers the warning through the public API rather than mocking into _build_async.

- Extract _is_async_trace_enabled() helper to deduplicate trace check
- Post-batch row-count guard now raises DatasetProcessingError (not
  DatasetGenerationError) so the scheduler wraps it with rg_id
  symmetrically with the pre-batch path
- Add test_dropped_rows_reduce_actual_record_count for partial
  completion path
@andreatgretel
Copy link
Copy Markdown
Contributor Author

Good call — I'll add a "Breaking behavior" note to the PR description calling out the fail-fast change. No changelog file in the repo yet, but happy to start one if you think it's worth it.

@nabinchha
Copy link
Copy Markdown
Contributor

Nice work on this one, @andreatgretel — the fail-fast pivot is the right call before this becomes the default path.

Summary

Hardens the async engine so processor callback failures surface immediately instead of silently shrinking the dataset. The try/finally consolidation in run(), the allow_resize sync-fallback, and the row-count guards all land cleanly. Implementation matches the PR description.

Findings

Warnings — Worth addressing

dataset_builder.py:270-286DeprecationWarning can become a wrapped DataDesignerGenerationError

  • What: _resolve_async_compatibility emits a DeprecationWarning when allow_resize=True is detected. This method is called inside build() and build_preview(), both of which catch except Exception as e and re-wrap it as DataDesignerGenerationError. Users who run with warnings.filterwarnings("error") (standard in many test suites) will trigger this: the warning becomes an exception, gets caught by the outer handler, and surfaces as a cryptic generation error with no mention of allow_resize.
  • Why: The whole point of the deprecation message is to guide the user toward migration — wrapping it hides that guidance.
  • Suggestion: Move the _resolve_async_compatibility call (and its warning) before the try block in both build() and build_preview(). The sync fallback still works, but the warning can no longer be swallowed. Alternatively, catch DeprecationWarning explicitly in the outer handler and let it propagate unwrapped.

async_scheduler.py:282-290 — Incomplete-RG log fires misleadingly after processor failures

  • What: After the try/finally block in run(), the if self._rg_states: check logs an error about unfinished row groups. When a DatasetGenerationError from a processor callback propagates out of _main_dispatch_loop, the failing row group was never removed from _rg_states (by design — del happens after the callback succeeds). So the log fires, then the DatasetGenerationError propagates. The user sees both an "unfinished row group(s)" error log and the actual exception.
  • Why: The log implies something went wrong in addition to the reported failure — it's noise that makes triage harder.
  • Suggestion: Guard the log so it only fires on clean exits. A simple flag works:
dispatch_error: BaseException | None = None
try:
    await self._main_dispatch_loop(...)
except BaseException as exc:
    dispatch_error = exc
    raise
finally:
    ...  # cleanup as-is

if self._reporter:
    self._reporter.log_final()

if self._rg_states and dispatch_error is None:
    ...  # log incomplete row groups

dataset_builder.py:407-415 — Post-batch row-count guard lives inline instead of in ProcessorRunner

  • What: The pre-batch row-count guard is encapsulated inside ProcessorRunner.run_pre_batch_on_df via a strict_row_count parameter — any caller gets the protection automatically. The equivalent post-batch guard is ad-hoc in the on_before_checkpoint closure inside _prepare_async_run. Both check the same invariant (async engine doesn't support row-count changes) but at different levels of abstraction.
  • Why: If another caller of run_post_batch needs the same protection in the future, they'd have to re-implement the check. The pre-batch side won't have that problem.
  • Suggestion: Add a strict_row_count parameter to run_post_batch (mirroring run_pre_batch_on_df) and move the check there. The closure then becomes a one-liner:
    def on_before_checkpoint(rg_id: int, rg_size: int) -> None:
        df = buffer_manager.get_dataframe(rg_id)
        df = self._processor_runner.run_post_batch(df, current_batch_number=rg_id, strict_row_count=True)
        buffer_manager.replace_dataframe(rg_id, df)

New tests reach into private attributes

  • What: test_early_shutdown_drains_workers asserts on scheduler._worker_tasks, and test_write_metadata_records_actual_and_target_counts sets buffer_manager._actual_num_records directly to stage its precondition. Both access _-prefixed internals, which goes against the project's testing guidelines ("Test public APIs only").
  • Why: These tests verify important invariants, but coupling to private attributes makes them fragile if internals are refactored.
  • Suggestion: For _worker_tasks — consider exposing a read-only active_worker_count property on AsyncTaskScheduler so the test can assert scheduler.active_worker_count == 0. For _actual_num_records — the test could run a real scheduler that checkpoints some rows and drops others (similar to test_dropped_rows_reduce_actual_record_count) instead of poking the internal counter.

Suggestions — Take it or leave it

test_async_engine_switch.py:29 — Unused monkeypatch fixture

  • What: test_async_engine_env_controls_builder_execution_path still declares monkeypatch: pytest.MonkeyPatch but never uses it. The test now sets builder._use_async directly, which is cleaner.
  • Why: Leftover parameter from the old patch.object approach.
  • Suggestion: Drop monkeypatch from the signature.

What Looks Good

  • try/finally consolidation in run() — Collapsing the normal-path cleanup and the CancelledError handler into a single finally block removes a real duplication hazard. The asyncio.shield placement is correct.

  • Fail-fast callback semantics — Switching from silent row-group drops to DatasetGenerationError propagation is the single most impactful change here. Users with a flaky processor will now know immediately instead of discovering a short dataset after the fact. The except DatasetGenerationError: raise / except Exception: wrap pattern prevents double-wrapping cleanly.

  • _resolve_async_compatibility design — Graceful sync fallback with a DeprecationWarning is the right UX for the transition period. The test matrix covers both the fallback and the clean-async paths, including the zero-warnings assertion for the happy case.

Verdict

Needs changes — The two warnings (DeprecationWarning getting swallowed, misleading incomplete-RG log on failure exits) are worth fixing before merge. Everything else is optional.


This review was generated by an AI assistant.

johnnygreco
johnnygreco previously approved these changes Apr 22, 2026
- DeprecationWarning no longer swallowed by interface error wrapper
- Incomplete-RG log only fires on clean scheduler exits
- Post-batch row-count guard moved into ProcessorRunner (strict_row_count)
- Expose active_worker_count property on AsyncTaskScheduler
- Drop unused monkeypatch fixture and pytest import
Remove test_write_metadata_records_actual_and_target_counts (poked
_actual_num_records directly) and assert metadata counts in
test_dropped_rows_reduce_actual_record_count instead, which exercises
the same path through the public API.
@andreatgretel
Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review! Addressed everything in two commits (7846f84, 0e3c2b9):

Warnings (all addressed):

  1. DeprecationWarning swallowed by error wrapper - Added except DeprecationWarning: raise before the except Exception in both create() and preview() at the interface layer. The warning now propagates cleanly when warnings.filterwarnings("error") is active.

  2. Incomplete-RG log on failure exits - Added a dispatch_error flag so the log only fires on clean exits, exactly as you suggested.

  3. Post-batch row-count guard inline vs ProcessorRunner - Added strict_row_count param to run_post_batch mirroring run_pre_batch_on_df. The closure is now a one-liner.

  4. Tests reaching into private attributes - Exposed active_worker_count property on AsyncTaskScheduler and updated the test. For _actual_num_records, folded the metadata assertion into test_dropped_rows_reduce_actual_record_count which exercises the same path through the public API, and deleted the standalone test.

Suggestions:

  1. Unused monkeypatch - Removed, along with the now-unused pytest import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

chore: async engine readiness - blockers and polish before default

3 participants