Skip to content

Commit 838c693

Browse files
committed
⚙️ fixed: refactor set variable on exec method.
1 parent f5779a0 commit 838c693

2 files changed

Lines changed: 30 additions & 32 deletions

File tree

src/ddeutil/workflow/workflow.py

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,18 @@ def process(
622622
max_job_parallel: int = 2,
623623
total_job: Optional[int] = None,
624624
) -> Result:
625-
"""Job process method."""
625+
"""Job process method.
626+
627+
Args:
628+
job_queue:
629+
run_id (str):
630+
context (DictData):
631+
parent_run_id (str, default None):
632+
event (Event, default None):
633+
timeout:
634+
max_job_parallel:
635+
total_job:
636+
"""
626637
ts: float = time.monotonic()
627638
trace: Trace = get_trace(
628639
run_id, parent_run_id=parent_run_id, extras=self.extras
@@ -640,6 +651,7 @@ def process(
640651

641652
# NOTE: Force update internal extras for handler circle execution.
642653
self.extras.update({"__sys_exec_break_circle": self.name})
654+
643655
with ThreadPoolExecutor(max_job_parallel, "wf") as executor:
644656
futures: list[Future] = []
645657

@@ -760,25 +772,19 @@ def process(
760772
future.cancel()
761773

762774
trace.error(
763-
f"[WORKFLOW]: {self.name!r} was timeout because it use exec "
764-
f"time more than {timeout} seconds."
775+
(
776+
f"{self.name!r} was timeout because it use exec time more "
777+
f"than {timeout} seconds."
778+
),
779+
module="workflow",
765780
)
766781

767782
time.sleep(0.0025)
768783

769784
pop_sys_extras(self.extras)
770-
return Result.from_trace(trace).catch(
771-
status=FAILED,
772-
context=catch(
773-
context,
774-
status=FAILED,
775-
updated={
776-
"errors": WorkflowTimeoutError(
777-
f"{self.name!r} was timeout because it use exec time "
778-
f"more than {timeout} seconds."
779-
).to_dict(),
780-
},
781-
),
785+
raise WorkflowTimeoutError(
786+
f"{self.name!r} was timeout because it use exec time more than "
787+
f"{timeout} seconds."
782788
)
783789

784790
def _execute(
@@ -797,11 +803,6 @@ def _execute(
797803
{"jobs": {}, "info": {"exec_start": get_dt_now()}}
798804
| self.parameterize(params)
799805
)
800-
801-
event: ThreadEvent = event or ThreadEvent()
802-
max_job_parallel: int = dynamic(
803-
"max_job_parallel", f=max_job_parallel, extras=self.extras
804-
)
805806
trace.info(
806807
f"[WORKFLOW]: Execute: {self.name!r} ("
807808
f"{'parallel' if max_job_parallel > 1 else 'sequential'} jobs)"
@@ -815,6 +816,7 @@ def _execute(
815816
job_queue: Queue[str] = Queue()
816817
for job_id in self.jobs:
817818
job_queue.put(job_id)
819+
818820
catch(context, status=WAIT)
819821
return self.process(
820822
job_queue,
@@ -850,11 +852,6 @@ def _rerun(
850852

851853
err: dict[str, str] = params.get("errors", {})
852854
trace.info(f"[WORKFLOW]: Previous error: {err}")
853-
854-
event: ThreadEvent = event or ThreadEvent()
855-
max_job_parallel: int = dynamic(
856-
"max_job_parallel", f=max_job_parallel, extras=self.extras
857-
)
858855
trace.info(
859856
f"[WORKFLOW]: Execute: {self.name!r} ("
860857
f"{'parallel' if max_job_parallel > 1 else 'sequential'} jobs)"
@@ -871,7 +868,9 @@ def _rerun(
871868
{
872869
"params": params["params"].copy(),
873870
"jobs": {
874-
j: jobs[j] for j in jobs if jobs[j]["status"] == SUCCESS
871+
j: jobs[j]
872+
for j in jobs
873+
if jobs[j].get("status", FAILED) == SUCCESS
875874
},
876875
}
877876
)
@@ -991,6 +990,10 @@ def execute(
991990
"status": WAIT,
992991
"info": {"exec_start": get_dt_now()},
993992
}
993+
event: ThreadEvent = event or ThreadEvent()
994+
max_job_parallel: int = dynamic(
995+
"max_job_parallel", f=max_job_parallel, extras=self.extras
996+
)
994997
try:
995998
if rerun_mode:
996999
return self._rerun(

tests/test_workflow.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,10 +418,7 @@ def test_workflow_parameterize(test_path):
418418
},
419419
}
420420
)
421-
assert workflow.parameterize({"name": "foo"}) == {
422-
"params": {"name": "foo"},
423-
"jobs": {},
424-
}
421+
assert workflow.parameterize({"name": "foo"}) == {"params": {"name": "foo"}}
425422

426423
# NOTE: Raise if passing parameter that does not set on the workflow.
427424
with pytest.raises(WorkflowError):
@@ -442,11 +439,9 @@ def test_workflow_parameterize(test_path):
442439
)
443440
assert workflow.parameterize({"data": {"foo": {"bar": {"baz": 1}}}}) == {
444441
"params": {"data": {"foo": {"bar": {"baz": 1}}}},
445-
"jobs": {},
446442
}
447443
assert workflow.parameterize({"data": '{"foo": {"bar": {"baz": 1}}}'}) == {
448444
"params": {"data": {"foo": {"bar": {"baz": 1}}}},
449-
"jobs": {},
450445
}
451446

452447

0 commit comments

Comments
 (0)