Skip to content

Commit 72804ce

Browse files
committed
🎯 feat: split handler exec func to warp.
1 parent 8a99594 commit 72804ce

1 file changed

Lines changed: 150 additions & 97 deletions

File tree

src/ddeutil/workflow/workflow.py

Lines changed: 150 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ def release(
526526
),
527527
)
528528

529-
def execute_job(
529+
def process_job(
530530
self,
531531
job: Job,
532532
run_id: str,
@@ -605,112 +605,27 @@ def execute_job(
605605

606606
return result.status, catch(context, status=result.status)
607607

608-
def execute(
608+
def process(
609609
self,
610-
params: DictData,
610+
job_queue: Queue[str],
611+
run_id: str,
612+
context: DictData,
611613
*,
612-
run_id: Optional[str] = None,
614+
parent_run_id: Optional[str] = None,
613615
event: Optional[ThreadEvent] = None,
614616
timeout: float = 3600,
615617
max_job_parallel: int = 2,
616618
) -> Result:
617-
"""Execute workflow with passing a dynamic parameters to all jobs that
618-
included in this workflow model with `jobs` field.
619-
620-
The result of execution process for each job and stages on this
621-
workflow will keep in dict which able to catch out with all jobs and
622-
stages by dot annotation.
623-
624-
For example with non-strategy job, when I want to use the output
625-
from previous stage, I can access it with syntax:
626-
627-
... ${job-id}.stages.${stage-id}.outputs.${key}
628-
... ${job-id}.stages.${stage-id}.errors.${key}
629-
630-
But example for strategy job:
631-
632-
... ${job-id}.strategies.${strategy-id}.stages.${stage-id}.outputs.${key}
633-
... ${job-id}.strategies.${strategy-id}.stages.${stage-id}.errors.${key}
634-
635-
This method already handle all exception class that can raise from
636-
the job execution. It will warp that error and keep it in the key `errors`
637-
at the result context.
638-
639-
640-
Execution --> Ok --> Result
641-
|-status: CANCEL
642-
╰-context:
643-
╰-errors:
644-
|-name: ...
645-
╰-message: ...
646-
647-
--> Ok --> Result
648-
|-status: FAILED
649-
╰-context:
650-
╰-errors:
651-
|-name: ...
652-
╰-message: ...
653-
654-
--> Ok --> Result
655-
╰-status: SKIP
656-
657-
--> Ok --> Result
658-
╰-status: SUCCESS
659-
660-
:param params: A parameter data that will parameterize before execution.
661-
:param run_id: (Optional[str]) A workflow running ID.
662-
:param event: (Event) An Event manager instance that use to cancel this
663-
execution if it forces stopped by parent execution.
664-
:param timeout: (float) A workflow execution time out in second unit
665-
that use for limit time of execution and waiting job dependency.
666-
This value does not force stop the task that still running more than
667-
this limit time. (Default: 60 * 60 seconds)
668-
:param max_job_parallel: (int) The maximum workers that use for job
669-
execution in `ThreadPoolExecutor` object. (Default: 2 workers)
670-
671-
:rtype: Result
672-
"""
673619
ts: float = time.monotonic()
674-
parent_run_id, run_id = extract_id(
675-
self.name, run_id=run_id, extras=self.extras
676-
)
677620
trace: Trace = get_trace(
678-
run_id,
679-
parent_run_id=parent_run_id,
680-
extras=self.extras,
681-
pre_process=True,
682-
)
683-
context: DictData = self.parameterize(params)
684-
event: ThreadEvent = event or ThreadEvent()
685-
max_job_parallel: int = dynamic(
686-
"max_job_parallel", f=max_job_parallel, extras=self.extras
687-
)
688-
trace.info(
689-
f"[WORKFLOW]: Execute: {self.name!r} ("
690-
f"{'parallel' if max_job_parallel > 1 else 'sequential'} jobs)"
621+
run_id, parent_run_id=parent_run_id, extras=self.extras
691622
)
692-
if not self.jobs:
693-
trace.warning(f"[WORKFLOW]: {self.name!r} does not set jobs")
694-
return Result(
695-
run_id=run_id,
696-
parent_run_id=parent_run_id,
697-
status=SUCCESS,
698-
context=catch(context, status=SUCCESS),
699-
extras=self.extras,
700-
)
701-
702-
job_queue: Queue = Queue()
703-
for job_id in self.jobs:
704-
job_queue.put(job_id)
705-
706623
not_timeout_flag: bool = True
707624
total_job: int = len(self.jobs)
708625
statuses: list[Status] = [WAIT] * total_job
709626
skip_count: int = 0
710627
sequence_statuses: list[Status] = []
711-
timeout: float = dynamic(
712-
"max_job_exec_timeout", f=timeout, extras=self.extras
713-
)
628+
714629
catch(context, status=WAIT)
715630
if event and event.is_set():
716631
err_msg: str = (
@@ -791,7 +706,7 @@ def execute(
791706
if max_job_parallel > 1:
792707
futures.append(
793708
executor.submit(
794-
self.execute_job,
709+
self.process_job,
795710
job=job,
796711
run_id=run_id,
797712
context=context,
@@ -805,7 +720,7 @@ def execute(
805720
if len(futures) < 1:
806721
futures.append(
807722
executor.submit(
808-
self.execute_job,
723+
self.process_job,
809724
job=job,
810725
run_id=run_id,
811726
context=context,
@@ -893,6 +808,144 @@ def execute(
893808
extras=self.extras,
894809
)
895810

811+
def _execute(
812+
self,
813+
job_queue: Queue[str],
814+
context: DictData,
815+
trace: Trace,
816+
*,
817+
event: Optional[ThreadEvent] = None,
818+
timeout: float = 3600,
819+
max_job_parallel: int = 2,
820+
) -> Result:
821+
catch(context, status=WAIT)
822+
return self.process(
823+
job_queue,
824+
run_id=trace.run_id,
825+
context=context,
826+
parent_run_id=trace.parent_run_id,
827+
event=event,
828+
timeout=timeout,
829+
max_job_parallel=max_job_parallel,
830+
)
831+
832+
def execute(
833+
self,
834+
params: DictData,
835+
*,
836+
run_id: Optional[str] = None,
837+
event: Optional[ThreadEvent] = None,
838+
timeout: float = 3600,
839+
max_job_parallel: int = 2,
840+
) -> Result:
841+
"""Execute workflow with passing a dynamic parameters to all jobs that
842+
included in this workflow model with `jobs` field.
843+
844+
The result of execution process for each job and stages on this
845+
workflow will keep in dict which able to catch out with all jobs and
846+
stages by dot annotation.
847+
848+
For example with non-strategy job, when I want to use the output
849+
from previous stage, I can access it with syntax:
850+
851+
... ${job-id}.stages.${stage-id}.outputs.${key}
852+
... ${job-id}.stages.${stage-id}.errors.${key}
853+
854+
But example for strategy job:
855+
856+
... ${job-id}.strategies.${strategy-id}.stages.${stage-id}.outputs.${key}
857+
... ${job-id}.strategies.${strategy-id}.stages.${stage-id}.errors.${key}
858+
859+
This method already handle all exception class that can raise from
860+
the job execution. It will warp that error and keep it in the key `errors`
861+
at the result context.
862+
863+
864+
Execution --> Ok --> Result
865+
|-status: CANCEL
866+
╰-context:
867+
╰-errors:
868+
|-name: ...
869+
╰-message: ...
870+
871+
--> Ok --> Result
872+
|-status: FAILED
873+
╰-context:
874+
╰-errors:
875+
|-name: ...
876+
╰-message: ...
877+
878+
--> Ok --> Result
879+
╰-status: SKIP
880+
881+
--> Ok --> Result
882+
╰-status: SUCCESS
883+
884+
:param params: A parameter data that will parameterize before execution.
885+
:param run_id: (Optional[str]) A workflow running ID.
886+
:param event: (Event) An Event manager instance that use to cancel this
887+
execution if it forces stopped by parent execution.
888+
:param timeout: (float) A workflow execution time out in second unit
889+
that use for limit time of execution and waiting job dependency.
890+
This value does not force stop the task that still running more than
891+
this limit time. (Default: 60 * 60 seconds)
892+
:param max_job_parallel: (int) The maximum workers that use for job
893+
execution in `ThreadPoolExecutor` object. (Default: 2 workers)
894+
895+
:rtype: Result
896+
"""
897+
ts: float = time.monotonic()
898+
parent_run_id, run_id = extract_id(
899+
self.name, run_id=run_id, extras=self.extras
900+
)
901+
trace: Trace = get_trace(
902+
run_id,
903+
parent_run_id=parent_run_id,
904+
extras=self.extras,
905+
pre_process=True,
906+
)
907+
context: DictData = self.parameterize(params) | {
908+
"info": {"exec_start": get_dt_now()}
909+
}
910+
event: ThreadEvent = event or ThreadEvent()
911+
max_job_parallel: int = dynamic(
912+
"max_job_parallel", f=max_job_parallel, extras=self.extras
913+
)
914+
trace.info(
915+
f"[WORKFLOW]: Execute: {self.name!r} ("
916+
f"{'parallel' if max_job_parallel > 1 else 'sequential'} jobs)"
917+
)
918+
if not self.jobs:
919+
trace.warning(f"[WORKFLOW]: {self.name!r} does not set jobs")
920+
return Result(
921+
run_id=run_id,
922+
parent_run_id=parent_run_id,
923+
status=SUCCESS,
924+
context=catch(context, status=SUCCESS),
925+
extras=self.extras,
926+
)
927+
928+
job_queue: Queue[str] = Queue()
929+
for job_id in self.jobs:
930+
job_queue.put(job_id)
931+
932+
try:
933+
return self._execute(
934+
job_queue,
935+
context,
936+
trace,
937+
event=event,
938+
timeout=timeout,
939+
max_job_parallel=max_job_parallel,
940+
)
941+
finally:
942+
context["info"].update(
943+
{
944+
"exec_end": get_dt_now(),
945+
"exec_latency": round(time.monotonic() - ts, 6),
946+
}
947+
)
948+
896949
def rerun(
897950
self,
898951
context: DictData,
@@ -1060,7 +1113,7 @@ def rerun(
10601113
if max_job_parallel > 1:
10611114
futures.append(
10621115
executor.submit(
1063-
self.execute_job,
1116+
self.process_job,
10641117
job=job,
10651118
run_id=run_id,
10661119
context=context,
@@ -1074,7 +1127,7 @@ def rerun(
10741127
if len(futures) < 1:
10751128
futures.append(
10761129
executor.submit(
1077-
self.execute_job,
1130+
self.process_job,
10781131
job=job,
10791132
run_id=run_id,
10801133
context=context,

0 commit comments

Comments
 (0)