fix: async engine side-effect column propagation and collision resolution#509
fix: async engine side-effect column propagation and collision resolution#509andreatgretel wants to merge 3 commits intomainfrom
Conversation
…tion ExecutionGraph.set_side_effect() now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. This prevents false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages. AsyncTaskScheduler now includes side-effect columns in _instance_to_columns so their values are written to the RowGroupBufferManager and available to downstream prompt templates. Fixes #508
PR #509 Review: fix: async engine side-effect column propagation and collision resolutionSummaryThis PR fixes two bugs in the async engine's handling of
Files changed: 3 (37 additions, 3 deletions) FindingsCorrectness
Observations
TestingThe new test Gap: There is no unit test for the Style / Conventions
VerdictLGTM. This is a focused, well-reasoned bug fix. Both changes are correct and consistent with each other. The test coverage for |
Greptile SummaryThis PR fixes two bugs in the async engine's handling of
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py | Adds first-writer-wins semantics to set_side_effect() with a warning log on collision; resolve_side_effect() already correctly prefers real columns over side-effect aliases. Logic is sound and matches the sync-engine contract. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Introduces two generator-identity maps: _gen_instance_to_columns (real graph columns, used for completion tracking) and _gen_instance_to_columns_including_side_effects (adds side-effect columns, used only in the three buffer-write paths). All dispatch-dedup and completion-tracking call sites use the real-columns-only dict, so no side-effect column name is ever passed to CompletionTracker or get_strategy. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Adds test_side_effect_columns_separated_from_completion_tracking which asserts the structural invariant: side-effect column names are absent from _gen_instance_to_columns and present in _gen_instance_to_columns_including_side_effects. Correctly uses object.__setattr__ to inject a mock config with side_effect_columns. |
| packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py | Adds test_side_effect_collision_first_writer_wins which exercises the new warning log path and verifies that downstream edges resolve to the first-registered producer. Coverage is thorough. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["AsyncTaskScheduler.__init__\n(generators dict)"] --> B["_gen_instance_to_columns\n(real graph columns only)"]
A --> C["_gen_instance_to_columns_including_side_effects\n(real + side-effect columns)"]
B --> D["Completion Tracking\nmark_cell_complete /\nmark_row_range_complete\n(lines 706–711)"]
B --> E["Dispatch Dedup\n_dispatched.add/discard\n(lines 362, 383, 625)"]
C --> F["Buffer Writes\n_run_from_scratch / _run_cell / _run_batch\n(lines 772, 798, 822)"]
D --> G["CompletionTracker._validate_strategy\n→ graph.get_strategy(col)\n✓ Only registered columns passed"]
F --> H["RowGroupBufferManager.update_cell\n✓ Side-effect values written"]
Reviews (3): Last reviewed commit: "fix: warn on side-effect collision and c..." | Re-trigger Greptile
| for col, gen in generators.items(): | ||
| for side_effect_col in getattr(gen.config, "side_effect_columns", []): | ||
| if side_effect_col not in seen_cols: | ||
| instance_to_columns.setdefault(id(gen), []).append(side_effect_col) | ||
| seen_cols.add(side_effect_col) |
There was a problem hiding this comment.
Side-effect columns trigger
KeyError in CompletionTracker
Adding side-effect columns to _instance_to_columns causes them to be passed to mark_cell_complete / mark_row_range_complete in _execute_task_inner_impl (lines 702–707). Both methods call _validate_strategy, which calls graph.get_strategy(col). Since side-effect columns are registered only in _side_effect_map and never in _strategies, this raises KeyError at runtime whenever a generator with side-effect columns is executed by the async engine.
The buffer-write loops (in _run_from_scratch, _run_cell, _run_batch) guard with if col in result_df.columns and are correct. The completion-tracking loop has no such guard and will fail for every cell or batch task whose generator declares side-effect columns.
A minimal fix is to track which entries in _instance_to_columns are side-effect-only and skip them in the mark-complete loop — or store them in a separate _instance_to_side_effect_columns dict used only by the buffer-write paths.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 135-139
Comment:
**Side-effect columns trigger `KeyError` in `CompletionTracker`**
Adding side-effect columns to `_instance_to_columns` causes them to be passed to `mark_cell_complete` / `mark_row_range_complete` in `_execute_task_inner_impl` (lines 702–707). Both methods call `_validate_strategy`, which calls `graph.get_strategy(col)`. Since side-effect columns are registered only in `_side_effect_map` and never in `_strategies`, this raises `KeyError` at runtime whenever a generator with side-effect columns is executed by the async engine.
The buffer-write loops (in `_run_from_scratch`, `_run_cell`, `_run_batch`) guard with `if col in result_df.columns` and are correct. The completion-tracking loop has no such guard and will fail for every cell or batch task whose generator declares side-effect columns.
A minimal fix is to track which entries in `_instance_to_columns` are side-effect-only and skip them in the mark-complete loop — or store them in a separate `_instance_to_side_effect_columns` dict used only by the buffer-write paths.
How can I resolve this? If you propose a fix, please make it concise.…cheduler Side-effect columns added to _instance_to_columns caused KeyError in CompletionTracker._validate_strategy() because they are not registered in the execution graph. Split into _instance_to_write_columns (buffer writes, includes side-effects) and _instance_to_columns (completion tracking, real columns only).
| First-writer-wins: if a side-effect column is already mapped to a | ||
| different producer, the earlier mapping is kept. This matches sync | ||
| engine semantics where earlier consumers see the first producer's | ||
| value. | ||
| """ | ||
| if side_effect_col not in self._side_effect_map: | ||
| self._side_effect_map[side_effect_col] = producer |
There was a problem hiding this comment.
Isn't a side effect column 1:1 with its producer? In which scenario would multiple producers point to the same side effect column?
There was a problem hiding this comment.
You're right that in a correctly configured pipeline it should be 1:1. The collision scenario comes from Anonymizer's detection workflow - it has branching pipeline paths where different stages declare the same side-effect columns (e.g. _merged_tagged_text is declared by both prepare_validation_inputs and merge_and_build_candidates), but only one path is active per run. With last-writer-wins the graph resolves the wrong producer and creates a false cycle.
That said, this is an unusual pattern and silently ignoring the second registration isn't great. I'll add a warning log on collision so misconfigurations don't go unnoticed.
There was a problem hiding this comment.
in this example wouldn't the side effect columns be prepare_validation_inputs _merged_tagged_text and merge_and_build_candidates_merged_tagged_text? It's still 1:1 right?
| if side_effect_col not in seen_cols: | ||
| instance_to_write_columns.setdefault(id(gen), []).append(side_effect_col) | ||
| seen_cols.add(side_effect_col) | ||
| self._instance_to_write_columns = instance_to_write_columns |
There was a problem hiding this comment.
nit: Perhaps we can clarify this a bit with two name changes? Reads a little clearer in my mind at least...
_instance_to_columns -> _gen_instance_to_columns
_instance_to_write_columns -> _gen_instance_to_columns_including_side_effects
There was a problem hiding this comment.
Good call, that's clearer. I'll rename them:
_instance_to_columns->_gen_instance_to_columns_instance_to_write_columns->_gen_instance_to_columns_including_side_effects
Log a warning when multiple producers register the same side-effect column (first-writer-wins still applies). Rename _instance_to_columns and _instance_to_write_columns per review feedback for clarity.
Summary
Fix two bugs in the async engine's handling of
@custom_column_generatorside-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running withDATA_DESIGNER_ASYNC_ENGINE=1.Related Issue
Fixes #508
Changes
ExecutionGraph.set_side_effect()now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. Prevents falseDAGCircularDependencyErrorwhen multiple generators declare the same side-effect column at different pipeline stages.AsyncTaskSchedulernow includes side-effect columns in_instance_to_columnsso their values are written to theRowGroupBufferManagerand available to downstream prompt templates.Testing
make testpasses (1747 engine tests)