Skip to content

Commit 48cefca

Browse files
committed
Add workflow cancellation reason
1 parent c8a052c commit 48cefca

4 files changed

Lines changed: 106 additions & 9 deletions

File tree

temporalio/worker/_workflow_instance.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
265265
)
266266
self._primary_task: asyncio.Task[None] | None = None
267267
self._time_ns = 0
268-
self._cancel_requested = False
268+
self._cancel_reason: str | None = None
269269
self._deployment_version_for_current_task: None | (
270270
temporalio.bridge.proto.common.WorkerDeploymentVersion
271271
) = None
@@ -595,10 +595,9 @@ def _apply(
595595
raise RuntimeError(f"Unrecognized job: {job.WhichOneof('variant')}")
596596

597597
def _apply_cancel_workflow(
598-
self, _job: temporalio.bridge.proto.workflow_activation.CancelWorkflow
598+
self, job: temporalio.bridge.proto.workflow_activation.CancelWorkflow
599599
) -> None:
600-
self._cancel_requested = True
601-
# TODO(cretz): Details or cancel message or whatever?
600+
self._cancel_reason = job.reason
602601
if self._primary_task:
603602
# The primary task may not have started yet and we want to give the
604603
# workflow the ability to receive the cancellation, so we must defer
@@ -799,7 +798,6 @@ def _apply_remove_from_cache(
799798
self, _job: temporalio.bridge.proto.workflow_activation.RemoveFromCache
800799
) -> None:
801800
self._deleting = True
802-
self._cancel_requested = True
803801
# We consider eviction to be under replay so that certain code like
804802
# logging that avoids replaying doesn't run during eviction either
805803
self._is_replaying = True
@@ -1189,6 +1187,9 @@ def workflow_continue_as_new(
11891187
)
11901188
)
11911189

1190+
def workflow_cancellation_reason(self) -> str | None:
1191+
return self._cancel_reason
1192+
11921193
def workflow_extern_functions(self) -> Mapping[str, Callable]:
11931194
return self._extern_functions
11941195

@@ -2046,7 +2047,7 @@ async def run_child() -> Any:
20462047
and (t := asyncio.current_task()) is not None
20472048
):
20482049
t.uncancel() # type: ignore[union-attr]
2049-
if self._cancel_requested:
2050+
if self._cancel_reason is not None or self._deleting:
20502051
raise
20512052

20522053
async def _outbound_start_nexus_operation(
@@ -2102,7 +2103,7 @@ async def operation_handle_fn() -> OutputT:
21022103
and (t := asyncio.current_task()) is not None
21032104
):
21042105
t.uncancel() # type: ignore[union-attr]
2105-
if self._cancel_requested:
2106+
if self._cancel_reason is not None or self._deleting:
21062107
raise
21072108

21082109
#### Miscellaneous helpers ####
@@ -2588,8 +2589,9 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
25882589
# cancel later on will show the workflow as cancelled. But this is
25892590
# a Temporal limitation in that cancellation is a state not an
25902591
# event.
2591-
if self._cancel_requested and temporalio.exceptions.is_cancelled_exception(
2592-
err
2592+
if (
2593+
self._cancel_reason is not None
2594+
and temporalio.exceptions.is_cancelled_exception(err)
25932595
):
25942596
self._add_command().cancel_workflow_execution.SetInParent()
25952597
elif self.workflow_is_failure_exception(err):

temporalio/workflow/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
_current_update_info,
6060
_Runtime,
6161
_set_current_update_info,
62+
cancellation_reason,
6263
current_update_info,
6364
deprecate_patch,
6465
extern_functions,
@@ -196,6 +197,7 @@
196197
"get_last_completion_result",
197198
"get_last_failure",
198199
"has_last_completion_result",
200+
"cancellation_reason",
199201
"in_workflow",
200202
"info",
201203
"instance",

temporalio/workflow/_context.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"ParentInfo",
3737
"RootInfo",
3838
"UpdateInfo",
39+
"cancellation_reason",
3940
"current_update_info",
4041
"deprecate_patch",
4142
"extern_functions",
@@ -295,6 +296,9 @@ def workflow_continue_as_new(
295296
initial_versioning_behavior: ContinueAsNewVersioningBehavior | None,
296297
) -> NoReturn: ...
297298

299+
@abstractmethod
300+
def workflow_cancellation_reason(self) -> str | None: ...
301+
298302
@abstractmethod
299303
def workflow_extern_functions(self) -> Mapping[str, Callable]: ...
300304

@@ -591,6 +595,27 @@ def in_workflow() -> bool:
591595
return _Runtime.maybe_current() is not None
592596

593597

598+
def cancellation_reason() -> str | None:
599+
"""Reason the workflow was cancelled, or None if no external cancellation
600+
request has been received.
601+
602+
A non-None value (including an empty string) indicates that the workflow
603+
received an explicit cancellation request from the server. This can be used
604+
when catching an :py:class:`asyncio.CancelledError` to distinguish a
605+
workflow-level cancel from a cancel that originated from inner asyncio task
606+
cancellation.
607+
608+
Note, this only reflects cancellation requested via the server; it is not
609+
set for cache eviction or for cancels of inner tasks/scopes.
610+
611+
Returns:
612+
The reason string sent with the workflow cancellation request (which
613+
may be empty), or ``None`` if the workflow has not been cancelled via
614+
an external request.
615+
"""
616+
return _Runtime.current().workflow_cancellation_reason()
617+
618+
594619
def memo() -> Mapping[str, Any]:
595620
"""Current workflow's memo values, converted without type hints.
596621

tests/worker/test_workflow.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,74 @@ async def started() -> bool:
10931093
assert (await handle.describe()).status == WorkflowExecutionStatus.CANCELED
10941094

10951095

1096+
@workflow.defn
1097+
class CancelReasonWorkflow:
1098+
def __init__(self) -> None:
1099+
self._started = False
1100+
# Reason observed when the inner task was cancelled (no external
1101+
# workflow cancel has happened yet at that point).
1102+
self._reason_inner: str | None = "unset"
1103+
# Reason observed in the outer CancelledError handler after the
1104+
# external workflow cancel has been delivered.
1105+
self._reason_outer: str | None = "unset"
1106+
1107+
@workflow.run
1108+
async def run(self) -> NoReturn:
1109+
self._started = True
1110+
task = asyncio.create_task(asyncio.sleep(1000))
1111+
try:
1112+
task.cancel()
1113+
await task
1114+
except asyncio.CancelledError:
1115+
self._reason_inner = workflow.cancellation_reason()
1116+
try:
1117+
await asyncio.sleep(1000)
1118+
except asyncio.CancelledError:
1119+
self._reason_outer = workflow.cancellation_reason()
1120+
raise
1121+
raise RuntimeError("unreachable")
1122+
1123+
@workflow.query
1124+
def started(self) -> bool:
1125+
return self._started
1126+
1127+
@workflow.query
1128+
def reason_inner(self) -> str | None:
1129+
return self._reason_inner
1130+
1131+
@workflow.query
1132+
def reason_outer(self) -> str | None:
1133+
return self._reason_outer
1134+
1135+
1136+
async def test_workflow_cancellation_reason(client: Client):
1137+
async with new_worker(client, CancelReasonWorkflow) as worker:
1138+
handle = await client.start_workflow(
1139+
CancelReasonWorkflow.run,
1140+
id=f"workflow-{uuid.uuid4()}",
1141+
task_queue=worker.task_queue,
1142+
)
1143+
1144+
async def started() -> bool:
1145+
return await handle.query(CancelReasonWorkflow.started)
1146+
1147+
await assert_eq_eventually(True, started)
1148+
# Before any external cancel, reason is None even though an inner task
1149+
# cancel has already been observed.
1150+
assert await handle.query(CancelReasonWorkflow.reason_inner) is None
1151+
1152+
await handle.cancel()
1153+
with pytest.raises(WorkflowFailureError) as err:
1154+
await handle.result()
1155+
assert isinstance(err.value.cause, CancelledError)
1156+
1157+
# After external cancel, reason is a string (empty since the Python
1158+
# client does not send one) — non-None is the load-bearing distinction.
1159+
outer = await handle.query(CancelReasonWorkflow.reason_outer)
1160+
assert outer is not None
1161+
assert isinstance(outer, str)
1162+
1163+
10961164
@workflow.defn
10971165
class TrapCancelWorkflow:
10981166
@workflow.run

0 commit comments

Comments
 (0)