Skip to content

Commit acfd538

Browse files
[cross-repo from server#387] Conformance blocker: complete remaining child-workflows parity scenarios (#149)
1 parent 9f6c268 commit acfd538

4 files changed

Lines changed: 235 additions & 4 deletions

File tree

src/durable_workflow/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@
5454
from .errors import (
5555
ActivityCancelled,
5656
ActivityFailed,
57+
ChildWorkflowCancelled,
5758
ChildWorkflowFailed,
59+
ChildWorkflowTerminated,
5860
DurableWorkflowError,
5961
InvalidArgument,
6062
NamespaceNotFound,
@@ -188,7 +190,9 @@
188190
"ActivityRetryPolicy",
189191
"BridgeAdapterOutcome",
190192
"ChildWorkflowRetryPolicy",
193+
"ChildWorkflowCancelled",
191194
"ChildWorkflowFailed",
195+
"ChildWorkflowTerminated",
192196
"Client",
193197
"ContinueAsNew",
194198
"NamespaceDescription",

src/durable_workflow/errors.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,66 @@ class ChildWorkflowFailed(DurableWorkflowError):
236236
:attr:`exception_class` mirrors the child's recorded exception class.
237237
"""
238238

239-
def __init__(self, message: str, exception_class: str | None = None) -> None:
239+
def __init__(
240+
self,
241+
message: str,
242+
exception_class: str | None = None,
243+
*,
244+
failure_kind: str | None = None,
245+
child_workflow_run_id: str | None = None,
246+
child_workflow_type: str | None = None,
247+
) -> None:
240248
super().__init__(message)
241249
self.exception_class = exception_class
250+
self.failure_kind = failure_kind
251+
self.failure_category = failure_kind
252+
self.child_workflow_run_id = child_workflow_run_id
253+
self.child_workflow_type = child_workflow_type
254+
255+
256+
class ChildWorkflowCancelled(ChildWorkflowFailed):
257+
"""A child workflow finished in the ``cancelled`` state.
258+
259+
Raised inside the parent workflow when it awaits a child that was
260+
cancelled directly or by parent-close policy. It is a child outcome, so it
261+
remains catchable as :class:`ChildWorkflowFailed`.
262+
"""
263+
264+
def __init__(
265+
self,
266+
message: str = "child workflow was cancelled",
267+
exception_class: str | None = None,
268+
*,
269+
child_workflow_run_id: str | None = None,
270+
child_workflow_type: str | None = None,
271+
) -> None:
272+
super().__init__(
273+
message,
274+
exception_class,
275+
failure_kind="cancelled",
276+
child_workflow_run_id=child_workflow_run_id,
277+
child_workflow_type=child_workflow_type,
278+
)
279+
280+
281+
class ChildWorkflowTerminated(ChildWorkflowFailed):
282+
"""A child workflow finished in the ``terminated`` state."""
283+
284+
def __init__(
285+
self,
286+
message: str = "child workflow was terminated",
287+
exception_class: str | None = None,
288+
*,
289+
child_workflow_run_id: str | None = None,
290+
child_workflow_type: str | None = None,
291+
) -> None:
292+
super().__init__(
293+
message,
294+
exception_class,
295+
failure_kind="terminated",
296+
child_workflow_run_id=child_workflow_run_id,
297+
child_workflow_type=child_workflow_type,
298+
)
242299

243300

244301
class ActivityFailed(DurableWorkflowError):

src/durable_workflow/workflow.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
from .external_storage import ExternalPayloadCache, ExternalStorageDriver
3434
from .errors import (
3535
ActivityFailed,
36+
ChildWorkflowCancelled,
3637
ChildWorkflowFailed,
38+
ChildWorkflowTerminated,
3739
NonDeterministicReplayError,
3840
QueryFailed,
3941
WorkflowPayloadDecodeError,
@@ -2009,6 +2011,8 @@ def _activity_failed_from_payload(payload: Mapping[str, Any]) -> ActivityFailed:
20092011
message = _optional_str(payload.get("message"))
20102012
if message is None and exception is not None:
20112013
message = _optional_str(exception.get("message"))
2014+
if message is None:
2015+
message = _optional_str(payload.get("closed_reason"))
20122016
exception_type = _optional_str(payload.get("exception_type"))
20132017
if exception_type is None and exception is not None:
20142018
exception_type = _optional_str(exception.get("type"))
@@ -2032,6 +2036,51 @@ def _activity_failed_from_payload(payload: Mapping[str, Any]) -> ActivityFailed:
20322036
)
20332037

20342038

2039+
def _child_workflow_failed_from_payload(
2040+
payload: Mapping[str, Any],
2041+
event_type: str,
2042+
) -> ChildWorkflowFailed:
2043+
exception_payload = payload.get("exception")
2044+
exception = dict(exception_payload) if isinstance(exception_payload, Mapping) else None
2045+
2046+
message = _optional_str(payload.get("message"))
2047+
if message is None and exception is not None:
2048+
message = _optional_str(exception.get("message"))
2049+
2050+
exception_class = _optional_str(payload.get("exception_class"))
2051+
if exception_class is None:
2052+
exception_class = _optional_str(payload.get("exception_type"))
2053+
if exception_class is None and exception is not None:
2054+
exception_class = _optional_str(exception.get("class")) or _optional_str(exception.get("type"))
2055+
2056+
child_workflow_run_id = _optional_str(payload.get("child_workflow_run_id"))
2057+
child_workflow_type = _optional_str(payload.get("child_workflow_type"))
2058+
2059+
if event_type == "ChildRunCancelled":
2060+
return ChildWorkflowCancelled(
2061+
message or "child workflow was cancelled",
2062+
exception_class,
2063+
child_workflow_run_id=child_workflow_run_id,
2064+
child_workflow_type=child_workflow_type,
2065+
)
2066+
2067+
if event_type == "ChildRunTerminated":
2068+
return ChildWorkflowTerminated(
2069+
message or "child workflow was terminated",
2070+
exception_class,
2071+
child_workflow_run_id=child_workflow_run_id,
2072+
child_workflow_type=child_workflow_type,
2073+
)
2074+
2075+
return ChildWorkflowFailed(
2076+
message or "child workflow failed",
2077+
exception_class,
2078+
failure_kind=_optional_str(payload.get("failure_category")) or "child_workflow",
2079+
child_workflow_run_id=child_workflow_run_id,
2080+
child_workflow_type=child_workflow_type,
2081+
)
2082+
2083+
20352084
def _first_yield_failure(values: Iterable[Any]) -> ActivityFailed | ChildWorkflowFailed | None:
20362085
for value in values:
20372086
if isinstance(value, ActivityFailed):
@@ -2392,9 +2441,9 @@ def _receiver_condition_wait_bindings() -> dict[int, str | None]:
23922441
shape,
23932442
ev,
23942443
)
2395-
elif etype == "ChildRunFailed":
2444+
elif etype in ("ChildRunFailed", "ChildRunCancelled", "ChildRunTerminated"):
23962445
_append_resolved_result(
2397-
ChildWorkflowFailed(payload.get("message", "child workflow failed")),
2446+
_child_workflow_failed_from_payload(payload, etype),
23982447
"child workflow",
23992448
ev,
24002449
)

tests/test_replay.py

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@
66
import pytest
77

88
from durable_workflow import Replayer, ReplayOutcome, serializer, workflow
9-
from durable_workflow.errors import ActivityFailed, ChildWorkflowFailed, NonDeterministicReplayError
9+
from durable_workflow.errors import (
10+
ActivityFailed,
11+
ChildWorkflowCancelled,
12+
ChildWorkflowFailed,
13+
ChildWorkflowTerminated,
14+
NonDeterministicReplayError,
15+
)
1016
from durable_workflow.workflow import (
1117
ActivityRetryPolicy,
1218
ChildWorkflowRetryPolicy,
@@ -837,6 +843,48 @@ def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
837843
return "handled"
838844

839845

846+
@workflow.defn(name="child-wf-failure-details")
847+
class ChildWorkflowFailureDetailsWf:
848+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
849+
try:
850+
yield ctx.start_child_workflow("sub-workflow", [])
851+
except ChildWorkflowFailed as exc:
852+
return {
853+
"message": str(exc),
854+
"exception_class": exc.exception_class,
855+
"failure_kind": exc.failure_kind,
856+
"child_workflow_run_id": exc.child_workflow_run_id,
857+
"child_workflow_type": exc.child_workflow_type,
858+
}
859+
860+
861+
@workflow.defn(name="child-wf-cancelled")
862+
class ChildWorkflowCancelledWf:
863+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
864+
try:
865+
yield ctx.start_child_workflow("sub-workflow", [])
866+
except ChildWorkflowCancelled as exc:
867+
return {
868+
"message": str(exc),
869+
"failure_kind": exc.failure_kind,
870+
"child_workflow_run_id": exc.child_workflow_run_id,
871+
"child_workflow_type": exc.child_workflow_type,
872+
}
873+
874+
875+
@workflow.defn(name="child-wf-terminated")
876+
class ChildWorkflowTerminatedWf:
877+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
878+
try:
879+
yield ctx.start_child_workflow("sub-workflow", [])
880+
except ChildWorkflowTerminated as exc:
881+
return {
882+
"message": str(exc),
883+
"failure_kind": exc.failure_kind,
884+
"child_workflow_run_id": exc.child_workflow_run_id,
885+
}
886+
887+
840888
@workflow.defn(name="child-wf-failed-fallback")
841889
class ChildWorkflowFailedFallbackWf:
842890
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
@@ -949,6 +997,79 @@ def test_child_failed_caught(self) -> None:
949997
assert isinstance(cmd, CompleteWorkflow)
950998
assert cmd.result == "handled"
951999

1000+
def test_child_failed_preserves_typed_details(self) -> None:
1001+
history = [
1002+
{
1003+
"event_type": "ChildRunFailed",
1004+
"payload": {
1005+
"message": "child failed",
1006+
"exception_class": "ExampleChildError",
1007+
"failure_category": "child_workflow",
1008+
"child_workflow_run_id": "child-run-1",
1009+
"child_workflow_type": "sub-workflow",
1010+
},
1011+
},
1012+
]
1013+
1014+
outcome = replay(ChildWorkflowFailureDetailsWf, history, [])
1015+
1016+
assert len(outcome.commands) == 1
1017+
cmd = outcome.commands[0]
1018+
assert isinstance(cmd, CompleteWorkflow)
1019+
assert cmd.result == {
1020+
"message": "child failed",
1021+
"exception_class": "ExampleChildError",
1022+
"failure_kind": "child_workflow",
1023+
"child_workflow_run_id": "child-run-1",
1024+
"child_workflow_type": "sub-workflow",
1025+
}
1026+
1027+
def test_child_cancelled_is_observed_by_parent_as_typed_cancellation(self) -> None:
1028+
history = [
1029+
{
1030+
"event_type": "ChildRunCancelled",
1031+
"payload": {
1032+
"message": "cancelled by operator",
1033+
"child_workflow_run_id": "child-run-1",
1034+
"child_workflow_type": "sub-workflow",
1035+
},
1036+
},
1037+
]
1038+
1039+
outcome = replay(ChildWorkflowCancelledWf, history, [])
1040+
1041+
assert len(outcome.commands) == 1
1042+
cmd = outcome.commands[0]
1043+
assert isinstance(cmd, CompleteWorkflow)
1044+
assert cmd.result == {
1045+
"message": "cancelled by operator",
1046+
"failure_kind": "cancelled",
1047+
"child_workflow_run_id": "child-run-1",
1048+
"child_workflow_type": "sub-workflow",
1049+
}
1050+
1051+
def test_child_terminated_is_observed_by_parent_as_typed_child_failure(self) -> None:
1052+
history = [
1053+
{
1054+
"event_type": "ChildRunTerminated",
1055+
"payload": {
1056+
"message": "terminated by operator",
1057+
"child_workflow_run_id": "child-run-1",
1058+
},
1059+
},
1060+
]
1061+
1062+
outcome = replay(ChildWorkflowTerminatedWf, history, [])
1063+
1064+
assert len(outcome.commands) == 1
1065+
cmd = outcome.commands[0]
1066+
assert isinstance(cmd, CompleteWorkflow)
1067+
assert cmd.result == {
1068+
"message": "terminated by operator",
1069+
"failure_kind": "terminated",
1070+
"child_workflow_run_id": "child-run-1",
1071+
}
1072+
9521073
def test_child_failed_then_fallback_yields_command(self) -> None:
9531074
history = [{"event_type": "ChildRunFailed", "payload": {"message": "child failed"}}]
9541075
outcome = replay(ChildWorkflowFailedFallbackWf, history, [])

0 commit comments

Comments
 (0)