chore: async engine readiness - blockers and polish before default#553
Conversation
- Processor callback failures (pre-batch and post-batch) now raise DatasetGenerationError instead of silently dropping row groups - Early shutdown and all error paths drain in-flight workers via a finally block in AsyncTaskScheduler.run() - Pre-batch and post-batch processors that change row count in async mode raise immediately (strict_row_count guard) - Partial completion logs a warning when actual < target records - allow_resize=True auto-falls back to sync engine with a deprecation warning instead of raising, using a per-run _use_async flag - Preview path mirrors the trace check from the full build path; PreviewResults exposes task_traces Closes #462
- Prevent double-wrapping of DatasetGenerationError in scheduler callbacks - Fix stacklevel in allow_resize DeprecationWarning to point at user code - Update stale comment to reflect fail-fast behavior - Rename misleading test and remove unused caplog fixture - Add zero-warnings assertion for happy-path case - Move warnings import to module level
Greptile SummaryThis PR hardens the async engine for production by making processor callback failures propagate as
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | try/finally refactor in run() correctly consolidates cleanup across all exit paths; fail-fast propagation of DatasetGenerationError from pre/post-batch callbacks is well-structured with correct semaphore release in finally; _rg_states left intact on failure is intentional and safe given dispatch_error guards the stale-state log. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | _resolve_async_compatibility correctly replaces hard raise with DeprecationWarning + sync fallback; _use_async instance flag properly gates both build() and _run_cell_by_cell_generator(); partial-completion warning added after async build. |
| packages/data-designer/src/data_designer/interface/data_designer.py | except DeprecationWarning: raise guard added to prevent wrapping in DataDesignerGenerationError when warnings-as-errors is active — intentional and correct pattern. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py | strict_row_count guard added to run_pre_batch_on_df and run_post_batch; correctly raises DatasetProcessingError on row-count changes; backward-compatible via default=False. |
| packages/data-designer-config/src/data_designer/config/preview_results.py | task_traces: list[Any] |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Tests updated to reflect fail-fast semantics; new tests for post-batch failure propagation and early-shutdown worker drain via active_worker_count property. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py | test_resolve_async_compatibility correctly captures DeprecationWarning via catch_warnings(record=True); new partial-completion test validates actual_num_records accounting. |
| packages/data-designer-engine/tests/engine/models/test_async_engine_switch.py | Switched from module-level patching to instance-flag (_use_async) mutation; correctly tests both async and sync dispatch paths without monkeypatch coupling. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[DataDesigner.generate_dataset / preview] --> B{DATA_DESIGNER_ASYNC_ENGINE?}
B -- Yes --> C[_resolve_async_compatibility]
C -- allow_resize=True --> D[DeprecationWarning\n+ sync fallback\n_use_async = False]
C -- no allow_resize --> E[_use_async = True]
B -- No --> F[_use_async = False]
D --> F
E --> G[AsyncTaskScheduler.run]
F --> H[Sync batch loop]
G --> I[try: _main_dispatch_loop]
I --> J{Exception?}
J -- DatasetGenerationError\nor CancelledError --> K[except BaseException:\ndispatch_error = exc\nraise]
J -- Normal exit --> L[finally block]
K --> L
L --> M[Cancel admission_task]
M --> N[asyncio.shield _cancel_workers]
N --> O{dispatch_error?}
O -- Yes --> P[Exception propagates]
O -- No --> Q[reporter.log_final\nlog incomplete rg_states]
I --> R[_run_seeds_complete_check\npre-batch callback]
R -- Exception --> S{DatasetGenerationError?}
S -- Yes --> T[re-raise as-is]
S -- No --> U[wrap as DatasetGenerationError\nfail-fast]
I --> V[_checkpoint_completed_row_groups\npost-batch callback]
V -- Exception --> W{DatasetGenerationError?}
W -- Yes --> X[re-raise + rg_semaphore.release]
W -- No --> Y[wrap as DatasetGenerationError\nfail-fast]
style D fill:#fff3cd
style K fill:#f8d7da
style P fill:#f8d7da
style U fill:#f8d7da
style Y fill:#f8d7da
Reviews (4): Last reviewed commit: "Merge branch 'main' into andreatgretel/c..." | Re-trigger Greptile
Code Review: PR #553 — chore: async engine readiness - blockers and polish before defaultSummaryThis PR hardens the async engine for production readiness before it becomes the default execution path. The changes fall into three categories:
The PR also adds row-count guards, partial-completion warnings, Files changed: 8 (245 additions, 103 deletions) FindingsHigh SeverityNone found. Medium Severity1. In The original
Low Severity2. In
3. Semaphore release skipped for remaining row groups on fail-fast In
4. The
Positive Observations5. Clean The consolidation of admission cancellation and worker drain into a single 6. Double-wrap prevention for The second commit adds 7. Instance flag Replacing the module-level 8. Solid test coverage The new tests ( VerdictAPPROVE — This is a well-structured hardening PR with clear fail-fast semantics, proper cleanup guarantees, and good test coverage. The medium finding (error type wrapping) is a design-level note worth discussing but not blocking. The low-severity items are minor housekeeping suggestions. |
johnnygreco
left a comment
There was a problem hiding this comment.
Non-blocking polish items on top of an otherwise clean PR. One release-notes flag worth calling out: the switch from silently drop the row group to fail the whole run on pre-/post-batch callback failure is a user-visible behavior change. Anyone with a flaky processor will now see their run error out instead of producing a smaller dataset — worth a CHANGELOG entry before this becomes the default engine.
Line-specific suggestions below.
| # -- Partial completion warning ------------------------------------------------ | ||
|
|
||
|
|
||
| def test_write_metadata_records_actual_and_target_counts() -> None: |
There was a problem hiding this comment.
The docstring and inline comment acknowledge this covers the write_metadata building block, not the new partial-completion warning at dataset_builder.py:337-341. A caplog-based test over _build_async (or a thin extraction of the warning block) would directly pin the logger.warning("⚠️ Generated {actual} of {num_records} ...") path — the feature is listed in the PR description but currently has no direct assertion.
There was a problem hiding this comment.
Added test_dropped_rows_reduce_actual_record_count — drops all rows in one row group via the scheduler and asserts actual_num_records < num_records. Tests the condition that triggers the warning through the public API rather than mocking into _build_async.
- Extract _is_async_trace_enabled() helper to deduplicate trace check - Post-batch row-count guard now raises DatasetProcessingError (not DatasetGenerationError) so the scheduler wraps it with rg_id symmetrically with the pre-batch path - Add test_dropped_rows_reduce_actual_record_count for partial completion path
|
Good call — I'll add a "Breaking behavior" note to the PR description calling out the fail-fast change. No changelog file in the repo yet, but happy to start one if you think it's worth it. |
|
Nice work on this one, @andreatgretel — the fail-fast pivot is the right call before this becomes the default path. SummaryHardens the async engine so processor callback failures surface immediately instead of silently shrinking the dataset. The FindingsWarnings — Worth addressing
dispatch_error: BaseException | None = None
try:
await self._main_dispatch_loop(...)
except BaseException as exc:
dispatch_error = exc
raise
finally:
... # cleanup as-is
if self._reporter:
self._reporter.log_final()
if self._rg_states and dispatch_error is None:
... # log incomplete row groups
New tests reach into private attributes
Suggestions — Take it or leave it
What Looks Good
VerdictNeeds changes — The two warnings (DeprecationWarning getting swallowed, misleading incomplete-RG log on failure exits) are worth fixing before merge. Everything else is optional. This review was generated by an AI assistant. |
- DeprecationWarning no longer swallowed by interface error wrapper - Incomplete-RG log only fires on clean scheduler exits - Post-batch row-count guard moved into ProcessorRunner (strict_row_count) - Expose active_worker_count property on AsyncTaskScheduler - Drop unused monkeypatch fixture and pytest import
Remove test_write_metadata_records_actual_and_target_counts (poked _actual_num_records directly) and assert metadata counts in test_dropped_rows_reduce_actual_record_count instead, which exercises the same path through the public API.
|
Thanks for the thorough review! Addressed everything in two commits ( Warnings (all addressed):
Suggestions:
|
📋 Summary
Hardens the async engine for production readiness before it becomes the default execution path. Processor callback failures now propagate as
DatasetGenerationError(fail-fast) instead of silently dropping row groups,allow_resize=Truetriggers a graceful sync fallback withDeprecationWarning, andtry/finallycleanup inrun()ensures workers are always drained.🔗 Related Issue
Closes #462
🔄 Changes
🔧 Changed
DatasetGenerationErrorinstead of silently dropping row groupsallow_resize=Truetriggers graceful sync fallback withDeprecationWarninginstead of raisingtry/finallyrefactor inAsyncTaskScheduler.run()eliminates duplicated cleanup logic and ensures workers are always cancelled viaasyncio.shield()_run_cell_by_cell_generatornow branches on instance flag_use_asyncinstead of module-levelDATA_DESIGNER_ASYNC_ENGINE, so the sync fallback decision propagates correctly✨ Added
strict_row_count=True) and post-batch callbacks reject unsupported row-count changestask_tracesfield onPreviewResultsfor async scheduler trace data🐛 Fixed
DatasetGenerationErrorfrom callbacks no longer gets double-wrapped by the scheduler's genericexcept Exceptionhandlerstacklevelinallow_resizeDeprecationWarningnow points at user code instead of library internals🔍 Attention Areas
async_scheduler.py— fail-fast semantics change: pre/post-batch failures now propagate instead of being swalloweddataset_builder.py—_resolve_async_compatibilityreplaces_validate_async_compatibility, row-count guards in callbacks🧪 Testing
make testpassesAll 48 async-related tests pass. 18 smoke tests pass. Ruff check and format clean on all changed files.
✅ Checklist