Skip to content

Commit 043f2bf

Browse files
authored
🎯 feat: handle workflow rerun method for job failed. (#53)
* 🎯 feat: draft rerun method. * 🧪 tests: add testcase for rerun only one failed job. * ⚙️ fixed: fix workflow job does not copy job model before validation. * 🎯 feat: improve duplicate stage validation in the job model. * ⚙️ fixed: filter failure job before release rerun job step. * ⚙️ fixed: filter failure job before release rerun job step.
1 parent 0b604c6 commit 043f2bf

5 files changed

Lines changed: 410 additions & 13 deletions

File tree

src/ddeutil/workflow/job.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,19 @@ def __validate_stage_id__(cls, value: list[Stage]) -> list[Stage]:
402402
"""
403403
# VALIDATE: Validate stage id should not duplicate.
404404
rs: list[str] = []
405+
rs_raise: list[str] = []
405406
for stage in value:
406407
name: str = stage.iden
407408
if name in rs:
408-
raise ValueError(
409-
f"Stage name, {name!r}, should not be duplicate."
410-
)
409+
rs_raise.append(name)
410+
continue
411411
rs.append(name)
412+
413+
if rs_raise:
414+
raise ValueError(
415+
f"Stage name, {', '.join(repr(s) for s in rs_raise)}, should "
416+
f"not be duplicate."
417+
)
412418
return value
413419

414420
@model_validator(mode="after")

src/ddeutil/workflow/workflow.py

Lines changed: 218 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,10 @@ def __validate_jobs_need__(self) -> Self:
257257
f"{self.name!r}."
258258
)
259259

260-
# NOTE: Set job ID to the job model.
261-
self.jobs[job].id = job
260+
# NOTE: Copy the job model and set job ID to the job model.
261+
job_model = self.jobs[job].model_copy()
262+
job_model.id = job
263+
self.jobs[job] = job_model
262264

263265
# VALIDATE: Validate workflow name should not dynamic with params
264266
# template.
@@ -771,3 +773,217 @@ def execute(
771773
).to_dict(),
772774
},
773775
)
776+
777+
def rerun(
778+
self,
779+
context: DictData,
780+
*,
781+
parent_run_id: Optional[str] = None,
782+
event: Optional[Event] = None,
783+
timeout: float = 3600,
784+
max_job_parallel: int = 2,
785+
) -> Result:
786+
"""Re-Execute workflow with passing the error context data.
787+
788+
:param context: A context result that get the failed status.
789+
:param parent_run_id: (Optional[str]) A parent workflow running ID.
790+
:param event: (Event) An Event manager instance that use to cancel this
791+
execution if it forces stopped by parent execution.
792+
:param timeout: (float) A workflow execution time out in second unit
793+
that use for limit time of execution and waiting job dependency.
794+
This value does not force stop the task that still running more than
795+
this limit time. (Default: 60 * 60 seconds)
796+
:param max_job_parallel: (int) The maximum workers that use for job
797+
execution in `ThreadPoolExecutor` object. (Default: 2 workers)
798+
799+
:rtype: Result
800+
"""
801+
ts: float = time.monotonic()
802+
803+
result: Result = Result.construct_with_rs_or_id(
804+
parent_run_id=parent_run_id,
805+
id_logic=self.name,
806+
extras=self.extras,
807+
)
808+
if context["status"] == SUCCESS:
809+
result.trace.info(
810+
"[WORKFLOW]: Does not rerun because it already executed with "
811+
"success status."
812+
)
813+
return result.catch(status=SUCCESS, context=context)
814+
815+
err = context["errors"]
816+
result.trace.info(f"[WORKFLOW]: Previous error: {err}")
817+
818+
event: Event = event or Event()
819+
max_job_parallel: int = dynamic(
820+
"max_job_parallel", f=max_job_parallel, extras=self.extras
821+
)
822+
result.trace.info(
823+
f"[WORKFLOW]: Execute: {self.name!r} ("
824+
f"{'parallel' if max_job_parallel > 1 else 'sequential'} jobs)"
825+
)
826+
if not self.jobs:
827+
result.trace.warning(f"[WORKFLOW]: {self.name!r} does not set jobs")
828+
return result.catch(status=SUCCESS, context=context)
829+
830+
# NOTE: Prepare the new context for rerun process.
831+
jobs: DictData = context.get("jobs")
832+
new_context: DictData = {
833+
"params": context["params"].copy(),
834+
"jobs": {j: jobs[j] for j in jobs if jobs[j]["status"] == SUCCESS},
835+
}
836+
837+
total_job: int = 0
838+
job_queue: Queue = Queue()
839+
for job_id in self.jobs:
840+
841+
if job_id in new_context["jobs"]:
842+
continue
843+
844+
job_queue.put(job_id)
845+
total_job += 1
846+
847+
if total_job == 0:
848+
result.trace.warning("[WORKFLOW]: It does not have job to rerun.")
849+
return result.catch(status=SUCCESS, context=context)
850+
851+
not_timeout_flag: bool = True
852+
statuses: list[Status] = [WAIT] * total_job
853+
skip_count: int = 0
854+
sequence_statuses: list[Status] = []
855+
timeout: float = dynamic(
856+
"max_job_exec_timeout", f=timeout, extras=self.extras
857+
)
858+
859+
result.catch(status=WAIT, context=new_context)
860+
if event and event.is_set():
861+
return result.catch(
862+
status=CANCEL,
863+
context={
864+
"errors": WorkflowCancelError(
865+
"Execution was canceled from the event was set before "
866+
"workflow execution."
867+
).to_dict(),
868+
},
869+
)
870+
871+
with ThreadPoolExecutor(max_job_parallel, "wf") as executor:
872+
futures: list[Future] = []
873+
874+
while not job_queue.empty() and (
875+
not_timeout_flag := ((time.monotonic() - ts) < timeout)
876+
):
877+
job_id: str = job_queue.get()
878+
job: Job = self.job(name=job_id)
879+
if (check := job.check_needs(new_context["jobs"])) == WAIT:
880+
job_queue.task_done()
881+
job_queue.put(job_id)
882+
time.sleep(0.15)
883+
continue
884+
elif check == FAILED: # pragma: no cov
885+
return result.catch(
886+
status=FAILED,
887+
context={
888+
"status": FAILED,
889+
"errors": WorkflowError(
890+
f"Validate job trigger rule was failed with "
891+
f"{job.trigger_rule.value!r}."
892+
).to_dict(),
893+
},
894+
)
895+
elif check == SKIP: # pragma: no cov
896+
result.trace.info(
897+
f"[JOB]: Skip job: {job_id!r} from trigger rule."
898+
)
899+
job.set_outputs(output={"status": SKIP}, to=new_context)
900+
job_queue.task_done()
901+
skip_count += 1
902+
continue
903+
904+
if max_job_parallel > 1:
905+
futures.append(
906+
executor.submit(
907+
self.execute_job,
908+
job=job,
909+
params=new_context,
910+
result=result,
911+
event=event,
912+
),
913+
)
914+
job_queue.task_done()
915+
continue
916+
917+
if len(futures) < 1:
918+
futures.append(
919+
executor.submit(
920+
self.execute_job,
921+
job=job,
922+
params=new_context,
923+
result=result,
924+
event=event,
925+
)
926+
)
927+
elif (future := futures.pop(0)).done():
928+
if e := future.exception():
929+
sequence_statuses.append(get_status_from_error(e))
930+
else:
931+
st, _ = future.result()
932+
sequence_statuses.append(st)
933+
job_queue.put(job_id)
934+
elif future.cancelled():
935+
sequence_statuses.append(CANCEL)
936+
job_queue.put(job_id)
937+
elif future.running() or "state=pending" in str(future):
938+
futures.insert(0, future)
939+
job_queue.put(job_id)
940+
else: # pragma: no cov
941+
job_queue.put(job_id)
942+
futures.insert(0, future)
943+
result.trace.warning(
944+
f"[WORKFLOW]: ... Execution non-threading not "
945+
f"handle: {future}."
946+
)
947+
948+
job_queue.task_done()
949+
950+
if not_timeout_flag:
951+
job_queue.join()
952+
for total, future in enumerate(as_completed(futures), start=0):
953+
try:
954+
statuses[total], _ = future.result()
955+
except WorkflowError as e:
956+
statuses[total] = get_status_from_error(e)
957+
958+
# NOTE: Update skipped status from the job trigger.
959+
for i in range(skip_count):
960+
statuses[total + 1 + i] = SKIP
961+
962+
# NOTE: Update status from none-parallel job execution.
963+
for i, s in enumerate(sequence_statuses, start=0):
964+
statuses[total + 1 + skip_count + i] = s
965+
966+
return result.catch(
967+
status=validate_statuses(statuses), context=new_context
968+
)
969+
970+
event.set()
971+
for future in futures:
972+
future.cancel()
973+
974+
result.trace.error(
975+
f"[WORKFLOW]: {self.name!r} was timeout because it use exec "
976+
f"time more than {timeout} seconds."
977+
)
978+
979+
time.sleep(0.0025)
980+
981+
return result.catch(
982+
status=FAILED,
983+
context={
984+
"errors": WorkflowTimeoutError(
985+
f"{self.name!r} was timeout because it use exec time more "
986+
f"than {timeout} seconds."
987+
).to_dict(),
988+
},
989+
)

tests/test_workflow.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ def test_workflow():
4242
)
4343

4444
assert workflow.name == "manual-workflow"
45-
assert workflow.job("demo-run") == job
45+
46+
set_job_id = job.model_copy()
47+
set_job_id.id = "demo-run"
48+
assert workflow.job("demo-run") == set_job_id
4649

4750
# NOTE: Raise ValueError when get a job with ID that does not exist.
4851
with pytest.raises(ValueError):

tests/test_workflow_exec.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,21 @@ def test_workflow_exec():
2525
name="demo-workflow",
2626
jobs={"sleep-run": job, "sleep-again-run": job},
2727
)
28+
assert all(j in workflow.jobs for j in ("sleep-run", "sleep-again-run"))
29+
2830
rs: Result = workflow.execute(params={}, max_job_parallel=1)
2931
assert rs.status == SUCCESS
3032
assert rs.context == {
3133
"status": SUCCESS,
3234
"params": {},
3335
"jobs": {
36+
"sleep-run": {
37+
"status": SUCCESS,
38+
"stages": {"7972360640": {"outputs": {}, "status": SUCCESS}},
39+
},
3440
"sleep-again-run": {
3541
"status": SUCCESS,
36-
"stages": {
37-
"7972360640": {
38-
"outputs": {},
39-
"status": SUCCESS,
40-
}
41-
},
42+
"stages": {"7972360640": {"outputs": {}, "status": SUCCESS}},
4243
},
4344
},
4445
}
@@ -64,7 +65,7 @@ def test_workflow_exec_timeout():
6465
"status": FAILED,
6566
"params": {},
6667
"jobs": {
67-
"sleep-again-run": {
68+
"sleep-run": {
6869
"status": CANCEL,
6970
"stages": {"7972360640": {"outputs": {}, "status": SUCCESS}},
7071
"errors": {
@@ -189,6 +190,10 @@ def test_workflow_exec_parallel():
189190
"status": SUCCESS,
190191
"params": {},
191192
"jobs": {
193+
"sleep-run": {
194+
"status": SUCCESS,
195+
"stages": {"7972360640": {"outputs": {}, "status": SUCCESS}},
196+
},
192197
"sleep-again-run": {
193198
"status": SUCCESS,
194199
"stages": {"7972360640": {"outputs": {}, "status": SUCCESS}},

0 commit comments

Comments
 (0)