chore: async engine readiness - blockers and polish before default#553
chore: async engine readiness - blockers and polish before default#553andreatgretel wants to merge 5 commits intomainfrom
Conversation
- Processor callback failures (pre-batch and post-batch) now raise DatasetGenerationError instead of silently dropping row groups - Early shutdown and all error paths drain in-flight workers via a finally block in AsyncTaskScheduler.run() - Pre-batch and post-batch processors that change row count in async mode raise immediately (strict_row_count guard) - Partial completion logs a warning when actual < target records - allow_resize=True auto-falls back to sync engine with a deprecation warning instead of raising, using a per-run _use_async flag - Preview path mirrors the trace check from the full build path; PreviewResults exposes task_traces Closes #462
- Prevent double-wrapping of DatasetGenerationError in scheduler callbacks - Fix stacklevel in allow_resize DeprecationWarning to point at user code - Update stale comment to reflect fail-fast behavior - Rename misleading test and remove unused caplog fixture - Add zero-warnings assertion for happy-path case - Move warnings import to module level
Greptile SummaryThis PR hardens the async engine for production readiness by changing pre/post-batch processor failures from silent row-group drops to fail-fast
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Key behavioral changes: pre/post-batch failures now propagate as DatasetGenerationError instead of silently dropping rows; try/finally refactor ensures workers are always drained; active_worker_count property added. One P2: dispatch_error is None check in the post-finally block is dead code since that branch is only reachable on success. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | _validate_async_compatibility replaced by _resolve_async_compatibility; allow_resize now triggers sync fallback + DeprecationWarning instead of raising; _use_async instance flag replaces module-level DATA_DESIGNER_ASYNC_ENGINE checks; partial-completion warning added; trace helper extracted. Clean. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py | strict_row_count parameter added to run_pre_batch_on_df and run_post_batch; raises DatasetProcessingError on row-count change when enabled. Straightforward and correct. |
| packages/data-designer/src/data_designer/interface/data_designer.py | except DeprecationWarning: raise guards added before the generic except Exception handlers so DeprecationWarning isn't swallowed when warnings-as-errors is active; task_traces wired into PreviewResults. |
| packages/data-designer-config/src/data_designer/config/preview_results.py | task_traces: list[Any] |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Tests updated to reflect fail-fast semantics; new tests for post-batch failure propagation and early-shutdown worker drain. Good coverage of changed behavior. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py | test_validate_async_compatibility updated to test_resolve_async_compatibility; now asserts DeprecationWarning + sync fallback instead of raised error; new partial-completion test added. |
| packages/data-designer-engine/tests/engine/models/test_async_engine_switch.py | Patching module-level DATA_DESIGNER_ASYNC_ENGINE replaced by setting builder._use_async directly, matching the new instance-flag approach. Simpler and more robust. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[build / build_preview] --> B[_resolve_async_compatibility]
B -->|allow_resize detected| C[DeprecationWarning\n+ sync fallback\n_use_async = False]
B -->|no allow_resize| D[_use_async = True]
C --> E[Sync path\n_fan_out_with_threads]
D --> F[Async path\n_fan_out_with_async]
F --> G[AsyncTaskScheduler.run]
G --> H[_main_dispatch_loop]
H -->|exception| I[except BaseException\ndispatch_error = exc\nraise]
H -->|success| J[finally: cancel admission\n+ asyncio.shield cancel_workers]
I --> J
J -->|exception path| K[exception propagates to caller]
J -->|success path| L[reporter.log_final\ncheck _rg_states]
H --> M{_run_seeds_complete_check}
M -->|on_seeds_complete raises| N[wrap as DatasetGenerationError\nfail-fast]
M -->|ok| O{_checkpoint_completed_row_groups}
O -->|on_before_checkpoint raises| P[wrap as DatasetGenerationError\nfail-fast]
O -->|ok| Q[del _rg_states rg_id\non_finalize_row_group]
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 293-298
Comment:
**`dispatch_error is None` guard is unreachable dead code**
The condition `dispatch_error is None` is always `True` when line 293 is evaluated. If `dispatch_error` is set (i.e., `_main_dispatch_loop` threw), the `raise` on line 279 propagates the exception through the `finally` block and past lines 290–298 entirely — those lines are only reachable on a clean exit. The variable and its check can be simplified:
```suggestion
if self._rg_states:
incomplete = list(self._rg_states)
logger.error(
f"Scheduler exited with {len(self._rg_states)} unfinished row group(s): {incomplete}. "
"These row groups were not checkpointed."
)
```
And the `dispatch_error: BaseException | None = None` declaration and its assignment in the `except` branch can be removed entirely.
How can I resolve this? If you propose a fix, please make it concise.Reviews (3): Last reviewed commit: "test: fold metadata-count test into drop..." | Re-trigger Greptile
Code Review: PR #553 — chore: async engine readiness - blockers and polish before defaultSummaryThis PR hardens the async engine for production readiness before it becomes the default execution path. The changes fall into three categories:
The PR also adds row-count guards, partial-completion warnings, Files changed: 8 (245 additions, 103 deletions) FindingsHigh SeverityNone found. Medium Severity1. In The original
Low Severity2. In
3. Semaphore release skipped for remaining row groups on fail-fast In
4. The
Positive Observations5. Clean The consolidation of admission cancellation and worker drain into a single 6. Double-wrap prevention for The second commit adds 7. Instance flag Replacing the module-level 8. Solid test coverage The new tests ( VerdictAPPROVE — This is a well-structured hardening PR with clear fail-fast semantics, proper cleanup guarantees, and good test coverage. The medium finding (error type wrapping) is a design-level note worth discussing but not blocking. The low-severity items are minor housekeeping suggestions. |
johnnygreco
left a comment
There was a problem hiding this comment.
Non-blocking polish items on top of an otherwise clean PR. One release-notes flag worth calling out: the switch from silently drop the row group to fail the whole run on pre-/post-batch callback failure is a user-visible behavior change. Anyone with a flaky processor will now see their run error out instead of producing a smaller dataset — worth a CHANGELOG entry before this becomes the default engine.
Line-specific suggestions below.
| # -- Partial completion warning ------------------------------------------------ | ||
|
|
||
|
|
||
| def test_write_metadata_records_actual_and_target_counts() -> None: |
There was a problem hiding this comment.
The docstring and inline comment acknowledge this covers the write_metadata building block, not the new partial-completion warning at dataset_builder.py:337-341. A caplog-based test over _build_async (or a thin extraction of the warning block) would directly pin the logger.warning("⚠️ Generated {actual} of {num_records} ...") path — the feature is listed in the PR description but currently has no direct assertion.
There was a problem hiding this comment.
Added test_dropped_rows_reduce_actual_record_count — drops all rows in one row group via the scheduler and asserts actual_num_records < num_records. Tests the condition that triggers the warning through the public API rather than mocking into _build_async.
- Extract _is_async_trace_enabled() helper to deduplicate trace check - Post-batch row-count guard now raises DatasetProcessingError (not DatasetGenerationError) so the scheduler wraps it with rg_id symmetrically with the pre-batch path - Add test_dropped_rows_reduce_actual_record_count for partial completion path
|
Good call — I'll add a "Breaking behavior" note to the PR description calling out the fail-fast change. No changelog file in the repo yet, but happy to start one if you think it's worth it. |
|
Nice work on this one, @andreatgretel — the fail-fast pivot is the right call before this becomes the default path. SummaryHardens the async engine so processor callback failures surface immediately instead of silently shrinking the dataset. The FindingsWarnings — Worth addressing
dispatch_error: BaseException | None = None
try:
await self._main_dispatch_loop(...)
except BaseException as exc:
dispatch_error = exc
raise
finally:
... # cleanup as-is
if self._reporter:
self._reporter.log_final()
if self._rg_states and dispatch_error is None:
... # log incomplete row groups
New tests reach into private attributes
Suggestions — Take it or leave it
What Looks Good
VerdictNeeds changes — The two warnings (DeprecationWarning getting swallowed, misleading incomplete-RG log on failure exits) are worth fixing before merge. Everything else is optional. This review was generated by an AI assistant. |
- DeprecationWarning no longer swallowed by interface error wrapper - Incomplete-RG log only fires on clean scheduler exits - Post-batch row-count guard moved into ProcessorRunner (strict_row_count) - Expose active_worker_count property on AsyncTaskScheduler - Drop unused monkeypatch fixture and pytest import
Remove test_write_metadata_records_actual_and_target_counts (poked _actual_num_records directly) and assert metadata counts in test_dropped_rows_reduce_actual_record_count instead, which exercises the same path through the public API.
|
Thanks for the thorough review! Addressed everything in two commits ( Warnings (all addressed):
Suggestions:
|
📋 Summary
Hardens the async engine for production readiness before it becomes the default execution path. Processor callback failures now propagate as
DatasetGenerationError(fail-fast) instead of silently dropping row groups,allow_resize=Truetriggers a graceful sync fallback withDeprecationWarning, andtry/finallycleanup inrun()ensures workers are always drained.🔗 Related Issue
Closes #462
🔄 Changes
🔧 Changed
DatasetGenerationErrorinstead of silently dropping row groupsallow_resize=Truetriggers graceful sync fallback withDeprecationWarninginstead of raisingtry/finallyrefactor inAsyncTaskScheduler.run()eliminates duplicated cleanup logic and ensures workers are always cancelled viaasyncio.shield()_run_cell_by_cell_generatornow branches on instance flag_use_asyncinstead of module-levelDATA_DESIGNER_ASYNC_ENGINE, so the sync fallback decision propagates correctly✨ Added
strict_row_count=True) and post-batch callbacks reject unsupported row-count changestask_tracesfield onPreviewResultsfor async scheduler trace data🐛 Fixed
DatasetGenerationErrorfrom callbacks no longer gets double-wrapped by the scheduler's genericexcept Exceptionhandlerstacklevelinallow_resizeDeprecationWarningnow points at user code instead of library internals🔍 Attention Areas
async_scheduler.py— fail-fast semantics change: pre/post-batch failures now propagate instead of being swalloweddataset_builder.py—_resolve_async_compatibilityreplaces_validate_async_compatibility, row-count guards in callbacks🧪 Testing
make testpassesAll 48 async-related tests pass. 18 smoke tests pass. Ruff check and format clean on all changed files.
✅ Checklist