Skip to content

Commit 04d6fe7

Browse files
committed
Add cancel reasons on caller side
1 parent 48cefca commit 04d6fe7

6 files changed

Lines changed: 97 additions & 15 deletions

File tree

temporalio/client/_impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ async def cancel_workflow(self, input: CancelWorkflowInput) -> None:
352352
identity=self._client.identity,
353353
request_id=str(uuid.uuid4()),
354354
first_execution_run_id=input.first_execution_run_id or "",
355+
reason=input.reason,
355356
),
356357
retry=True,
357358
metadata=input.rpc_metadata,

temporalio/client/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class CancelWorkflowInput:
111111
id: str
112112
run_id: str | None
113113
first_execution_run_id: str | None
114+
reason: str
114115
rpc_metadata: Mapping[str, str | bytes]
115116
rpc_timeout: timedelta | None
116117

temporalio/client/_workflow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ async def result(
318318
async def cancel(
319319
self,
320320
*,
321+
reason: str = "",
321322
rpc_metadata: Mapping[str, str | bytes] = {},
322323
rpc_timeout: timedelta | None = None,
323324
) -> None:
@@ -334,6 +335,8 @@ async def cancel(
334335
workflow ID even if it is unrelated to the started workflow.
335336
336337
Args:
338+
reason: Reason recorded with the cancellation request. Available
339+
inside the workflow via :py:func:`temporalio.workflow.cancellation_reason`.
337340
rpc_metadata: Headers used on the RPC call. Keys here override
338341
client-level RPC metadata keys.
339342
rpc_timeout: Optional RPC deadline to set for the RPC call.
@@ -346,6 +349,7 @@ async def cancel(
346349
id=self._id,
347350
run_id=self._run_id,
348351
first_execution_run_id=self._first_execution_run_id,
352+
reason=reason,
349353
rpc_metadata=rpc_metadata,
350354
rpc_timeout=rpc_timeout,
351355
)

temporalio/worker/_workflow_instance.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1988,10 +1988,12 @@ async def _outbound_start_child_workflow(
19881988
handle: _ChildWorkflowHandle
19891989

19901990
# Common code for handling cancel for start and run
1991-
def apply_child_cancel_error() -> None:
1992-
# Send a cancel request to the child
1991+
def apply_child_cancel_error(err: asyncio.CancelledError) -> None:
1992+
# Send a cancel request to the child, forwarding the msg passed to
1993+
# Task.cancel(msg) (if any) as the cancellation reason.
1994+
reason = err.args[0] if err.args and isinstance(err.args[0], str) else ""
19931995
cancel_command = self._add_command()
1994-
handle._apply_cancel_command(cancel_command)
1996+
handle._apply_cancel_command(cancel_command, reason=reason)
19951997
# If the cancel command is for external workflow, we
19961998
# have to add a seq and mark it pending
19971999
if cancel_command.HasField("request_cancel_external_workflow_execution"):
@@ -2014,8 +2016,8 @@ async def run_child() -> Any:
20142016
# We have to shield because we don't want the future itself
20152017
# to be cancelled
20162018
return await asyncio.shield(handle._result_fut)
2017-
except asyncio.CancelledError:
2018-
apply_child_cancel_error()
2019+
except asyncio.CancelledError as err:
2020+
apply_child_cancel_error(err)
20192021
# Clear the cancellation counter on Python 3.11+ so the
20202022
# next await does not immediately re-raise CancelledError
20212023
if (
@@ -2038,8 +2040,8 @@ async def run_child() -> Any:
20382040
# to be cancelled
20392041
await asyncio.shield(handle._start_fut)
20402042
return handle
2041-
except asyncio.CancelledError:
2042-
apply_child_cancel_error()
2043+
except asyncio.CancelledError as err:
2044+
apply_child_cancel_error(err)
20432045
# Clear the cancellation counter on Python 3.11+ so the
20442046
# next await does not immediately re-raise CancelledError
20452047
if (
@@ -3383,8 +3385,12 @@ def _apply_start_command(self) -> None:
33833385
def _apply_cancel_command(
33843386
self,
33853387
command: temporalio.bridge.proto.workflow_commands.WorkflowCommand,
3388+
*,
3389+
reason: str = "",
33863390
) -> None:
3387-
command.cancel_child_workflow_execution.child_workflow_seq = self._seq
3391+
v = command.cancel_child_workflow_execution
3392+
v.child_workflow_seq = self._seq
3393+
v.reason = reason
33883394

33893395

33903396
class _ExternalWorkflowHandle(temporalio.workflow.ExternalWorkflowHandle[Any]):
@@ -3428,14 +3434,15 @@ async def signal(
34283434
)
34293435
)
34303436

3431-
async def cancel(self) -> None:
3437+
async def cancel(self, *, reason: str = "") -> None:
34323438
self._instance._assert_not_read_only("cancel external handle")
34333439
command = self._instance._add_command()
34343440
v = command.request_cancel_external_workflow_execution
34353441
v.workflow_execution.namespace = self._instance._info.namespace
34363442
v.workflow_execution.workflow_id = self._id
34373443
if self._run_id:
34383444
v.workflow_execution.run_id = self._run_id
3445+
v.reason = reason
34393446
await self._instance._cancel_external_workflow(command)
34403447

34413448

temporalio/workflow/_workflow_ops.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,11 +604,15 @@ async def signal(
604604
"""
605605
raise NotImplementedError
606606

607-
async def cancel(self) -> None:
607+
async def cancel(self, *, reason: str = "") -> None:
608608
"""Send a cancellation request to this external workflow.
609609
610610
This will fail if the workflow cannot accept the request (e.g. if the
611611
workflow is not found).
612+
613+
Args:
614+
reason: Reason recorded with the cancellation request. Available in
615+
the target workflow via :py:func:`cancellation_reason`.
612616
"""
613617
raise NotImplementedError
614618

tests/worker/test_workflow.py

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,16 +1149,81 @@ async def started() -> bool:
11491149
# cancel has already been observed.
11501150
assert await handle.query(CancelReasonWorkflow.reason_inner) is None
11511151

1152-
await handle.cancel()
1152+
await handle.cancel(reason="user-supplied reason")
11531153
with pytest.raises(WorkflowFailureError) as err:
11541154
await handle.result()
11551155
assert isinstance(err.value.cause, CancelledError)
11561156

1157-
# After external cancel, reason is a string (empty since the Python
1158-
# client does not send one) — non-None is the load-bearing distinction.
11591157
outer = await handle.query(CancelReasonWorkflow.reason_outer)
1160-
assert outer is not None
1161-
assert isinstance(outer, str)
1158+
assert outer == "user-supplied reason"
1159+
1160+
1161+
@workflow.defn
1162+
class CancelReasonReporter:
1163+
"""Workflow that swallows a cancel and returns the observed reason."""
1164+
1165+
@workflow.run
1166+
async def run(self) -> str:
1167+
try:
1168+
await asyncio.sleep(1000)
1169+
except asyncio.CancelledError:
1170+
return workflow.cancellation_reason() or ""
1171+
raise RuntimeError("unreachable")
1172+
1173+
1174+
@workflow.defn
1175+
class ChildCancelReasonWorkflow:
1176+
@workflow.run
1177+
async def run(self, msg: str) -> str:
1178+
child = await workflow.start_child_workflow(
1179+
CancelReasonReporter.run,
1180+
id=f"{workflow.info().workflow_id}_child",
1181+
)
1182+
child.cancel(msg)
1183+
return await child
1184+
1185+
1186+
async def test_workflow_child_cancel_reason(client: Client):
1187+
async with new_worker(
1188+
client, ChildCancelReasonWorkflow, CancelReasonReporter
1189+
) as worker:
1190+
result = await client.execute_workflow(
1191+
ChildCancelReasonWorkflow.run,
1192+
"from-parent",
1193+
id=f"workflow-{uuid.uuid4()}",
1194+
task_queue=worker.task_queue,
1195+
)
1196+
assert result == "from-parent"
1197+
1198+
1199+
@workflow.defn
1200+
class ExternalCancelReasonWorkflow:
1201+
@workflow.run
1202+
async def run(self, target_id: str) -> None:
1203+
await workflow.get_external_workflow_handle(target_id).cancel(
1204+
reason="from-external-caller"
1205+
)
1206+
1207+
1208+
async def test_workflow_external_cancel_reason(client: Client):
1209+
async with new_worker(
1210+
client, ExternalCancelReasonWorkflow, CancelReasonReporter
1211+
) as worker:
1212+
target_id = f"workflow-{uuid.uuid4()}"
1213+
target = await client.start_workflow(
1214+
CancelReasonReporter.run,
1215+
id=target_id,
1216+
task_queue=worker.task_queue,
1217+
)
1218+
await client.execute_workflow(
1219+
ExternalCancelReasonWorkflow.run,
1220+
target_id,
1221+
id=f"workflow-{uuid.uuid4()}",
1222+
task_queue=worker.task_queue,
1223+
)
1224+
# Server wraps the user-supplied reason with metadata about the caller
1225+
# when one workflow cancels another, so check for substring.
1226+
assert "from-external-caller" in await target.result()
11621227

11631228

11641229
@workflow.defn

0 commit comments

Comments
 (0)