Skip to content

Commit bfa7a46

Browse files
chore: async engine readiness - blockers and polish before default (#553)
* chore: async engine readiness blockers (#462) - Processor callback failures (pre-batch and post-batch) now raise DatasetGenerationError instead of silently dropping row groups - Early shutdown and all error paths drain in-flight workers via a finally block in AsyncTaskScheduler.run() - Pre-batch and post-batch processors that change row count in async mode raise immediately (strict_row_count guard) - Partial completion logs a warning when actual < target records - allow_resize=True auto-falls back to sync engine with a deprecation warning instead of raising, using a per-run _use_async flag - Preview path mirrors the trace check from the full build path; PreviewResults exposes task_traces Closes #462 * fix: address review findings for async engine readiness - Prevent double-wrapping of DatasetGenerationError in scheduler callbacks - Fix stacklevel in allow_resize DeprecationWarning to point at user code - Update stale comment to reflect fail-fast behavior - Rename misleading test and remove unused caplog fixture - Add zero-warnings assertion for happy-path case - Move warnings import to module level * fix: address review comments on async engine readiness - Extract _is_async_trace_enabled() helper to deduplicate trace check - Post-batch row-count guard now raises DatasetProcessingError (not DatasetGenerationError) so the scheduler wraps it with rg_id symmetrically with the pre-batch path - Add test_dropped_rows_reduce_actual_record_count for partial completion path * fix: address second-round review feedback on async engine readiness - DeprecationWarning no longer swallowed by interface error wrapper - Incomplete-RG log only fires on clean scheduler exits - Post-batch row-count guard moved into ProcessorRunner (strict_row_count) - Expose active_worker_count property on AsyncTaskScheduler - Drop unused monkeypatch fixture and pytest import * test: fold metadata-count test into dropped-rows test Remove test_write_metadata_records_actual_and_target_counts (poked _actual_num_records directly) and assert metadata counts in test_dropped_rows_reduce_actual_record_count instead, which exercises the same path through the public API.
1 parent 4c6823c commit bfa7a46

8 files changed

Lines changed: 308 additions & 111 deletions

File tree

packages/data-designer-config/src/data_designer/config/preview_results.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from __future__ import annotations
55

6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any
77

88
from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults
99
from data_designer.config.config_builder import DataDesignerConfigBuilder
@@ -23,6 +23,7 @@ def __init__(
2323
dataset: pd.DataFrame | None = None,
2424
analysis: DatasetProfilerResults | None = None,
2525
processor_artifacts: dict[str, list[dict]] | None = None,
26+
task_traces: list[Any] | None = None,
2627
):
2728
"""Creates a new instance with results from a Data Designer preview run.
2829
@@ -32,9 +33,11 @@ def __init__(
3233
dataset: Dataset of the preview run.
3334
analysis: Analysis of the preview run.
3435
processor_artifacts: Artifacts generated by the processors.
36+
task_traces: Async scheduler task traces (when DATA_DESIGNER_ASYNC_TRACE=1).
3537
"""
3638
self.dataset: pd.DataFrame | None = dataset
3739
self.analysis: DatasetProfilerResults | None = analysis
3840
self.processor_artifacts: dict[str, list[dict]] | None = processor_artifacts
3941
self.dataset_metadata: DatasetMetadata | None = dataset_metadata
42+
self.task_traces: list[Any] | None = task_traces
4043
self._config_builder = config_builder

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import data_designer.lazy_heavy_imports as lazy
1616
from data_designer.config.column_configs import GenerationStrategy
1717
from data_designer.engine.context import current_row_group
18+
from data_designer.engine.dataset_builders.errors import DatasetGenerationError
1819
from data_designer.engine.dataset_builders.multi_column_configs import MultiColumnConfig
1920
from data_designer.engine.dataset_builders.utils.async_progress_reporter import (
2021
DEFAULT_REPORT_INTERVAL,
@@ -216,6 +217,10 @@ def _setup_async_progress_reporter(
216217
progress_bar=self._progress_bar,
217218
)
218219

220+
@property
221+
def active_worker_count(self) -> int:
222+
return sum(1 for t in self._worker_tasks if not t.done())
223+
219224
def _spawn_worker(self, coro: Coroutine[Any, Any, None]) -> asyncio.Task:
220225
"""Create a tracked worker task that auto-removes itself on completion."""
221226
task = asyncio.create_task(coro)
@@ -265,33 +270,32 @@ async def run(self) -> None:
265270
# Launch admission as a background task so it interleaves with dispatch.
266271
admission_task = asyncio.create_task(self._admit_row_groups())
267272

273+
dispatch_error: BaseException | None = None
268274
try:
269275
# Main dispatch loop
270276
await self._main_dispatch_loop(seed_cols, has_pre_batch, all_columns)
271-
272-
# Cancel admission if still running
277+
except BaseException as exc:
278+
dispatch_error = exc
279+
raise
280+
finally:
281+
# Always cancel admission + drain in-flight workers, regardless
282+
# of how the dispatch loop exited (normal, early shutdown,
283+
# CancelledError, or processor failure).
273284
if not admission_task.done():
274285
admission_task.cancel()
275286
with contextlib.suppress(asyncio.CancelledError):
276287
await admission_task
288+
await asyncio.shield(self._cancel_workers())
277289

278-
if self._reporter:
279-
self._reporter.log_final()
280-
281-
if self._rg_states:
282-
incomplete = list(self._rg_states)
283-
logger.error(
284-
f"Scheduler exited with {len(self._rg_states)} unfinished row group(s): {incomplete}. "
285-
"These row groups were not checkpointed."
286-
)
290+
if self._reporter:
291+
self._reporter.log_final()
287292

288-
except asyncio.CancelledError:
289-
if not admission_task.done():
290-
admission_task.cancel()
291-
with contextlib.suppress(asyncio.CancelledError):
292-
await admission_task
293-
await asyncio.shield(self._cancel_workers())
294-
raise
293+
if self._rg_states and dispatch_error is None:
294+
incomplete = list(self._rg_states)
295+
logger.error(
296+
f"Scheduler exited with {len(self._rg_states)} unfinished row group(s): {incomplete}. "
297+
"These row groups were not checkpointed."
298+
)
295299

296300
async def _main_dispatch_loop(
297301
self,
@@ -500,29 +504,26 @@ def _checkpoint_completed_row_groups(self, all_columns: list[str]) -> None:
500504
if self._tracker.is_row_group_complete(rg_id, state.size, all_columns)
501505
]
502506
for rg_id, rg_size in completed:
503-
dropped = False
504507
try:
505-
del self._rg_states[rg_id]
506508
if self._on_before_checkpoint:
507509
try:
508510
self._on_before_checkpoint(rg_id, rg_size)
509-
except Exception:
510-
# Post-batch is mandatory; drop rather than checkpoint unprocessed data.
511-
logger.error(
512-
f"on_before_checkpoint failed for row group {rg_id}, dropping row group.",
513-
exc_info=True,
514-
)
515-
self._drop_row_group(rg_id, rg_size)
516-
if self._buffer_manager:
517-
self._buffer_manager.free_row_group(rg_id)
518-
dropped = True
511+
except DatasetGenerationError:
512+
raise
513+
except Exception as exc:
514+
raise DatasetGenerationError(
515+
f"Post-batch processor failed for row group {rg_id}: {exc}"
516+
) from exc
517+
# Remove from tracking only after the callback succeeds.
518+
del self._rg_states[rg_id]
519519
# If all rows were dropped (e.g. seed failure), free instead of finalizing
520-
if not dropped and all(self._tracker.is_dropped(rg_id, ri) for ri in range(rg_size)):
520+
if all(self._tracker.is_dropped(rg_id, ri) for ri in range(rg_size)):
521521
if self._buffer_manager:
522522
self._buffer_manager.free_row_group(rg_id)
523-
dropped = True
524-
if not dropped and self._on_finalize_row_group is not None:
523+
elif self._on_finalize_row_group is not None:
525524
self._on_finalize_row_group(rg_id)
525+
except DatasetGenerationError:
526+
raise
526527
except Exception:
527528
logger.error(f"Failed to checkpoint row group {rg_id}.", exc_info=True)
528529
finally:
@@ -543,19 +544,19 @@ def _run_seeds_complete_check(self, seed_cols: frozenset[str]) -> None:
543544
if self._on_seeds_complete:
544545
try:
545546
self._on_seeds_complete(rg_id, state.size)
546-
# The callback may drop rows (e.g. pre-batch filtering).
547-
# Record skipped tasks for any newly-dropped rows so
548-
# progress reporting stays accurate.
549-
if self._reporter:
550-
for ri in range(state.size):
551-
if self._tracker.is_dropped(rg_id, ri):
552-
self._record_skipped_tasks_for_row(rg_id, ri)
553-
except Exception:
554-
logger.warning(
555-
f"Pre-batch processor failed for row group {rg_id}, skipping.",
556-
exc_info=True,
557-
)
558-
self._drop_row_group(rg_id, state.size)
547+
except DatasetGenerationError:
548+
raise
549+
except Exception as exc:
550+
raise DatasetGenerationError(
551+
f"Pre-batch processor failed for row group {rg_id}: {exc}"
552+
) from exc
553+
# The callback may drop rows (e.g. pre-batch filtering).
554+
# Record skipped tasks for any newly-dropped rows so
555+
# progress reporting stays accurate.
556+
if self._reporter:
557+
for ri in range(state.size):
558+
if self._tracker.is_dropped(rg_id, ri):
559+
self._record_skipped_tasks_for_row(rg_id, ri)
559560

560561
def _drop_row(self, row_group: int, row_index: int, *, exclude_columns: set[str] | None = None) -> None:
561562
if self._tracker.is_dropped(row_group, row_index):

packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import os
1010
import time
1111
import uuid
12+
import warnings
1213
from pathlib import Path
1314
from typing import TYPE_CHECKING, Any, Callable
1415

@@ -58,6 +59,7 @@
5859
if TYPE_CHECKING:
5960
import pandas as pd
6061

62+
from data_designer.config.run_config import RunConfig
6163
from data_designer.engine.column_generators.generators.base import ColumnGeneratorWithModelRegistry
6264
from data_designer.engine.dataset_builders.utils.task_model import TaskTrace
6365
from data_designer.engine.models.usage import ModelUsageStats
@@ -91,6 +93,10 @@
9193
_CLIENT_VERSION: str = get_library_version()
9294

9395

96+
def _is_async_trace_enabled(settings: RunConfig) -> bool:
97+
return settings.async_trace or os.environ.get("DATA_DESIGNER_ASYNC_TRACE", "0") == "1"
98+
99+
94100
class DatasetBuilder:
95101
def __init__(
96102
self,
@@ -106,6 +112,7 @@ def __init__(
106112
self._task_traces: list[TaskTrace] = []
107113
self._registry = registry or DataDesignerRegistry()
108114
self._graph: ExecutionGraph | None = None
115+
self._use_async: bool = DATA_DESIGNER_ASYNC_ENGINE
109116

110117
self._data_designer_config = compile_data_designer_config(data_designer_config, resource_provider)
111118
self._column_configs = compile_dataset_builder_column_configs(self._data_designer_config)
@@ -185,8 +192,8 @@ def build(
185192
start_time = time.perf_counter()
186193
buffer_size = self._resource_provider.run_config.buffer_size
187194

188-
if DATA_DESIGNER_ASYNC_ENGINE:
189-
self._validate_async_compatibility()
195+
self._use_async = DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility()
196+
if self._use_async:
190197
self._build_async(generators, num_records, buffer_size, on_batch_complete)
191198
else:
192199
group_id = uuid.uuid4().hex
@@ -218,8 +225,8 @@ def build_preview(self, *, num_records: int) -> pd.DataFrame:
218225
generators, self._graph = self._initialize_generators_and_graph()
219226
start_time = time.perf_counter()
220227

221-
if DATA_DESIGNER_ASYNC_ENGINE:
222-
self._validate_async_compatibility()
228+
self._use_async = DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility()
229+
if self._use_async:
223230
dataset = self._build_async_preview(generators, num_records)
224231
else:
225232
group_id = uuid.uuid4().hex
@@ -236,11 +243,15 @@ def _build_async_preview(self, generators: list[ColumnGenerator], num_records: i
236243
"""Async preview path - single row group, no disk writes, returns in-memory DataFrame."""
237244
logger.info("⚡ DATA_DESIGNER_ASYNC_ENGINE is enabled - using async task-queue preview")
238245

246+
settings = self._resource_provider.run_config
247+
trace_enabled = _is_async_trace_enabled(settings)
248+
239249
scheduler, buffer_manager = self._prepare_async_run(
240250
generators,
241251
num_records,
242252
buffer_size=num_records,
243253
run_post_batch_in_scheduler=False,
254+
trace=trace_enabled,
244255
)
245256

246257
loop = ensure_async_engine_loop()
@@ -256,15 +267,23 @@ def _build_async_preview(self, generators: list[ColumnGenerator], num_records: i
256267
buffer_manager.free_row_group(0)
257268
return dataset
258269

259-
def _validate_async_compatibility(self) -> None:
260-
"""Raise if any column uses allow_resize=True with the async scheduler."""
270+
def _resolve_async_compatibility(self) -> bool:
271+
"""Check if the async engine can be used; auto-fallback to sync if not.
272+
273+
Returns True if async is usable, False if allow_resize forces sync fallback.
274+
"""
261275
offending = [config.name for config in self.single_column_configs if getattr(config, "allow_resize", False)]
262276
if offending:
263-
raise DatasetGenerationError(
264-
f"allow_resize=True is not supported with DATA_DESIGNER_ASYNC_ENGINE=1. "
265-
f"Offending column(s): {offending}. Either remove allow_resize=True or "
266-
f"disable the async scheduler."
277+
msg = (
278+
f"allow_resize=True detected on column(s) {offending}. "
279+
"Falling back to sync engine for this run. "
280+
"allow_resize is deprecated and will be removed in a future release; "
281+
"use workflow chaining instead (see issue #552)."
267282
)
283+
logger.warning(f"⚠️ {msg}")
284+
warnings.warn(msg, DeprecationWarning, stacklevel=4)
285+
return False
286+
return True
268287

269288
def _build_async(
270289
self,
@@ -277,7 +296,7 @@ def _build_async(
277296
logger.info("⚡ DATA_DESIGNER_ASYNC_ENGINE is enabled - using async task-queue builder")
278297

279298
settings = self._resource_provider.run_config
280-
trace_enabled = settings.async_trace or os.environ.get("DATA_DESIGNER_ASYNC_TRACE", "0") == "1"
299+
trace_enabled = _is_async_trace_enabled(settings)
281300

282301
def finalize_row_group(rg_id: int) -> None:
283302
def on_complete(final_path: Path | str | None) -> None:
@@ -318,6 +337,15 @@ def on_complete(final_path: Path | str | None) -> None:
318337
# Write metadata
319338
buffer_manager.write_metadata(target_num_records=num_records, buffer_size=buffer_size)
320339

340+
# Surface partial completion
341+
actual = buffer_manager.actual_num_records
342+
if actual < num_records:
343+
pct = actual / num_records * 100 if num_records > 0 else 0
344+
logger.warning(
345+
f"⚠️ Generated {actual} of {num_records} requested records ({pct:.0f}%). "
346+
"The dataset may be incomplete due to errors or early shutdown."
347+
)
348+
321349
def _prepare_async_run(
322350
self,
323351
generators: list[ColumnGenerator],
@@ -366,10 +394,10 @@ def _prepare_async_run(
366394
buffer_manager = RowGroupBufferManager(self.artifact_storage)
367395

368396
# Pre-batch processor callback: runs after seed tasks complete for a row group.
369-
# If it raises, the scheduler drops all rows in the row group (skips it).
397+
# If it raises, the scheduler propagates the error as DatasetGenerationError (fail-fast).
370398
def on_seeds_complete(rg_id: int, rg_size: int) -> None:
371399
df = buffer_manager.get_dataframe(rg_id)
372-
df = self._processor_runner.run_pre_batch_on_df(df)
400+
df = self._processor_runner.run_pre_batch_on_df(df, strict_row_count=True)
373401
buffer_manager.replace_dataframe(rg_id, df)
374402
for ri in range(rg_size):
375403
if buffer_manager.is_dropped(rg_id, ri) and not tracker.is_dropped(rg_id, ri):
@@ -378,7 +406,7 @@ def on_seeds_complete(rg_id: int, rg_size: int) -> None:
378406
# Post-batch processor callback: runs after all columns, before finalization.
379407
def on_before_checkpoint(rg_id: int, rg_size: int) -> None:
380408
df = buffer_manager.get_dataframe(rg_id)
381-
df = self._processor_runner.run_post_batch(df, current_batch_number=rg_id)
409+
df = self._processor_runner.run_post_batch(df, current_batch_number=rg_id, strict_row_count=True)
382410
buffer_manager.replace_dataframe(rg_id, df)
383411

384412
# Coarse upper bound: sums all registered aliases, not just those used
@@ -505,7 +533,7 @@ def _run_cell_by_cell_generator(self, generator: ColumnGenerator) -> None:
505533
max_workers = self._resource_provider.run_config.non_inference_max_parallel_workers
506534
if isinstance(generator, ColumnGeneratorWithModel):
507535
max_workers = generator.inference_parameters.max_parallel_requests
508-
if DATA_DESIGNER_ASYNC_ENGINE:
536+
if self._use_async:
509537
logger.info("⚡ Using async engine for concurrent execution")
510538
self._fan_out_with_async(generator, max_workers=max_workers)
511539
else:

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,44 @@ def run_pre_batch(self, batch_manager: DatasetBatchManager) -> None:
7272
df = self._run_stage(df, ProcessorStage.PRE_BATCH)
7373
batch_manager.replace_buffer(df.to_dict(orient="records"), allow_resize=True)
7474

75-
def run_pre_batch_on_df(self, df: pd.DataFrame) -> pd.DataFrame:
76-
"""Run PRE_BATCH processors on a DataFrame and return the result."""
77-
return self._run_stage(df, ProcessorStage.PRE_BATCH)
75+
def run_pre_batch_on_df(self, df: pd.DataFrame, *, strict_row_count: bool = False) -> pd.DataFrame:
76+
"""Run PRE_BATCH processors on a DataFrame and return the result.
77+
78+
Args:
79+
df: Input DataFrame.
80+
strict_row_count: If True, raise ``DatasetProcessingError`` when a
81+
processor changes the row count. Used by the async engine where
82+
row-count changes are not supported.
83+
"""
84+
original_len = len(df)
85+
df = self._run_stage(df, ProcessorStage.PRE_BATCH)
86+
if strict_row_count and len(df) != original_len:
87+
raise DatasetProcessingError(
88+
f"Pre-batch processor changed row count from {original_len} to {len(df)}. "
89+
"Row-count changes in pre-batch processors are not supported with the async engine."
90+
)
91+
return df
7892

79-
def run_post_batch(self, df: pd.DataFrame, current_batch_number: int | None) -> pd.DataFrame:
80-
"""Run process_after_batch() on processors that implement it."""
81-
return self._run_stage(df, ProcessorStage.POST_BATCH, current_batch_number=current_batch_number)
93+
def run_post_batch(
94+
self, df: pd.DataFrame, current_batch_number: int | None, *, strict_row_count: bool = False
95+
) -> pd.DataFrame:
96+
"""Run process_after_batch() on processors that implement it.
97+
98+
Args:
99+
df: Input DataFrame.
100+
current_batch_number: Batch index passed to processors.
101+
strict_row_count: If True, raise ``DatasetProcessingError`` when a
102+
processor changes the row count. Used by the async engine where
103+
row-count changes are not supported.
104+
"""
105+
original_len = len(df)
106+
df = self._run_stage(df, ProcessorStage.POST_BATCH, current_batch_number=current_batch_number)
107+
if strict_row_count and len(df) != original_len:
108+
raise DatasetProcessingError(
109+
f"Post-batch processor changed row count from {original_len} to {len(df)}. "
110+
"Row-count changes in post-batch processors are not supported with the async engine."
111+
)
112+
return df
82113

83114
def run_after_generation_on_df(self, df: pd.DataFrame) -> pd.DataFrame:
84115
"""Run process_after_generation() on a DataFrame (for preview mode)."""

0 commit comments

Comments
 (0)