diff --git a/architecture/config.md b/architecture/config.md index 497cd1dee..236cacfe7 100644 --- a/architecture/config.md +++ b/architecture/config.md @@ -28,7 +28,7 @@ The config layer provides: ### Column Configs -All column configs inherit from `SingleColumnConfig(ConfigBase, ABC)`, which provides `name`, `drop`, `allow_resize`, and the `column_type` discriminator field. +All column configs inherit from `SingleColumnConfig(ConfigBase, ABC)`, which provides `name`, `drop`, `skip`, `propagate_skip`, and the `column_type` discriminator field. Concrete types include: `SamplerColumnConfig`, `LLMTextColumnConfig`, `LLMStructuredColumnConfig`, `LLMCodeColumnConfig`, `LLMJudgeColumnConfig`, `EmbeddingColumnConfig`, `ImageColumnConfig`, `ValidationColumnConfig`, `ExpressionColumnConfig`, `SeedDatasetColumnConfig`, `CustomColumnConfig`. diff --git a/architecture/dataset-builders.md b/architecture/dataset-builders.md index 70b6afed4..f9bd4d85c 100644 --- a/architecture/dataset-builders.md +++ b/architecture/dataset-builders.md @@ -24,7 +24,7 @@ Iterates compiled column order. For each generator: 1. `log_pre_generation()` — logs model and optional MCP tool alias 2. **From-scratch generators** (empty buffer): `generate_from_scratch` → optional `run_pre_batch` after first seed column 3. **`CELL_BY_CELL` generators**: `_fan_out_with_threads` or `_fan_out_with_async` — parallel cell generation -4. **`FULL_COLUMN` generators**: `generate` on the whole batch DataFrame; optional resize via `allow_resize` +4. **`FULL_COLUMN` generators**: `generate` on the whole batch DataFrame; output row count must match input row count ### Async Execution (`_build_async`) @@ -92,7 +92,7 @@ Checkpoint state lives in `metadata.json`. Each metadata write includes the conf Both engines resume the same way: they scan `parquet-files/batch_*.parquet` and read parquet metadata to recover the completed row-group IDs and their actual persisted row counts. `metadata.json` remains the source of truth for the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), but the filesystem is the source of truth for *progress* (`num_completed_batches`, `actual_num_records`). Splitting the two sources is what lets resume survive a crash between writing a batch parquet and updating metadata — the filesystem reflects the durable state even when metadata lags by a step. Reading actual row counts also matters for async early-shutdown salvage, where a completed parquet file can contain fewer rows than the requested row-group size. The async engine tolerates non-contiguous IDs because row groups can complete out of order; the sync engine writes batches sequentially and rejects holes (likely external mutation or a directory written by an incompatible engine). -Resume deliberately rejects `allow_resize=True` columns because resized batches mutate row boundaries and the original remaining batch plan cannot be reconstructed safely from aggregate counters. It also treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension. The terminal-state check raises a clear `DatasetGenerationError` (not a `TypeError`) when the persisted metadata is missing required fields such as `target_num_records`. +Resume relies on stable row-group boundaries within a run. It treats datasets that have completed `process_after_generation()` as terminal: after-generation processors operate on the whole dataset and can re-chunk rows or change schema, invalidating row-group identity for later resume/extension. The terminal-state check raises a clear `DatasetGenerationError` (not a `TypeError`) when the persisted metadata is missing required fields such as `target_num_records`. After-generation processors run unconditionally on the on-disk dataset whenever they are configured — including the case where resume sees every row group already on disk. This closes the crash window between the final row-group parquet write and the `post_generation_state="started"` marker write: in that window, the dataset is complete but post-generation never ran, and the on-disk parquet files are still clean (no processor has touched them). The `post_generation_state="started"` short-circuit still rejects the other direction (`process_after_generation()` crashed mid-rewrite, leaving the parquet files in an ambiguous state), so resume only re-runs after-generation when it is safe to do so. diff --git a/fern/versions/latest/pages/concepts/agent-rollout-ingestion.mdx b/fern/versions/latest/pages/concepts/agent-rollout-ingestion.mdx index 559d10ae0..96a5576f7 100644 --- a/fern/versions/latest/pages/concepts/agent-rollout-ingestion.mdx +++ b/fern/versions/latest/pages/concepts/agent-rollout-ingestion.mdx @@ -171,16 +171,13 @@ You can also explode imported rollouts into a tool-interaction dataset. This exa ```python import data_designer.config as dd +import pandas as pd from data_designer.interface import DataDesigner from pydantic import BaseModel, Field from typing import Literal -@dd.custom_column_generator( - required_columns=["messages"], - side_effect_columns=["tool_call", "tool_response", "tool_name"], -) -def explode_tool_interactions(row: dict) -> list[dict]: +def extract_tool_interactions(row: dict) -> list[dict]: rows = [] tool_calls_by_id = {} context_messages = [] @@ -229,7 +226,23 @@ class ToolInteractionAnalysis(BaseModel): data_designer = DataDesigner() -config_builder = dd.DataDesignerConfigBuilder( +rollout_config = dd.DataDesignerConfigBuilder() +rollout_config.with_seed_dataset( + dd.AgentRolloutSeedSource( + format=dd.AgentRolloutFormat.CLAUDE_CODE, + ) +) + +rollout_result = data_designer.create(rollout_config, num_records=100) +rollout_df = rollout_result.load_dataset() +tool_rows = [ + interaction + for _, row in rollout_df.iterrows() + for interaction in extract_tool_interactions(row.to_dict()) +] +tool_df = pd.DataFrame(tool_rows) + +analysis_config = dd.DataDesignerConfigBuilder( model_configs=[ dd.ModelConfig( alias="tool-analyst", @@ -239,21 +252,9 @@ config_builder = dd.DataDesignerConfigBuilder( ] ) -config_builder.with_seed_dataset( - dd.AgentRolloutSeedSource( - format=dd.AgentRolloutFormat.CLAUDE_CODE, - ) -) - -config_builder.add_column( - dd.CustomColumnConfig( - name="tool_interaction_context", - generator_function=explode_tool_interactions, - allow_resize=True, - ) -) +analysis_config.with_seed_dataset(dd.DataFrameSeedSource(df=tool_df)) -config_builder.add_column( +analysis_config.add_column( dd.LLMStructuredColumnConfig( name="tool_interaction_analysis", model_alias="tool-analyst", @@ -279,11 +280,11 @@ Base your answer on the tool call arguments, the tool response, and the immediat ) ) -preview = data_designer.preview(config_builder, num_records=5) +preview = data_designer.preview(analysis_config, num_records=5) preview.display_sample_record() ``` -This pattern is useful when you want to curate evaluator or monitoring datasets from real traces. The resize-enabled custom column turns each tool interaction into its own record, and the structured column adds a consistent outcome label plus a grounded summary. Because the logic operates on normalized `tool_calls` and `tool` messages, the same pattern transfers across supported rollout formats. If your traces are long, consider adding a second custom or expression column that windows the context before sending it to a model. +This pattern is useful when you want to curate evaluator or monitoring datasets from real traces. The stage-boundary transform turns each tool interaction into its own record, and the structured column adds a consistent outcome label plus a grounded summary. Because the logic operates on normalized `tool_calls` and `tool` messages, the same pattern transfers across supported rollout formats. If your traces are long, consider adding a second custom or expression column that windows the context before sending it to a model. ## Related Guides diff --git a/fern/versions/latest/pages/concepts/architecture-and-performance.mdx b/fern/versions/latest/pages/concepts/architecture-and-performance.mdx index f5d31eed4..3b71e3ce5 100644 --- a/fern/versions/latest/pages/concepts/architecture-and-performance.mdx +++ b/fern/versions/latest/pages/concepts/architecture-and-performance.mdx @@ -185,7 +185,7 @@ Resume has a few important invariants: - `buffer_size` must match the original run. - `num_records` must be at least the original target; you may extend a run by requesting more records. -- Runs with `allow_resize=True` columns are not resumable because row boundaries can change. +- Row counts must stay stable within a run. Put filtering, expansion, aggregation, or deduplication at workflow boundaries. - Once `process_after_generation()` has run, the dataset is considered terminal for resume. Re-running with the same target returns the existing dataset; extending requires a fresh run. - If a run crashed after every row group was written but before `process_after_generation()` could start, resume runs after-generation on the existing on-disk dataset (the parquet files are still clean) and marks it terminal afterwards. A crash _during_ `process_after_generation()` still raises — the parquet files may have been partially rewritten and starting fresh is the only safe option. diff --git a/fern/versions/latest/pages/concepts/custom_columns.mdx b/fern/versions/latest/pages/concepts/custom_columns.mdx index 82b84053d..21e163c17 100644 --- a/fern/versions/latest/pages/concepts/custom_columns.mdx +++ b/fern/versions/latest/pages/concepts/custom_columns.mdx @@ -100,58 +100,17 @@ This gives you direct access to all `ModelFacade` capabilities: custom parsers, | `generator_function` | Callable | Yes | Decorated function | | `generation_strategy` | GenerationStrategy | No | `CELL_BY_CELL` or `FULL_COLUMN` | | `generator_params` | BaseModel | No | Typed params passed to function | -| `allow_resize` | bool | No | Allow 1:N or N:1 generation | ### Resizing (1:N and N:1) -**FULL_COLUMN:** Set `allow_resize=True` and return a DataFrame with more or fewer rows than the input: +Custom column generators must preserve row count. A `CELL_BY_CELL` generator returns one `dict` per input row, and a `FULL_COLUMN` generator returns a DataFrame with the same number of rows it received. -```python -@dd.custom_column_generator( - required_columns=["topic"], - side_effect_columns=["variation_id"], -) -def expand_topics(df: pd.DataFrame, params: None, models: dict) -> pd.DataFrame: - rows = [] - for _, row in df.iterrows(): - for i in range(3): # Generate 3 variations per input - rows.append({ - "topic": row["topic"], - "question": f"Question {i+1} about {row['topic']}", - "variation_id": i, - }) - return pd.DataFrame(rows) - -dd.CustomColumnConfig( - name="question", - generator_function=expand_topics, - generation_strategy=dd.GenerationStrategy.FULL_COLUMN, - allow_resize=True, -) -``` - -**CELL_BY_CELL:** With `allow_resize=True`, your function may return a single row (`dict`) or multiple rows (`list[dict]`). Return `[]` to drop that input row. - -```python -@dd.custom_column_generator(required_columns=["id"]) -def expand_row(row: dict) -> list[dict]: - return [ - {**row, "variant": "a"}, - {**row, "variant": "b"}, - ] - -dd.CustomColumnConfig( - name="variant", - generator_function=expand_row, - generation_strategy=dd.GenerationStrategy.CELL_BY_CELL, - allow_resize=True, -) -``` +For expansion, filtering, aggregation, or deduplication, put the row-count-changing work at a workflow boundary. Use [Workflow Chaining](/concepts/workflow-chaining) to run one stage, transform that stage's output, and seed the next stage from the transformed rows. Use cases: -- **Expansion (1:N)**: Generate multiple variations per input -- **Retraction (N:1)**: Filter, aggregate, or deduplicate records (FULL_COLUMN) or return `[]` per row (CELL_BY_CELL) +- **Expansion (1:N)**: Generate multiple variations per input between workflow stages +- **Retraction (N:1)**: Filter, aggregate, or deduplicate records between workflow stages ## Multi-Turn Example diff --git a/fern/versions/latest/pages/concepts/processors.mdx b/fern/versions/latest/pages/concepts/processors.mdx index 2ee193ca6..80d525cd8 100644 --- a/fern/versions/latest/pages/concepts/processors.mdx +++ b/fern/versions/latest/pages/concepts/processors.mdx @@ -30,8 +30,8 @@ Processors can run at three stages, determined by which callback methods they im Each batch carries the full dataset schema during generation. Post-batch schema changes such as column dropping only alter past batches, so all columns remain accessible to generators while building follow-up batches. - - The async engine (default) enforces row-count invariance in `process_before_batch()` and `process_after_batch()` — a processor returning a different row count raises `DatasetGenerationError`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes. The legacy sync engine (opt-out via `DATA_DESIGNER_ASYNC_ENGINE=0`) is permissive about row-count changes at all stages. + + Data Designer enforces row-count invariance in generation-time processors: `process_before_batch()` and `process_after_batch()`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes, or put the transform at a workflow boundary. diff --git a/fern/versions/latest/pages/plugins/example.mdx b/fern/versions/latest/pages/plugins/example.mdx index 77aeced33..efa0b77fb 100644 --- a/fern/versions/latest/pages/plugins/example.mdx +++ b/fern/versions/latest/pages/plugins/example.mdx @@ -84,16 +84,7 @@ class IndexMultiplierColumnConfig(SingleColumnConfig): - `required_columns` lists any columns this generator depends on (empty if none) - `side_effect_columns` lists any additional columns this generator produces beyond the primary column (empty if none) -**If your plugin can expand or retract the number of rows (1:N or N:1):** set `allow_resize=True` in the config class so the pipeline updates batch bookkeeping correctly. For example: - -```python -class MyColumnConfig(SingleColumnConfig): - column_type: Literal["my-plugin"] = "my-plugin" - allow_resize: bool = True # required when output row count can differ from input - # ... -``` - -The default is `False`; only set it to `True` when your `generate` method can return more or fewer rows than it receives. +Column generator plugins must preserve row count: `generate()` returns one output row for each input row. If your plugin needs to expand or retract records, run it as a stage-boundary transform with [Workflow Chaining](/concepts/workflow-chaining) and feed the transformed rows into the next stage. ### Step 3: Create the implementation class diff --git a/packages/data-designer-config/src/data_designer/config/base.py b/packages/data-designer-config/src/data_designer/config/base.py index be1d387c1..8910e4653 100644 --- a/packages/data-designer-config/src/data_designer/config/base.py +++ b/packages/data-designer-config/src/data_designer/config/base.py @@ -90,9 +90,6 @@ class SingleColumnConfig(ConfigBase, ABC): name: Unique name of the column to be generated. drop: If True, the column will be generated but removed from the final dataset. Useful for intermediate columns that are dependencies for other columns. - allow_resize: If True, the generator may emit a different number of rows than - it received (1:N or N:1). Explicit ``skip`` gates are invalid on resize - columns, and upstream skip propagation is not applied to them. column_type: Discriminator field that identifies the specific column type. Subclasses must override this field to specify the column type with a `Literal` value. skip: Optional expression gate for conditional generation. @@ -102,7 +99,6 @@ class SingleColumnConfig(ConfigBase, ABC): name: str drop: bool = False - allow_resize: bool = False column_type: str skip: SkipConfig | None = None propagate_skip: bool = Field( @@ -122,12 +118,6 @@ def _validate_skip_scope(self) -> Self: "Sampler/seed columns are collapsed into shared multi-column generators " "and cannot be skipped individually." ) - if self.allow_resize: - raise ValueError( - "skip and allow_resize cannot be used together. " - "allow_resize changes buffer size during generation (1:N / N:1), which " - "breaks index-based skip tracking and merge-back." - ) self_refs = {self.name} | set(self.side_effect_columns) if not self_refs.isdisjoint(self.skip.columns): offending = self_refs & set(self.skip.columns) diff --git a/packages/data-designer-config/src/data_designer/config/fingerprint.py b/packages/data-designer-config/src/data_designer/config/fingerprint.py index 13d72af98..4cb32090b 100644 --- a/packages/data-designer-config/src/data_designer/config/fingerprint.py +++ b/packages/data-designer-config/src/data_designer/config/fingerprint.py @@ -34,7 +34,7 @@ if TYPE_CHECKING: from data_designer.config.data_designer_config import DataDesignerConfig -CONFIG_HASH_VERSION = 1 +CONFIG_HASH_VERSION = 2 CONFIG_HASH_ALGO = "sha256" diff --git a/packages/data-designer-config/tests/config/test_columns.py b/packages/data-designer-config/tests/config/test_columns.py index baafb1c43..527637fe1 100644 --- a/packages/data-designer-config/tests/config/test_columns.py +++ b/packages/data-designer-config/tests/config/test_columns.py @@ -700,10 +700,9 @@ def test_default_column_emoji_for_custom_column_type() -> None: assert StubColumnConfig.get_column_emoji() == "🎨" -def test_allow_resize_inherited_by_subclasses() -> None: - """Subclasses inherit allow_resize from SingleColumnConfig.""" - assert StubColumnConfig(name="test").allow_resize is False - assert StubColumnConfig(name="test", allow_resize=True).allow_resize is True +def test_removed_allow_resize_field_rejected() -> None: + with pytest.raises(ValidationError, match="Extra inputs are not permitted"): + StubColumnConfig(name="test", allow_resize=True) def test_get_model_aliases_empty_when_no_model_alias_field() -> None: diff --git a/packages/data-designer-config/tests/config/test_skip_config.py b/packages/data-designer-config/tests/config/test_skip_config.py index 6b94fd21f..a208dd0c9 100644 --- a/packages/data-designer-config/tests/config/test_skip_config.py +++ b/packages/data-designer-config/tests/config/test_skip_config.py @@ -86,15 +86,6 @@ def test_skip_rejected_on_seed_dataset_type() -> None: ) -def test_skip_rejected_with_allow_resize() -> None: - with pytest.raises(ValidationError, match="skip and allow_resize cannot be used together"): - LLMTextColumnConfig( - **_BASE_LLM, - allow_resize=True, - skip=SkipConfig(when="{{ x == 0 }}"), - ) - - def test_skip_self_reference_rejected() -> None: with pytest.raises(ValidationError, match="references itself"): LLMTextColumnConfig( diff --git a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py index fd002f9a6..dff132659 100644 --- a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py +++ b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py @@ -363,6 +363,5 @@ def generate(self, data: pd.DataFrame) -> pd.DataFrame: Returns: DataFrame containing the input columns plus the new column and any side-effect - columns. When ``config.allow_resize`` is ``False``, the row count must match - the input; when it is ``True``, the row count may change. + columns. The row count must match the input. """ diff --git a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py index 08c78120b..749857501 100644 --- a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py +++ b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py @@ -155,11 +155,8 @@ def get_generation_strategy(self) -> GenerationStrategy: """Return strategy based on config.""" return self.config.generation_strategy - def generate(self, data: dict | pd.DataFrame) -> dict | pd.DataFrame | list[dict]: - """Generate column value(s) for a row (dict) or batch (DataFrame). - - For cell_by_cell with allow_resize=True, may return dict or list[dict] (0, 1, or N rows). - """ + def generate(self, data: dict | pd.DataFrame) -> dict | pd.DataFrame: + """Generate column value(s) for a row (dict) or batch (DataFrame).""" is_full_column = self.config.generation_strategy == GenerationStrategy.FULL_COLUMN is_dataframe = not isinstance(data, dict) @@ -177,7 +174,7 @@ def generate(self, data: dict | pd.DataFrame) -> dict | pd.DataFrame | list[dict return self._generate(data, is_dataframe) - async def agenerate(self, data: dict | pd.DataFrame) -> dict | pd.DataFrame | list[dict]: + async def agenerate(self, data: dict | pd.DataFrame) -> dict | pd.DataFrame: """Async generate — branches on strategy and detects coroutine functions.""" is_full_column = self.config.generation_strategy == GenerationStrategy.FULL_COLUMN if is_full_column: @@ -229,7 +226,7 @@ async def _ainvoke_generator_function(self, data: dict) -> dict | pd.DataFrame: models = self._build_models_dict() return await fn(data, self.config.generator_params, models) - def _generate(self, data: dict | pd.DataFrame, is_dataframe: bool) -> dict | pd.DataFrame | list[dict]: + def _generate(self, data: dict | pd.DataFrame, is_dataframe: bool) -> dict | pd.DataFrame: """Unified generation logic for both strategies.""" get_keys = (lambda d: set(d.columns)) if is_dataframe else (lambda d: set(d.keys())) @@ -265,28 +262,11 @@ def _generate(self, data: dict | pd.DataFrame, is_dataframe: bool) -> dict | pd. def _postprocess_result( self, - result: dict | pd.DataFrame | list[dict], + result: dict | pd.DataFrame, is_dataframe: bool, keys_before: set[str], - ) -> dict | pd.DataFrame | list[dict]: + ) -> dict | pd.DataFrame: """Validate type and output columns of a generation result.""" - # Cell-by-cell with allow_resize: accept dict or list[dict] - if not is_dataframe and self.config.allow_resize: - if isinstance(result, dict): - return self._validate_output(result, keys_before, is_dataframe) - if isinstance(result, list): - if not all(isinstance(r, dict) for r in result): - raise CustomColumnGenerationError( - f"Custom generator for column '{self.config.name}' with allow_resize must return " - "dict or list[dict]; list elements must be dicts." - ) - return [self._validate_cell_output(r, keys_before) for r in result] - raise CustomColumnGenerationError( - f"Custom generator for column '{self.config.name}' with allow_resize must return " - f"dict or list[dict], got {type(result).__name__}" - ) - - # Validate return type for non-resize paths expected_type = lazy.pd.DataFrame if is_dataframe else dict type_name = "DataFrame" if is_dataframe else "dict" if not isinstance(result, expected_type): @@ -425,5 +405,3 @@ def log_pre_generation(self) -> None: logger.info(f"{LOG_INDENT}model_aliases: {self.config.model_aliases}") if self.config.generator_params: logger.info(f"{LOG_INDENT}generator_params: {self.config.generator_params}") - if self.config.allow_resize: - logger.info(f"{LOG_INDENT}allow_resize: {self.config.allow_resize}") diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 1bbd51df7..d52b051ba 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -26,7 +26,6 @@ ProcessorType, ) from data_designer.config.utils.type_helpers import StrEnum -from data_designer.config.utils.warning_helpers import warn_at_caller from data_designer.config.version import get_library_version from data_designer.engine import flags from data_designer.engine.column_generators.generators.base import ( @@ -190,8 +189,6 @@ def __init__( self.batch_manager = DatasetBatchManager(resource_provider.artifact_storage) self._resource_provider = resource_provider self._records_to_drop: set[int] = set() - self._cell_resize_results: list[dict | list[dict] | None] = [] - self._cell_resize_mode = False self._task_traces: list[TaskTrace] = [] self._registry = registry or DataDesignerRegistry() self._graph: ExecutionGraph | None = None @@ -312,7 +309,7 @@ def build( Path to the generated dataset directory. """ self._reset_run_state() - self._use_async = flags.DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility() + self._use_async = flags.DATA_DESIGNER_ASYNC_ENGINE run_readiness_check( self.single_column_configs, @@ -381,13 +378,6 @@ def build( resume = ResumeMode.NEVER self.artifact_storage.resume = ResumeMode.NEVER - if resume == ResumeMode.ALWAYS and self._has_allow_resize_columns(): - raise DatasetGenerationError( - "🛑 Cannot resume when any column has allow_resize=True. Resized batches change row boundaries, " - "so the original batch plan cannot be reconstructed safely. Use resume=ResumeMode.NEVER to " - "start a new generation run." - ) - if self._use_async: self._build_async(generators, num_records, buffer_size, on_batch_complete, resume=resume) elif resume == ResumeMode.ALWAYS: @@ -442,9 +432,6 @@ def _set_metadata_defaults(self) -> None: } ) - def _has_allow_resize_columns(self) -> bool: - return any(getattr(config, "allow_resize", False) for config in self.single_column_configs) - def _post_generation_processed_resume_result(self, resume: ResumeMode, num_records: int) -> Path | None: """Decide whether to short-circuit resume based on after-generation processor state. @@ -658,7 +645,7 @@ def _build_with_resume( def build_preview(self, *, num_records: int) -> pd.DataFrame: self._reset_run_state() - self._use_async = flags.DATA_DESIGNER_ASYNC_ENGINE and self._resolve_async_compatibility() + self._use_async = flags.DATA_DESIGNER_ASYNC_ENGINE run_readiness_check( self.single_column_configs, self._resource_provider, @@ -726,34 +713,6 @@ def _build_async_preview(self, generators: list[ColumnGenerator], num_records: i buffer_manager.free_row_group(0) return dataset - def _resolve_async_compatibility(self) -> bool: - """Check if the async engine can be used; auto-fallback to sync if not. - - Returns True if async is usable, False if allow_resize forces sync fallback. - """ - offending = [config.name for config in self.single_column_configs if getattr(config, "allow_resize", False)] - if offending: - msg = ( - f"allow_resize=True detected on column(s) {offending}. " - "Falling back to sync engine for this run. " - "allow_resize is deprecated and will be removed in a future release; " - "use workflow chaining instead (see issue #552)." - ) - logger.warning(f"⚠️ {msg}") - # ``warn_at_caller`` rather than ``warnings.warn(stacklevel=N)`` so - # attribution lands on the user's call site instead of an internal - # ``DatasetBuilder.build`` / ``data_designer.interface`` frame. - # The exact internal-frame depth from this method up to user code - # depends on which entry point invoked the builder (build vs. - # build_preview, sync vs. async wrapping), so a hard-coded - # ``stacklevel`` is brittle; ``warn_at_caller`` walks past every - # ``data_designer.*`` frame regardless of chain shape. Library - # attribution would also be silenced under Python's default - # ``ignore::DeprecationWarning`` filter. See PR #594 review. - warn_at_caller(msg, DeprecationWarning) - return False - return True - def _find_completed_row_groups(self) -> dict[int, int]: """Scan final parquet files and return row-group IDs with persisted row counts. @@ -855,6 +814,9 @@ def _check_resume_config_compatibility(self) -> _ConfigCompatibility: else _ConfigCompatibility.INCOMPATIBLE ) + if not metadata_path.exists(): + return _ConfigCompatibility.COMPATIBLE + config_path = dataset_dir / SDG_CONFIG_FILENAME if not config_path.exists(): logger.warning( @@ -873,10 +835,10 @@ def _check_resume_config_compatibility(self) -> _ConfigCompatibility: ) except (OSError, json.JSONDecodeError, ValidationError): logger.warning( - "⚠️ Could not read stored config at %s for compatibility check — assuming compatible.", + "⚠️ Could not read stored config at %s for compatibility check — treating as incompatible.", config_path, ) - return _ConfigCompatibility.COMPATIBLE + return _ConfigCompatibility.INCOMPATIBLE def _dropped_column_artifact_policy_matches(self, metadata: dict[str, Any]) -> bool: """Return whether stored dropped-column artifact behavior matches this run. @@ -1091,7 +1053,7 @@ def _prepare_async_run( # If it raises, the scheduler propagates the error as DatasetGenerationError (fail-fast). def on_seeds_complete(rg_id: int, rg_size: int) -> FrontierDelta: df = buffer_manager.get_dataframe(rg_id) - df = self._processor_runner.run_pre_batch_on_df(df, strict_row_count=True) + df = self._processor_runner.run_pre_batch_on_df(df) buffer_manager.replace_dataframe(rg_id, df) deltas: list[FrontierDelta] = [] for ri in range(rg_size): @@ -1270,15 +1232,6 @@ def _run_cell_by_cell_generator(self, generator: ColumnGenerator) -> None: def _column_display_name(self, config: ColumnConfigT) -> str: return f"columns {config.column_names}" if hasattr(config, "column_names") else config.name - def _log_resize_if_changed(self, column_name: str, original_count: int, new_count: int, allow_resize: bool) -> None: - if not allow_resize or new_count == original_count: - return - if new_count == 0: - logger.warning(f"⚠️ Column '{column_name}' reduced batch to 0 records. This batch will be skipped.") - else: - emoji = "💥" if new_count > original_count else "✂️" - logger.info(f"{emoji} Column '{column_name}' resized batch: {original_count} -> {new_count} records.") - def _require_graph(self) -> ExecutionGraph: """Return the initialized execution graph for the current run.""" graph = self._graph @@ -1289,15 +1242,11 @@ def _require_graph(self) -> ExecutionGraph: def _column_can_skip(self, column_name: str) -> bool: """Fast check: can *column_name* ever be skipped (expression gate or propagation)? - Returns ``False`` for ``allow_resize=True`` columns because 1:N generators - change the row count — the skip-aware merge path assumes a 1:1 mapping - between input and output rows and would raise on the row-count check. + Returns ``True`` when the column has an expression gate or should + propagate skip metadata from required columns. """ if self._graph is None: return False - config = self.single_column_config_by_name.get(column_name) - if config is not None and config.allow_resize: - return False if self._graph.get_skip_config(column_name) is not None: return True return self._graph.should_propagate_skip(column_name) and bool(self._graph.get_required_columns(column_name)) @@ -1333,29 +1282,25 @@ def _run_full_column_generator(self, generator: ColumnGenerator) -> None: def _run_full_column_generator_without_skip(self, generator: ColumnGenerator) -> None: """Run the generator on the full batch, preserving skip metadata across the replace.""" - original_count = self.batch_manager.num_records_in_buffer - allow_resize = generator.config.allow_resize if not isinstance(generator.config, MultiColumnConfig) else False old_records = [record for _, record in self.batch_manager.iter_current_batch()] input_records, restore_context = prepare_records_for_skip_metadata_round_trip(old_records) df = generator.generate(lazy.pd.DataFrame(input_records)) - self._log_resize_if_changed(self._column_display_name(generator.config), original_count, len(df), allow_resize) new_records = df.to_dict(orient="records") if restore_context is not None: try: - restore_skip_metadata(new_records, context=restore_context, allow_resize=allow_resize) + restore_skip_metadata(new_records, context=restore_context) except ValueError as exc: raise DatasetGenerationError( f"Unable to restore skip provenance after FULL_COLUMN generation for " f"{self._column_display_name(generator.config)}: {exc}" ) from exc - self.batch_manager.replace_buffer(new_records, allow_resize=allow_resize) + self.batch_manager.replace_buffer(new_records) def _run_full_column_generator_with_skip(self, generator: ColumnGenerator, column_name: str) -> None: """Run a FULL_COLUMN generator with per-row skip evaluation and merge-back. - Only reachable when ``_column_can_skip`` is True, which excludes - ``allow_resize=True`` columns, so resize handling is not needed here. + Only reachable when ``_column_can_skip`` is True. """ active_records: list[dict] = [] records_with_skip_status: list[tuple[bool, dict]] = [] @@ -1377,7 +1322,7 @@ def _run_full_column_generator_with_skip(self, generator: ColumnGenerator, colum return batch = self._merge_skipped_and_generated(generator, column_name, active_records, records_with_skip_status) - self.batch_manager.replace_buffer(batch, allow_resize=False) + self.batch_manager.replace_buffer(batch) def _merge_skipped_and_generated( self, @@ -1423,14 +1368,6 @@ def _setup_fan_out( "generator so concurrent fan-out is not supported." ) - allow_resize = generator.config.allow_resize - if allow_resize: - self._cell_resize_results = [None] * self.batch_manager.num_records_batch - self._cell_resize_mode = True - self._current_column_display_name = self._column_display_name(generator.config) - else: - self._cell_resize_mode = False - label = f"{generator.config.column_type} column '{generator.config.name}'" progress_tracker = ProgressTracker( total_records=self.batch_manager.num_records_batch, @@ -1455,27 +1392,7 @@ def _setup_fan_out( def _finalize_fan_out(self, progress_tracker: ProgressTracker) -> None: progress_tracker.log_final() - if self._cell_resize_mode: - # Flatten results in index order; skip indices in _records_to_drop (failed cells), - # so those rows are omitted from the new buffer. - new_records: list[dict] = [] - for i in range(len(self._cell_resize_results)): - if i in self._records_to_drop: - continue - r = self._cell_resize_results[i] - if r is not None: - new_records.extend(r if isinstance(r, list) else [r]) - self._log_resize_if_changed( - self._current_column_display_name, - self.batch_manager.num_records_in_buffer, - len(new_records), - True, - ) - self.batch_manager.replace_buffer(new_records, allow_resize=True) - self._records_to_drop.clear() - self._cell_resize_mode = False - self._cell_resize_results = [] - elif len(self._records_to_drop) > 0: + if len(self._records_to_drop) > 0: self._cleanup_dropped_record_images(self._records_to_drop) self.batch_manager.drop_records(self._records_to_drop) self._records_to_drop.clear() @@ -1540,7 +1457,7 @@ def callback(exc: Exception, *, context: dict | None = None) -> None: return callback def _write_processed_batch(self, dataframe: pd.DataFrame) -> None: - self.batch_manager.replace_buffer(dataframe.to_dict(orient="records"), allow_resize=False) + self.batch_manager.replace_buffer(dataframe.to_dict(orient="records")) self.batch_manager.write() def _validate_column_configs(self) -> None: @@ -1666,11 +1583,8 @@ def _worker_error_callback(self, exc: Exception, *, context: dict | None = None) raise RuntimeError("Worker error callback called without a valid context index.") self._records_to_drop.add(context["index"]) - def _worker_result_callback(self, result: dict | list[dict], *, context: dict | None = None) -> None: - if self._cell_resize_mode: - self._cell_resize_results[context["index"]] = result - else: - self.batch_manager.update_record(context["index"], result) + def _worker_result_callback(self, result: dict, *, context: dict | None = None) -> None: + self.batch_manager.update_record(context["index"], result) def _emit_batch_inference_events( self, batch_mode: str, usage_deltas: dict[str, ModelUsageStats], group_id: str diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py index f2bc39cdd..f0ec37638 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py @@ -213,18 +213,14 @@ def update_record(self, index: int, record: dict) -> None: raise IndexError(f"🛑 Index {index} is out of bounds for buffer of size {len(self._buffer)}.") self._buffer[index] = record - def replace_buffer(self, records: list[dict], *, allow_resize: bool = False) -> None: + def replace_buffer(self, records: list[dict]) -> None: """Replace the buffer contents. Args: records: New records to replace the buffer. - allow_resize: If True, allows the number of records to differ from the current - buffer size (1:N or N:1 patterns). Defaults to False for strict 1:1 mapping. """ - if not allow_resize and len(records) != len(self._buffer): + if len(records) != len(self._buffer): raise DatasetBatchManagementError( f"🛑 Number of records ({len(records)}) must match the current buffer size ({len(self._buffer)})." ) self._buffer = records - if allow_resize and self._num_records_list is not None: - self._num_records_list[self._current_batch_number] = len(records) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py index e8ea468a1..2d7a85a68 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py @@ -59,10 +59,20 @@ def _run_stage(self, df: pd.DataFrame, stage: ProcessorStage, **kwargs) -> pd.Da except Exception as e: raise DatasetProcessingError(f"🛑 Failed in {stage.value} for {processor.name}: {e}") from e if len(df) != original_len: + if stage == ProcessorStage.PRE_BATCH: + self._raise_if_pre_batch_resized(original_len, len(df)) delta = len(df) - original_len logger.info(f"ℹ️ {stage.name} processors changed the record count by {delta:+d} records.") return df + @staticmethod + def _raise_if_pre_batch_resized(original_len: int, new_len: int) -> None: + if new_len != original_len: + raise DatasetProcessingError( + f"Pre-batch processor changed row count from {original_len} to {new_len}. " + "Row-count changes in pre-batch processors are not supported; use workflow chaining instead." + ) + def run_pre_batch(self, batch_manager: DatasetBatchManager) -> None: """Run process_before_batch() on current batch.""" if not self.has_processors_for(ProcessorStage.PRE_BATCH): @@ -70,25 +80,15 @@ def run_pre_batch(self, batch_manager: DatasetBatchManager) -> None: df = batch_manager.get_current_batch(as_dataframe=True) df = self._run_stage(df, ProcessorStage.PRE_BATCH) - batch_manager.replace_buffer(df.to_dict(orient="records"), allow_resize=True) + batch_manager.replace_buffer(df.to_dict(orient="records")) - def run_pre_batch_on_df(self, df: pd.DataFrame, *, strict_row_count: bool = False) -> pd.DataFrame: + def run_pre_batch_on_df(self, df: pd.DataFrame) -> pd.DataFrame: """Run PRE_BATCH processors on a DataFrame and return the result. Args: df: Input DataFrame. - strict_row_count: If True, raise ``DatasetProcessingError`` when a - processor changes the row count. Used by the async engine where - row-count changes are not supported. """ - original_len = len(df) - df = self._run_stage(df, ProcessorStage.PRE_BATCH) - if strict_row_count and len(df) != original_len: - raise DatasetProcessingError( - f"Pre-batch processor changed row count from {original_len} to {len(df)}. " - "Row-count changes in pre-batch processors are not supported with the async engine." - ) - return df + return self._run_stage(df, ProcessorStage.PRE_BATCH) def run_post_batch( self, df: pd.DataFrame, current_batch_number: int | None, *, strict_row_count: bool = False @@ -99,15 +99,14 @@ def run_post_batch( df: Input DataFrame. current_batch_number: Batch index passed to processors. strict_row_count: If True, raise ``DatasetProcessingError`` when a - processor changes the row count. Used by the async engine where - row-count changes are not supported. + processor changes the row count. """ original_len = len(df) df = self._run_stage(df, ProcessorStage.POST_BATCH, current_batch_number=current_batch_number) if strict_row_count and len(df) != original_len: raise DatasetProcessingError( f"Post-batch processor changed row count from {original_len} to {len(df)}. " - "Row-count changes in post-batch processors are not supported with the async engine." + "Row-count changes in post-batch processors are not supported; use workflow chaining instead." ) return df diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_tracker.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_tracker.py index 8529e0687..d827d0568 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_tracker.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_tracker.py @@ -97,7 +97,6 @@ def restore_skip_metadata( records: Sequence[dict], *, context: SkipMetadataRestoreContext, - allow_resize: bool, ) -> None: """Restore skip provenance using hidden restore IDs instead of row position.""" restored_source_ids: list[str] = [] @@ -121,13 +120,11 @@ def restore_skip_metadata( if meta is not None: record[SKIPPED_COLUMNS_RECORD_KEY] = set(meta) - if not allow_resize: - if len(restored_source_ids) != len(context.source_ids) or set(restored_source_ids) != context.source_ids: - raise ValueError( - "Full-column generation changed the row identity mapping while " - "allow_resize=False. Returned rows must preserve a 1:1 mapping " - "to the original input so skip provenance can be restored." - ) + if len(restored_source_ids) != len(context.source_ids) or set(restored_source_ids) != context.source_ids: + raise ValueError( + "Full-column generation changed the row identity mapping. Returned rows must preserve " + "a 1:1 mapping to the original input so skip provenance can be restored." + ) def _choose_restore_id_column(records: Sequence[dict]) -> str: diff --git a/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py b/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py index 683bb6dc2..3ebfa8694 100644 --- a/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py +++ b/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py @@ -142,10 +142,9 @@ def create_resource_provider( ) # Default the client mode from the env var when the caller hasn't decided. - # The interface (DataDesigner) computes the mode based on env var AND the - # config (e.g. allow_resize columns force a sync fallback) and passes the - # result explicitly. Direct callers of this factory still get the env-var - # default for backward compatibility. + # The interface (DataDesigner) computes the mode and passes the result + # explicitly. Direct callers still get the env-var default for backward + # compatibility. if client_concurrency_mode is None: client_concurrency_mode = ( ClientConcurrencyMode.ASYNC if flags.DATA_DESIGNER_ASYNC_ENGINE else ClientConcurrencyMode.SYNC diff --git a/packages/data-designer-engine/src/data_designer/engine/validation.py b/packages/data-designer-engine/src/data_designer/engine/validation.py index ed166577b..0937bc72d 100644 --- a/packages/data-designer-engine/src/data_designer/engine/validation.py +++ b/packages/data-designer-engine/src/data_designer/engine/validation.py @@ -41,7 +41,6 @@ class ViolationType(str, Enum): PROMPT_WITHOUT_REFERENCES = "prompt_without_references" SKIP_REFERENCE_MISSING = "skip_reference_missing" SKIP_ON_SAMPLER_SEED = "skip_on_sampler_seed" - SKIP_WITH_ALLOW_RESIZE = "skip_with_allow_resize" class ViolationLevel(str, Enum): @@ -414,7 +413,7 @@ def validate_skip_references( columns: list[ColumnConfigT], allowed_references: list[str], ) -> list[Violation]: - """Validate ``skip.when`` expressions: reference existence, type scope, and ``allow_resize`` conflicts.""" + """Validate ``skip.when`` expressions for reference existence and type scope.""" violations: list[Violation] = [] for column in columns: if column.skip is None: @@ -434,16 +433,6 @@ def validate_skip_references( ) ) - if getattr(column, "allow_resize", False): - violations.append( - Violation( - column=column.name, - type=ViolationType.SKIP_WITH_ALLOW_RESIZE, - message="skip and allow_resize cannot be used together on the same column.", - level=ViolationLevel.ERROR, - ) - ) - for ref in column.skip.columns: if ref not in allowed_references: violations.append( diff --git a/packages/data-designer-engine/tests/engine/column_generators/generators/test_async_generators.py b/packages/data-designer-engine/tests/engine/column_generators/generators/test_async_generators.py index 7eff29fec..995abde01 100644 --- a/packages/data-designer-engine/tests/engine/column_generators/generators/test_async_generators.py +++ b/packages/data-designer-engine/tests/engine/column_generators/generators/test_async_generators.py @@ -353,8 +353,8 @@ async def async_fn(row: dict) -> dict: @pytest.mark.asyncio(loop_scope="session") -async def test_custom_agenerate_async_allow_resize_invalid_list() -> None: - """Async custom generator with allow_resize rejects invalid non-dict list items.""" +async def test_custom_agenerate_async_rejects_list_return() -> None: + """Async cell-by-cell custom generators must return one dict per input row.""" @custom_column_generator(required_columns=["x"]) async def async_fn(row: dict) -> list: @@ -363,10 +363,9 @@ async def async_fn(row: dict) -> list: config = CustomColumnConfig( name="out", generator_function=async_fn, - allow_resize=True, ) gen = CustomColumnGenerator(config=config, resource_provider=_mock_provider()) - with pytest.raises(CustomColumnGenerationError, match="list elements must be dicts"): + with pytest.raises(CustomColumnGenerationError, match="must return a dict, got list"): await gen.agenerate({"x": 1}) diff --git a/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py b/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py index af7bc7109..a0fa44742 100644 --- a/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py +++ b/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py @@ -123,23 +123,6 @@ def test_config_validation_non_callable() -> None: CustomColumnConfig(name="test", generator_function="not_a_function") -def test_config_validation_allow_resize_allows_full_column_and_cell_by_cell() -> None: - """allow_resize=True is valid with full_column or cell_by_cell.""" - - @custom_column_generator() - def dummy_fn(row: dict) -> dict: - return row - - for strategy in (GenerationStrategy.FULL_COLUMN, GenerationStrategy.CELL_BY_CELL): - config = CustomColumnConfig( - name="test", - generator_function=dummy_fn, - allow_resize=True, - generation_strategy=strategy, - ) - assert config.allow_resize is True - - # Cell-by-cell generation tests @@ -187,25 +170,8 @@ def test_side_effect_columns() -> None: assert result["secondary"] == 15 -# cell_by_cell allow_resize: dict | list[dict] - - -def test_cell_by_cell_allow_resize_return_dict() -> None: - """With allow_resize, returning a single dict (1:1) works like normal cell-by-cell.""" - config = CustomColumnConfig( - name="result", - generator_function=generator_with_required_columns, - generation_strategy=GenerationStrategy.CELL_BY_CELL, - allow_resize=True, - ) - generator = CustomColumnGenerator(config=config, resource_provider=Mock(spec=ResourceProvider)) - result = generator.generate({"input": "hi"}) - assert isinstance(result, dict) - assert result["result"] == "HI" - - -def test_cell_by_cell_allow_resize_return_list_expand() -> None: - """With allow_resize, returning list[dict] expands one row into multiple.""" +def test_cell_by_cell_return_list_rejected() -> None: + """Cell-by-cell generators must return one dict per input row.""" @custom_column_generator(required_columns=["x"]) def expand(row: dict) -> list[dict]: @@ -218,67 +184,9 @@ def expand(row: dict) -> list[dict]: name="out", generator_function=expand, generation_strategy=GenerationStrategy.CELL_BY_CELL, - allow_resize=True, - ) - generator = CustomColumnGenerator(config=config, resource_provider=Mock(spec=ResourceProvider)) - result = generator.generate({"x": 10}) - assert isinstance(result, list) - assert len(result) == 2 - assert result[0] == {"x": 10, "out": 10} - assert result[1] == {"x": 10, "out": 20} - - -def test_cell_by_cell_allow_resize_return_list_single() -> None: - """With allow_resize, returning [dict] (1:1 via list) is valid.""" - - @custom_column_generator(required_columns=["x"]) - def one_row(row: dict) -> list[dict]: - return [{**row, "out": row["x"]}] - - config = CustomColumnConfig( - name="out", - generator_function=one_row, - generation_strategy=GenerationStrategy.CELL_BY_CELL, - allow_resize=True, - ) - generator = CustomColumnGenerator(config=config, resource_provider=Mock(spec=ResourceProvider)) - result = generator.generate({"x": 42}) - assert result == [{"x": 42, "out": 42}] - - -def test_cell_by_cell_allow_resize_return_empty_list() -> None: - """With allow_resize, returning [] drops that row (0 rows).""" - - @custom_column_generator(required_columns=["x"]) - def drop(row: dict) -> list[dict]: - return [] - - config = CustomColumnConfig( - name="out", - generator_function=drop, - generation_strategy=GenerationStrategy.CELL_BY_CELL, - allow_resize=True, - ) - generator = CustomColumnGenerator(config=config, resource_provider=Mock(spec=ResourceProvider)) - result = generator.generate({"x": 1}) - assert result == [] - - -def test_cell_by_cell_allow_resize_invalid_return_type() -> None: - """With allow_resize, return must be dict or list[dict].""" - - @custom_column_generator(required_columns=["x"]) - def bad_return(row: dict): - return [1, 2] - - config = CustomColumnConfig( - name="out", - generator_function=bad_return, - generation_strategy=GenerationStrategy.CELL_BY_CELL, - allow_resize=True, ) generator = CustomColumnGenerator(config=config, resource_provider=Mock(spec=ResourceProvider)) - with pytest.raises(CustomColumnGenerationError, match="list elements must be dicts"): + with pytest.raises(CustomColumnGenerationError, match="must return a dict, got list"): generator.generate({"x": 1}) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py index b2bc98c4a..e223ddd25 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py @@ -5,9 +5,8 @@ import math import tracemalloc -import warnings from types import SimpleNamespace -from unittest.mock import MagicMock, Mock +from unittest.mock import MagicMock import pytest @@ -75,48 +74,6 @@ def generate(self, data: lazy.pd.DataFrame) -> lazy.pd.DataFrame: return data -# -- allow_resize validation test --------------------------------------------- - - -@pytest.mark.parametrize( - "configs,expected", - [ - pytest.param( - [Mock(name="col_a", allow_resize=True), Mock(name="col_b", allow_resize=False)], - False, - id="fallback_on_allow_resize", - ), - pytest.param( - [Mock(name="col_a", allow_resize=False), Mock(name="col_b", allow_resize=False)], - True, - id="async_without_allow_resize", - ), - ], -) -def test_resolve_async_compatibility(configs: list[Mock], expected: bool) -> None: - """allow_resize=True triggers auto-fallback to sync with a deprecation warning.""" - builder = Mock(spec=DatasetBuilder) - builder.single_column_configs = configs - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - result = DatasetBuilder._resolve_async_compatibility(builder) - assert result is expected - if not expected: - assert len(w) == 1 - assert issubclass(w[0].category, DeprecationWarning) - assert "allow_resize" in str(w[0].message) - # Regression for PR #594 review: the warning must attribute to the - # caller's frame (this test file), not to a ``data_designer.*`` library - # frame. Library-attributed ``DeprecationWarning`` entries fall under - # Python's default ``ignore::DeprecationWarning`` filter and are - # silenced. A regression to ``warnings.warn(..., stacklevel=N)`` would - # land somewhere inside the engine package and silently break the - # user-facing nudge. - assert w[0].filename == __file__ - else: - assert len(w) == 0 - - # -- _build_async integration test with mock generators ----------------------- diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index 0a0f192b4..763826578 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -817,6 +817,18 @@ def test_multiple_processors_run_in_definition_order(builder_with_seed): assert call_order == ["a", "b", "c"] +def test_pre_batch_processor_row_count_change_rejected(builder_with_seed, caplog): + mock_processor = create_mock_processor("filtering_processor", ["process_before_batch"]) + mock_processor.process_before_batch.side_effect = lambda df: df.iloc[:2].reset_index(drop=True) + builder_with_seed.set_processor_runner([mock_processor]) + + with caplog.at_level(logging.INFO): + with pytest.raises(DatasetGenerationError, match="Pre-batch processor changed row count"): + builder_with_seed.build(num_records=3) + + assert not any("PRE_BATCH processors changed the record count" in record.message for record in caplog.records) + + def test_process_preview_with_empty_dataframe(simple_builder): """Test that process_preview handles empty DataFrames gracefully.""" mock_processor = create_mock_processor("test_processor", ["process_after_batch", "process_after_generation"]) @@ -829,242 +841,6 @@ def test_process_preview_with_empty_dataframe(simple_builder): mock_processor.process_after_generation.assert_called_once() -# allow_resize integration tests -# -# Factory: _make_resize_full_expand. Stubs: _resize_full_keep_first, _resize_cell_*. - - -def _make_resize_full_expand(n: int, primary_col: str, side_effect_col: str): - """FULL_COLUMN: expand n times per seed_id.""" - - @custom_column_generator(required_columns=["seed_id"], side_effect_columns=[side_effect_col]) - def fn(df: pd.DataFrame) -> pd.DataFrame: - rows = [] - for _, row in df.iterrows(): - for i in range(n): - rows.append({**row.to_dict(), primary_col: f"{row['seed_id']}_v{i}", side_effect_col: i}) - return lazy.pd.DataFrame(rows) - - return fn - - -@custom_column_generator(required_columns=["seed_id"]) -def _resize_full_keep_first(df: pd.DataFrame) -> pd.DataFrame: - """FULL_COLUMN: keep first row per seed_id (retraction).""" - return df.drop_duplicates(subset="seed_id").assign(filtered=True) - - -@custom_column_generator(required_columns=["seed_id"]) -def _resize_full_drop_seed_one(df: pd.DataFrame) -> pd.DataFrame: - """FULL_COLUMN: drop the row with seed_id == 1.""" - return df[df["seed_id"] != 1].reset_index(drop=True).assign(filtered=True) - - -@custom_column_generator(required_columns=["seed_id"]) -def _resize_cell_expand(row: dict) -> list[dict]: - """CELL_BY_CELL: one row -> two rows (doubled).""" - return [ - {**row, "doubled": f"{row['seed_id']}_a"}, - {**row, "doubled": f"{row['seed_id']}_b"}, - ] - - -@custom_column_generator(required_columns=["seed_id"]) -def _resize_cell_filter_odd(row: dict) -> dict | list[dict]: - """CELL_BY_CELL: drop even seed_id, keep odd.""" - if row["seed_id"] % 2 == 0: - return [] - return {**row, "kept": row["seed_id"]} - - -@custom_column_generator(required_columns=["seed_id"]) -def _resize_cell_drop_all(row: dict) -> list[dict]: - """CELL_BY_CELL: return [] for every row (drop all).""" - return [] - - -_RESIZE_SPECS: dict[str, list[tuple[str, object, GenerationStrategy]]] = { - "cell_filter_odd": [("kept", _resize_cell_filter_odd, GenerationStrategy.CELL_BY_CELL)], - "cell_x2": [("doubled", _resize_cell_expand, GenerationStrategy.CELL_BY_CELL)], - "cell_drop_all": [("dropped", _resize_cell_drop_all, GenerationStrategy.CELL_BY_CELL)], - "full_x3": [("expanded", _make_resize_full_expand(3, "expanded", "copy"), GenerationStrategy.FULL_COLUMN)], - "full_chain": [ - ("expanded", _make_resize_full_expand(3, "expanded", "copy"), GenerationStrategy.FULL_COLUMN), - ("filtered", _resize_full_keep_first, GenerationStrategy.FULL_COLUMN), - ("expanded_again", _make_resize_full_expand(3, "expanded_again", "copy2"), GenerationStrategy.FULL_COLUMN), - ], - "cell_plus_full_chain": [ - ("doubled", _resize_cell_expand, GenerationStrategy.CELL_BY_CELL), - ("filtered", _resize_full_keep_first, GenerationStrategy.FULL_COLUMN), - ("expanded_again", _make_resize_full_expand(3, "expanded_again", "copy2"), GenerationStrategy.FULL_COLUMN), - ], -} - - -def _resize_columns(spec: str) -> list[CustomColumnConfig]: - """Return column configs for a given allow_resize recipe.""" - return [ - CustomColumnConfig( - name=name, - generator_function=fn, - generation_strategy=strat, - allow_resize=True, - ) - for name, fn, strat in _RESIZE_SPECS[spec] - ] - - -def _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns): - """Build a DatasetBuilder with the given resize column configs.""" - config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) - config_builder.with_seed_dataset(LocalFileSeedSource(path=str(seed_data_setup["seed_path"]))) - for col in columns: - config_builder.add_column(col) - return DatasetBuilder( - data_designer_config=config_builder.build(), - resource_provider=stub_resource_provider, - ) - - -@pytest.mark.parametrize( - "spec,num_records,expected_len,check_doubled_order", - [ - ("cell_filter_odd", 5, 3, False), - ("cell_x2", 5, 10, True), - ("cell_drop_all", 5, 0, False), - ("full_x3", 5, 15, False), - ("full_chain", 5, 15, False), - ("cell_plus_full_chain", 5, 15, False), - ], - ids=[ - "cell_filter_odd_preview", - "cell_x2_preview", - "cell_drop_all_preview", - "full_x3_preview", - "full_chain_preview", - "cell_plus_full_chain_preview", - ], -) -def test_allow_resize_preview( - stub_resource_provider, - stub_model_configs, - seed_data_setup, - spec, - num_records, - expected_len, - check_doubled_order, -): - """Preview with allow_resize columns (FULL_COLUMN and/or CELL_BY_CELL) yields expected length.""" - columns = _resize_columns(spec) - builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) - result = builder.build_preview(num_records=num_records) - assert len(result) == expected_len - if check_doubled_order: - expected = [x for i in range(1, 6) for x in (f"{i}_a", f"{i}_b")] - assert result["doubled"].tolist() == expected - - -@pytest.mark.parametrize( - "spec,num_records,buffer_size,expected_total_rows", - [ - ("cell_x2", 5, 2, 10), # batches [2,2,1] -> each x2 -> 4+4+2 - ("cell_filter_odd", 5, 2, 3), # batches [2,2,1] -> keep odd -> 1+1+1 - ("cell_drop_all", 5, 2, 0), # each batch -> 0 rows - ("full_x3", 5, 2, 15), # batches [2,2,1] -> each x3 -> 6+6+3 - ("full_x3", 4, 2, 12), # batches [2,2] -> 6+6 - ("full_chain", 5, 2, 15), # batches [2,2,1] -> x3, dedup, x3 -> 15 - ], - ids=[ - "cell_x2_multibatch", - "cell_filter_odd_multibatch", - "cell_drop_all_multibatch", - "full_x3_multibatch_5_2", - "full_x3_multibatch_4_2", - "full_chain_multibatch", - ], -) -def test_allow_resize_multiple_batches( - stub_resource_provider, - stub_model_configs, - seed_data_setup, - spec, - num_records, - buffer_size, - expected_total_rows, -): - """Resized batches are written independently and combine to expected total rows.""" - stub_resource_provider.run_config = RunConfig(buffer_size=buffer_size) - columns = _resize_columns(spec) - builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) - builder.build(num_records=num_records) - final_path = builder.artifact_storage.final_dataset_path - if expected_total_rows == 0 and not final_path.exists(): - df = lazy.pd.DataFrame() - else: - df = lazy.pd.read_parquet(final_path) - assert len(df) == expected_total_rows - - -def test_resume_rejects_allow_resize_columns(stub_resource_provider, stub_model_configs, seed_data_setup, tmp_path): - """Resume is rejected when allow_resize=True would make batch boundaries ambiguous.""" - artifact_path = tmp_path / "artifacts" - artifact_path.mkdir() - _write_metadata( - artifact_path / "dataset", - target_num_records=5, - buffer_size=2, - num_completed_batches=1, - actual_num_records=2, - ) - - stub_resource_provider.artifact_storage = ArtifactStorage(artifact_path=artifact_path, resume=ResumeMode.ALWAYS) - columns = _resize_columns("cell_x2") - builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) - - with pytest.raises(DatasetGenerationError, match="allow_resize=True"): - builder.build(num_records=5, resume=ResumeMode.ALWAYS) - - -def test_if_possible_allows_allow_resize_when_starting_fresh( - stub_resource_provider, stub_model_configs, seed_data_setup -): - """IF_POSSIBLE with allow_resize=True starts fresh when there is no checkpoint to resume.""" - columns = _resize_columns("cell_x2") - builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) - - final_path = builder.build(num_records=5, resume=ResumeMode.IF_POSSIBLE) - - df = lazy.pd.read_parquet(final_path) - assert len(df) == 10 - - -def test_if_possible_allows_allow_resize_when_config_is_incompatible( - stub_resource_provider, stub_model_configs, seed_data_setup, tmp_path -): - """IF_POSSIBLE with allow_resize=True starts fresh when an existing dataset is incompatible.""" - dataset_dir = tmp_path / "dataset" - dataset_dir.mkdir() - sentinel = dataset_dir / "important_file.txt" - sentinel.write_text("precious data") - - storage = ArtifactStorage(artifact_path=tmp_path, resume=ResumeMode.IF_POSSIBLE) - stub_resource_provider.artifact_storage = storage - columns = _resize_columns("cell_x2") - builder = _build_resize_builder(stub_resource_provider, stub_model_configs, seed_data_setup, columns) - _write_incompatible_config_metadata( - dataset_dir, - builder.data_designer_config.fingerprint()["config_hash_version"], - ) - - final_path = builder.build(num_records=5, resume=ResumeMode.IF_POSSIBLE) - - assert storage.resume == ResumeMode.NEVER - assert sentinel.exists() - assert final_path != dataset_dir / "parquet-files" - df = lazy.pd.read_parquet(final_path) - assert len(df) == 10 - - # skip metadata preservation tests @@ -1243,85 +1019,6 @@ def test_skip_propagation_resolves_side_effect_dependencies_in_sync_builder( assert row["analysis"] == "generated_analysis", f"seed_id={row['seed_id']}: analysis should be generated" -def test_skip_metadata_restore_preserves_row_identity_across_allow_resize_full_column( - stub_resource_provider, stub_model_configs, seed_data_setup -): - """Filtering out a skipped row must not transfer its skip provenance to surviving rows.""" - config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) - config_builder.with_seed_dataset(LocalFileSeedSource(path=str(seed_data_setup["seed_path"]))) - - config_builder.add_column( - CustomColumnConfig( - name="review", - generator_function=_make_label_generator("review", "seed_id"), - generation_strategy=GenerationStrategy.FULL_COLUMN, - skip=SkipConfig(when="{{ seed_id == 1 }}"), - ) - ) - config_builder.add_column( - CustomColumnConfig( - name="filtered", - generator_function=_resize_full_drop_seed_one, - generation_strategy=GenerationStrategy.FULL_COLUMN, - allow_resize=True, - propagate_skip=False, - ) - ) - config_builder.add_column( - CustomColumnConfig( - name="analysis", - generator_function=_make_label_generator("analysis", "review"), - generation_strategy=GenerationStrategy.FULL_COLUMN, - propagate_skip=True, - ) - ) - - builder = DatasetBuilder( - data_designer_config=config_builder.build(), - resource_provider=stub_resource_provider, - ) - result = builder.build_preview(num_records=5) - - assert result["seed_id"].tolist() == [2, 3, 4, 5] - assert result["analysis"].tolist() == ["generated_analysis"] * 4 - - -def test_allow_resize_column_not_blocked_by_upstream_skip(stub_resource_provider, stub_model_configs, seed_data_setup): - """An allow_resize=True column depending on a skippable upstream must not - enter the skip-aware branch (which enforces 1:1 row counts). - - Before the fix, _column_can_skip returned True for allow_resize columns - with propagate_skip=True and required_columns pointing to a skippable - upstream, causing a DatasetGenerationError on the row-count check. - """ - config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) - config_builder.with_seed_dataset(LocalFileSeedSource(path=str(seed_data_setup["seed_path"]))) - - config_builder.add_column( - CustomColumnConfig( - name="review", - generator_function=_make_label_generator("review", "seed_id"), - generation_strategy=GenerationStrategy.FULL_COLUMN, - skip=SkipConfig(when="{{ seed_id < 3 }}"), - ) - ) - config_builder.add_column( - CustomColumnConfig( - name="expanded", - generator_function=_make_resize_full_expand(2, "expanded", "copy"), - generation_strategy=GenerationStrategy.FULL_COLUMN, - allow_resize=True, - ) - ) - - builder = DatasetBuilder( - data_designer_config=config_builder.build(), - resource_provider=stub_resource_provider, - ) - result = builder.build_preview(num_records=5) - assert len(result) == 10 - - def test_skip_chained_transitive_propagation_through_three_levels( stub_resource_provider, stub_model_configs, seed_data_setup ) -> None: @@ -1969,6 +1666,25 @@ def test_build_resume_always_raises_on_config_mismatch(stub_resource_provider, s builder.build(num_records=4, resume=ResumeMode.ALWAYS) +def test_build_resume_always_raises_on_unreadable_stored_config( + stub_resource_provider, stub_test_config_builder, tmp_path +): + """resume=ALWAYS rejects legacy stored configs that fail schema validation.""" + dataset_dir = tmp_path / "dataset" + _write_metadata( + dataset_dir, + target_num_records=4, + buffer_size=2, + num_completed_batches=1, + actual_num_records=2, + ) + (dataset_dir / "builder_config.json").write_text('{"data_designer": {"columns": [{"allow_resize": true}]}}') + + builder = _make_resume_builder(stub_resource_provider, stub_test_config_builder, tmp_path) + with pytest.raises(DatasetGenerationError, match="does not match the config used"): + builder.build(num_records=4, resume=ResumeMode.ALWAYS) + + def test_build_resume_logs_warning_when_already_complete( stub_resource_provider, stub_test_config_builder, tmp_path, caplog ): diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py index cf3a600bf..d477a39c5 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dataset_batch_manager.py @@ -169,41 +169,6 @@ def test_replace_buffer_wrong_length(stub_batch_manager_with_data): stub_batch_manager_with_data.replace_buffer(wrong_length_records) -@pytest.mark.parametrize( - "new_size", - [6, 1, 0], - ids=["expansion", "retraction", "empty"], -) -def test_replace_buffer_allow_resize(stub_batch_manager_with_data, new_size): - """allow_resize=True permits any record count change and updates bookkeeping.""" - stub_batch_manager_with_data.add_records([{"id": i} for i in range(3)]) - - new_records = [{"id": i} for i in range(new_size)] - stub_batch_manager_with_data.replace_buffer(new_records, allow_resize=True) - - assert stub_batch_manager_with_data.num_records_in_buffer == new_size - assert stub_batch_manager_with_data._buffer == new_records - assert stub_batch_manager_with_data.num_records_batch == new_size - - -def test_actual_num_records_tracks_expansion(stub_batch_manager_with_data): - """Test that actual_num_records correctly tracks when buffer is resized.""" - # Add 3 records, then expand to 6 - records = [{"id": i} for i in range(3)] - stub_batch_manager_with_data.add_records(records) - expanded = [{"id": i} for i in range(6)] - stub_batch_manager_with_data.replace_buffer(expanded, allow_resize=True) - - # Finish batch and check metadata - stub_batch_manager_with_data.finish_batch() - - with open(stub_batch_manager_with_data.artifact_storage.metadata_file_path) as f: - metadata = json.load(f) - - assert metadata["target_num_records"] == 13 # [6, 3, 3, 1] after resize from [3, 3, 3, 1] - assert metadata["actual_num_records"] == 6 # actual expanded count - - # Test write method def test_write_empty_buffer(stub_batch_manager_with_data): result = stub_batch_manager_with_data.write() diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_skip_tracker.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_skip_tracker.py index 7eeb68789..cf833c8e7 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_skip_tracker.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_skip_tracker.py @@ -172,14 +172,14 @@ def test_restore_skip_metadata_uses_restore_ids_after_reorder() -> None: {"a": 10, restore_id_column: prepared_rows[0][restore_id_column]}, {"a": 20, restore_id_column: prepared_rows[1][restore_id_column]}, ] - restore_skip_metadata(new, context=restore_context, allow_resize=False) + restore_skip_metadata(new, context=restore_context) assert new[0][SKIPPED_COLUMNS_RECORD_KEY] == {"col_z"} assert new[1][SKIPPED_COLUMNS_RECORD_KEY] == {"col_x"} assert SKIPPED_COLUMNS_RECORD_KEY not in new[2] -def test_restore_skip_metadata_allow_resize_handles_filtered_rows() -> None: +def test_restore_skip_metadata_rejects_filtered_rows() -> None: old = [{"a": 1}, {"a": 2}] prepared_rows, restore_context = prepare_records_for_skip_metadata_round_trip(old) assert restore_context is None @@ -193,9 +193,9 @@ def test_restore_skip_metadata_allow_resize_handles_filtered_rows() -> None: restore_id_column = restore_context.restore_id_column new = [{"a": 20, restore_id_column: prepared_rows[1][restore_id_column]}] - restore_skip_metadata(new, context=restore_context, allow_resize=True) - assert SKIPPED_COLUMNS_RECORD_KEY not in new[0] + with pytest.raises(ValueError, match="1:1 mapping"): + restore_skip_metadata(new, context=restore_context) def test_restore_skip_metadata_rejects_missing_restore_id_column() -> None: @@ -204,4 +204,4 @@ def test_restore_skip_metadata_rejects_missing_restore_id_column() -> None: assert restore_context is not None with pytest.raises(ValueError, match="must preserve the internal column"): - restore_skip_metadata([{"a": 10}], context=restore_context, allow_resize=False) + restore_skip_metadata([{"a": 10}], context=restore_context) diff --git a/packages/data-designer-engine/tests/engine/test_validation.py b/packages/data-designer-engine/tests/engine/test_validation.py index 1a1191a9e..6b1cff615 100644 --- a/packages/data-designer-engine/tests/engine/test_validation.py +++ b/packages/data-designer-engine/tests/engine/test_validation.py @@ -460,27 +460,9 @@ def test_validate_skip_on_sampler_seed() -> None: params={"low": 0, "high": 10}, skip=SkipConfig(when="{{ y }}"), drop=False, - allow_resize=False, propagate_skip=True, ) violations = validate_skip_references([col], allowed_references=["y"]) assert len(violations) == 1 assert violations[0].type == ViolationType.SKIP_ON_SAMPLER_SEED assert violations[0].column == "sampler_with_skip" - - -def test_validate_skip_with_allow_resize() -> None: - col = LLMTextColumnConfig.model_construct( - name="with_skip", - column_type="llm-text", - prompt="test {{ gate }}", - model_alias=STUB_MODEL_ALIAS, - skip=SkipConfig(when="{{ gate == 0 }}"), - allow_resize=True, - drop=False, - propagate_skip=True, - ) - violations = validate_skip_references([col], allowed_references=["gate"]) - assert len(violations) == 1 - assert violations[0].type == ViolationType.SKIP_WITH_ALLOW_RESIZE - assert violations[0].column == "with_skip" diff --git a/packages/data-designer/src/data_designer/interface/data_designer.py b/packages/data-designer/src/data_designer/interface/data_designer.py index 270fff96c..32be9e9be 100644 --- a/packages/data-designer/src/data_designer/interface/data_designer.py +++ b/packages/data-designer/src/data_designer/interface/data_designer.py @@ -268,9 +268,8 @@ def create( ) # ``DeprecationWarning`` is re-raised before the generic wrapper so that - # ``warnings.warn(..., DeprecationWarning)`` calls inside the engine — most - # notably ``allow_resize=True`` deprecation in ``_resolve_async_compatibility`` - # — surface their original message under strict warning filters + # ``warnings.warn(..., DeprecationWarning)`` calls inside the engine + # surface their original message under strict warning filters # (``pytest.warns``, ``-W error::DeprecationWarning``, etc.) instead of being # swallowed and re-wrapped as a generic ``DataDesignerGenerationError``. try: @@ -706,21 +705,12 @@ def _create_request_admission_controller(self) -> AdaptiveRequestAdmissionContro def _resolve_client_concurrency_mode(config_builder: DataDesignerConfigBuilder) -> ClientConcurrencyMode: """Pick the model-client mode that matches the engine the run will use. - The async engine is the default, but ``allow_resize=True`` columns force - a sync-engine fallback (see ``DatasetBuilder._resolve_async_compatibility``). - Without aligning the client mode here, those runs would create async-only - clients and then call sync methods on them — raising ``SyncClientUnavailableError`` - from inside the sync engine. Match the client mode to the actual engine - choice so the fallback path is functional. + The async engine is the default. Users can still opt into the legacy sync + engine with ``DATA_DESIGNER_ASYNC_ENGINE=0`` for the transitional release. """ if not flags.DATA_DESIGNER_ASYNC_ENGINE: # Deliberate opt-out via env var. Surface the deprecation so users - # know the sync path is going away. Mirror the ``allow_resize`` shape - # in ``_resolve_async_compatibility``: emit both a ``logger.warning`` - # (visible in the project's logging output) and a ``DeprecationWarning`` - # (programmatic signal callers can filter on). The ``allow_resize`` - # auto-fallback has its own warning from the builder layer; we don't - # double-warn here. + # know the sync path is going away. msg = ( "DATA_DESIGNER_ASYNC_ENGINE=0 selects the legacy sync engine, which is " "deprecated and will be removed in a future release. Unset the variable " @@ -729,8 +719,6 @@ def _resolve_client_concurrency_mode(config_builder: DataDesignerConfigBuilder) logger.warning(f"⚠️ {msg}") warnings.warn(msg, DeprecationWarning, stacklevel=3) return ClientConcurrencyMode.SYNC - if any(c.allow_resize for c in config_builder.get_column_configs()): - return ClientConcurrencyMode.SYNC return ClientConcurrencyMode.ASYNC def _get_interface_info(self, model_providers: list[ModelProvider]) -> InterfaceInfo: diff --git a/packages/data-designer/tests/interface/test_data_designer.py b/packages/data-designer/tests/interface/test_data_designer.py index 67fdf9dfb..69c1f626a 100644 --- a/packages/data-designer/tests/interface/test_data_designer.py +++ b/packages/data-designer/tests/interface/test_data_designer.py @@ -18,13 +18,11 @@ import data_designer.interface.data_designer as dd_mod import data_designer.lazy_heavy_imports as lazy from data_designer.config.column_configs import ( - CustomColumnConfig, ExpressionColumnConfig, LLMTextColumnConfig, SamplerColumnConfig, ) from data_designer.config.config_builder import DataDesignerConfigBuilder -from data_designer.config.custom_column import custom_column_generator from data_designer.config.errors import InvalidConfigError from data_designer.config.models import ChatCompletionInferenceParams, ModelConfig, ModelProvider from data_designer.config.processors import DropColumnsProcessorConfig @@ -394,73 +392,38 @@ def stub_seed_reader(): return StubHuggingFaceSeedReader() -def _builder_with_allow_resize() -> DataDesignerConfigBuilder: - """Config with one allow_resize=True column — forces sync-engine fallback.""" - - @custom_column_generator() - def _expander(row: dict) -> list[dict]: - return [{**row, "item": i} for i in range(2)] - - builder = DataDesignerConfigBuilder() - builder.add_column( - SamplerColumnConfig( - name="seed", - sampler_type=SamplerType.CATEGORY, - params=CategorySamplerParams(values=["a"]), - ) - ) - builder.add_column( - CustomColumnConfig( - name="item", - generator_function=_expander, - allow_resize=True, - ) - ) - return builder - - @pytest.mark.parametrize( - "env_value,with_allow_resize,expected,expect_deprecation", + "env_value,expected,expect_deprecation", [ - ("1", False, "async", False), - ("1", True, "sync", False), - ("0", False, "sync", True), + ("1", "async", False), + ("0", "sync", True), ], ids=[ - "async-on-no-fallback-uses-async-clients", - "async-on-allow-resize-falls-back-to-sync-clients", + "async-on-uses-async-clients", "async-off-uses-sync-clients-and-warns", ], ) def test_resolve_client_concurrency_mode_matches_engine_choice( env_value: str, - with_allow_resize: bool, expected: str, expect_deprecation: bool, monkeypatch: pytest.MonkeyPatch, ) -> None: """Client mode must match the engine the run will actually use. - Without this alignment, a sync-fallback run (e.g. ``allow_resize=True``) - would be left with async-only clients and call sync methods on them, - raising ``SyncClientUnavailableError`` from inside the sync engine. - The ``DATA_DESIGNER_ASYNC_ENGINE=0`` opt-out path also emits a ``DeprecationWarning`` so users on the legacy sync engine see a - pre-removal signal in their logs. The auto-fallback path - (``allow_resize=True``) does not double-warn here; the builder layer - emits its own warning when the run actually executes. + pre-removal signal in their logs. """ monkeypatch.setattr(flags, "DATA_DESIGNER_ASYNC_ENGINE", env_value == "1") - builder = _builder_with_allow_resize() if with_allow_resize else DataDesignerConfigBuilder() - if not with_allow_resize: - builder.add_column( - SamplerColumnConfig( - name="seed", - sampler_type=SamplerType.CATEGORY, - params=CategorySamplerParams(values=["a"]), - ) + builder = DataDesignerConfigBuilder() + builder.add_column( + SamplerColumnConfig( + name="seed", + sampler_type=SamplerType.CATEGORY, + params=CategorySamplerParams(values=["a"]), ) + ) if expect_deprecation: with pytest.warns(DeprecationWarning, match="legacy sync engine"): @@ -1374,29 +1337,6 @@ def test_check_models_invokes_readiness_check( assert kwargs["client_concurrency_mode"] == ClientConcurrencyMode.ASYNC -def test_check_models_passes_sync_mode_for_sync_fallback( - stub_artifact_path, - stub_model_providers, - stub_managed_assets_path, - monkeypatch: pytest.MonkeyPatch, -): - """check_models readiness uses the resolved client mode, including allow_resize fallback.""" - monkeypatch.setattr(flags, "DATA_DESIGNER_ASYNC_ENGINE", True) - config_builder = _builder_with_allow_resize() - data_designer = DataDesigner( - artifact_path=stub_artifact_path, - model_providers=stub_model_providers, - secret_resolver=PlaintextResolver(), - managed_assets_path=stub_managed_assets_path, - ) - - with patch("data_designer.interface.data_designer.run_readiness_check") as mock_check: - data_designer.check_models(config_builder) - - _, kwargs = mock_check.call_args - assert kwargs["client_concurrency_mode"] == ClientConcurrencyMode.SYNC - - def test_check_models_propagates_typed_model_error( stub_artifact_path, stub_model_providers, diff --git a/plans/346/async-generators-and-task-queue.md b/plans/346/async-generators-and-task-queue.md index a1c57aa84..280d46f63 100644 --- a/plans/346/async-generators-and-task-queue.md +++ b/plans/346/async-generators-and-task-queue.md @@ -3,6 +3,8 @@ Created: 2026-02-20 Status: In Progress +Historical note: this plan records the async-engine rollout before #766. References to `allow_resize` below describe removed pre-#766 behavior and are not current guidance. + Issue: [#346](https://github.com/NVIDIA-NeMo/DataDesigner/issues/346) Related: diff --git a/plans/479/skip-when-conditional-generation.md b/plans/479/skip-when-conditional-generation.md index 1bd2c5277..11e656a11 100644 --- a/plans/479/skip-when-conditional-generation.md +++ b/plans/479/skip-when-conditional-generation.md @@ -7,6 +7,8 @@ issue: https://github.com/NVIDIA-NeMo/DataDesigner/issues/479 # Plan: Conditional column generation — `SkipConfig` / `skip.when` +Historical note: this plan records the pre-#766 design. References to `allow_resize` below describe behavior that has since been removed from the config schema and engine. + ## Problem DataDesigner's DAG executes every column for every row unconditionally. In multi-stage synthesis pipelines, expensive downstream generation (LLM calls, segmentation, etc.) runs even when an earlier gate column indicates the row should be filtered out. diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index be9182b20..9fb2a9179 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -27,11 +27,11 @@ As a secondary benefit, chaining also enables the removal of `allow_resize` and ### Secondary benefit: `allow_resize` removal and sync/async convergence -The `allow_resize` flag on column configs lets a generator change the row count mid-generation. This works in the sync engine but is fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid. As of #553, an `allow_resize=True` config in async mode logs a `DeprecationWarning` and silently falls back to the sync engine for that run; it is no longer hard-rejected. +Historical note: this section describes the motivation for work now completed by #766. Current configs no longer support `allow_resize`; row-count-changing work belongs in `process_after_generation()` or at a workflow boundary. -`allow_resize` is one of the remaining divergences between sync and async. The async engine is the default execution path as of #592; sync remains only as a fallback for `allow_resize` runs. Maintaining a sync-only feature to keep one fallback path alive is counterproductive. With composite workflows in place, resize becomes a between-stage concern rather than a mid-generation concern. This lets us remove `allow_resize` and the associated engine complexity, and disallow row-count changes in pre-batch processors. Users who need resize use a composite workflow with a stage boundary at the resize point. +The removed `allow_resize` flag let a generator change the row count mid-generation. That was fundamentally incompatible with the async engine's fixed-size `CompletionTracker` grid and kept a sync-only fallback path alive. With composite workflows in place, resize is a between-stage concern rather than a mid-generation concern. -Note: `allow_resize` is documented in custom columns, plugin examples, and agent rollout ingestion docs (verified post-Fern migration in #581). The deprecation warning has shipped in #553; full removal still requires doc updates and the migration of any in-tree usage. +Note: shipped docs and examples were migrated as part of the removal. ### Why chaining instead of fixing async resize @@ -257,7 +257,7 @@ Adding `async def acreate(...)` on `DataDesigner` is a small, additive change. T `acreate()` is *not* part of chaining v1. It ships as its own small piece of work that can land before, alongside, or after Phase 1; the dependency only becomes hard for Phase 4. Listed as a sidecar under Implementation phases. -### Part 2: Remove `allow_resize` +### Part 2: Remove `allow_resize` (completed in #766) With composite workflows in place, `allow_resize` is no longer needed as an engine-internal mechanism. Resize becomes a between-stage concern. @@ -401,7 +401,7 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - Add `compose_workflow(name: str)` factory method on `DataDesigner`. - Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, duplicate stage-name rejection, artifact layout, throttle reuse across stages. -**Status after PR #636:** Implemented `CompositeWorkflow`, `compose_workflow()`, `to_config_builder()`, disk handoff, stage metadata, `acreate()`, shared throttle manager reuse, explicit stage artifact roots, cloned stage builders, concurrent-safe seed reader/resource-provider handling, seeded processor-only configs, stage output processors, and stage output selection. Still deferred after #636: stage-level resume, DAG branches, `allow_resize` removal, config bundles, and broader first-class artifact seeding. +**Status after PR #636:** Implemented `CompositeWorkflow`, `compose_workflow()`, `to_config_builder()`, disk handoff, stage metadata, `acreate()`, shared throttle manager reuse, explicit stage artifact roots, cloned stage builders, concurrent-safe seed reader/resource-provider handling, seeded processor-only configs, stage output processors, and stage output selection. Still deferred after #636: stage-level resume, DAG branches, config bundles, and broader first-class artifact seeding. ### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1) @@ -410,15 +410,15 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - Tests: parallel-independent workflows via `asyncio.gather`; verify shared `ThrottleManager` keeps aggregate request rate within configured caps. - Can ship before, alongside, or after Phase 1. Hard dependency for Phase 4. -### Phase 2: Remove `allow_resize` +### Phase 2: Remove `allow_resize` (completed in #766) - (Done in #553) `allow_resize=True` in async mode emits a `DeprecationWarning` and falls back to sync. -- Update docs that still reference `allow_resize` (`docs/concepts/custom_columns.md`, `docs/plugins/example.md`, `docs/concepts/agent-rollout-ingestion.md`) to point at composite workflows. -- Remove resize code from sync engine (`_cell_resize_mode`, `_finalize_fan_out` resize branch, `replace_buffer` `allow_resize` param). -- Remove `_resolve_async_compatibility()` and its sync-fallback branch from `_build_async()`. -- Remove the `allow_resize` field from the config schema. -- Add fail-fast guard in `ProcessorRunner` for pre-batch row-count changes. -- Tests: verify rejection, migration path examples. +- (Done in #766) Update docs that still reference `allow_resize` to point at composite workflows. +- (Done in #766) Remove resize code from sync engine (`_cell_resize_mode`, `_finalize_fan_out` resize branch, `replace_buffer` `allow_resize` param). +- (Done in #766) Remove `_resolve_async_compatibility()` and its sync-fallback branch from `_build_async()`. +- (Done in #766) Remove the `allow_resize` field from the config schema. +- (Done in #766) Add fail-fast guard in `ProcessorRunner` for pre-batch row-count changes. +- (Done in #766) Tests: verify rejection, migration path examples. - Migration examples should prefer `output_processors` for inline processor-expressible transforms, or seeded processor-only configs when the transform deserves its own named stage. Raw `on_success` remains the escape hatch for arbitrary custom transforms. ### Phase 3: Stage-level resume @@ -433,7 +433,7 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the workflow layout. - Depends on artifact layout from phase 1. -**Status after stage-level resume slice:** Implemented `workflow.run(resume=...)`, compatible completed-stage reuse, matching partial-stage delegation to `DataDesigner.create(..., resume=ResumeMode.ALWAYS)`, downstream invalidation after changed or missing stages, callback output path checks, target stage materialization, explicit `rerun_from` invalidation, stage output overrides for review gates, and docs for `ResumeMode.IF_POSSIBLE` / `ResumeMode.ALWAYS`. Still deferred: DAG branches, `allow_resize` removal, config bundles, and broader first-class artifact seeding. +**Status after stage-level resume slice:** Implemented `workflow.run(resume=...)`, compatible completed-stage reuse, matching partial-stage delegation to `DataDesigner.create(..., resume=ResumeMode.ALWAYS)`, downstream invalidation after changed or missing stages, callback output path checks, target stage materialization, explicit `rerun_from` invalidation, stage output overrides for review gates, and docs for `ResumeMode.IF_POSSIBLE` / `ResumeMode.ALWAYS`. Still deferred: DAG branches, config bundles, and broader first-class artifact seeding. ### Phase 4: DAG-shaped stages with parallel branches