@@ -907,8 +907,7 @@ async def wait_cancel() -> str:
907907
908908
909909class ActivityWaitCancelNotify :
910- def __init__ (self , * , cancel_delay : timedelta = timedelta ()) -> None :
911- self .cancel_delay = cancel_delay
910+ def __init__ (self ) -> None :
912911 self .wait_cancel_complete = asyncio .Event ()
913912
914913 @activity .defn
@@ -923,8 +922,6 @@ async def wait_cancel(self) -> str:
923922 activity .heartbeat ()
924923 return "Manually stopped"
925924 except asyncio .CancelledError :
926- if self .cancel_delay :
927- await asyncio .sleep (self .cancel_delay .total_seconds ())
928925 return "Got cancelled error, cancelled? " + str (activity .is_cancelled ())
929926 finally :
930927 self .wait_cancel_complete .set ()
@@ -975,169 +972,74 @@ def activity_result(self) -> str:
975972
976973
977974@pytest .mark .parametrize ("local" , [True , False ])
978- @pytest .mark .timeout (90000 )
979975async def test_workflow_cancel_activity (client : Client , local : bool ):
980976 # Need short task timeout to timeout LA task and longer assert timeout
981977 # so the task can timeout
982978 task_timeout = timedelta (seconds = 1 )
983979 assert_timeout = timedelta (seconds = 10 )
984980 activity_inst = ActivityWaitCancelNotify ()
985981
986- for i in range (100 ):
987- print (f"iteration { i } " )
988- async with new_worker (
989- client , CancelActivityWorkflow , activities = [activity_inst .wait_cancel ]
990- ) as worker :
991- # Try cancel - confirm error and activity was sent the cancel
992- handle = await client .start_workflow (
993- CancelActivityWorkflow .run ,
994- CancelActivityWorkflowParams (
995- cancellation_type = workflow .ActivityCancellationType .TRY_CANCEL .name ,
996- local = local ,
997- ),
998- id = f"workflow-{ uuid .uuid4 ()} " ,
999- task_queue = worker .task_queue ,
1000- task_timeout = task_timeout ,
1001- )
1002-
1003- async def activity_result () -> str :
1004- return await handle .query (CancelActivityWorkflow .activity_result )
1005-
1006- await assert_eq_eventually (
1007- "Error: CancelledError" , activity_result , timeout = assert_timeout
1008- )
1009- await activity_inst .wait_cancel_complete .wait ()
1010- await handle .cancel ()
1011-
1012- # Wait cancel - confirm no error due to graceful cancel handling
1013- handle = await client .start_workflow (
1014- CancelActivityWorkflow .run ,
1015- CancelActivityWorkflowParams (
1016- cancellation_type = workflow .ActivityCancellationType .WAIT_CANCELLATION_COMPLETED .name ,
1017- local = local ,
1018- ),
1019- id = f"workflow-{ uuid .uuid4 ()} " ,
1020- task_queue = worker .task_queue ,
1021- task_timeout = task_timeout ,
1022- )
1023- await assert_eq_eventually (
1024- "Got cancelled error, cancelled? True" ,
1025- activity_result ,
1026- timeout = assert_timeout ,
1027- )
1028- await activity_inst .wait_cancel_complete .wait ()
1029- await handle .cancel ()
1030-
1031- # Abandon - confirm error and that activity stays running
1032- handle = await client .start_workflow (
1033- CancelActivityWorkflow .run ,
1034- CancelActivityWorkflowParams (
1035- cancellation_type = workflow .ActivityCancellationType .ABANDON .name ,
1036- local = local ,
1037- ),
1038- id = f"workflow-{ uuid .uuid4 ()} " ,
1039- task_queue = worker .task_queue ,
1040- task_timeout = task_timeout ,
1041- )
1042- await assert_eq_eventually (
1043- "Error: CancelledError" , activity_result , timeout = assert_timeout
1044- )
1045- await asyncio .sleep (0.5 )
1046- assert not activity_inst .wait_cancel_complete .is_set ()
1047- await handle .cancel ()
1048- await activity_inst .wait_cancel_complete .wait ()
982+ async with new_worker (
983+ client , CancelActivityWorkflow , activities = [activity_inst .wait_cancel ]
984+ ) as worker :
985+ # Try cancel - confirm error and activity was sent the cancel
986+ handle = await client .start_workflow (
987+ CancelActivityWorkflow .run ,
988+ CancelActivityWorkflowParams (
989+ cancellation_type = workflow .ActivityCancellationType .TRY_CANCEL .name ,
990+ local = local ,
991+ ),
992+ id = f"workflow-{ uuid .uuid4 ()} " ,
993+ task_queue = worker .task_queue ,
994+ task_timeout = task_timeout ,
995+ )
1049996
997+ async def activity_result () -> str :
998+ return await handle .query (CancelActivityWorkflow .activity_result )
1050999
1051- @pytest .mark .skipif (
1052- os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO" ) != "1" ,
1053- reason = "Diagnostic repro for local-activity cancellation WFT-timeout race" ,
1054- )
1055- async def test_workflow_cancel_local_activity_wft_timeout_repro (client : Client ):
1056- # This mirrors only the local-activity half of test_workflow_cancel_activity
1057- # while bounding query/cancel RPCs so failures do not hide the useful logs.
1058- import temporalio .worker ._workflow as workflow_worker
1000+ await assert_eq_eventually (
1001+ "Error: CancelledError" , activity_result , timeout = assert_timeout
1002+ )
1003+ await activity_inst .wait_cancel_complete .wait ()
1004+ await handle .cancel ()
10591005
1060- workflow_worker .LOG_PROTOS = (
1061- os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_LOG_PROTOS" ) == "1"
1062- )
1063- iterations = int (
1064- os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_ITERS" , "20" )
1065- )
1066- activity_inst = ActivityWaitCancelNotify (
1067- cancel_delay = timedelta (
1068- seconds = float (
1069- os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_DELAY" , "0" )
1070- )
1006+ # Wait cancel - confirm no error due to graceful cancel handling
1007+ handle = await client .start_workflow (
1008+ CancelActivityWorkflow .run ,
1009+ CancelActivityWorkflowParams (
1010+ cancellation_type = workflow .ActivityCancellationType .WAIT_CANCELLATION_COMPLETED .name ,
1011+ local = local ,
1012+ ),
1013+ id = f"workflow-{ uuid .uuid4 ()} " ,
1014+ task_queue = worker .task_queue ,
1015+ task_timeout = task_timeout ,
10711016 )
1072- )
1073- task_timeout = timedelta (
1074- milliseconds = int (
1075- os .environ .get (
1076- "TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_TASK_TIMEOUT_MS" , "1000"
1077- )
1017+ await assert_eq_eventually (
1018+ "Got cancelled error, cancelled? True" ,
1019+ activity_result ,
1020+ timeout = assert_timeout ,
10781021 )
1079- )
1080-
1081- try :
1082- for i in range (iterations ):
1083- print (f"local activity WFT-timeout repro iteration { i } " )
1084- async with new_worker (
1085- client , CancelActivityWorkflow , activities = [activity_inst .wait_cancel ]
1086- ) as worker :
1087-
1088- async def run_case (
1089- cancellation_type : workflow .ActivityCancellationType ,
1090- expected_result : str ,
1091- * ,
1092- expect_activity_complete : bool ,
1093- ) -> None :
1094- handle = await client .start_workflow (
1095- CancelActivityWorkflow .run ,
1096- CancelActivityWorkflowParams (
1097- cancellation_type = cancellation_type .name ,
1098- local = True ,
1099- ),
1100- id = f"workflow-{ uuid .uuid4 ()} " ,
1101- task_queue = worker .task_queue ,
1102- task_timeout = task_timeout ,
1103- )
1104-
1105- async def activity_result () -> str :
1106- return await handle .query (
1107- CancelActivityWorkflow .activity_result ,
1108- rpc_timeout = timedelta (seconds = 2 ),
1109- )
1110-
1111- await assert_eq_eventually (
1112- expected_result ,
1113- activity_result ,
1114- timeout = timedelta (seconds = 10 ),
1115- interval = timedelta (milliseconds = 100 ),
1116- )
1117- if expect_activity_complete :
1118- await asyncio .wait_for (
1119- activity_inst .wait_cancel_complete .wait (), timeout = 5
1120- )
1121- await handle .cancel (rpc_timeout = timedelta (seconds = 2 ))
1022+ await activity_inst .wait_cancel_complete .wait ()
1023+ await handle .cancel ()
11221024
1123- await run_case (
1124- workflow . ActivityCancellationType . TRY_CANCEL ,
1125- "Error: CancelledError" ,
1126- expect_activity_complete = True ,
1127- )
1128- await run_case (
1129- workflow . ActivityCancellationType . WAIT_CANCELLATION_COMPLETED ,
1130- "Got cancelled error, cancelled? True " ,
1131- expect_activity_complete = True ,
1132- )
1133- await run_case (
1134- workflow . ActivityCancellationType . ABANDON ,
1135- "Error: CancelledError" ,
1136- expect_activity_complete = False ,
1137- )
1138- await asyncio . wait_for ( activity_inst .wait_cancel_complete .wait (), 5 )
1139- finally :
1140- workflow_worker . LOG_PROTOS = False
1025+ # Abandon - confirm error and that activity stays running
1026+ handle = await client . start_workflow (
1027+ CancelActivityWorkflow . run ,
1028+ CancelActivityWorkflowParams (
1029+ cancellation_type = workflow . ActivityCancellationType . ABANDON . name ,
1030+ local = local ,
1031+ ) ,
1032+ id = f"workflow- { uuid . uuid4 () } " ,
1033+ task_queue = worker . task_queue ,
1034+ task_timeout = task_timeout ,
1035+ )
1036+ await assert_eq_eventually (
1037+ "Error: CancelledError" , activity_result , timeout = assert_timeout
1038+ )
1039+ await asyncio . sleep ( 0.5 )
1040+ assert not activity_inst .wait_cancel_complete .is_set ( )
1041+ await handle . cancel ()
1042+ await activity_inst . wait_cancel_complete . wait ()
11411043
11421044
11431045@workflow .defn
0 commit comments