Skip to content

Commit d98e513

Browse files
committed
Add repro / setup
1 parent 84f8417 commit d98e513

2 files changed

Lines changed: 166 additions & 69 deletions

File tree

temporalio/worker/_workflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,9 @@ def _fmt_duration(td: timedelta) -> str:
526526
extra=extra,
527527
)
528528
else:
529-
logger.debug(
529+
# TODO: Undo this before merging
530+
logger.log(
531+
5,
530532
f"[TMPRL1104] {log_id} Workflow task duration information (%s)",
531533
msg_details,
532534
extra=extra,

tests/worker/test_workflow.py

Lines changed: 163 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,8 @@ async def wait_cancel() -> str:
907907

908908

909909
class ActivityWaitCancelNotify:
910-
def __init__(self) -> None:
910+
def __init__(self, *, cancel_delay: timedelta = timedelta()) -> None:
911+
self.cancel_delay = cancel_delay
911912
self.wait_cancel_complete = asyncio.Event()
912913

913914
@activity.defn
@@ -922,6 +923,8 @@ async def wait_cancel(self) -> str:
922923
activity.heartbeat()
923924
return "Manually stopped"
924925
except asyncio.CancelledError:
926+
if self.cancel_delay:
927+
await asyncio.sleep(self.cancel_delay.total_seconds())
925928
return "Got cancelled error, cancelled? " + str(activity.is_cancelled())
926929
finally:
927930
self.wait_cancel_complete.set()
@@ -963,9 +966,6 @@ async def run(self, params: CancelActivityWorkflowParams) -> None:
963966
self._activity_result = await handle
964967
except ActivityError as err:
965968
self._activity_result = f"Error: {err.cause.__class__.__name__}"
966-
# TODO(cretz): Remove when https://github.com/temporalio/sdk-rust/issues/323 is fixed
967-
except CancelledError as err:
968-
self._activity_result = f"Error: {err.__class__.__name__}"
969969
# Wait forever
970970
await asyncio.Future()
971971

@@ -975,74 +975,169 @@ def activity_result(self) -> str:
975975

976976

977977
@pytest.mark.parametrize("local", [True, False])
978+
@pytest.mark.timeout(90000)
978979
async def test_workflow_cancel_activity(client: Client, local: bool):
979980
# Need short task timeout to timeout LA task and longer assert timeout
980981
# so the task can timeout
981982
task_timeout = timedelta(seconds=1)
982983
assert_timeout = timedelta(seconds=10)
983984
activity_inst = ActivityWaitCancelNotify()
984985

985-
async with new_worker(
986-
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
987-
) as worker:
988-
# Try cancel - confirm error and activity was sent the cancel
989-
handle = await client.start_workflow(
990-
CancelActivityWorkflow.run,
991-
CancelActivityWorkflowParams(
992-
cancellation_type=workflow.ActivityCancellationType.TRY_CANCEL.name,
993-
local=local,
994-
),
995-
id=f"workflow-{uuid.uuid4()}",
996-
task_queue=worker.task_queue,
997-
task_timeout=task_timeout,
998-
)
986+
for i in range(100):
987+
print(f"iteration {i}")
988+
async with new_worker(
989+
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
990+
) as worker:
991+
# Try cancel - confirm error and activity was sent the cancel
992+
handle = await client.start_workflow(
993+
CancelActivityWorkflow.run,
994+
CancelActivityWorkflowParams(
995+
cancellation_type=workflow.ActivityCancellationType.TRY_CANCEL.name,
996+
local=local,
997+
),
998+
id=f"workflow-{uuid.uuid4()}",
999+
task_queue=worker.task_queue,
1000+
task_timeout=task_timeout,
1001+
)
9991002

1000-
async def activity_result() -> str:
1001-
return await handle.query(CancelActivityWorkflow.activity_result)
1003+
async def activity_result() -> str:
1004+
return await handle.query(CancelActivityWorkflow.activity_result)
10021005

1003-
await assert_eq_eventually(
1004-
"Error: CancelledError", activity_result, timeout=assert_timeout
1005-
)
1006-
await activity_inst.wait_cancel_complete.wait()
1007-
await handle.cancel()
1006+
await assert_eq_eventually(
1007+
"Error: CancelledError", activity_result, timeout=assert_timeout
1008+
)
1009+
await activity_inst.wait_cancel_complete.wait()
1010+
await handle.cancel()
10081011

1009-
# Wait cancel - confirm no error due to graceful cancel handling
1010-
handle = await client.start_workflow(
1011-
CancelActivityWorkflow.run,
1012-
CancelActivityWorkflowParams(
1013-
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED.name,
1014-
local=local,
1015-
),
1016-
id=f"workflow-{uuid.uuid4()}",
1017-
task_queue=worker.task_queue,
1018-
task_timeout=task_timeout,
1019-
)
1020-
await assert_eq_eventually(
1021-
"Got cancelled error, cancelled? True",
1022-
activity_result,
1023-
timeout=assert_timeout,
1024-
)
1025-
await activity_inst.wait_cancel_complete.wait()
1026-
await handle.cancel()
1012+
# Wait cancel - confirm no error due to graceful cancel handling
1013+
handle = await client.start_workflow(
1014+
CancelActivityWorkflow.run,
1015+
CancelActivityWorkflowParams(
1016+
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED.name,
1017+
local=local,
1018+
),
1019+
id=f"workflow-{uuid.uuid4()}",
1020+
task_queue=worker.task_queue,
1021+
task_timeout=task_timeout,
1022+
)
1023+
await assert_eq_eventually(
1024+
"Got cancelled error, cancelled? True",
1025+
activity_result,
1026+
timeout=assert_timeout,
1027+
)
1028+
await activity_inst.wait_cancel_complete.wait()
1029+
await handle.cancel()
10271030

1028-
# Abandon - confirm error and that activity stays running
1029-
handle = await client.start_workflow(
1030-
CancelActivityWorkflow.run,
1031-
CancelActivityWorkflowParams(
1032-
cancellation_type=workflow.ActivityCancellationType.ABANDON.name,
1033-
local=local,
1034-
),
1035-
id=f"workflow-{uuid.uuid4()}",
1036-
task_queue=worker.task_queue,
1037-
task_timeout=task_timeout,
1031+
# Abandon - confirm error and that activity stays running
1032+
handle = await client.start_workflow(
1033+
CancelActivityWorkflow.run,
1034+
CancelActivityWorkflowParams(
1035+
cancellation_type=workflow.ActivityCancellationType.ABANDON.name,
1036+
local=local,
1037+
),
1038+
id=f"workflow-{uuid.uuid4()}",
1039+
task_queue=worker.task_queue,
1040+
task_timeout=task_timeout,
1041+
)
1042+
await assert_eq_eventually(
1043+
"Error: CancelledError", activity_result, timeout=assert_timeout
1044+
)
1045+
await asyncio.sleep(0.5)
1046+
assert not activity_inst.wait_cancel_complete.is_set()
1047+
await handle.cancel()
1048+
await activity_inst.wait_cancel_complete.wait()
1049+
1050+
1051+
@pytest.mark.skipif(
1052+
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO") != "1",
1053+
reason="Diagnostic repro for local-activity cancellation WFT-timeout race",
1054+
)
1055+
async def test_workflow_cancel_local_activity_wft_timeout_repro(client: Client):
1056+
# This mirrors only the local-activity half of test_workflow_cancel_activity
1057+
# while bounding query/cancel RPCs so failures do not hide the useful logs.
1058+
import temporalio.worker._workflow as workflow_worker
1059+
1060+
workflow_worker.LOG_PROTOS = (
1061+
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_LOG_PROTOS") == "1"
1062+
)
1063+
iterations = int(
1064+
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_ITERS", "20")
1065+
)
1066+
activity_inst = ActivityWaitCancelNotify(
1067+
cancel_delay=timedelta(
1068+
seconds=float(
1069+
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_DELAY", "0")
1070+
)
10381071
)
1039-
await assert_eq_eventually(
1040-
"Error: CancelledError", activity_result, timeout=assert_timeout
1072+
)
1073+
task_timeout = timedelta(
1074+
milliseconds=int(
1075+
os.environ.get(
1076+
"TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_TASK_TIMEOUT_MS", "1000"
1077+
)
10411078
)
1042-
await asyncio.sleep(0.5)
1043-
assert not activity_inst.wait_cancel_complete.is_set()
1044-
await handle.cancel()
1045-
await activity_inst.wait_cancel_complete.wait()
1079+
)
1080+
1081+
try:
1082+
for i in range(iterations):
1083+
print(f"local activity WFT-timeout repro iteration {i}")
1084+
async with new_worker(
1085+
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
1086+
) as worker:
1087+
1088+
async def run_case(
1089+
cancellation_type: workflow.ActivityCancellationType,
1090+
expected_result: str,
1091+
*,
1092+
expect_activity_complete: bool,
1093+
) -> None:
1094+
handle = await client.start_workflow(
1095+
CancelActivityWorkflow.run,
1096+
CancelActivityWorkflowParams(
1097+
cancellation_type=cancellation_type.name,
1098+
local=True,
1099+
),
1100+
id=f"workflow-{uuid.uuid4()}",
1101+
task_queue=worker.task_queue,
1102+
task_timeout=task_timeout,
1103+
)
1104+
1105+
async def activity_result() -> str:
1106+
return await handle.query(
1107+
CancelActivityWorkflow.activity_result,
1108+
rpc_timeout=timedelta(seconds=2),
1109+
)
1110+
1111+
await assert_eq_eventually(
1112+
expected_result,
1113+
activity_result,
1114+
timeout=timedelta(seconds=10),
1115+
interval=timedelta(milliseconds=100),
1116+
)
1117+
if expect_activity_complete:
1118+
await asyncio.wait_for(
1119+
activity_inst.wait_cancel_complete.wait(), timeout=5
1120+
)
1121+
await handle.cancel(rpc_timeout=timedelta(seconds=2))
1122+
1123+
await run_case(
1124+
workflow.ActivityCancellationType.TRY_CANCEL,
1125+
"Error: CancelledError",
1126+
expect_activity_complete=True,
1127+
)
1128+
await run_case(
1129+
workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
1130+
"Got cancelled error, cancelled? True",
1131+
expect_activity_complete=True,
1132+
)
1133+
await run_case(
1134+
workflow.ActivityCancellationType.ABANDON,
1135+
"Error: CancelledError",
1136+
expect_activity_complete=False,
1137+
)
1138+
await asyncio.wait_for(activity_inst.wait_cancel_complete.wait(), 5)
1139+
finally:
1140+
workflow_worker.LOG_PROTOS = False
10461141

10471142

10481143
@workflow.defn
@@ -8852,9 +8947,9 @@ async def test_workflow_uncancel_shield_activity(client: Client):
88528947

88538948
# Verify no spurious "exception in shielded future" error logs
88548949
shielded_err = log_capturer.find_log("exception in shielded future")
8855-
assert shielded_err is None, (
8856-
f"Unexpected 'exception in shielded future' log: {shielded_err}"
8857-
)
8950+
assert (
8951+
shielded_err is None
8952+
), f"Unexpected 'exception in shielded future' log: {shielded_err}"
88588953

88598954

88608955
@workflow.defn
@@ -8938,9 +9033,9 @@ async def test_workflow_uncancel_shield_child_workflow(client: Client):
89389033

89399034
# Verify no spurious "exception in shielded future" error logs
89409035
shielded_err = log_capturer.find_log("exception in shielded future")
8941-
assert shielded_err is None, (
8942-
f"Unexpected 'exception in shielded future' log: {shielded_err}"
8943-
)
9036+
assert (
9037+
shielded_err is None
9038+
), f"Unexpected 'exception in shielded future' log: {shielded_err}"
89449039

89459040

89469041
@workflow.defn
@@ -9006,6 +9101,6 @@ async def test_workflow_uncancel_shield_signal_external(client: Client):
90069101

90079102
# Verify no spurious error logs
90089103
shielded_err = log_capturer.find_log("exception in shielded future")
9009-
assert shielded_err is None, (
9010-
f"Unexpected 'exception in shielded future' log: {shielded_err}"
9011-
)
9104+
assert (
9105+
shielded_err is None
9106+
), f"Unexpected 'exception in shielded future' log: {shielded_err}"

0 commit comments

Comments
 (0)