From b88508dab590d0433fe396d1df1ddb5028229f7d Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 8 Apr 2026 11:15:39 -0300 Subject: [PATCH 1/5] fix: async engine side-effect column propagation and collision resolution ExecutionGraph.set_side_effect() now uses first-writer-wins instead of last-writer-wins, matching sync engine semantics where earlier consumers see the first producer's value. This prevents false DAGCircularDependencyError when multiple generators declare the same side-effect column at different pipeline stages. AsyncTaskScheduler now includes side-effect columns in _instance_to_columns so their values are written to the RowGroupBufferManager and available to downstream prompt templates. Fixes #508 --- .../engine/dataset_builders/async_scheduler.py | 11 ++++++++++- .../dataset_builders/utils/execution_graph.py | 11 +++++++++-- .../utils/test_execution_graph.py | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index 9795afb70..a18f73a26 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -124,10 +124,19 @@ def __init__( self._disable_early_shutdown = disable_early_shutdown self._early_shutdown = False - # Multi-column dedup: group output columns by generator identity + # Multi-column dedup: group output columns by generator identity. + # Include side-effect columns so their values are written to the + # buffer and available to downstream prompt templates. instance_to_columns: dict[int, list[str]] = {} + seen_cols: set[str] = set() for col, gen in generators.items(): instance_to_columns.setdefault(id(gen), []).append(col) + seen_cols.add(col) + for col, gen in generators.items(): + for side_effect_col in getattr(gen.config, "side_effect_columns", []): + if side_effect_col not in seen_cols: + instance_to_columns.setdefault(id(gen), []).append(side_effect_col) + seen_cols.add(side_effect_col) self._instance_to_columns = instance_to_columns # Stateful generator tracking: instance_id → asyncio.Lock diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py index 29db09c83..84ade9303 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py @@ -105,8 +105,15 @@ def add_edge(self, upstream: str, downstream: str) -> None: self._downstream.setdefault(upstream, set()).add(downstream) def set_side_effect(self, side_effect_col: str, producer: str) -> None: - """Map a side-effect column name to its producing column.""" - self._side_effect_map[side_effect_col] = producer + """Map a side-effect column name to its producing column. + + First-writer-wins: if a side-effect column is already mapped to a + different producer, the earlier mapping is kept. This matches sync + engine semantics where earlier consumers see the first producer's + value. + """ + if side_effect_col not in self._side_effect_map: + self._side_effect_map[side_effect_col] = producer def resolve_side_effect(self, column: str) -> str: """Resolve a column name through the side-effect map. diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py index 9d2fa69c4..3826bfa92 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py @@ -156,6 +156,24 @@ def test_side_effect_name_collision_prefers_real_column() -> None: assert graph.get_downstream_columns("summary") == set() +def test_side_effect_collision_first_writer_wins() -> None: + """When two producers declare the same side-effect, the first registration wins.""" + graph = ExecutionGraph() + graph.add_column("producer_a", GenerationStrategy.CELL_BY_CELL) + graph.add_column("producer_b", GenerationStrategy.CELL_BY_CELL) + graph.add_column("consumer", GenerationStrategy.CELL_BY_CELL) + + graph.set_side_effect("shared_se", "producer_a") + graph.set_side_effect("shared_se", "producer_b") # should be ignored + + assert graph.resolve_side_effect("shared_se") == "producer_a" + + # consumer depends on shared_se -> should resolve to producer_a, not producer_b + resolved = graph.resolve_side_effect("shared_se") + graph.add_edge(upstream=resolved, downstream="consumer") + assert graph.get_upstream_columns("consumer") == {"producer_a"} + + # -- Validation tests ------------------------------------------------------- From c378684c386dc5492d5c3425c516c0fda1ad99df Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 9 Apr 2026 01:10:54 +0000 Subject: [PATCH 2/5] fix: separate side-effect columns from completion tracking in async scheduler Side-effect columns added to _instance_to_columns caused KeyError in CompletionTracker._validate_strategy() because they are not registered in the execution graph. Split into _instance_to_write_columns (buffer writes, includes side-effects) and _instance_to_columns (completion tracking, real columns only). --- .../dataset_builders/async_scheduler.py | 33 +++++++------- .../dataset_builders/test_async_scheduler.py | 44 +++++++++++++++++++ 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index a18f73a26..20bc1844a 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -125,19 +125,22 @@ def __init__( self._early_shutdown = False # Multi-column dedup: group output columns by generator identity. - # Include side-effect columns so their values are written to the - # buffer and available to downstream prompt templates. + # _instance_to_columns holds only real (graph-registered) columns and + # is used for completion tracking. _instance_to_write_columns extends + # that with side-effect columns for buffer writes only. instance_to_columns: dict[int, list[str]] = {} - seen_cols: set[str] = set() for col, gen in generators.items(): instance_to_columns.setdefault(id(gen), []).append(col) - seen_cols.add(col) + self._instance_to_columns = instance_to_columns + + seen_cols: set[str] = {col for col in generators} + instance_to_write_columns: dict[int, list[str]] = {k: list(v) for k, v in instance_to_columns.items()} for col, gen in generators.items(): for side_effect_col in getattr(gen.config, "side_effect_columns", []): if side_effect_col not in seen_cols: - instance_to_columns.setdefault(id(gen), []).append(side_effect_col) + instance_to_write_columns.setdefault(id(gen), []).append(side_effect_col) seen_cols.add(side_effect_col) - self._instance_to_columns = instance_to_columns + self._instance_to_write_columns = instance_to_write_columns # Stateful generator tracking: instance_id → asyncio.Lock self._stateful_locks: dict[int, asyncio.Lock] = {} @@ -763,10 +766,10 @@ async def _run_from_scratch(self, task: Task, generator: ColumnGenerator) -> Any else: result_df = await generator.agenerate(lazy.pd.DataFrame()) - # Write results to buffer + # Write results to buffer (include side-effect columns) if self._buffer_manager is not None: - output_cols = self._instance_to_columns.get(id(generator), [task.column]) - for col in output_cols: + write_cols = self._instance_to_write_columns.get(id(generator), [task.column]) + for col in write_cols: if col in result_df.columns: values = result_df[col].tolist() self._buffer_manager.update_batch(task.row_group, col, values) @@ -789,10 +792,10 @@ async def _run_cell(self, task: Task, generator: ColumnGenerator) -> Any: result = await generator.agenerate(row_data) - # Write back to buffer + # Write back to buffer (include side-effect columns) if self._buffer_manager is not None and not self._tracker.is_dropped(task.row_group, task.row_index): - output_cols = self._instance_to_columns.get(id(generator), [task.column]) - for col in output_cols: + write_cols = self._instance_to_write_columns.get(id(generator), [task.column]) + for col in write_cols: if col in result: self._buffer_manager.update_cell(task.row_group, task.row_index, col, result[col]) @@ -813,9 +816,9 @@ async def _run_batch(self, task: Task, generator: ColumnGenerator) -> Any: result_df = await generator.agenerate(batch_df) - # Merge result columns back to buffer + # Merge result columns back to buffer (include side-effect columns) if self._buffer_manager is not None: - output_cols = self._instance_to_columns.get(id(generator), [task.column]) + write_cols = self._instance_to_write_columns.get(id(generator), [task.column]) active_rows = rg_size - len(pre_dropped) if len(result_df) != active_rows: raise ValueError( @@ -828,7 +831,7 @@ async def _run_batch(self, task: Task, generator: ColumnGenerator) -> Any: continue # Skip writing to rows dropped by concurrent tasks during the await if not self._buffer_manager.is_dropped(task.row_group, ri): - for col in output_cols: + for col in write_cols: if col in result_df.columns: self._buffer_manager.update_cell(task.row_group, ri, col, result_df.iloc[result_idx][col]) result_idx += 1 diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index dd4cbca9b..e9bfc818d 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -1430,3 +1430,47 @@ async def test_scheduler_rg_semaphore_deadlock_with_transient_failures() -> None assert tracker.is_row_group_complete(0, 2, ["seed", "col"]) assert tracker.is_row_group_complete(1, 2, ["seed", "col"]) + + +def test_side_effect_columns_separated_from_completion_tracking() -> None: + """Side-effect columns must appear in _instance_to_write_columns (buffer writes) + but NOT in _instance_to_columns (completion tracking), because they are not + registered in the execution graph and would cause KeyError in CompletionTracker. + """ + graph = ExecutionGraph() + graph.add_column("seed", GenerationStrategy.FULL_COLUMN) + graph.add_column("primary", GenerationStrategy.CELL_BY_CELL) + graph.add_edge(upstream="seed", downstream="primary") + + row_groups = [(0, 2)] + tracker = CompletionTracker.with_graph(graph, row_groups) + + provider = _mock_provider() + seed_gen = MockSeedGenerator(config=_expr_config("seed"), resource_provider=provider) + cell_gen = MockCellGenerator(config=_expr_config("primary"), resource_provider=provider) + # Replace the config with a mock that reports side-effect columns. + mock_config = MagicMock() + mock_config.side_effect_columns = ["side_a", "side_b"] + object.__setattr__(cell_gen, "_config", mock_config) + + generators: dict[str, ColumnGenerator] = {"seed": seed_gen, "primary": cell_gen} + + scheduler = AsyncTaskScheduler( + generators=generators, + graph=graph, + tracker=tracker, + row_groups=row_groups, + ) + + cell_id = id(cell_gen) + + # Completion tracking dict: only real columns + assert "side_a" not in scheduler._instance_to_columns.get(cell_id, []) + assert "side_b" not in scheduler._instance_to_columns.get(cell_id, []) + assert "primary" in scheduler._instance_to_columns.get(cell_id, []) + + # Buffer write dict: includes side-effect columns + write_cols = scheduler._instance_to_write_columns.get(cell_id, []) + assert "primary" in write_cols + assert "side_a" in write_cols + assert "side_b" in write_cols From ece8f8c40e4bd3722a1a7d4f37f10d94a5bf3ba0 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Fri, 10 Apr 2026 18:26:04 +0000 Subject: [PATCH 3/5] fix: warn on side-effect collision and clarify scheduler column maps Log a warning when multiple producers register the same side-effect column (first-writer-wins still applies). Rename _instance_to_columns and _instance_to_write_columns per review feedback for clarity. --- .../dataset_builders/async_scheduler.py | 33 ++++++++++--------- .../dataset_builders/utils/execution_graph.py | 10 ++++++ .../dataset_builders/test_async_scheduler.py | 15 +++++---- .../utils/test_execution_graph.py | 11 +++++-- 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index 20bc1844a..db33b2ace 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -125,22 +125,23 @@ def __init__( self._early_shutdown = False # Multi-column dedup: group output columns by generator identity. - # _instance_to_columns holds only real (graph-registered) columns and - # is used for completion tracking. _instance_to_write_columns extends - # that with side-effect columns for buffer writes only. - instance_to_columns: dict[int, list[str]] = {} + # _gen_instance_to_columns holds only real (graph-registered) columns + # and is used for completion tracking. + # _gen_instance_to_columns_including_side_effects extends that with + # side-effect columns for buffer writes only. + gen_instance_to_columns: dict[int, list[str]] = {} for col, gen in generators.items(): - instance_to_columns.setdefault(id(gen), []).append(col) - self._instance_to_columns = instance_to_columns + gen_instance_to_columns.setdefault(id(gen), []).append(col) + self._gen_instance_to_columns = gen_instance_to_columns seen_cols: set[str] = {col for col in generators} - instance_to_write_columns: dict[int, list[str]] = {k: list(v) for k, v in instance_to_columns.items()} + gen_instance_to_columns_incl_se: dict[int, list[str]] = {k: list(v) for k, v in gen_instance_to_columns.items()} for col, gen in generators.items(): for side_effect_col in getattr(gen.config, "side_effect_columns", []): if side_effect_col not in seen_cols: - instance_to_write_columns.setdefault(id(gen), []).append(side_effect_col) + gen_instance_to_columns_incl_se.setdefault(id(gen), []).append(side_effect_col) seen_cols.add(side_effect_col) - self._instance_to_write_columns = instance_to_write_columns + self._gen_instance_to_columns_including_side_effects = gen_instance_to_columns_incl_se # Stateful generator tracking: instance_id → asyncio.Lock self._stateful_locks: dict[int, asyncio.Lock] = {} @@ -358,7 +359,7 @@ async def _salvage_rounds( self._dispatched.discard( Task(column=task.column, row_group=task.row_group, row_index=None, task_type="batch") ) - for sibling in self._instance_to_columns.get(gid, []): + for sibling in self._gen_instance_to_columns.get(gid, []): if sibling != task.column: self._dispatched.discard( Task(column=sibling, row_group=task.row_group, row_index=None, task_type="from_scratch") @@ -379,7 +380,7 @@ async def _salvage_rounds( ) # Re-mark sibling columns as dispatched to mirror _dispatch_seeds # and prevent _drain_frontier from re-dispatching them. - for sibling in self._instance_to_columns.get(gid, []): + for sibling in self._gen_instance_to_columns.get(gid, []): if sibling != task.column: self._dispatched.add( Task(column=sibling, row_group=task.row_group, row_index=None, task_type="from_scratch") @@ -621,7 +622,7 @@ async def _dispatch_seeds(self, rg_id: int, rg_size: int) -> None: self._dispatched.add(task) self._dispatched.add(batch_alias) # Also mark all sibling output columns as dispatched (multi-column dedup) - for sibling_col in self._instance_to_columns.get(gid, []): + for sibling_col in self._gen_instance_to_columns.get(gid, []): if sibling_col != col: self._dispatched.add( Task(column=sibling_col, row_group=rg_id, row_index=None, task_type="from_scratch") @@ -666,7 +667,7 @@ async def _execute_task_inner_impl(self, task: Task) -> None: trace.dispatched_at = time.perf_counter() generator = self._generators[task.column] - output_cols = self._instance_to_columns.get(id(generator), [task.column]) + output_cols = self._gen_instance_to_columns.get(id(generator), [task.column]) retryable = False # When True, skip removing from _dispatched so the task isn't re-dispatched # from the frontier (it was never completed, so it stays in the frontier). @@ -768,7 +769,7 @@ async def _run_from_scratch(self, task: Task, generator: ColumnGenerator) -> Any # Write results to buffer (include side-effect columns) if self._buffer_manager is not None: - write_cols = self._instance_to_write_columns.get(id(generator), [task.column]) + write_cols = self._gen_instance_to_columns_including_side_effects.get(id(generator), [task.column]) for col in write_cols: if col in result_df.columns: values = result_df[col].tolist() @@ -794,7 +795,7 @@ async def _run_cell(self, task: Task, generator: ColumnGenerator) -> Any: # Write back to buffer (include side-effect columns) if self._buffer_manager is not None and not self._tracker.is_dropped(task.row_group, task.row_index): - write_cols = self._instance_to_write_columns.get(id(generator), [task.column]) + write_cols = self._gen_instance_to_columns_including_side_effects.get(id(generator), [task.column]) for col in write_cols: if col in result: self._buffer_manager.update_cell(task.row_group, task.row_index, col, result[col]) @@ -818,7 +819,7 @@ async def _run_batch(self, task: Task, generator: ColumnGenerator) -> Any: # Merge result columns back to buffer (include side-effect columns) if self._buffer_manager is not None: - write_cols = self._instance_to_write_columns.get(id(generator), [task.column]) + write_cols = self._gen_instance_to_columns_including_side_effects.get(id(generator), [task.column]) active_rows = rg_size - len(pre_dropped) if len(result_df) != active_rows: raise ValueError( diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py index 84ade9303..a71b716f4 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py @@ -3,6 +3,7 @@ from __future__ import annotations +import logging import math from collections import deque @@ -14,6 +15,8 @@ from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError from data_designer.engine.dataset_builders.utils.task_model import SliceRef +logger = logging.getLogger(__name__) + class ExecutionGraph: """Column-level static execution graph built from column configs. @@ -114,6 +117,13 @@ def set_side_effect(self, side_effect_col: str, producer: str) -> None: """ if side_effect_col not in self._side_effect_map: self._side_effect_map[side_effect_col] = producer + elif self._side_effect_map[side_effect_col] != producer: + logger.warning( + "Side-effect column %r already mapped to producer %r; ignoring duplicate registration from %r", + side_effect_col, + self._side_effect_map[side_effect_col], + producer, + ) def resolve_side_effect(self, column: str) -> str: """Resolve a column name through the side-effect map. diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index e9bfc818d..6742c0b38 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -1433,9 +1433,10 @@ async def test_scheduler_rg_semaphore_deadlock_with_transient_failures() -> None def test_side_effect_columns_separated_from_completion_tracking() -> None: - """Side-effect columns must appear in _instance_to_write_columns (buffer writes) - but NOT in _instance_to_columns (completion tracking), because they are not - registered in the execution graph and would cause KeyError in CompletionTracker. + """Side-effect columns must appear in _gen_instance_to_columns_including_side_effects + (buffer writes) but NOT in _gen_instance_to_columns (completion tracking), because + they are not registered in the execution graph and would cause KeyError in + CompletionTracker. """ graph = ExecutionGraph() graph.add_column("seed", GenerationStrategy.FULL_COLUMN) @@ -1465,12 +1466,12 @@ def test_side_effect_columns_separated_from_completion_tracking() -> None: cell_id = id(cell_gen) # Completion tracking dict: only real columns - assert "side_a" not in scheduler._instance_to_columns.get(cell_id, []) - assert "side_b" not in scheduler._instance_to_columns.get(cell_id, []) - assert "primary" in scheduler._instance_to_columns.get(cell_id, []) + assert "side_a" not in scheduler._gen_instance_to_columns.get(cell_id, []) + assert "side_b" not in scheduler._gen_instance_to_columns.get(cell_id, []) + assert "primary" in scheduler._gen_instance_to_columns.get(cell_id, []) # Buffer write dict: includes side-effect columns - write_cols = scheduler._instance_to_write_columns.get(cell_id, []) + write_cols = scheduler._gen_instance_to_columns_including_side_effects.get(cell_id, []) assert "primary" in write_cols assert "side_a" in write_cols assert "side_b" in write_cols diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py index 3826bfa92..6a7f1868e 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py @@ -3,6 +3,8 @@ from __future__ import annotations +import logging + import pytest from data_designer.config.column_configs import ( @@ -156,17 +158,20 @@ def test_side_effect_name_collision_prefers_real_column() -> None: assert graph.get_downstream_columns("summary") == set() -def test_side_effect_collision_first_writer_wins() -> None: - """When two producers declare the same side-effect, the first registration wins.""" +def test_side_effect_collision_first_writer_wins(caplog: pytest.LogCaptureFixture) -> None: + """When two producers declare the same side-effect, the first registration wins and a warning is logged.""" graph = ExecutionGraph() graph.add_column("producer_a", GenerationStrategy.CELL_BY_CELL) graph.add_column("producer_b", GenerationStrategy.CELL_BY_CELL) graph.add_column("consumer", GenerationStrategy.CELL_BY_CELL) graph.set_side_effect("shared_se", "producer_a") - graph.set_side_effect("shared_se", "producer_b") # should be ignored + with caplog.at_level(logging.WARNING): + graph.set_side_effect("shared_se", "producer_b") # should be ignored with warning assert graph.resolve_side_effect("shared_se") == "producer_a" + assert "already mapped to producer 'producer_a'" in caplog.text + assert "ignoring duplicate registration from 'producer_b'" in caplog.text # consumer depends on shared_se -> should resolve to producer_a, not producer_b resolved = graph.resolve_side_effect("shared_se") From 604aa21e30dfbb09072bc66f28041a22c6f28063 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Mon, 13 Apr 2026 20:13:25 +0000 Subject: [PATCH 4/5] fix: raise ConfigCompilationError on duplicate side-effect producers Replace first-writer-wins collision handling with a hard error. Each side-effect column must have exactly one producer; duplicates are a configuration issue to be fixed at the source. --- .../dataset_builders/utils/execution_graph.py | 24 +++++++++---------- .../utils/test_execution_graph.py | 22 ++++------------- 2 files changed, 16 insertions(+), 30 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py index a71b716f4..cbf8cf104 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py @@ -12,7 +12,7 @@ DatasetBuilderColumnConfigT, MultiColumnConfig, ) -from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError +from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError from data_designer.engine.dataset_builders.utils.task_model import SliceRef logger = logging.getLogger(__name__) @@ -110,20 +110,18 @@ def add_edge(self, upstream: str, downstream: str) -> None: def set_side_effect(self, side_effect_col: str, producer: str) -> None: """Map a side-effect column name to its producing column. - First-writer-wins: if a side-effect column is already mapped to a - different producer, the earlier mapping is kept. This matches sync - engine semantics where earlier consumers see the first producer's - value. + Each side-effect column must have exactly one producer. Duplicate + registrations from a different producer are a configuration error - + use distinct column names for each pipeline stage instead. """ - if side_effect_col not in self._side_effect_map: - self._side_effect_map[side_effect_col] = producer - elif self._side_effect_map[side_effect_col] != producer: - logger.warning( - "Side-effect column %r already mapped to producer %r; ignoring duplicate registration from %r", - side_effect_col, - self._side_effect_map[side_effect_col], - producer, + existing = self._side_effect_map.get(side_effect_col) + if existing is not None and existing != producer: + raise ConfigCompilationError( + f"Side-effect column {side_effect_col!r} is already produced by {existing!r}; " + f"cannot register a second producer {producer!r}. " + f"Use distinct side-effect column names for each pipeline stage." ) + self._side_effect_map[side_effect_col] = producer def resolve_side_effect(self, column: str) -> str: """Resolve a column name through the side-effect map. diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py index 6a7f1868e..c1aa3eeef 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_execution_graph.py @@ -3,8 +3,6 @@ from __future__ import annotations -import logging - import pytest from data_designer.config.column_configs import ( @@ -21,7 +19,7 @@ from data_designer.config.utils.code_lang import CodeLang from data_designer.config.validator_params import CodeValidatorParams from data_designer.engine.dataset_builders.multi_column_configs import SamplerMultiColumnConfig -from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError +from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError from data_designer.engine.dataset_builders.utils.execution_graph import ExecutionGraph from data_designer.engine.dataset_builders.utils.task_model import SliceRef @@ -158,25 +156,15 @@ def test_side_effect_name_collision_prefers_real_column() -> None: assert graph.get_downstream_columns("summary") == set() -def test_side_effect_collision_first_writer_wins(caplog: pytest.LogCaptureFixture) -> None: - """When two producers declare the same side-effect, the first registration wins and a warning is logged.""" +def test_side_effect_collision_raises() -> None: + """Two producers for the same side-effect column is a configuration error.""" graph = ExecutionGraph() graph.add_column("producer_a", GenerationStrategy.CELL_BY_CELL) graph.add_column("producer_b", GenerationStrategy.CELL_BY_CELL) - graph.add_column("consumer", GenerationStrategy.CELL_BY_CELL) graph.set_side_effect("shared_se", "producer_a") - with caplog.at_level(logging.WARNING): - graph.set_side_effect("shared_se", "producer_b") # should be ignored with warning - - assert graph.resolve_side_effect("shared_se") == "producer_a" - assert "already mapped to producer 'producer_a'" in caplog.text - assert "ignoring duplicate registration from 'producer_b'" in caplog.text - - # consumer depends on shared_se -> should resolve to producer_a, not producer_b - resolved = graph.resolve_side_effect("shared_se") - graph.add_edge(upstream=resolved, downstream="consumer") - assert graph.get_upstream_columns("consumer") == {"producer_a"} + with pytest.raises(ConfigCompilationError, match="already produced by 'producer_a'"): + graph.set_side_effect("shared_se", "producer_b") # -- Validation tests ------------------------------------------------------- From ba84679f2b0279635c1cb0a67fc12c79357d9e6e Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Mon, 13 Apr 2026 20:40:03 +0000 Subject: [PATCH 5/5] fix: reject duplicate side-effect producers in sync DAG path Mirror the async path check: raise ConfigCompilationError when two custom columns declare the same side-effect column name during topological sort. --- .../engine/dataset_builders/utils/dag.py | 14 +++++++++- .../engine/dataset_builders/utils/test_dag.py | 26 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py index fd019137f..60e5583e8 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py @@ -8,7 +8,7 @@ import data_designer.lazy_heavy_imports as lazy from data_designer.config.column_types import ColumnConfigT from data_designer.engine.column_generators.utils.generator_classification import column_type_used_in_execution_dag -from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError +from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError from data_designer.logging import LOG_INDENT logger = logging.getLogger(__name__) @@ -29,6 +29,18 @@ def topologically_sort_column_configs(column_configs: list[ColumnConfigT]) -> li side_effect_dict = {n: list(c.side_effect_columns) for n, c in dag_column_config_dict.items()} + side_effect_to_producer: dict[str, str] = {} + for producer, cols in side_effect_dict.items(): + for col in cols: + existing = side_effect_to_producer.get(col) + if existing is not None and existing != producer: + raise ConfigCompilationError( + f"Side-effect column {col!r} is already produced by {existing!r}; " + f"cannot register a second producer {producer!r}. " + f"Use distinct side-effect column names for each pipeline stage." + ) + side_effect_to_producer[col] = producer + logger.info("⛓️ Sorting column configs into a Directed Acyclic Graph") for name, col in dag_column_config_dict.items(): dag.add_node(name) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py index 8328a8f9d..bbb7aa9c8 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py @@ -1,9 +1,12 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from typing import Any + import pytest from data_designer.config.column_configs import ( + CustomColumnConfig, ExpressionColumnConfig, LLMCodeColumnConfig, LLMJudgeColumnConfig, @@ -13,12 +16,13 @@ ValidationColumnConfig, ) from data_designer.config.column_types import DataDesignerColumnType +from data_designer.config.custom_column import custom_column_generator from data_designer.config.sampler_params import SamplerType from data_designer.config.utils.code_lang import CodeLang from data_designer.config.validator_params import CodeValidatorParams from data_designer.engine.dataset_builders.multi_column_configs import SamplerMultiColumnConfig from data_designer.engine.dataset_builders.utils.dag import topologically_sort_column_configs -from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError +from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError, DAGCircularDependencyError MODEL_ALIAS = "stub-model-alias" @@ -111,3 +115,23 @@ def test_circular_dependencies(): ) with pytest.raises(DAGCircularDependencyError, match="cyclic dependencies"): topologically_sort_column_configs(column_configs) + + +def test_duplicate_side_effect_producers_raises() -> None: + """Two custom columns declaring the same side-effect column is a configuration error.""" + + @custom_column_generator(required_columns=["text"], side_effect_columns=["shared_col"]) + def gen_a(row: dict[str, Any]) -> dict[str, Any]: + return row + + @custom_column_generator(required_columns=["text"], side_effect_columns=["shared_col"]) + def gen_b(row: dict[str, Any]) -> dict[str, Any]: + return row + + column_configs = [ + LLMTextColumnConfig(name="text", prompt="hello", model_alias=MODEL_ALIAS), + CustomColumnConfig(name="col_a", generator_function=gen_a), + CustomColumnConfig(name="col_b", generator_function=gen_b), + ] + with pytest.raises(ConfigCompilationError, match="already produced by"): + topologically_sort_column_configs(column_configs)