Skip to content

Commit 71dd98a

Browse files
committed
🎯 feat: add warp exec on rerun method.
1 parent 72804ce commit 71dd98a

1 file changed

Lines changed: 53 additions & 193 deletions

File tree

src/ddeutil/workflow/workflow.py

Lines changed: 53 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ def parameterize(self, params: DictData) -> DictData:
374374
if k in self.params
375375
}
376376
),
377-
"jobs": {},
378377
}
379378

380379
def release(
@@ -615,18 +614,18 @@ def process(
615614
event: Optional[ThreadEvent] = None,
616615
timeout: float = 3600,
617616
max_job_parallel: int = 2,
617+
total_job: Optional[int] = None,
618618
) -> Result:
619+
"""Process workflow."""
619620
ts: float = time.monotonic()
620621
trace: Trace = get_trace(
621622
run_id, parent_run_id=parent_run_id, extras=self.extras
622623
)
623624
not_timeout_flag: bool = True
624-
total_job: int = len(self.jobs)
625+
total_job: int = total_job or len(self.jobs)
625626
statuses: list[Status] = [WAIT] * total_job
626627
skip_count: int = 0
627628
sequence_statuses: list[Status] = []
628-
629-
catch(context, status=WAIT)
630629
if event and event.is_set():
631630
err_msg: str = (
632631
"Execution was canceled from the event was set "
@@ -770,6 +769,7 @@ def process(
770769
statuses[total + 1 + skip_count + i] = s
771770

772771
pop_sys_extras(self.extras)
772+
print(statuses)
773773
st: Status = validate_statuses(statuses)
774774
return Result(
775775
run_id=run_id,
@@ -817,6 +817,7 @@ def _execute(
817817
event: Optional[ThreadEvent] = None,
818818
timeout: float = 3600,
819819
max_job_parallel: int = 2,
820+
total_job: Optional[int] = None,
820821
) -> Result:
821822
catch(context, status=WAIT)
822823
return self.process(
@@ -827,6 +828,7 @@ def _execute(
827828
event=event,
828829
timeout=timeout,
829830
max_job_parallel=max_job_parallel,
831+
total_job=total_job,
830832
)
831833

832834
def execute(
@@ -881,18 +883,22 @@ def execute(
881883
--> Ok --> Result
882884
╰-status: SUCCESS
883885
884-
:param params: A parameter data that will parameterize before execution.
885-
:param run_id: (Optional[str]) A workflow running ID.
886-
:param event: (Event) An Event manager instance that use to cancel this
887-
execution if it forces stopped by parent execution.
888-
:param timeout: (float) A workflow execution time out in second unit
889-
that use for limit time of execution and waiting job dependency.
890-
This value does not force stop the task that still running more than
891-
this limit time. (Default: 60 * 60 seconds)
892-
:param max_job_parallel: (int) The maximum workers that use for job
893-
execution in `ThreadPoolExecutor` object. (Default: 2 workers)
894-
895-
:rtype: Result
886+
Args:
887+
params (DictData): A parameter data that will parameterize before
888+
execution.
889+
run_id (str, default None): A workflow running ID.
890+
event (Event, default None): An Event manager instance that use to
891+
cancel this execution if it forces stopped by parent execution.
892+
timeout (float, default 3600): A workflow execution time out in
893+
second unit that use for limit time of execution and waiting job
894+
dependency. This value does not force stop the task that still
895+
running more than this limit time. (Default: 60 * 60 seconds)
896+
max_job_parallel (int, default 2) The maximum workers that use for
897+
job execution in `ThreadPoolExecutor` object.
898+
899+
Returns
900+
Result: Return Result object that create from execution context with
901+
return mode.
896902
"""
897903
ts: float = time.monotonic()
898904
parent_run_id, run_id = extract_id(
@@ -904,9 +910,11 @@ def execute(
904910
extras=self.extras,
905911
pre_process=True,
906912
)
907-
context: DictData = self.parameterize(params) | {
908-
"info": {"exec_start": get_dt_now()}
909-
}
913+
context: DictData = {
914+
"jobs": {},
915+
"info": {"exec_start": get_dt_now()},
916+
} | self.parameterize(params)
917+
910918
event: ThreadEvent = event or ThreadEvent()
911919
max_job_parallel: int = dynamic(
912920
"max_job_parallel", f=max_job_parallel, extras=self.extras
@@ -962,16 +970,16 @@ def rerun(
962970
it does not support rerun only stage.
963971
964972
Args:
965-
context: A context result that get the failed status.
966-
run_id: (Optional[str]) A workflow running ID.
967-
event: (Event) An Event manager instance that use to cancel this
968-
execution if it forces stopped by parent execution.
969-
timeout: (float) A workflow execution time out in second unit
970-
that use for limit time of execution and waiting job dependency.
971-
This value does not force stop the task that still running more
972-
than this limit time. (Default: 60 * 60 seconds)
973-
max_job_parallel: (int) The maximum workers that use for job
974-
execution in `ThreadPoolExecutor` object. (Default: 2 workers)
973+
context (DictData): A context result that get the failed status.
974+
run_id (str, default None): A workflow running ID.
975+
event (Event, default None): An Event manager instance that use to
976+
cancel this execution if it forces stopped by parent execution.
977+
timeout (float, default 3600): A workflow execution time out in
978+
second unit that use for limit time of execution and waiting job
979+
dependency. This value does not force stop the task that still
980+
running more than this limit time. (Default: 60 * 60 seconds)
981+
max_job_parallel (int, default 2) The maximum workers that use for
982+
job execution in `ThreadPoolExecutor` object.
975983
976984
Returns
977985
Result: Return Result object that create from execution context with
@@ -1016,6 +1024,7 @@ def rerun(
10161024
context: DictData = {
10171025
"params": context["params"].copy(),
10181026
"jobs": {j: jobs[j] for j in jobs if jobs[j]["status"] == SUCCESS},
1027+
"info": {"exec_start": get_dt_now()},
10191028
}
10201029

10211030
total_job: int = 0
@@ -1038,169 +1047,20 @@ def rerun(
10381047
context=catch(context=context, status=SKIP),
10391048
)
10401049

1041-
not_timeout_flag: bool = True
1042-
statuses: list[Status] = [WAIT] * total_job
1043-
skip_count: int = 0
1044-
sequence_statuses: list[Status] = []
1045-
timeout: float = dynamic(
1046-
"max_job_exec_timeout", f=timeout, extras=self.extras
1047-
)
1048-
1049-
catch(context, status=WAIT)
1050-
if event and event.is_set():
1051-
return Result.from_trace(trace).catch(
1052-
status=CANCEL,
1053-
context=catch(
1054-
context,
1055-
status=CANCEL,
1056-
updated={
1057-
"errors": WorkflowCancelError(
1058-
"Execution was canceled from the event was set "
1059-
"before workflow execution."
1060-
).to_dict(),
1061-
},
1062-
),
1050+
try:
1051+
return self._execute(
1052+
job_queue,
1053+
context,
1054+
trace,
1055+
event=event,
1056+
timeout=timeout,
1057+
max_job_parallel=max_job_parallel,
1058+
total_job=total_job,
10631059
)
1064-
1065-
with ThreadPoolExecutor(max_job_parallel, "wf") as executor:
1066-
futures: list[Future] = []
1067-
backoff_sleep = 0.01
1068-
consecutive_waits = 0
1069-
1070-
while not job_queue.empty() and (
1071-
not_timeout_flag := ((time.monotonic() - ts) < timeout)
1072-
):
1073-
job_id: str = job_queue.get()
1074-
job: Job = self.job(name=job_id)
1075-
if (check := job.check_needs(context["jobs"])) == WAIT:
1076-
job_queue.task_done()
1077-
job_queue.put(job_id)
1078-
consecutive_waits += 1
1079-
1080-
# NOTE: Exponential backoff up to 0.15s max.
1081-
backoff_sleep = min(backoff_sleep * 1.5, 0.15)
1082-
time.sleep(backoff_sleep)
1083-
continue
1084-
1085-
# NOTE: Reset backoff when we can proceed
1086-
consecutive_waits = 0
1087-
backoff_sleep = 0.01
1088-
1089-
if check == FAILED: # pragma: no cov
1090-
return Result.from_trace(trace).catch(
1091-
status=FAILED,
1092-
context=catch(
1093-
context,
1094-
status=FAILED,
1095-
updated={
1096-
"status": FAILED,
1097-
"errors": WorkflowError(
1098-
f"Validate job trigger rule was failed "
1099-
f"with {job.trigger_rule.value!r}."
1100-
).to_dict(),
1101-
},
1102-
),
1103-
)
1104-
elif check == SKIP: # pragma: no cov
1105-
trace.info(
1106-
f"[JOB]: Skip job: {job_id!r} from trigger rule."
1107-
)
1108-
job.set_outputs(output={"status": SKIP}, to=context)
1109-
job_queue.task_done()
1110-
skip_count += 1
1111-
continue
1112-
1113-
if max_job_parallel > 1:
1114-
futures.append(
1115-
executor.submit(
1116-
self.process_job,
1117-
job=job,
1118-
run_id=run_id,
1119-
context=context,
1120-
parent_run_id=parent_run_id,
1121-
event=event,
1122-
),
1123-
)
1124-
job_queue.task_done()
1125-
continue
1126-
1127-
if len(futures) < 1:
1128-
futures.append(
1129-
executor.submit(
1130-
self.process_job,
1131-
job=job,
1132-
run_id=run_id,
1133-
context=context,
1134-
parent_run_id=parent_run_id,
1135-
event=event,
1136-
)
1137-
)
1138-
elif (future := futures.pop(0)).done():
1139-
if e := future.exception():
1140-
sequence_statuses.append(get_status_from_error(e))
1141-
else:
1142-
st, _ = future.result()
1143-
sequence_statuses.append(st)
1144-
job_queue.put(job_id)
1145-
elif future.cancelled():
1146-
sequence_statuses.append(CANCEL)
1147-
job_queue.put(job_id)
1148-
elif future.running() or "state=pending" in str(future):
1149-
futures.insert(0, future)
1150-
job_queue.put(job_id)
1151-
else: # pragma: no cov
1152-
job_queue.put(job_id)
1153-
futures.insert(0, future)
1154-
trace.warning(
1155-
f"[WORKFLOW]: ... Execution non-threading not "
1156-
f"handle: {future}."
1157-
)
1158-
1159-
job_queue.task_done()
1160-
1161-
if not_timeout_flag:
1162-
job_queue.join()
1163-
for total, future in enumerate(as_completed(futures), start=0):
1164-
try:
1165-
statuses[total], _ = future.result()
1166-
except WorkflowError as e:
1167-
statuses[total] = get_status_from_error(e)
1168-
1169-
# NOTE: Update skipped status from the job trigger.
1170-
for i in range(skip_count):
1171-
statuses[total + 1 + i] = SKIP
1172-
1173-
# NOTE: Update status from none-parallel job execution.
1174-
for i, s in enumerate(sequence_statuses, start=0):
1175-
statuses[total + 1 + skip_count + i] = s
1176-
1177-
st: Status = validate_statuses(statuses)
1178-
return Result.from_trace(trace).catch(
1179-
status=st,
1180-
context=catch(context, status=st),
1181-
)
1182-
1183-
event.set()
1184-
for future in futures:
1185-
future.cancel()
1186-
1187-
trace.error(
1188-
f"[WORKFLOW]: {self.name!r} was timeout because it use exec "
1189-
f"time more than {timeout} seconds."
1060+
finally:
1061+
context["info"].update(
1062+
{
1063+
"exec_end": get_dt_now(),
1064+
"exec_latency": round(time.monotonic() - ts, 6),
1065+
}
11901066
)
1191-
1192-
time.sleep(0.0025)
1193-
1194-
return Result.from_trace(trace).catch(
1195-
status=FAILED,
1196-
context=catch(
1197-
context,
1198-
status=FAILED,
1199-
updated={
1200-
"errors": WorkflowTimeoutError(
1201-
f"{self.name!r} was timeout because it use exec time "
1202-
f"more than {timeout} seconds."
1203-
).to_dict(),
1204-
},
1205-
),
1206-
)

0 commit comments

Comments
 (0)