Skip to content

Commit 89857f7

Browse files
przemekborutaclaude
andcommitted
refactor: unify DAG construction by moving topological sort into execution_graph.py
Eliminates dag.py and its networkx dependency by moving topologically_sort_column_configs into execution_graph.py as a module-level function. Side-effect resolution is now O(1) via a side_effect_map dict (previously O(n²) linear scan). Kahn's algorithm is reused in-place rather than leaning on networkx.topological_sort. Closes #510 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 088e037 commit 89857f7

File tree

4 files changed

+60
-61
lines changed

4 files changed

+60
-61
lines changed

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/config_compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
SamplerMultiColumnConfig,
1212
SeedDatasetMultiColumnConfig,
1313
)
14-
from data_designer.engine.dataset_builders.utils.dag import topologically_sort_column_configs
1514
from data_designer.engine.dataset_builders.utils.errors import ConfigCompilationError
15+
from data_designer.engine.dataset_builders.utils.execution_graph import topologically_sort_column_configs
1616

1717

1818
def compile_dataset_builder_column_configs(config: DataDesignerConfig) -> list[DatasetBuilderColumnConfigT]:

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py

Lines changed: 0 additions & 59 deletions
This file was deleted.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@
33

44
from __future__ import annotations
55

6+
import logging
67
import math
78
from collections import deque
89

910
from data_designer.config.column_configs import GenerationStrategy
11+
from data_designer.config.column_types import ColumnConfigT
12+
from data_designer.engine.column_generators.utils.generator_classification import column_type_used_in_execution_dag
1013
from data_designer.engine.dataset_builders.multi_column_configs import (
1114
DatasetBuilderColumnConfigT,
1215
MultiColumnConfig,
1316
)
1417
from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError
1518
from data_designer.engine.dataset_builders.utils.task_model import SliceRef
19+
from data_designer.logging import LOG_INDENT
20+
21+
logger = logging.getLogger(__name__)
1622

1723

1824
class ExecutionGraph:
@@ -258,3 +264,55 @@ def to_mermaid(self) -> str:
258264
for dep in sorted(self._upstream.get(col, set())):
259265
lines.append(f" {dep} --> {col}")
260266
return "\n".join(lines)
267+
268+
269+
def topologically_sort_column_configs(column_configs: list[ColumnConfigT]) -> list[ColumnConfigT]:
270+
non_dag_cols = [col for col in column_configs if not column_type_used_in_execution_dag(col.column_type)]
271+
dag_col_dict = {col.name: col for col in column_configs if column_type_used_in_execution_dag(col.column_type)}
272+
273+
if not dag_col_dict:
274+
return non_dag_cols
275+
276+
# side_effect_col_name -> producing column name
277+
side_effect_map: dict[str, str] = {}
278+
for name, col in dag_col_dict.items():
279+
for se_col in col.side_effect_columns:
280+
side_effect_map[se_col] = name
281+
282+
def resolve(col_name: str) -> str | None:
283+
if col_name in dag_col_dict:
284+
return col_name
285+
return side_effect_map.get(col_name)
286+
287+
upstream: dict[str, set[str]] = {name: set() for name in dag_col_dict}
288+
downstream: dict[str, set[str]] = {name: set() for name in dag_col_dict}
289+
290+
logger.info("⛓️ Sorting column configs into a Directed Acyclic Graph")
291+
for name, col in dag_col_dict.items():
292+
for req in col.required_columns:
293+
resolved = resolve(req)
294+
if resolved is None or resolved == name:
295+
continue
296+
logger.debug(f"{LOG_INDENT}🔗 `{name}` depends on `{resolved}`")
297+
upstream[name].add(resolved)
298+
downstream[resolved].add(name)
299+
300+
in_degree = {name: len(ups) for name, ups in upstream.items()}
301+
queue: deque[str] = deque(name for name, deg in in_degree.items() if deg == 0)
302+
order: list[str] = []
303+
while queue:
304+
name = queue.popleft()
305+
order.append(name)
306+
for child in downstream.get(name, set()):
307+
in_degree[child] -= 1
308+
if in_degree[child] == 0:
309+
queue.append(child)
310+
311+
if len(order) != len(dag_col_dict):
312+
raise DAGCircularDependencyError(
313+
"🛑 The Data Designer column configurations contain cyclic dependencies. Please "
314+
"inspect the column configurations and ensure they can be sorted without "
315+
"circular references."
316+
)
317+
318+
return non_dag_cols + [dag_col_dict[n] for n in order]

packages/data-designer-engine/tests/engine/dataset_builders/utils/test_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from data_designer.config.utils.code_lang import CodeLang
1818
from data_designer.config.validator_params import CodeValidatorParams
1919
from data_designer.engine.dataset_builders.multi_column_configs import SamplerMultiColumnConfig
20-
from data_designer.engine.dataset_builders.utils.dag import topologically_sort_column_configs
2120
from data_designer.engine.dataset_builders.utils.errors import DAGCircularDependencyError
21+
from data_designer.engine.dataset_builders.utils.execution_graph import topologically_sort_column_configs
2222

2323
MODEL_ALIAS = "stub-model-alias"
2424

0 commit comments

Comments
 (0)