2929from _pytask .node_protocols import PPathNode
3030from _pytask .node_protocols import PTask
3131from _pytask .node_protocols import PTaskWithPath
32- from _pytask .path import find_common_ancestor_of_nodes
32+ from _pytask .nodes import PythonNode
3333from _pytask .report import DagReport
3434from _pytask .shared import reduce_names_of_multiple_nodes
3535from _pytask .shared import reduce_node_name
@@ -87,6 +87,16 @@ def pytask_dag_create_dag(tasks: list[PTask]) -> nx.DiGraph:
8787 tree_map (lambda x : dag .add_node (x .name , node = x ), task .produces )
8888 tree_map (lambda x : dag .add_edge (task .name , x .name ), task .produces )
8989
90+ # If a node is a PythonNode wrapped in another PythonNode, it is a product from
91+ # another task that is a dependency in the current task. Thus, draw an edge
92+ # connecting the two nodes.
93+ tree_map (
94+ lambda x : dag .add_edge (x .value .name , x .name )
95+ if isinstance (x , PythonNode ) and isinstance (x .value , PythonNode )
96+ else None ,
97+ task .depends_on ,
98+ )
99+
90100 _check_if_dag_has_cycles (dag )
91101
92102 return dag
@@ -114,7 +124,7 @@ def pytask_dag_select_execution_dag(session: Session, dag: nx.DiGraph) -> None:
114124def pytask_dag_validate_dag (session : Session , dag : nx .DiGraph ) -> None :
115125 """Validate the DAG."""
116126 _check_if_root_nodes_are_available (dag , session .config ["paths" ])
117- _check_if_tasks_have_the_same_products (dag )
127+ _check_if_tasks_have_the_same_products (dag , session . config [ "paths" ] )
118128
119129
120130def _have_task_or_neighbors_changed (
@@ -292,7 +302,7 @@ def _format_dictionary_to_tree(dict_: dict[str, list[str]], title: str) -> str:
292302 return render_to_string (tree , console = console , strip_styles = True )
293303
294304
295- def _check_if_tasks_have_the_same_products (dag : nx .DiGraph ) -> None :
305+ def _check_if_tasks_have_the_same_products (dag : nx .DiGraph , paths : list [ Path ] ) -> None :
296306 nodes_created_by_multiple_tasks = []
297307
298308 for node in dag .nodes :
@@ -303,19 +313,11 @@ def _check_if_tasks_have_the_same_products(dag: nx.DiGraph) -> None:
303313 nodes_created_by_multiple_tasks .append (node )
304314
305315 if nodes_created_by_multiple_tasks :
306- all_names = nodes_created_by_multiple_tasks + [
307- predecessor
308- for node in nodes_created_by_multiple_tasks
309- for predecessor in dag .predecessors (node )
310- ]
311- common_ancestor = find_common_ancestor_of_nodes (* all_names )
312316 dictionary = {}
313317 for node in nodes_created_by_multiple_tasks :
314- short_node_name = reduce_node_name (
315- dag .nodes [node ]["node" ], [common_ancestor ]
316- )
318+ short_node_name = reduce_node_name (dag .nodes [node ]["node" ], paths )
317319 short_predecessors = reduce_names_of_multiple_nodes (
318- dag .predecessors (node ), dag , [ common_ancestor ]
320+ dag .predecessors (node ), dag , paths
319321 )
320322 dictionary [short_node_name ] = short_predecessors
321323 text = _format_dictionary_to_tree (dictionary , "Products from multiple tasks:" )
0 commit comments