Skip to content

Commit a68fc1c

Browse files
committed
Undo temporary repro stuff
1 parent 67b0ff9 commit a68fc1c

2 files changed

Lines changed: 57 additions & 157 deletions

File tree

temporalio/worker/_workflow.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -526,9 +526,7 @@ def _fmt_duration(td: timedelta) -> str:
526526
extra=extra,
527527
)
528528
else:
529-
# TODO: Undo this before merging
530-
logger.log(
531-
5,
529+
logger.debug(
532530
f"[TMPRL1104] {log_id} Workflow task duration information (%s)",
533531
msg_details,
534532
extra=extra,

tests/worker/test_workflow.py

Lines changed: 56 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -907,8 +907,7 @@ async def wait_cancel() -> str:
907907

908908

909909
class ActivityWaitCancelNotify:
910-
def __init__(self, *, cancel_delay: timedelta = timedelta()) -> None:
911-
self.cancel_delay = cancel_delay
910+
def __init__(self) -> None:
912911
self.wait_cancel_complete = asyncio.Event()
913912

914913
@activity.defn
@@ -923,8 +922,6 @@ async def wait_cancel(self) -> str:
923922
activity.heartbeat()
924923
return "Manually stopped"
925924
except asyncio.CancelledError:
926-
if self.cancel_delay:
927-
await asyncio.sleep(self.cancel_delay.total_seconds())
928925
return "Got cancelled error, cancelled? " + str(activity.is_cancelled())
929926
finally:
930927
self.wait_cancel_complete.set()
@@ -975,169 +972,74 @@ def activity_result(self) -> str:
975972

976973

977974
@pytest.mark.parametrize("local", [True, False])
978-
@pytest.mark.timeout(90000)
979975
async def test_workflow_cancel_activity(client: Client, local: bool):
980976
# Need short task timeout to timeout LA task and longer assert timeout
981977
# so the task can timeout
982978
task_timeout = timedelta(seconds=1)
983979
assert_timeout = timedelta(seconds=10)
984980
activity_inst = ActivityWaitCancelNotify()
985981

986-
for i in range(100):
987-
print(f"iteration {i}")
988-
async with new_worker(
989-
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
990-
) as worker:
991-
# Try cancel - confirm error and activity was sent the cancel
992-
handle = await client.start_workflow(
993-
CancelActivityWorkflow.run,
994-
CancelActivityWorkflowParams(
995-
cancellation_type=workflow.ActivityCancellationType.TRY_CANCEL.name,
996-
local=local,
997-
),
998-
id=f"workflow-{uuid.uuid4()}",
999-
task_queue=worker.task_queue,
1000-
task_timeout=task_timeout,
1001-
)
1002-
1003-
async def activity_result() -> str:
1004-
return await handle.query(CancelActivityWorkflow.activity_result)
1005-
1006-
await assert_eq_eventually(
1007-
"Error: CancelledError", activity_result, timeout=assert_timeout
1008-
)
1009-
await activity_inst.wait_cancel_complete.wait()
1010-
await handle.cancel()
1011-
1012-
# Wait cancel - confirm no error due to graceful cancel handling
1013-
handle = await client.start_workflow(
1014-
CancelActivityWorkflow.run,
1015-
CancelActivityWorkflowParams(
1016-
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED.name,
1017-
local=local,
1018-
),
1019-
id=f"workflow-{uuid.uuid4()}",
1020-
task_queue=worker.task_queue,
1021-
task_timeout=task_timeout,
1022-
)
1023-
await assert_eq_eventually(
1024-
"Got cancelled error, cancelled? True",
1025-
activity_result,
1026-
timeout=assert_timeout,
1027-
)
1028-
await activity_inst.wait_cancel_complete.wait()
1029-
await handle.cancel()
1030-
1031-
# Abandon - confirm error and that activity stays running
1032-
handle = await client.start_workflow(
1033-
CancelActivityWorkflow.run,
1034-
CancelActivityWorkflowParams(
1035-
cancellation_type=workflow.ActivityCancellationType.ABANDON.name,
1036-
local=local,
1037-
),
1038-
id=f"workflow-{uuid.uuid4()}",
1039-
task_queue=worker.task_queue,
1040-
task_timeout=task_timeout,
1041-
)
1042-
await assert_eq_eventually(
1043-
"Error: CancelledError", activity_result, timeout=assert_timeout
1044-
)
1045-
await asyncio.sleep(0.5)
1046-
assert not activity_inst.wait_cancel_complete.is_set()
1047-
await handle.cancel()
1048-
await activity_inst.wait_cancel_complete.wait()
982+
async with new_worker(
983+
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
984+
) as worker:
985+
# Try cancel - confirm error and activity was sent the cancel
986+
handle = await client.start_workflow(
987+
CancelActivityWorkflow.run,
988+
CancelActivityWorkflowParams(
989+
cancellation_type=workflow.ActivityCancellationType.TRY_CANCEL.name,
990+
local=local,
991+
),
992+
id=f"workflow-{uuid.uuid4()}",
993+
task_queue=worker.task_queue,
994+
task_timeout=task_timeout,
995+
)
1049996

997+
async def activity_result() -> str:
998+
return await handle.query(CancelActivityWorkflow.activity_result)
1050999

1051-
@pytest.mark.skipif(
1052-
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO") != "1",
1053-
reason="Diagnostic repro for local-activity cancellation WFT-timeout race",
1054-
)
1055-
async def test_workflow_cancel_local_activity_wft_timeout_repro(client: Client):
1056-
# This mirrors only the local-activity half of test_workflow_cancel_activity
1057-
# while bounding query/cancel RPCs so failures do not hide the useful logs.
1058-
import temporalio.worker._workflow as workflow_worker
1000+
await assert_eq_eventually(
1001+
"Error: CancelledError", activity_result, timeout=assert_timeout
1002+
)
1003+
await activity_inst.wait_cancel_complete.wait()
1004+
await handle.cancel()
10591005

1060-
workflow_worker.LOG_PROTOS = (
1061-
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_LOG_PROTOS") == "1"
1062-
)
1063-
iterations = int(
1064-
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_ITERS", "20")
1065-
)
1066-
activity_inst = ActivityWaitCancelNotify(
1067-
cancel_delay=timedelta(
1068-
seconds=float(
1069-
os.environ.get("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_DELAY", "0")
1070-
)
1006+
# Wait cancel - confirm no error due to graceful cancel handling
1007+
handle = await client.start_workflow(
1008+
CancelActivityWorkflow.run,
1009+
CancelActivityWorkflowParams(
1010+
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED.name,
1011+
local=local,
1012+
),
1013+
id=f"workflow-{uuid.uuid4()}",
1014+
task_queue=worker.task_queue,
1015+
task_timeout=task_timeout,
10711016
)
1072-
)
1073-
task_timeout = timedelta(
1074-
milliseconds=int(
1075-
os.environ.get(
1076-
"TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_TASK_TIMEOUT_MS", "1000"
1077-
)
1017+
await assert_eq_eventually(
1018+
"Got cancelled error, cancelled? True",
1019+
activity_result,
1020+
timeout=assert_timeout,
10781021
)
1079-
)
1080-
1081-
try:
1082-
for i in range(iterations):
1083-
print(f"local activity WFT-timeout repro iteration {i}")
1084-
async with new_worker(
1085-
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
1086-
) as worker:
1087-
1088-
async def run_case(
1089-
cancellation_type: workflow.ActivityCancellationType,
1090-
expected_result: str,
1091-
*,
1092-
expect_activity_complete: bool,
1093-
) -> None:
1094-
handle = await client.start_workflow(
1095-
CancelActivityWorkflow.run,
1096-
CancelActivityWorkflowParams(
1097-
cancellation_type=cancellation_type.name,
1098-
local=True,
1099-
),
1100-
id=f"workflow-{uuid.uuid4()}",
1101-
task_queue=worker.task_queue,
1102-
task_timeout=task_timeout,
1103-
)
1104-
1105-
async def activity_result() -> str:
1106-
return await handle.query(
1107-
CancelActivityWorkflow.activity_result,
1108-
rpc_timeout=timedelta(seconds=2),
1109-
)
1110-
1111-
await assert_eq_eventually(
1112-
expected_result,
1113-
activity_result,
1114-
timeout=timedelta(seconds=10),
1115-
interval=timedelta(milliseconds=100),
1116-
)
1117-
if expect_activity_complete:
1118-
await asyncio.wait_for(
1119-
activity_inst.wait_cancel_complete.wait(), timeout=5
1120-
)
1121-
await handle.cancel(rpc_timeout=timedelta(seconds=2))
1022+
await activity_inst.wait_cancel_complete.wait()
1023+
await handle.cancel()
11221024

1123-
await run_case(
1124-
workflow.ActivityCancellationType.TRY_CANCEL,
1125-
"Error: CancelledError",
1126-
expect_activity_complete=True,
1127-
)
1128-
await run_case(
1129-
workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
1130-
"Got cancelled error, cancelled? True",
1131-
expect_activity_complete=True,
1132-
)
1133-
await run_case(
1134-
workflow.ActivityCancellationType.ABANDON,
1135-
"Error: CancelledError",
1136-
expect_activity_complete=False,
1137-
)
1138-
await asyncio.wait_for(activity_inst.wait_cancel_complete.wait(), 5)
1139-
finally:
1140-
workflow_worker.LOG_PROTOS = False
1025+
# Abandon - confirm error and that activity stays running
1026+
handle = await client.start_workflow(
1027+
CancelActivityWorkflow.run,
1028+
CancelActivityWorkflowParams(
1029+
cancellation_type=workflow.ActivityCancellationType.ABANDON.name,
1030+
local=local,
1031+
),
1032+
id=f"workflow-{uuid.uuid4()}",
1033+
task_queue=worker.task_queue,
1034+
task_timeout=task_timeout,
1035+
)
1036+
await assert_eq_eventually(
1037+
"Error: CancelledError", activity_result, timeout=assert_timeout
1038+
)
1039+
await asyncio.sleep(0.5)
1040+
assert not activity_inst.wait_cancel_complete.is_set()
1041+
await handle.cancel()
1042+
await activity_inst.wait_cancel_complete.wait()
11411043

11421044

11431045
@workflow.defn

0 commit comments

Comments
 (0)