Skip to content

Commit 9fe8c7d

Browse files
committed
🎯 feat: update rerun strategy on release method.
1 parent 838c693 commit 9fe8c7d

2 files changed

Lines changed: 163 additions & 10 deletions

File tree

src/ddeutil/workflow/workflow.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545
from . import DRYRUN
4646
from .__types import DictData
47-
from .audits import NORMAL, RERUN, Audit, ReleaseType, get_audit
47+
from .audits import NORMAL, RERUN, Audit, AuditData, ReleaseType, get_audit
4848
from .conf import YamlParser, dynamic
4949
from .errors import (
5050
WorkflowCancelError,
@@ -465,13 +465,21 @@ def release(
465465
)
466466

467467
if release_type == RERUN:
468-
# TODO: It will load previous audit and use this data to run with
469-
# the `rerun` method.
470-
raise NotImplementedError(
471-
"Release does not support for rerun type yet. Please use the "
472-
"`rerun` method instead."
473-
)
468+
try:
469+
previous: AuditData = audit.find_audit_with_release(
470+
name, release=release
471+
)
472+
values: DictData = previous.context
473+
except FileNotFoundError:
474+
trace.warning(
475+
(
476+
f"Does not find previous audit log with release: "
477+
f"{release:%Y%m%d%H%M%S}"
478+
),
479+
module="release",
480+
)
474481
elif release_type == DRYRUN:
482+
# IMPORTANT: Set system extra parameter for allow dryrun mode,
475483
self.extras.update({"__sys_release_dryrun_mode": True})
476484
trace.debug("[RELEASE]: Mark dryrun mode to the extra params.")
477485
elif release_type == NORMAL and audit.is_pointed(data=audit_data):

tests/test_workflow_release.py

Lines changed: 148 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import shutil
22
from datetime import datetime
3+
from pathlib import Path
34
from zoneinfo import ZoneInfo
45

56
import pytest
67
from ddeutil.workflow import (
78
DRYRUN,
9+
FAILED,
810
FORCE,
911
NORMAL,
1012
RERUN,
@@ -166,6 +168,8 @@ def test_workflow_release_with_datetime_force():
166168

167169
def test_workflow_release_with_datetime(test_path):
168170
test_audit_skip_path = test_path / "tests_workflow_release_audits"
171+
if test_audit_skip_path.exists():
172+
shutil.rmtree(test_audit_skip_path)
169173

170174
workflow: Workflow = Workflow.model_validate(
171175
obj={
@@ -227,7 +231,7 @@ def test_workflow_release_with_auto(test_path):
227231
def test_workflow_release_rerun():
228232
workflow: Workflow = Workflow.model_validate(
229233
obj={
230-
"name": "wf-scheduling-common",
234+
"name": "wf-release-rerun",
231235
"jobs": {
232236
"first-job": {
233237
"stages": [
@@ -239,8 +243,149 @@ def test_workflow_release_rerun():
239243
"extras": {"enable_write_audit": True},
240244
}
241245
)
242-
with pytest.raises(NotImplementedError):
243-
workflow.release(release=datetime.now(), params={}, release_type=RERUN)
246+
dt = datetime.now()
247+
rs: Result = workflow.release(release=dt, params={}, release_type=RERUN)
248+
assert rs.status == SUCCESS
249+
assert exclude_info(rs.context) == {
250+
"status": SUCCESS,
251+
"params": {},
252+
"release": {
253+
"type": RERUN,
254+
"logical_date": dt.replace(
255+
second=0, microsecond=0, tzinfo=ZoneInfo("UTC")
256+
),
257+
},
258+
"jobs": {
259+
"first-job": {
260+
"status": SUCCESS,
261+
"stages": {
262+
"first-stage": {"outputs": {}, "status": SUCCESS},
263+
"second-stage": {"outputs": {}, "status": SUCCESS},
264+
},
265+
}
266+
},
267+
}
268+
269+
270+
def test_workflow_release_rerun_from_failed(test_path: Path):
271+
test_audit_rerun = test_path / "tests_workflow_release_rerun"
272+
if test_audit_rerun.exists():
273+
shutil.rmtree(test_audit_rerun)
274+
275+
workflow: Workflow = Workflow.model_validate(
276+
obj={
277+
"name": "wf-release-rerun-failed",
278+
"jobs": {
279+
"first-job": {
280+
"stages": [
281+
{"name": "First Stage", "id": "first-stage"},
282+
{
283+
"name": "Second Stage",
284+
"id": "second-stage",
285+
"raise": "Some Error",
286+
},
287+
]
288+
}
289+
},
290+
"extras": {
291+
"audit_conf": {
292+
"type": "file",
293+
"path": str(test_audit_rerun.resolve().absolute()),
294+
},
295+
"enable_write_audit": True,
296+
},
297+
}
298+
)
299+
release: datetime = datetime(2025, 8, 10)
300+
rs: Result = workflow.release(
301+
release=release,
302+
params={"asat-dt": datetime(2024, 10, 1)},
303+
run_id="1001",
304+
runs_metadata={"runs_by": "nobody"},
305+
)
306+
assert rs.status == FAILED
307+
assert exclude_info(rs.context) == {
308+
"status": FAILED,
309+
"params": {"asat-dt": datetime(2024, 10, 1, 0, 0)},
310+
"errors": {
311+
"name": "WorkflowError",
312+
"message": "Job execution, 'first-job', was failed.",
313+
},
314+
"release": {
315+
"type": NORMAL,
316+
"logical_date": datetime(2025, 8, 10, tzinfo=ZoneInfo("UTC")),
317+
},
318+
"jobs": {
319+
"first-job": {
320+
"status": FAILED,
321+
"stages": {
322+
"first-stage": {"outputs": {}, "status": SUCCESS},
323+
"second-stage": {
324+
"outputs": {},
325+
"errors": {
326+
"name": "StageError",
327+
"message": "Some Error",
328+
},
329+
"status": FAILED,
330+
},
331+
},
332+
"errors": {
333+
"name": "JobError",
334+
"message": "Strategy execution was break because its nested-stage, 'second-stage', failed.",
335+
},
336+
}
337+
},
338+
"name": "WorkflowError",
339+
"message": "Job execution, 'first-job', was failed.",
340+
}
341+
print()
342+
print(rs.context["info"])
343+
344+
rs: Result = workflow.release(
345+
release=release,
346+
params={"asat-dt": datetime(2024, 10, 1)},
347+
run_id="1002",
348+
runs_metadata={"runs_by": "nobody"},
349+
release_type=RERUN,
350+
)
351+
assert rs.status == FAILED
352+
assert exclude_info(rs.context) == {
353+
"status": FAILED,
354+
"params": {"asat-dt": datetime(2024, 10, 1, 0, 0)},
355+
"errors": {
356+
"name": "WorkflowError",
357+
"message": "Job execution, 'first-job', was failed.",
358+
},
359+
"release": {
360+
"type": RERUN,
361+
"logical_date": datetime(2025, 8, 10, tzinfo=ZoneInfo("UTC")),
362+
},
363+
"jobs": {
364+
"first-job": {
365+
"status": FAILED,
366+
"stages": {
367+
"first-stage": {"outputs": {}, "status": SUCCESS},
368+
"second-stage": {
369+
"outputs": {},
370+
"errors": {
371+
"name": "StageError",
372+
"message": "Some Error",
373+
},
374+
"status": FAILED,
375+
},
376+
},
377+
"errors": {
378+
"name": "JobError",
379+
"message": "Strategy execution was break because its nested-stage, 'second-stage', failed.",
380+
},
381+
}
382+
},
383+
"name": "WorkflowError",
384+
"message": "Job execution, 'first-job', was failed.",
385+
}
386+
print(rs.context["info"])
387+
388+
shutil.rmtree(test_audit_rerun)
244389

245390

246391
def test_workflow_release_dryrun():

0 commit comments

Comments
 (0)