33
44import itertools
55import sys
6- from typing import Sequence
76from typing import TYPE_CHECKING
87
98import networkx as nx
109from _pytask .config import hookimpl
11- from _pytask .config import IS_FILE_SYSTEM_CASE_SENSITIVE
1210from _pytask .console import ARROW_DOWN_ICON
1311from _pytask .console import console
1412from _pytask .console import FILE_ICON
2321from _pytask .database_utils import State
2422from _pytask .exceptions import ResolvingDependenciesError
2523from _pytask .mark import Mark
26- from _pytask .mark_utils import get_marks
27- from _pytask .mark_utils import has_mark
2824from _pytask .node_protocols import PNode
2925from _pytask .node_protocols import PTask
3026from _pytask .nodes import PythonNode
@@ -48,13 +44,12 @@ def pytask_dag(session: Session) -> bool | None:
4844 session = session , tasks = session .tasks
4945 )
5046 session .hook .pytask_dag_modify_dag (session = session , dag = session .dag )
51- session .hook .pytask_dag_validate_dag (session = session , dag = session .dag )
5247 session .hook .pytask_dag_select_execution_dag (session = session , dag = session .dag )
5348
5449 except Exception : # noqa: BLE001
5550 report = DagReport .from_exception (sys .exc_info ())
5651 session .hook .pytask_dag_log (session = session , report = report )
57- session .dag_reports = report
52+ session .dag_report = report
5853
5954 raise ResolvingDependenciesError from None
6055
@@ -63,7 +58,7 @@ def pytask_dag(session: Session) -> bool | None:
6358
6459
6560@hookimpl
66- def pytask_dag_create_dag (tasks : list [PTask ]) -> nx .DiGraph :
61+ def pytask_dag_create_dag (session : Session , tasks : list [PTask ]) -> nx .DiGraph :
6762 """Create the DAG from tasks, dependencies and products."""
6863
6964 def _add_dependency (dag : nx .DiGraph , task : PTask , node : PNode ) -> None :
@@ -101,6 +96,7 @@ def _add_product(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
10196 )
10297
10398 _check_if_dag_has_cycles (dag )
99+ _check_if_tasks_have_the_same_products (dag , session .config ["paths" ])
104100
105101 return dag
106102
@@ -123,13 +119,6 @@ def pytask_dag_select_execution_dag(session: Session, dag: nx.DiGraph) -> None:
123119 )
124120
125121
126- @hookimpl
127- def pytask_dag_validate_dag (session : Session , dag : nx .DiGraph ) -> None :
128- """Validate the DAG."""
129- _check_if_root_nodes_are_available (dag , session .config ["paths" ])
130- _check_if_tasks_have_the_same_products (dag , session .config ["paths" ])
131-
132-
133122def _have_task_or_neighbors_changed (
134123 session : Session , dag : nx .DiGraph , task : PTask
135124) -> bool :
@@ -198,98 +187,6 @@ def _format_cycles(dag: nx.DiGraph, cycles: list[tuple[str, ...]]) -> str:
198187 return "\n " .join (lines [:- 1 ])
199188
200189
201- _TEMPLATE_ERROR : str = (
202- "Some dependencies do not exist or are not produced by any task. See the following "
203- "tree which shows which dependencies are missing for which tasks.\n \n {}"
204- )
205- if IS_FILE_SYSTEM_CASE_SENSITIVE :
206- _TEMPLATE_ERROR += (
207- "\n \n (Hint: Your file-system is case-sensitive. Check the paths' "
208- "capitalization carefully.)"
209- )
210-
211-
212- def _check_if_root_nodes_are_available (dag : nx .DiGraph , paths : Sequence [Path ]) -> None :
213- __tracebackhide__ = True
214-
215- missing_root_nodes = []
216- is_task_skipped : dict [str , bool ] = {}
217-
218- for node in dag .nodes :
219- is_node = "node" in dag .nodes [node ]
220- is_without_parents = len (list (dag .predecessors (node ))) == 0
221- if is_node and is_without_parents :
222- are_all_tasks_skipped , is_task_skipped = _check_if_tasks_are_skipped (
223- node , dag , is_task_skipped
224- )
225- if not are_all_tasks_skipped :
226- try :
227- node_exists = dag .nodes [node ]["node" ].state ()
228- except Exception as e : # noqa: BLE001
229- msg = _format_exception_from_failed_node_state (node , dag , paths )
230- raise ResolvingDependenciesError (msg ) from e
231- if not node_exists :
232- missing_root_nodes .append (node )
233-
234- if missing_root_nodes :
235- dictionary = {}
236- for node in missing_root_nodes :
237- short_node_name = format_node_name (dag .nodes [node ]["node" ], paths ).plain
238- not_skipped_successors = [
239- task for task in dag .successors (node ) if not is_task_skipped [task ]
240- ]
241- short_successors = reduce_names_of_multiple_nodes (
242- not_skipped_successors , dag , paths
243- )
244- dictionary [short_node_name ] = short_successors
245-
246- text = _format_dictionary_to_tree (dictionary , "Missing dependencies:" )
247- raise ResolvingDependenciesError (_TEMPLATE_ERROR .format (text )) from None
248-
249-
250- def _format_exception_from_failed_node_state (
251- node_signature : str , dag : nx .DiGraph , paths : Sequence [Path ]
252- ) -> str :
253- """Format message when ``node.state()`` threw an exception."""
254- tasks = [dag .nodes [i ]["task" ] for i in dag .successors (node_signature )]
255- names = [task .name for task in tasks ]
256- successors = ", " .join ([f"{ name !r} " for name in names ])
257- node_name = format_node_name (dag .nodes [node_signature ]["node" ], paths ).plain
258- return (
259- f"While checking whether dependency { node_name !r} from task(s) "
260- f"{ successors } exists, an error was raised."
261- )
262-
263-
264- def _check_if_tasks_are_skipped (
265- node : PNode , dag : nx .DiGraph , is_task_skipped : dict [str , bool ]
266- ) -> tuple [bool , dict [str , bool ]]:
267- """Check for a given node whether it is only used by skipped tasks."""
268- are_all_tasks_skipped = []
269- for successor in dag .successors (node ):
270- if successor not in is_task_skipped :
271- is_task_skipped [successor ] = _check_if_task_is_skipped (successor , dag )
272- are_all_tasks_skipped .append (is_task_skipped [successor ])
273-
274- return all (are_all_tasks_skipped ), is_task_skipped
275-
276-
277- def _check_if_task_is_skipped (task_name : str , dag : nx .DiGraph ) -> bool :
278- task = dag .nodes [task_name ]["task" ]
279- is_skipped = has_mark (task , "skip" )
280-
281- if is_skipped :
282- return True
283-
284- skip_if_markers = get_marks (task , "skipif" )
285- return any (_skipif (* marker .args , ** marker .kwargs )[0 ] for marker in skip_if_markers )
286-
287-
288- def _skipif (condition : bool , * , reason : str ) -> tuple [bool , str ]:
289- """Shameless copy to circumvent circular imports."""
290- return condition , reason
291-
292-
293190def _format_dictionary_to_tree (dict_ : dict [str , list [str ]], title : str ) -> str :
294191 """Format missing root nodes."""
295192 tree = Tree (title )
0 commit comments