diff --git a/luigi/execution_summary.py b/luigi/execution_summary.py index e818a1ed14..a420816124 100644 --- a/luigi/execution_summary.py +++ b/luigi/execution_summary.py @@ -94,24 +94,45 @@ def _partition_tasks(worker): Still_pending_not_ext is only used to get upstream_failure, upstream_missing_dependency and run_by_other_worker """ task_history = worker._add_task_history - pending_tasks = {task for (task, status, ext) in task_history if status == 'PENDING'} - set_tasks = {} - set_tasks["completed"] = {task for (task, status, ext) in task_history if status == 'DONE' and task in pending_tasks} - set_tasks["already_done"] = {task for (task, status, ext) in task_history - if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]} - set_tasks["ever_failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'} - set_tasks["failed"] = set_tasks["ever_failed"] - set_tasks["completed"] - set_tasks["scheduling_error"] = {task for (task, status, ext) in task_history if status == 'UNKNOWN'} - set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history - if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and not ext} - set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history - if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and ext} - set_tasks["run_by_other_worker"] = set() - set_tasks["upstream_failure"] = set() - set_tasks["upstream_missing_dependency"] = set() - set_tasks["upstream_run_by_other_worker"] = set() - set_tasks["upstream_scheduling_error"] = set() - set_tasks["not_run"] = set() + + set_tasks = { + "completed": set(), + "already_done": set(), + "ever_failed": set(), + "failed": set(), + "scheduling_error": set(), + "still_pending_ext": set(), + "still_pending_not_ext": set(), + "run_by_other_worker": set(), + "upstream_failure": set(), + "upstream_missing_dependency": set(), + "upstream_run_by_other_worker": set(), + "upstream_scheduling_error": set(), + "not_run": set() + } + + pending_tasks = set() + + for task, status, ext in task_history: + if status == 'PENDING': + pending_tasks.add(task) + if task not in set_tasks["ever_failed"] and task not in set_tasks["completed"]: + if ext: + set_tasks["still_pending_not_ext"].add(task) + else: + set_tasks["still_pending_ext"].add(task) + elif status == 'DONE': + if task in pending_tasks: + set_tasks["completed"].add(task) + elif task not in set_tasks["completed"]: + set_tasks["already_done"].add(task) + elif status == 'FAILED': + set_tasks["ever_failed"].add(task) + if task not in set_tasks["completed"]: + set_tasks["failed"].add(task) + elif status == 'UNKNOWN': + set_tasks["scheduling_error"].add(task) + return set_tasks