-
Notifications
You must be signed in to change notification settings - Fork 188
fix: drop invalid expression rows #757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0bae4eb
a958414
effab2c
cdf9ffd
8311252
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| from typing import TYPE_CHECKING, Any, Callable | ||
|
|
||
| import data_designer.lazy_heavy_imports as lazy | ||
| from data_designer.config.column_configs import GenerationStrategy | ||
| from data_designer.config.column_configs import ExpressionColumnConfig, GenerationStrategy | ||
| from data_designer.engine.capacity import ( | ||
| AsyncCapacityConfigured, | ||
| AsyncCapacityObservedMaxima, | ||
|
|
@@ -80,6 +80,7 @@ | |
| SchedulerAdmissionEventSink, | ||
| runtime_correlation_provider, | ||
| ) | ||
| from data_designer.engine.processing.ginja.exceptions import UserTemplateError | ||
|
|
||
| if TYPE_CHECKING: | ||
| from data_designer.engine.column_generators.generators.base import ColumnGenerator | ||
|
|
@@ -402,6 +403,8 @@ def first_non_retryable_error(self) -> Exception | None: | |
| def _raise_if_fatal_worker_error(self) -> None: | ||
| if self._fatal_worker_error is None: | ||
| return | ||
| if isinstance(self._fatal_worker_error, UserTemplateError): | ||
| raise DatasetGenerationError(str(self._fatal_worker_error)) from self._fatal_worker_error | ||
| raise DatasetGenerationError( | ||
| "Unexpected internal task failure in async scheduler." | ||
| ) from self._fatal_worker_error | ||
|
|
@@ -1696,6 +1699,19 @@ async def _execute_task_inner_impl(self, task: Task, lease: TaskAdmissionLease, | |
| trace.status = "error" | ||
| trace.error = str(exc) | ||
|
|
||
| if self._is_fatal_expression_template_error(task, generator, exc): | ||
| logger.error( | ||
| "Fatal expression failure on %s[rg=%s, row=%s]: %s", | ||
| task.column, | ||
| task.row_group, | ||
| task.row_index, | ||
| exc, | ||
| exc_info=True, | ||
| ) | ||
| self._fatal_worker_error = exc | ||
| self._wake_event.set() | ||
| raise | ||
|
|
||
| if retryable: | ||
| self._deferred.append(task) | ||
| self._deferred_errors[task] = exc | ||
|
|
@@ -1783,6 +1799,9 @@ async def _run_generator_call(self, task: Task, operation: str, call: Coroutine[ | |
| try: | ||
| return await call | ||
| except Exception as exc: | ||
| generator = self._generators[task.column] | ||
| if self._is_fatal_expression_template_error(task, generator, exc): | ||
| raise | ||
| if self._is_retryable(exc) or self._is_expected_non_retryable(exc): | ||
| raise | ||
| raise DatasetGenerationError( | ||
|
|
@@ -1808,6 +1827,22 @@ def _require_dataframe_result( | |
| ) | ||
| return result | ||
|
|
||
| def _task_supports_row_drops(self, task: Task, generator: ColumnGenerator) -> bool: | ||
| # Root expression columns are scheduled as ``from_scratch`` even though | ||
| # they share the per-row drop / fatal-when-all-dropped contract with | ||
| # ``batch`` expression columns. Treat both task types as row-drop | ||
| # capable so the fatal-error path fires and partial drops don't blow | ||
| # away the whole row group. | ||
| return task.task_type in ("batch", "from_scratch") and isinstance(generator.config, ExpressionColumnConfig) | ||
|
|
||
| def _is_fatal_expression_template_error( | ||
| self, | ||
| task: Task, | ||
| generator: ColumnGenerator, | ||
| exc: BaseException, | ||
| ) -> bool: | ||
| return self._task_supports_row_drops(task, generator) and isinstance(exc, UserTemplateError) | ||
|
|
||
| async def _run_from_scratch(self, task: Task, generator: ColumnGenerator) -> Any: | ||
| """Execute a from_scratch task.""" | ||
| rg_size = self._get_rg_size(task.row_group) | ||
|
|
@@ -1822,6 +1857,16 @@ async def _run_from_scratch(self, task: Task, generator: ColumnGenerator) -> Any | |
| ) | ||
| result_operation = "From-scratch generator" | ||
| else: | ||
| # Root expression columns are scheduled as ``from_scratch`` but must | ||
| # honor the per-row drop / fatal-when-all-dropped contract just like | ||
| # ``batch`` expression columns. Route them through the shared | ||
| # row-drop path so partial drops shrink the row group instead of | ||
| # failing the ``expected_rows=rg_size`` check, and all-dropped | ||
| # raises ``UserTemplateError`` instead of silently dropping the | ||
| # whole row group. | ||
| if self._task_supports_row_drops(task, generator): | ||
| return await self._run_full_column_from_scratch_with_row_drops(task, generator, rg_size) | ||
|
|
||
| # Non-FromScratch generators dispatched as seeds (no upstream columns) | ||
| # operate on existing buffer rows β same contract as the sync engine's | ||
| # FULL_COLUMN path. Pass an ``rg_size``-row snapshot so the generator | ||
|
|
@@ -1855,6 +1900,65 @@ async def _run_from_scratch(self, task: Task, generator: ColumnGenerator) -> Any | |
|
|
||
| return result_df | ||
|
|
||
| async def _run_full_column_from_scratch_with_row_drops( | ||
| self, | ||
| task: Task, | ||
| generator: ColumnGenerator, | ||
| rg_size: int, | ||
| ) -> Any: | ||
| """Execute a row-drop-capable seed task (root expression columns). | ||
|
|
||
| Mirrors ``_run_batch``'s expression handling but without the | ||
| ``pre_skipped`` evaluation that only applies once upstream columns | ||
| exist: root expressions have no upstream refs, so skip metadata can | ||
| never fire here. | ||
| """ | ||
| if self._buffer_manager is not None: | ||
| pre_dropped: set[int] = {ri for ri in range(rg_size) if self._buffer_manager.is_dropped(task.row_group, ri)} | ||
| active_row_indices = [ri for ri in range(rg_size) if ri not in pre_dropped] | ||
| records = [self._buffer_manager.get_row(task.row_group, ri) for ri in active_row_indices] | ||
| input_df = lazy.pd.DataFrame(records, index=active_row_indices) if records else lazy.pd.DataFrame() | ||
| else: | ||
| pre_dropped = set() | ||
| active_row_indices = list(range(rg_size)) | ||
| input_df = lazy.pd.DataFrame(index=range(rg_size)) | ||
|
|
||
| if len(input_df) == 0: | ||
| return input_df | ||
|
|
||
| result_df = await self._run_generator_call( | ||
| task, | ||
| "full-column generation", | ||
| generator.agenerate(input_df), | ||
| ) | ||
|
|
||
| result_df = self._require_expression_row_drop_result( | ||
| task, | ||
| result_df, | ||
| active_row_indices=active_row_indices, | ||
| ) | ||
|
|
||
| if self._buffer_manager is not None: | ||
| write_cols = self._gen_instance_to_columns_including_side_effects.get(id(generator), [task.column]) | ||
| result_by_row_index = self._batch_result_by_row_index( | ||
| result_df, | ||
| active_row_indices=active_row_indices, | ||
| supports_row_drops=True, | ||
| ) | ||
| for ri in range(rg_size): | ||
| if ri in pre_dropped: | ||
| continue | ||
| result_row = result_by_row_index.get(ri) | ||
| if result_row is None: | ||
| self._drop_row(task.row_group, ri, exclude_columns={task.column}) | ||
| continue | ||
| if not self._buffer_manager.is_dropped(task.row_group, ri): | ||
| for col in write_cols: | ||
| if col in result_row: | ||
| self._buffer_manager.update_cell(task.row_group, ri, col, result_row[col]) | ||
|
|
||
| return result_df | ||
|
|
||
| async def _run_cell(self, task: Task, generator: ColumnGenerator) -> tuple[Any, bool]: | ||
| """Execute a cell-by-cell task. Returns ``(result, skipped)``.""" | ||
| if task.row_index is None: | ||
|
|
@@ -1920,6 +2024,7 @@ async def _run_batch(self, task: Task, generator: ColumnGenerator) -> Any: | |
| if self._buffer_manager is not None: | ||
| pre_dropped: set[int] = {ri for ri in range(rg_size) if self._buffer_manager.is_dropped(task.row_group, ri)} | ||
| active_rows_data: list[dict] = [] | ||
| active_row_indices: list[int] = [] | ||
|
|
||
| # Skip evaluation only applies to single-column configs. | ||
| # Multi-column configs (sampler/seed) are rejected by the SkipConfig | ||
|
|
@@ -1937,16 +2042,18 @@ async def _run_batch(self, task: Task, generator: ColumnGenerator) -> Any: | |
| continue | ||
|
|
||
| active_rows_data.append(record) | ||
| active_row_indices.append(ri) | ||
|
|
||
| batch_df = ( | ||
| lazy.pd.DataFrame(strip_skip_metadata_from_records(active_rows_data)) | ||
| lazy.pd.DataFrame(strip_skip_metadata_from_records(active_rows_data), index=active_row_indices) | ||
| if active_rows_data | ||
| else lazy.pd.DataFrame() | ||
| ) | ||
| else: | ||
| batch_df = lazy.pd.DataFrame() | ||
| pre_dropped = set() | ||
| pre_skipped = set() | ||
| active_row_indices = [] | ||
|
|
||
| if len(batch_df) == 0: | ||
| return batch_df | ||
|
|
@@ -1957,28 +2064,81 @@ async def _run_batch(self, task: Task, generator: ColumnGenerator) -> Any: | |
| "batch generation", | ||
| generator.agenerate(batch_df), | ||
| ) | ||
| result_df = self._require_dataframe_result( | ||
| task, | ||
| "Batch generator", | ||
| result_df, | ||
| expected_rows=active_rows, | ||
| ) | ||
| if self._task_supports_row_drops(task, generator): | ||
| result_df = self._require_expression_row_drop_result( | ||
| task, | ||
| result_df, | ||
| active_row_indices=active_row_indices, | ||
| ) | ||
| else: | ||
| result_df = self._require_dataframe_result( | ||
| task, | ||
| "Batch generator", | ||
| result_df, | ||
| expected_rows=active_rows, | ||
| ) | ||
|
|
||
| # Merge result columns back to buffer (include side-effect columns) | ||
| if self._buffer_manager is not None: | ||
| write_cols = self._gen_instance_to_columns_including_side_effects.get(id(generator), [task.column]) | ||
| result_idx = 0 | ||
| result_by_row_index = self._batch_result_by_row_index( | ||
| result_df, | ||
| active_row_indices=active_row_indices, | ||
| supports_row_drops=self._task_supports_row_drops(task, generator), | ||
| ) | ||
| for ri in range(rg_size): | ||
| if ri in pre_dropped or ri in pre_skipped: | ||
| continue | ||
| result_row = result_by_row_index.get(ri) | ||
| if result_row is None: | ||
| self._drop_row(task.row_group, ri, exclude_columns={task.column}) | ||
| continue | ||
| if not self._buffer_manager.is_dropped(task.row_group, ri): | ||
| for col in write_cols: | ||
| if col in result_df.columns: | ||
| self._buffer_manager.update_cell(task.row_group, ri, col, result_df.iloc[result_idx][col]) | ||
| result_idx += 1 | ||
| if col in result_row: | ||
| self._buffer_manager.update_cell(task.row_group, ri, col, result_row[col]) | ||
|
|
||
| return result_df | ||
|
|
||
| def _require_expression_row_drop_result( | ||
| self, | ||
| task: Task, | ||
| result: Any, | ||
| *, | ||
| active_row_indices: list[int], | ||
| ) -> Any: | ||
| result_df = self._require_dataframe_result(task, "Batch generator", result) | ||
| result_indexes = result_df.index.to_list() | ||
| if len(result_indexes) != len(set(result_indexes)): | ||
| raise DatasetGenerationError( | ||
| f"Batch generator for column '{task.column}' returned duplicate row indexes (rg={task.row_group})." | ||
| ) | ||
| active_index_set = set(active_row_indices) | ||
| unexpected_indexes = set(result_indexes) - active_index_set | ||
| if unexpected_indexes: | ||
| raise DatasetGenerationError( | ||
| f"Batch generator for column '{task.column}' returned row indexes outside the active input rows " | ||
| f"(rg={task.row_group}): {sorted(unexpected_indexes)!r}." | ||
| ) | ||
| if len(result_df) > len(active_row_indices): | ||
| raise DatasetGenerationError( | ||
| f"Batch generator for column '{task.column}' returned {len(result_df)} rows " | ||
| f"but at most {len(active_row_indices)} active rows were expected (rg={task.row_group})." | ||
| ) | ||
| return result_df | ||
|
|
||
| @staticmethod | ||
| def _batch_result_by_row_index( | ||
| result_df: Any, | ||
| *, | ||
| active_row_indices: list[int], | ||
| supports_row_drops: bool, | ||
| ) -> dict[int, dict[str, Any]]: | ||
| result_records = result_df.to_dict(orient="records") | ||
| if supports_row_drops: | ||
| return dict(zip(result_df.index.to_list(), result_records, strict=True)) | ||
| return dict(zip(active_row_indices, result_records, strict=True)) | ||
|
|
||
| def _get_rg_size(self, row_group: int) -> int: | ||
| try: | ||
| return self._row_groups.row_group_size(row_group) | ||
|
Comment on lines
+2137
to
2144
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Given that the two preceding checks already pass β (1) no duplicate result indexes, and (2) every result index is a member of Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 2063-2070
Comment:
**Third guard in `_require_expression_row_drop_result` is unreachable**
Given that the two preceding checks already pass β (1) no duplicate result indexes, and (2) every result index is a member of `active_index_set` β the result set is a subset of `active_row_indices` with no duplicates. Because `active_row_indices` is itself duplicate-free (built from `range(rg_size)` minus pre-dropped rows), `len(active_index_set) == len(active_row_indices)`, so `len(result_df) <= len(active_row_indices)` is guaranteed and the third `if` can never fire. This is dead code, not a runtime bug, but it may create a false sense of coverage for this guard.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think root expression columns fall through this check. Root full-column columns are scheduled as
from_scratch, so an expression with no upstream refs, likeExpressionColumnConfig(name="empty_root", expr="{{ \"\" }}"), does not hit the new fatal expression path and the scheduler drops the whole row group instead of raising. It also reaches the zero-column DataFrame case inexpression.pybefore rendering. Maybe route root expressions through the batch path, or treat expressionfrom_scratchtasks as row-drop-capable too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch β fixed in cdf9ffd by taking option 2 (treat expression
from_scratchtasks as row-drop-capable)._task_supports_row_dropsnow accepts bothbatchandfrom_scratchfor expression configs, so the fatal-error path fires for root expressions too._run_full_column_from_scratch_with_row_dropsmirrors_run_batch's expression handling (active-row-index tracking,_require_expression_row_drop_resultvalidation, per-row_drop_row+update_cell) but skips thepre_skippedevaluation since root expressions have no upstream refs.DataFrame.to_dict(orient="records")returns[]for any 0-column DataFrame regardless of row count, so a root expression dispatched against a fresh row group buffer would otherwise skip rendering entirely and fail thestrict=Truezip.ExpressionColumnGenerator.generatenow synthesizes empty per-row dicts in that case so the loop still fires once per row.Covered by the new
test_root_expression_all_dropped_async_fails_loudlyregression test.