Skip to content

fix: async engine side-effect column propagation and collision resolution#509

Open
andreatgretel wants to merge 3 commits intomainfrom
andreatgretel/fix/async-side-effect-columns
Open

fix: async engine side-effect column propagation and collision resolution#509
andreatgretel wants to merge 3 commits intomainfrom
andreatgretel/fix/async-side-effect-columns

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

Summary

Fix two bugs in the async engine's handling of @custom_column_generator side-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running with DATA_DESIGNER_ASYNC_ENGINE=1.

Related Issue

Fixes #508

Changes

  • ExecutionGraph.set_side_effect() now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. Prevents false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages.
  • AsyncTaskScheduler now includes side-effect columns in _instance_to_columns so their values are written to the RowGroupBufferManager and available to downstream prompt templates.

Testing

  • make test passes (1747 engine tests)
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

…tion

ExecutionGraph.set_side_effect() now uses first-writer-wins instead of
last-writer-wins, matching sync engine semantics where earlier consumers
see the first producer's value. This prevents false DAGCircularDependencyError
when multiple generators declare the same side-effect column at different
pipeline stages.

AsyncTaskScheduler now includes side-effect columns in _instance_to_columns
so their values are written to the RowGroupBufferManager and available to
downstream prompt templates.

Fixes #508
@andreatgretel andreatgretel requested a review from a team as a code owner April 8, 2026 15:08
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

PR #509 Review: fix: async engine side-effect column propagation and collision resolution

Summary

This PR fixes two bugs in the async engine's handling of @custom_column_generator side-effect columns that prevented multi-stage pipelines (e.g., Anonymizer's detection workflow) from running with DATA_DESIGNER_ASYNC_ENGINE=1:

  1. ExecutionGraph.set_side_effect() — changed from last-writer-wins to first-writer-wins semantics, preventing false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages.
  2. AsyncTaskScheduler.__init__ — now includes side-effect columns in _instance_to_columns so their values are written to RowGroupBufferManager and available to downstream prompt templates.

Files changed: 3 (37 additions, 3 deletions)

Findings

Correctness

execution_graph.py — first-writer-wins (lines 107-116): The change is clean and correct. The set_side_effect method now guards with if side_effect_col not in self._side_effect_map, which implements first-writer-wins. Since ExecutionGraph.create() iterates column_configs in declared order (line 56-67), and the docstring says this matches sync engine semantics, this is the right fix. The previous last-writer-wins behavior would silently re-map a side-effect to a later producer, which could create a dependency edge back to an earlier stage and trigger a false cycle detection.

async_scheduler.py — side-effect propagation (lines 130-139): The fix correctly adds side-effect columns to _instance_to_columns. The seen_cols set prevents the same side-effect column from being added to multiple generators' output lists when there's a collision (consistent with the first-writer-wins semantics in the graph). The getattr(gen.config, "side_effect_columns", []) pattern is safe — configs that don't define side_effect_columns will return [].

Observations

  1. Two-loop pattern in async_scheduler.py (lines 132-139): The code uses two separate loops over generators.items() — the first populates primary columns, the second adds side-effects. This is intentional and correct: the seen_cols set must be fully populated with all primary column names before side-effect deduplication runs, otherwise a side-effect column that shadows a primary column could slip through. Good design choice.

  2. _instance_to_columns usage is comprehensive. I checked all 6 usage sites of _instance_to_columns in async_scheduler.py (lines ~358, ~379, ~621, ~666, ~768, ~794). All sites iterate output_cols and check if col in result_df.columns or if col in result before writing. This means adding side-effect columns to the list is safe — if a generator doesn't produce a side-effect value for a given invocation, it's simply skipped.

  3. No logging for silently ignored collisions. When set_side_effect silently drops a second registration, there's no debug-level log. This is fine for correctness, but a logger.debug(...) could help users troubleshoot pipelines. Minor — not a blocker.

Testing

The new test test_side_effect_collision_first_writer_wins is well-structured: it registers two producers for the same side-effect, verifies the first wins, and checks downstream edge resolution. The existing test_side_effect_name_collision_prefers_real_column test (already passing) provides complementary coverage for the real-column-wins-over-side-effect case.

Gap: There is no unit test for the AsyncTaskScheduler side-effect propagation change. Testing this would require mocking generators with config.side_effect_columns, which is more of an integration concern. The PR description mentions 1747 engine tests pass, which likely covers this path indirectly. Low risk, but worth noting.

Style / Conventions

  • Code follows the project's absolute import convention.
  • Type annotations are present on all new variables (dict[int, list[str]], set[str]).
  • Docstring on set_side_effect clearly explains the first-writer-wins contract and its rationale.
  • No unnecessary changes or scope creep.

Verdict

LGTM. This is a focused, well-reasoned bug fix. Both changes are correct and consistent with each other. The test coverage for ExecutionGraph is solid. The only minor gap is the lack of a direct unit test for the AsyncTaskScheduler propagation change, but the overall test suite passing mitigates this risk. Ship it.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 8, 2026

Greptile Summary

This PR fixes two bugs in the async engine's handling of @custom_column_generator side-effect columns. ExecutionGraph.set_side_effect() is changed from last-writer-wins to first-writer-wins to prevent false DAGCircularDependencyError when multiple pipeline stages declare the same side-effect column. AsyncTaskScheduler is reworked to maintain two dicts: _gen_instance_to_columns for completion tracking (real columns only) and _gen_instance_to_columns_including_side_effects for buffer writes (includes side-effect columns), resolving the previously-flagged KeyError in CompletionTracker._validate_strategy.

Confidence Score: 5/5

Safe to merge — both bugs are correctly fixed and covered by targeted unit tests.

The two-dict split in the scheduler cleanly resolves the KeyError path raised in prior review without breaking any existing dispatch or dedup logic. The first-writer-wins change in set_side_effect is minimal, well-tested, and matches sync-engine semantics. No P0 or P1 findings remain.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py Adds first-writer-wins semantics to set_side_effect() with a warning log on collision; resolve_side_effect() already correctly prefers real columns over side-effect aliases. Logic is sound and matches the sync-engine contract.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Introduces two generator-identity maps: _gen_instance_to_columns (real graph columns, used for completion tracking) and _gen_instance_to_columns_including_side_effects (adds side-effect columns, used only in the three buffer-write paths). All dispatch-dedup and completion-tracking call sites use the real-columns-only dict, so no side-effect column name is ever passed to CompletionTracker or get_strategy.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Adds test_side_effect_columns_separated_from_completion_tracking which asserts the structural invariant: side-effect column names are absent from _gen_instance_to_columns and present in _gen_instance_to_columns_including_side_effects. Correctly uses object.__setattr__ to inject a mock config with side_effect_columns.
packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py Adds test_side_effect_collision_first_writer_wins which exercises the new warning log path and verifies that downstream edges resolve to the first-registered producer. Coverage is thorough.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["AsyncTaskScheduler.__init__\n(generators dict)"] --> B["_gen_instance_to_columns\n(real graph columns only)"]
    A --> C["_gen_instance_to_columns_including_side_effects\n(real + side-effect columns)"]

    B --> D["Completion Tracking\nmark_cell_complete /\nmark_row_range_complete\n(lines 706–711)"]
    B --> E["Dispatch Dedup\n_dispatched.add/discard\n(lines 362, 383, 625)"]

    C --> F["Buffer Writes\n_run_from_scratch / _run_cell / _run_batch\n(lines 772, 798, 822)"]

    D --> G["CompletionTracker._validate_strategy\n→ graph.get_strategy(col)\n✓ Only registered columns passed"]
    F --> H["RowGroupBufferManager.update_cell\n✓ Side-effect values written"]
Loading

Reviews (3): Last reviewed commit: "fix: warn on side-effect collision and c..." | Re-trigger Greptile

Comment on lines +135 to +139
for col, gen in generators.items():
for side_effect_col in getattr(gen.config, "side_effect_columns", []):
if side_effect_col not in seen_cols:
instance_to_columns.setdefault(id(gen), []).append(side_effect_col)
seen_cols.add(side_effect_col)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Side-effect columns trigger KeyError in CompletionTracker

Adding side-effect columns to _instance_to_columns causes them to be passed to mark_cell_complete / mark_row_range_complete in _execute_task_inner_impl (lines 702–707). Both methods call _validate_strategy, which calls graph.get_strategy(col). Since side-effect columns are registered only in _side_effect_map and never in _strategies, this raises KeyError at runtime whenever a generator with side-effect columns is executed by the async engine.

The buffer-write loops (in _run_from_scratch, _run_cell, _run_batch) guard with if col in result_df.columns and are correct. The completion-tracking loop has no such guard and will fail for every cell or batch task whose generator declares side-effect columns.

A minimal fix is to track which entries in _instance_to_columns are side-effect-only and skip them in the mark-complete loop — or store them in a separate _instance_to_side_effect_columns dict used only by the buffer-write paths.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 135-139

Comment:
**Side-effect columns trigger `KeyError` in `CompletionTracker`**

Adding side-effect columns to `_instance_to_columns` causes them to be passed to `mark_cell_complete` / `mark_row_range_complete` in `_execute_task_inner_impl` (lines 702–707). Both methods call `_validate_strategy`, which calls `graph.get_strategy(col)`. Since side-effect columns are registered only in `_side_effect_map` and never in `_strategies`, this raises `KeyError` at runtime whenever a generator with side-effect columns is executed by the async engine.

The buffer-write loops (in `_run_from_scratch`, `_run_cell`, `_run_batch`) guard with `if col in result_df.columns` and are correct. The completion-tracking loop has no such guard and will fail for every cell or batch task whose generator declares side-effect columns.

A minimal fix is to track which entries in `_instance_to_columns` are side-effect-only and skip them in the mark-complete loop — or store them in a separate `_instance_to_side_effect_columns` dict used only by the buffer-write paths.

How can I resolve this? If you propose a fix, please make it concise.

…cheduler

Side-effect columns added to _instance_to_columns caused KeyError in
CompletionTracker._validate_strategy() because they are not registered
in the execution graph. Split into _instance_to_write_columns (buffer
writes, includes side-effects) and _instance_to_columns (completion
tracking, real columns only).
Comment on lines +110 to +116
First-writer-wins: if a side-effect column is already mapped to a
different producer, the earlier mapping is kept. This matches sync
engine semantics where earlier consumers see the first producer's
value.
"""
if side_effect_col not in self._side_effect_map:
self._side_effect_map[side_effect_col] = producer
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a side effect column 1:1 with its producer? In which scenario would multiple producers point to the same side effect column?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that in a correctly configured pipeline it should be 1:1. The collision scenario comes from Anonymizer's detection workflow - it has branching pipeline paths where different stages declare the same side-effect columns (e.g. _merged_tagged_text is declared by both prepare_validation_inputs and merge_and_build_candidates), but only one path is active per run. With last-writer-wins the graph resolves the wrong producer and creates a false cycle.

That said, this is an unusual pattern and silently ignoring the second registration isn't great. I'll add a warning log on collision so misconfigurations don't go unnoticed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this example wouldn't the side effect columns be prepare_validation_inputs _merged_tagged_text and merge_and_build_candidates_merged_tagged_text? It's still 1:1 right?

if side_effect_col not in seen_cols:
instance_to_write_columns.setdefault(id(gen), []).append(side_effect_col)
seen_cols.add(side_effect_col)
self._instance_to_write_columns = instance_to_write_columns
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Perhaps we can clarify this a bit with two name changes? Reads a little clearer in my mind at least...
_instance_to_columns -> _gen_instance_to_columns
_instance_to_write_columns -> _gen_instance_to_columns_including_side_effects

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, that's clearer. I'll rename them:

  • _instance_to_columns -> _gen_instance_to_columns
  • _instance_to_write_columns -> _gen_instance_to_columns_including_side_effects

Log a warning when multiple producers register the same side-effect
column (first-writer-wins still applies). Rename _instance_to_columns
and _instance_to_write_columns per review feedback for clarity.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: async engine drops side-effect column values and silently misresolves collisions

2 participants