Skip to content

Commit 8a99594

Browse files
committed
🎯 feat: update exec method on job module.
1 parent 9243d80 commit 8a99594

4 files changed

Lines changed: 147 additions & 100 deletions

File tree

src/ddeutil/workflow/job.py

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -884,17 +884,31 @@ def process(
884884
params: DictData,
885885
run_id: str,
886886
context: DictData,
887+
*,
887888
parent_run_id: Optional[str] = None,
888889
event: Optional[Event] = None,
889890
) -> Result:
890891
"""Process routing method that will route the provider function depend
891892
on runs-on value.
893+
894+
Args:
895+
params (DictData): A parameter data that want to use in this
896+
execution.
897+
run_id (str): A running stage ID.
898+
context (DictData): A context data that was passed from handler
899+
method.
900+
parent_run_id (str, default None): A parent running ID.
901+
event (Event, default None): An event manager that use to track
902+
parent process was not force stopped.
903+
904+
Returns:
905+
Result: The execution result with status and context data.
892906
"""
893907
trace: Trace = get_trace(
894908
run_id, parent_run_id=parent_run_id, extras=self.extras
895909
)
896910
trace.info(
897-
f"[JOB]: Routing for "
911+
f"[JOB]: Routing "
898912
f"{''.join(self.runs_on.type.value.split('_')).title()}: "
899913
f"{self.id!r}"
900914
)
@@ -952,6 +966,7 @@ def process(
952966
run_id=parent_run_id,
953967
event=event,
954968
)
969+
955970
if rs is None:
956971
trace.error(
957972
f"[JOB]: Execution not support runs-on: {self.runs_on.type.value!r} "
@@ -970,20 +985,20 @@ def process(
970985
},
971986
extras=self.extras,
972987
)
973-
if rs.status in (CANCEL, SKIP):
974-
trace.debug(
975-
f"[JOB]: Job process routing got result status be {rs.status}"
976-
)
988+
989+
if rs.status == SKIP:
990+
raise JobSkipError("Job got skipped status.")
991+
elif rs.status == CANCEL:
992+
raise JobCancelError("Job got canceled status.")
977993
elif rs.status == FAILED:
978-
raise JobError("[JOB]: Job process error")
994+
raise JobError("Job process error")
979995
return rs
980996

981997
def _execute(
982998
self,
983999
params: DictData,
984-
run_id: str,
9851000
context: DictData,
986-
parent_run_id: Optional[str] = None,
1001+
trace: Trace,
9871002
event: Optional[Event] = None,
9881003
) -> Result:
9891004
"""Wrapped the route execute method before returning to handler
@@ -994,45 +1009,40 @@ def _execute(
9941009
9951010
Args:
9961011
params: A parameter data that want to use in this execution
997-
run_id:
9981012
context:
999-
parent_run_id:
1000-
event:
1013+
trace (Trace):
1014+
event (Event, default None):
10011015
10021016
Returns:
10031017
Result: The wrapped execution result.
10041018
"""
10051019
current_retry: int = 0
1020+
maximum_retry: int = self.retry + 1
10061021
exception: Exception
10071022
catch(context, status=WAIT)
1008-
trace: Trace = get_trace(
1009-
run_id, parent_run_id=parent_run_id, extras=self.extras
1010-
)
10111023
try:
10121024
return self.process(
10131025
params,
1014-
run_id,
1026+
run_id=trace.run_id,
10151027
context=context,
1016-
parent_run_id=parent_run_id,
1028+
parent_run_id=trace.parent_run_id,
10171029
event=event,
10181030
)
10191031
except (JobCancelError, JobSkipError):
10201032
trace.debug("[JOB]: process raise skip or cancel error.")
10211033
raise
10221034
except Exception as e:
1035+
if self.retry == 0:
1036+
raise
1037+
10231038
current_retry += 1
10241039
exception = e
1025-
finally:
1026-
trace.debug("[JOB]: Failed at the first execution.")
1027-
1028-
if self.retry == 0:
1029-
raise exception
10301040

10311041
trace.warning(
1032-
f"[JOB]: Retry count: {current_retry} ... "
1042+
f"[JOB]: Retry count: {current_retry}/{maximum_retry} ... "
10331043
f"( {exception.__class__.__name__} )"
10341044
)
1035-
while current_retry < (self.retry + 1):
1045+
while current_retry < maximum_retry:
10361046
try:
10371047
catch(
10381048
context=context,
@@ -1041,18 +1051,18 @@ def _execute(
10411051
)
10421052
return self.process(
10431053
params,
1044-
run_id,
1054+
run_id=trace.run_id,
10451055
context=context,
1046-
parent_run_id=parent_run_id,
1056+
parent_run_id=trace.parent_run_id,
10471057
event=event,
10481058
)
10491059
except (JobCancelError, JobSkipError):
10501060
trace.debug("[JOB]: process raise skip or cancel error.")
10511061
raise
1052-
except exception as e:
1062+
except Exception as e:
10531063
current_retry += 1
10541064
trace.warning(
1055-
f"[JOB]: Retry count: {current_retry} ... "
1065+
f"[JOB]: Retry count: {current_retry}/{maximum_retry} ... "
10561066
f"( {e.__class__.__name__} )"
10571067
)
10581068
exception = e
@@ -1103,22 +1113,24 @@ def execute(
11031113
)
11041114
result: Result = self._execute(
11051115
params,
1106-
run_id=run_id,
11071116
context=context,
1108-
parent_run_id=parent_run_id,
1117+
trace=trace,
11091118
event=event,
11101119
)
11111120
return result
1112-
except JobError: # pragma: no cov
1121+
except JobError as e: # pragma: no cov
1122+
if isinstance(e, JobSkipError):
1123+
trace.error(f"[JOB]: ⏭️ Skip: {e}")
1124+
1125+
st: Status = get_status_from_error(e)
11131126
return Result.from_trace(trace).catch(
1114-
status=FAILED,
1115-
context=catch(context, status=FAILED),
1127+
status=st, context=catch(context, status=st)
11161128
)
11171129
finally:
11181130
context["info"].update(
11191131
{
11201132
"exec_end": get_dt_now(),
1121-
"exec_latency": time.monotonic() - ts,
1133+
"exec_latency": round(time.monotonic() - ts, 6),
11221134
}
11231135
)
11241136
trace.debug("[JOB]: End Handler job execution.")

0 commit comments

Comments
 (0)