-
Notifications
You must be signed in to change notification settings - Fork 170
docs: add skip column config option for conditional column generation (#479) #480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+667
β0
Merged
Changes from 4 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
847375c
plan: add skip_when for conditional column generation (#479)
nabinchha 3f415c7
plan: remove HopChain example from skip_when plan
nabinchha 6ee4c70
plan: replace HopChain example with generic product review example
nabinchha 14ab39b
plan: add open questions on skip sentinel value and row filtering
nabinchha 351bc29
plan: major revision β SkipConfig model, sync engine support, decouplβ¦
nabinchha b481e73
Merge branch 'main' into nmulepati/feat/479-skip-when-plan
nabinchha 2219cd6
updates
nabinchha 9790f95
plan: document get_required_columns for skip propagation
nabinchha 7046378
plan: centralize __skipped__ handling in skip_provenance
nabinchha a157a7f
plan: align doc title with SkipConfig / skip.when
nabinchha c5d2dbc
plan: address review β delimiter validation, centralized error handliβ¦
nabinchha 75943d5
plan: add get_side_effect_columns accessor to execution graph spec
nabinchha File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,260 @@ | ||
| --- | ||
| date: 2026-03-30 | ||
| authors: | ||
| - nmulepati | ||
| issue: https://github.com/NVIDIA-NeMo/DataDesigner/issues/479 | ||
| --- | ||
|
|
||
| # Plan: `skip_when` β Conditional Column Generation | ||
|
|
||
| ## Problem | ||
|
|
||
| DataDesigner's DAG executes every column for every row unconditionally. In multi-stage synthesis pipelines, expensive downstream generation (LLM calls, segmentation, etc.) runs even when an earlier gate column indicates the row should be filtered out. | ||
|
|
||
| Today the only workarounds are: | ||
|
|
||
| 1. **Generate all columns unconditionally and post-filter** β wastes LLM calls on rows that will be discarded. | ||
| 2. **Split into multiple `DataDesigner.create()` calls** with intermediate filtering β loses single-pipeline ergonomics and forces the user to manage seed-dataset hand-offs. | ||
|
|
||
| ## Proposed Solution | ||
|
|
||
| Add a `skip_when` field to `SingleColumnConfig`. When its Jinja2 expression evaluates truthy for a row, the cell is set to `None` and the generator is never called. Skips auto-propagate through the DAG: downstream columns whose `required_columns` include a skipped cell also skip automatically. | ||
|
|
||
| Example: a pipeline that generates product reviews only for items in stock. The `sentiment_analysis` and `review` columns are expensive LLM calls that should be skipped for out-of-stock items: | ||
|
|
||
| ```python | ||
| config_builder.add_column( | ||
| name="in_stock", column_type="sampler", | ||
| sampler_type="bernoulli", params=BernoulliSamplerParams(p=0.7), | ||
| ) | ||
| config_builder.add_column( | ||
| name="sentiment_analysis", | ||
| column_type="llm-structured", | ||
| skip_when="{{ in_stock == 0 }}", | ||
| prompt="Analyze the sentiment of reviews for {{ product_name }}...", | ||
| ... | ||
| ) | ||
| # review depends on sentiment_analysis β auto-skips when sentiment_analysis is skipped | ||
| config_builder.add_column( | ||
| name="review", | ||
| column_type="llm-text", | ||
| prompt="Write a {{ sentiment_analysis.tone }} review for {{ product_name }}...", | ||
| ) | ||
| ``` | ||
|
|
||
| Skipped rows stay in the output (row count is preserved). Skipped cells contain `None`. | ||
|
nabinchha marked this conversation as resolved.
Outdated
|
||
|
|
||
| ## Design Decisions | ||
|
|
||
| | Decision | Choice | Rationale | | ||
| |---|---|---| | ||
| | Where does `skip_when` live? | `SingleColumnConfig` base class | Cross-cutting; applies to all column types | | ||
|
nabinchha marked this conversation as resolved.
Outdated
|
||
| | What happens to skipped cells? | Set to `None`, row stays in output | Rows are not dropped β users can post-filter or inspect | | ||
| | Do downstream columns auto-skip? | Yes, always | If upstream data is missing, generating downstream is wasteful and error-prone | | ||
| | How are `skip_when` columns ordered in the DAG? | `skip_when_columns` extracted from the expression become DAG edges | Ensures the gate column is generated before the guarded column | | ||
| | How does this interact with `_records_to_drop`? | Independently β skip does not drop rows | Skip produces `None`; drop removes the row entirely | | ||
|
|
||
| --- | ||
|
|
||
| ## Implementation | ||
|
|
||
| ### 1. Config: `SingleColumnConfig` β add field + property | ||
|
|
||
| **File:** `packages/data-designer-config/src/data_designer/config/base.py` | ||
|
|
||
| Add to `SingleColumnConfig` (after `allow_resize`): | ||
|
|
||
| ```python | ||
| skip_when: str | None = Field( | ||
| default=None, | ||
| description="Jinja2 expression; when truthy, skip generation for this row (cell set to None).", | ||
| ) | ||
| ``` | ||
|
|
||
| Add a `@field_validator("skip_when")` that validates Jinja2 syntax. **Critical constraint:** `base.py` line 4 prohibits `data_designer.*` imports, so use `jinja2` directly: | ||
|
|
||
| ```python | ||
| @field_validator("skip_when") | ||
| @classmethod | ||
| def _validate_skip_when(cls, v: str | None) -> str | None: | ||
| if v is not None: | ||
| from jinja2.sandbox import ImmutableSandboxedEnvironment | ||
| ImmutableSandboxedEnvironment().parse(v) | ||
| return v | ||
| ``` | ||
|
|
||
| Add a concrete property `skip_when_columns` (not abstract β base provides default): | ||
|
|
||
| ```python | ||
| @property | ||
| def skip_when_columns(self) -> list[str]: | ||
| if self.skip_when is None: | ||
| return [] | ||
| from jinja2 import meta | ||
| from jinja2.sandbox import ImmutableSandboxedEnvironment | ||
| env = ImmutableSandboxedEnvironment() | ||
| ast = env.parse(self.skip_when) | ||
| return list(meta.find_undeclared_variables(ast)) | ||
| ``` | ||
|
nabinchha marked this conversation as resolved.
nabinchha marked this conversation as resolved.
|
||
|
|
||
| ### 2. DAG: add `skip_when_columns` edges | ||
|
|
||
| #### 2a. `dag.py` β `topologically_sort_column_configs()` | ||
|
|
||
| **File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py` | ||
|
|
||
| After the `for req_col_name in col.required_columns:` block (line 35-47), add a matching block for `col.skip_when_columns` that adds edges using the same pattern (direct column match + side-effect resolution). | ||
|
|
||
| #### 2b. `execution_graph.py` β `ExecutionGraph.create()` | ||
|
|
||
| **File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py` | ||
|
|
||
| In the second pass (line 78-88), after the `for req in sub.required_columns:` edge loop, add: | ||
|
|
||
| ```python | ||
| for skip_col in sub.skip_when_columns: | ||
| resolved = graph.resolve_side_effect(skip_col) | ||
| if resolved not in known_columns: | ||
| continue # seed/sampler columns not in graph | ||
| if resolved == name: | ||
| continue | ||
| graph.add_edge(upstream=resolved, downstream=name) | ||
| ``` | ||
|
nabinchha marked this conversation as resolved.
|
||
|
|
||
| Store skip metadata on the graph for runtime access: | ||
|
|
||
| - Add `_skip_when: dict[str, str]` to `__init__` | ||
| - Populate during first pass: `graph._skip_when[name] = sub.skip_when` (when not None) | ||
| - Add accessor: `get_skip_when(column) -> str | None` | ||
|
|
||
| ### 3. New utility: `skip_evaluator.py` | ||
|
|
||
| **New file:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_evaluator.py` | ||
|
|
||
| Two pure functions, no engine state dependencies: | ||
|
|
||
| ```python | ||
| def evaluate_skip_when(expression: str, record: dict) -> bool: | ||
|
nabinchha marked this conversation as resolved.
|
||
| """Render expression against record; return True if truthy.""" | ||
|
|
||
| def should_skip_by_propagation( | ||
|
nabinchha marked this conversation as resolved.
|
||
| required_columns: list[str], | ||
| skipped_columns_for_row: set[str], | ||
| ) -> bool: | ||
| """Return True if any required column was skipped.""" | ||
| ``` | ||
|
|
||
| `evaluate_skip_when` wraps expression in `{{ }}`, renders via `ImmutableSandboxedEnvironment`, checks truthiness (result not in `("", "false", "0", "none", "null")`). | ||
|
nabinchha marked this conversation as resolved.
Outdated
nabinchha marked this conversation as resolved.
Outdated
|
||
|
|
||
| `should_skip_by_propagation` returns `True` if the intersection of `required_columns` and `skipped_columns_for_row` is non-empty. | ||
|
|
||
| ### 4. Sync engine: `DatasetBuilder` | ||
|
|
||
| **File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py` | ||
|
|
||
| #### 4a. Add state | ||
| - `self._skipped_cells: dict[int, set[str]] = {}` β buffer index to skipped column names | ||
| - Clear at start of `_run_batch()` (line ~428) | ||
|
|
||
| #### 4b. Add helper methods | ||
| - `_should_skip_cell(config, record_index, record) -> bool` β checks propagation (any upstream skipped?) then evaluates `skip_when` expression | ||
| - `_mark_cell_skipped(record_index, column_name, side_effect_columns, record)` β records skip, writes `None` to buffer | ||
|
|
||
| #### 4c. Modify `_fan_out_with_threads()` (line 638) | ||
|
nabinchha marked this conversation as resolved.
Outdated
|
||
| Before `executor.submit()`, check `_should_skip_cell()`. If skip: write `None` for column + side effects, record success on progress tracker, `continue`. | ||
|
|
||
| #### 4d. Modify `_fan_out_with_async()` (line 621) | ||
| Convert list comprehension to explicit loop with same skip check. | ||
|
|
||
| #### 4e. Modify `_run_full_column_generator()` (line 503) | ||
|
nabinchha marked this conversation as resolved.
Outdated
|
||
| After `generator.generate()` returns, iterate records. For each row where `_should_skip_cell()` is true, overwrite that column + side effects with `None` and record in `_skipped_cells`. Replace buffer with updated records. | ||
|
|
||
|
nabinchha marked this conversation as resolved.
Outdated
|
||
| ### 5. Async engine: `AsyncTaskScheduler` | ||
|
|
||
| #### 5a. `CompletionTracker` β add skip tracking | ||
|
|
||
| **File:** `packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py` | ||
|
|
||
| - Add `_skipped: dict[int, dict[int, set[str]]]` (rg -> ri -> column names) | ||
| - `mark_cell_skipped(column, row_group, row_index)` | ||
| - `get_skipped_columns_for_row(row_group, row_index) -> set[str]` | ||
|
|
||
| #### 5b. Modify `_run_cell()` (line 767 of `async_scheduler.py`) | ||
|
|
||
| After the `is_dropped` guard (line 772), add skip evaluation: | ||
|
|
||
| 1. Get `skipped_cols` from tracker for this row | ||
| 2. Check `should_skip_by_propagation` using `config.required_columns` and `skipped_cols` | ||
| 3. If not propagation-skipped, check `evaluate_skip_when` using `self._graph.get_skip_when(task.column)` against `row_data` | ||
| 4. If skip: write `None` to buffer for all output cols, call `tracker.mark_cell_skipped()`, return `None` | ||
|
|
||
| The caller (`_execute_task_inner_impl`) still marks the task complete β skipped cells ARE complete (they produced `None`). Downstream tasks get unblocked and will themselves check propagation. | ||
|
|
||
| #### 5c. Modify `_run_batch()` (line 792 of `async_scheduler.py`) | ||
|
|
||
| After generation, iterate rows. For skipped rows, overwrite with `None` and mark in tracker. Same pattern as sync path Step 4e. | ||
|
|
||
| ### 6. Expression generator: defensive `None` guard | ||
|
|
||
| **File:** `packages/data-designer-engine/src/data_designer/engine/column_generators/generators/expression.py` | ||
|
|
||
| In `generate()` (line 24), inside the per-record loop: if any `required_columns` value is `None`, set `record[self.config.name] = None` instead of rendering the Jinja2 template (which would crash on `None` arithmetic like `{{ price * 1.1 }}`). | ||
|
|
||
| ### 7. Validation: check `skip_when` references | ||
|
|
||
| **File:** `packages/data-designer-engine/src/data_designer/engine/validation.py` | ||
|
|
||
| - Add `SKIP_WHEN_REFERENCE_MISSING` to `ViolationType` enum | ||
| - Add `validate_skip_when_references(columns, allowed_references)` β iterates columns with `skip_when`, checks each `skip_when_columns` entry exists in `allowed_references` | ||
| - Wire into `validate_data_designer_config()` | ||
|
|
||
| --- | ||
|
|
||
| ## Files Modified | ||
|
|
||
| | File | Change | | ||
| |---|---| | ||
| | `config/base.py` | `skip_when` field + validator + `skip_when_columns` property | | ||
| | `engine/.../dag.py` | Add `skip_when_columns` edges in topological sort | | ||
| | `engine/.../execution_graph.py` | Add `skip_when_columns` edges + skip metadata storage + accessor | | ||
| | `engine/.../skip_evaluator.py` | **NEW** β `evaluate_skip_when()`, `should_skip_by_propagation()` | | ||
| | `engine/.../dataset_builder.py` | `_skipped_cells` state, `_should_skip_cell()`, modify 3 fan-out/generation methods | | ||
| | `engine/.../async_scheduler.py` | Skip checks in `_run_cell()` and `_run_batch()` | | ||
| | `engine/.../completion_tracker.py` | `_skipped` dict + `mark_cell_skipped` + `get_skipped_columns_for_row` | | ||
| | `engine/.../expression.py` | Defensive `None` guard when upstream is null | | ||
| | `engine/validation.py` | `validate_skip_when_references()` | | ||
|
|
||
| --- | ||
|
|
||
| ## Open Questions | ||
|
|
||
| ### 1. What value should skipped cells contain? | ||
|
nabinchha marked this conversation as resolved.
|
||
|
|
||
| The current plan sets skipped cells to `None` (which becomes `NaN`/`pd.NA` in the DataFrame). Alternatives: | ||
|
|
||
| - **`None`** β simple, standard pandas null. Downside: indistinguishable from a legitimate `None` produced by a generator (e.g., an LLM that returns no output). | ||
| - **Sentinel value** (e.g., `SKIPPED = "__SKIPPED__"` or a dedicated `SkippedValue` type) β distinguishable from real nulls. Downside: leaks into user-facing DataFrames unless stripped at output time; complicates type handling. | ||
| - **`pd.NA` with metadata** β store skip status in a sidecar structure (the `_skipped_cells` / `CompletionTracker._skipped` dicts already track this) and write `None` to the cell. Users who need to distinguish skip-null from real-null can inspect the metadata. | ||
|
|
||
| Recommendation: Use `None` in the cell, track skip provenance in engine-internal state. If users need to distinguish, expose a `results.load_skip_mask()` or similar accessor. | ||
|
nabinchha marked this conversation as resolved.
Outdated
|
||
|
|
||
| ### 2. Should there be an option to auto-remove skipped rows from the final output? | ||
|
|
||
| Many pipelines want to discard rows where a gate column failed β they don't need the skipped rows in the output at all. Options: | ||
|
|
||
| - **Post-hoc filtering by the user** β `df = df.dropna(subset=["categories"])`. Simple but manual. | ||
| - **`drop_skipped_rows` option on `DataDesigner.create()`** β automatically remove any row where at least one column was skipped before writing to disk. Clean UX but may surprise users who want to inspect skipped rows. | ||
| - **A built-in `DropSkippedRowsProcessorConfig` processor** β runs as a post-generation processor that removes rows with any skipped cells. Fits the existing processor model and is opt-in. | ||
| - **`drop_when_skipped=True` on individual column configs** β drop the row if *this specific column* was skipped. More granular than a global flag. | ||
|
|
||
| Recommendation: Start with a `DropSkippedRowsProcessorConfig` processor β it's opt-in, composable with other processors, and doesn't require new parameters on `create()` or column configs. | ||
|
|
||
| --- | ||
|
|
||
| ## Verification | ||
|
|
||
| 1. **Unit tests:** Config field defaults, Jinja2 validation, `skip_when_columns` extraction, DAG edge creation, skip evaluator truthiness, CompletionTracker skip tracking | ||
| 2. **Integration tests (sync):** Column with `skip_when` produces `None` for matching rows; downstream auto-skips; row count preserved (no drops) | ||
| 3. **Integration tests (async):** Same scenarios under `DATA_DESIGNER_ASYNC_ENGINE=1` | ||
| 4. **Validation tests:** Unknown column in `skip_when` produces ERROR violation | ||
| 5. **Run:** `make check-all-fix` + `make test` + `make update-license-headers` | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.