Skip to content

Commit ba17250

Browse files
committed
🎨 format: clean and revise job exec on workflow model.
1 parent 2c1900c commit ba17250

10 files changed

Lines changed: 94 additions & 40 deletions

src/ddeutil/workflow/stages.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,7 +1598,7 @@ def execute_branch(
15981598
result: Result,
15991599
*,
16001600
event: Optional[Event] = None,
1601-
) -> Result:
1601+
) -> tuple[Status, Result]:
16021602
"""Execute branch that will execute all nested-stage that was set in
16031603
this stage with specific branch ID.
16041604
@@ -1611,7 +1611,7 @@ def execute_branch(
16111611
16121612
:raise StageCancelError: If event was set.
16131613
1614-
:rtype: Result
1614+
:rtype: tuple[Status, Result]
16151615
"""
16161616
result.trace.debug(f"[STAGE]: Execute Branch: {branch!r}")
16171617

@@ -1701,7 +1701,7 @@ def execute_branch(
17011701
raise StageCancelError(error_msg, refs=branch)
17021702

17031703
status: Status = SKIP if sum(skips) == total_stage else SUCCESS
1704-
return result.catch(
1704+
return status, result.catch(
17051705
status=status,
17061706
parallel={
17071707
branch: {
@@ -1760,7 +1760,7 @@ def execute(
17601760
statuses: list[Status] = [WAIT] * len_parallel
17611761
for i, future in enumerate(as_completed(futures), start=0):
17621762
try:
1763-
statuses[i] = future.result().status
1763+
statuses[i], _ = future.result()
17641764
except StageError as e:
17651765
statuses[i] = get_status_from_error(e)
17661766
self.mark_errors(context, e)
@@ -1827,7 +1827,7 @@ def execute_item(
18271827
result: Result,
18281828
*,
18291829
event: Optional[Event] = None,
1830-
) -> Result:
1830+
) -> tuple[Status, Result]:
18311831
"""Execute item that will execute all nested-stage that was set in this
18321832
stage with specific foreach item.
18331833
@@ -1849,7 +1849,7 @@ def execute_item(
18491849
:raise StageError: If the stage execution raise any Exception error.
18501850
:raise StageError: If the result from execution has `FAILED` status.
18511851
1852-
:rtype: Result
1852+
:rtype: tuple[Status, Result]
18531853
"""
18541854
result.trace.debug(f"[STAGE]: Execute Item: {item!r}")
18551855
key: StrOrInt = index if self.use_index_as_key else item
@@ -1939,7 +1939,7 @@ def execute_item(
19391939
raise StageCancelError(error_msg, refs=key)
19401940

19411941
status: Status = SKIP if sum(skips) == total_stage else SUCCESS
1942-
return result.catch(
1942+
return status, result.catch(
19431943
status=status,
19441944
foreach={
19451945
key: {
@@ -2051,7 +2051,7 @@ def execute(
20512051

20522052
for i, future in enumerate(done, start=0):
20532053
try:
2054-
statuses[i] = future.result().status
2054+
statuses[i], _ = future.result()
20552055
except StageError as e:
20562056
statuses[i] = get_status_from_error(e)
20572057
self.mark_errors(context, e)
@@ -2123,7 +2123,7 @@ def execute_loop(
21232123
params: DictData,
21242124
result: Result,
21252125
event: Optional[Event] = None,
2126-
) -> tuple[Result, T]:
2126+
) -> tuple[Status, Result, T]:
21272127
"""Execute loop that will execute all nested-stage that was set in this
21282128
stage with specific loop and item.
21292129
@@ -2134,7 +2134,7 @@ def execute_loop(
21342134
:param event: (Event) An Event manager instance that use to cancel this
21352135
execution if it forces stopped by parent execution.
21362136
2137-
:rtype: tuple[Result, T]
2137+
:rtype: tuple[Status, Result, T]
21382138
:return: Return a pair of Result and changed item.
21392139
"""
21402140
result.trace.debug(f"[STAGE]: Execute Loop: {loop} (Item {item!r})")
@@ -2234,6 +2234,7 @@ def execute_loop(
22342234

22352235
status: Status = SKIP if sum(skips) == total_stage else SUCCESS
22362236
return (
2237+
status,
22372238
result.catch(
22382239
status=status,
22392240
until={
@@ -2287,7 +2288,7 @@ def execute(
22872288
"Execution was canceled from the event before start loop."
22882289
)
22892290

2290-
result, item = self.execute_loop(
2291+
status, result, item = self.execute_loop(
22912292
item=item,
22922293
loop=loop,
22932294
params=params,
@@ -2320,7 +2321,7 @@ def execute(
23202321
f": {next_track!r}"
23212322
)
23222323
until_rs: bool = not next_track
2323-
statuses.append(result.status)
2324+
statuses.append(status)
23242325
delay(0.005)
23252326

23262327
if exceed_loop:

src/ddeutil/workflow/workflow.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from pydantic.functional_validators import field_validator, model_validator
3232
from typing_extensions import Self
3333

34-
from . import WorkflowSkipError, get_status_from_error
34+
from . import get_status_from_error
3535
from .__types import DictData
3636
from .conf import FileLoad, Loader, dynamic
3737
from .errors import WorkflowCancelError, WorkflowError, WorkflowTimeoutError
@@ -479,7 +479,7 @@ def execute_job(
479479
*,
480480
result: Optional[Result] = None,
481481
event: Optional[Event] = None,
482-
) -> Result:
482+
) -> tuple[Status, Result]:
483483
"""Job execution with passing dynamic parameters from the main workflow
484484
execution to the target job object via job's ID.
485485
@@ -505,13 +505,12 @@ def execute_job(
505505
"Job execution was canceled because the event was set "
506506
"before start job execution."
507507
)
508-
result.catch(
508+
return CANCEL, result.catch(
509509
status=CANCEL,
510510
context={
511511
"errors": WorkflowCancelError(error_msg).to_dict(),
512512
},
513513
)
514-
raise WorkflowCancelError(error_msg)
515514

516515
result.trace.info(f"[WORKFLOW]: Execute Job: {job.id!r}")
517516
rs: Result = job.execute(
@@ -524,33 +523,28 @@ def execute_job(
524523

525524
if rs.status == FAILED:
526525
error_msg: str = f"Job execution, {job.id!r}, was failed."
527-
result.catch(
526+
return FAILED, result.catch(
528527
status=FAILED,
529528
context={
530529
"errors": WorkflowError(error_msg).to_dict(),
531530
**params,
532531
},
533532
)
534-
raise WorkflowError(error_msg)
535533

536534
elif rs.status == CANCEL:
537535
error_msg: str = (
538536
f"Job execution, {job.id!r}, was canceled from the event after "
539537
f"end job execution."
540538
)
541-
result.catch(
539+
return CANCEL, result.catch(
542540
status=CANCEL,
543541
context={
544542
"errors": WorkflowCancelError(error_msg).to_dict(),
545543
**params,
546544
},
547545
)
548-
raise WorkflowCancelError(error_msg)
549546

550-
result.catch(status=rs.status, context=params)
551-
if rs.status == SKIP:
552-
raise WorkflowSkipError(f"Job execution, {job.id!r}, was skipped")
553-
return result
547+
return rs.status, result.catch(status=rs.status, context=params)
554548

555549
def execute(
556550
self,
@@ -710,8 +704,8 @@ def execute(
710704
if e := future.exception():
711705
sequence_statuses.append(get_status_from_error(e))
712706
else:
713-
rs: Result = future.result()
714-
sequence_statuses.append(rs.status)
707+
st, _ = future.result()
708+
sequence_statuses.append(st)
715709
job_queue.put(job_id)
716710
elif future.cancelled():
717711
sequence_statuses.append(CANCEL)
@@ -734,7 +728,7 @@ def execute(
734728
total_future: int = 0
735729
for i, future in enumerate(as_completed(futures), start=0):
736730
try:
737-
statuses[i] = future.result().status
731+
statuses[i], _ = future.result()
738732
except WorkflowError as e:
739733
statuses[i] = get_status_from_error(e)
740734
total_future += 1

tests/stages/test_stage_call.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import pytest
22
from ddeutil.workflow import FAILED, SUCCESS, Result, Workflow
33
from ddeutil.workflow.stages import CallStage, Stage
4-
from utils import dump_yaml_context
4+
5+
from ..utils import dump_yaml_context
56

67

78
def test_call_stage_exec_necessary_args():

tests/stages/test_stage_case.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from ddeutil.workflow import CANCEL, FAILED, SKIP, SUCCESS, Result
22
from ddeutil.workflow.stages import CaseStage, Stage
3-
from utils import MockEvent
3+
4+
from ..utils import MockEvent
45

56

67
def test_case_stage_exec(test_path):

tests/stages/test_stage_foreach.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
Workflow,
88
)
99
from ddeutil.workflow.stages import ForEachStage, Stage
10-
from utils import MockEvent, dump_yaml_context
10+
11+
from ..utils import MockEvent, dump_yaml_context
1112

1213

1314
def test_foreach_stage_exec_all_skipped():
@@ -62,6 +63,59 @@ def test_foreach_stage_exec_all_skipped():
6263
}
6364

6465

66+
def test_foreach_stage_exec_other_skipped():
67+
stage: Stage = ForEachStage(
68+
name="Start run for-each stage",
69+
id="foreach-stage",
70+
foreach=[1, 2, 3],
71+
concurrent=3,
72+
stages=[
73+
{
74+
"name": "Echo stage",
75+
"if": "${{ item }} == 3",
76+
"echo": "Start run with item ${{ item }}",
77+
},
78+
{
79+
"name": "Final Echo",
80+
"if": "${{ item }} == 3",
81+
"echo": "Final stage",
82+
},
83+
],
84+
)
85+
rs: Result = stage.handler_execute(params={})
86+
assert rs.status == SUCCESS
87+
assert rs.context == {
88+
"status": SUCCESS,
89+
"items": [1, 2, 3],
90+
"foreach": {
91+
1: {
92+
"status": SKIP,
93+
"item": 1,
94+
"stages": {
95+
"2709471980": {"outputs": {}, "status": SKIP},
96+
"9263488742": {"outputs": {}, "status": SKIP},
97+
},
98+
},
99+
2: {
100+
"status": SKIP,
101+
"item": 2,
102+
"stages": {
103+
"2709471980": {"outputs": {}, "status": SKIP},
104+
"9263488742": {"outputs": {}, "status": SKIP},
105+
},
106+
},
107+
3: {
108+
"status": SUCCESS,
109+
"item": 3,
110+
"stages": {
111+
"2709471980": {"outputs": {}, "status": SUCCESS},
112+
"9263488742": {"outputs": {}, "status": SUCCESS},
113+
},
114+
},
115+
},
116+
}
117+
118+
65119
def test_foreach_stage_exec_skipped():
66120
stage: Stage = ForEachStage(
67121
name="Start run for-each stage",

tests/stages/test_stage_parallel.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
Result,
77
)
88
from ddeutil.workflow.stages import ParallelStage, Stage
9-
from utils import MockEvent
9+
10+
from ..utils import MockEvent
1011

1112

1213
def test_parallel_stage_exec():

tests/stages/test_stage_py_virtual.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from ddeutil.workflow import Result, Stage, StageError, Workflow
2-
from utils import dump_yaml_context
2+
3+
from ..utils import dump_yaml_context
34

45

56
def test_stage_py_virtual(test_path):

tests/stages/test_stage_trigger.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
Workflow,
99
)
1010
from ddeutil.workflow.stages import Stage
11-
from utils import MockEvent
1211

1312
from src.ddeutil.workflow import TriggerStage
1413

14+
from ..utils import MockEvent
15+
1516

1617
def test_trigger_stage_exec():
1718
workflow = Workflow.from_conf(name="wf-trigger", extras={})

tests/stages/test_stage_until.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from ddeutil.workflow import FAILED, SKIP, SUCCESS, Result, Stage, Workflow
22
from ddeutil.workflow.stages import EmptyStage, UntilStage
3-
from utils import dump_yaml_context
3+
4+
from ..utils import dump_yaml_context
45

56

67
def test_until_stage():

tests/test_workflow_exec_job.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import pytest
2-
from ddeutil.workflow import FAILED, SUCCESS, Result, Workflow, WorkflowError
1+
from ddeutil.workflow import FAILED, SUCCESS, Workflow
32
from ddeutil.workflow.job import Job
43

54

@@ -18,7 +17,8 @@ def test_workflow_execute_job():
1817
],
1918
)
2019
workflow: Workflow = Workflow(name="workflow", jobs={"demo-run": job})
21-
rs = workflow.execute_job(job=workflow.job("demo-run"), params={})
20+
st, rs = workflow.execute_job(job=workflow.job("demo-run"), params={})
21+
assert st == SUCCESS
2222
assert rs.status == SUCCESS
2323
assert rs.context == {
2424
"status": SUCCESS,
@@ -44,9 +44,8 @@ def test_workflow_execute_job_raise_inside():
4444
],
4545
)
4646
workflow: Workflow = Workflow(name="workflow", jobs={"demo-run": job})
47-
rs = Result()
48-
with pytest.raises(WorkflowError):
49-
workflow.execute_job(job=workflow.job("demo-run"), result=rs, params={})
47+
st, rs = workflow.execute_job(job=workflow.job("demo-run"), params={})
48+
assert st == FAILED
5049
assert rs.status == FAILED
5150
assert rs.context == {
5251
"status": FAILED,

0 commit comments

Comments
 (0)