@@ -907,7 +907,8 @@ async def wait_cancel() -> str:
907907
908908
909909class ActivityWaitCancelNotify :
910- def __init__ (self ) -> None :
910+ def __init__ (self , * , cancel_delay : timedelta = timedelta ()) -> None :
911+ self .cancel_delay = cancel_delay
911912 self .wait_cancel_complete = asyncio .Event ()
912913
913914 @activity .defn
@@ -922,6 +923,8 @@ async def wait_cancel(self) -> str:
922923 activity .heartbeat ()
923924 return "Manually stopped"
924925 except asyncio .CancelledError :
926+ if self .cancel_delay :
927+ await asyncio .sleep (self .cancel_delay .total_seconds ())
925928 return "Got cancelled error, cancelled? " + str (activity .is_cancelled ())
926929 finally :
927930 self .wait_cancel_complete .set ()
@@ -963,9 +966,6 @@ async def run(self, params: CancelActivityWorkflowParams) -> None:
963966 self ._activity_result = await handle
964967 except ActivityError as err :
965968 self ._activity_result = f"Error: { err .cause .__class__ .__name__ } "
966- # TODO(cretz): Remove when https://github.com/temporalio/sdk-rust/issues/323 is fixed
967- except CancelledError as err :
968- self ._activity_result = f"Error: { err .__class__ .__name__ } "
969969 # Wait forever
970970 await asyncio .Future ()
971971
@@ -975,74 +975,169 @@ def activity_result(self) -> str:
975975
976976
977977@pytest .mark .parametrize ("local" , [True , False ])
978+ @pytest .mark .timeout (90000 )
978979async def test_workflow_cancel_activity (client : Client , local : bool ):
979980 # Need short task timeout to timeout LA task and longer assert timeout
980981 # so the task can timeout
981982 task_timeout = timedelta (seconds = 1 )
982983 assert_timeout = timedelta (seconds = 10 )
983984 activity_inst = ActivityWaitCancelNotify ()
984985
985- async with new_worker (
986- client , CancelActivityWorkflow , activities = [activity_inst .wait_cancel ]
987- ) as worker :
988- # Try cancel - confirm error and activity was sent the cancel
989- handle = await client .start_workflow (
990- CancelActivityWorkflow .run ,
991- CancelActivityWorkflowParams (
992- cancellation_type = workflow .ActivityCancellationType .TRY_CANCEL .name ,
993- local = local ,
994- ),
995- id = f"workflow-{ uuid .uuid4 ()} " ,
996- task_queue = worker .task_queue ,
997- task_timeout = task_timeout ,
998- )
986+ for i in range (100 ):
987+ print (f"iteration { i } " )
988+ async with new_worker (
989+ client , CancelActivityWorkflow , activities = [activity_inst .wait_cancel ]
990+ ) as worker :
991+ # Try cancel - confirm error and activity was sent the cancel
992+ handle = await client .start_workflow (
993+ CancelActivityWorkflow .run ,
994+ CancelActivityWorkflowParams (
995+ cancellation_type = workflow .ActivityCancellationType .TRY_CANCEL .name ,
996+ local = local ,
997+ ),
998+ id = f"workflow-{ uuid .uuid4 ()} " ,
999+ task_queue = worker .task_queue ,
1000+ task_timeout = task_timeout ,
1001+ )
9991002
1000- async def activity_result () -> str :
1001- return await handle .query (CancelActivityWorkflow .activity_result )
1003+ async def activity_result () -> str :
1004+ return await handle .query (CancelActivityWorkflow .activity_result )
10021005
1003- await assert_eq_eventually (
1004- "Error: CancelledError" , activity_result , timeout = assert_timeout
1005- )
1006- await activity_inst .wait_cancel_complete .wait ()
1007- await handle .cancel ()
1006+ await assert_eq_eventually (
1007+ "Error: CancelledError" , activity_result , timeout = assert_timeout
1008+ )
1009+ await activity_inst .wait_cancel_complete .wait ()
1010+ await handle .cancel ()
10081011
1009- # Wait cancel - confirm no error due to graceful cancel handling
1010- handle = await client .start_workflow (
1011- CancelActivityWorkflow .run ,
1012- CancelActivityWorkflowParams (
1013- cancellation_type = workflow .ActivityCancellationType .WAIT_CANCELLATION_COMPLETED .name ,
1014- local = local ,
1015- ),
1016- id = f"workflow-{ uuid .uuid4 ()} " ,
1017- task_queue = worker .task_queue ,
1018- task_timeout = task_timeout ,
1019- )
1020- await assert_eq_eventually (
1021- "Got cancelled error, cancelled? True" ,
1022- activity_result ,
1023- timeout = assert_timeout ,
1024- )
1025- await activity_inst .wait_cancel_complete .wait ()
1026- await handle .cancel ()
1012+ # Wait cancel - confirm no error due to graceful cancel handling
1013+ handle = await client .start_workflow (
1014+ CancelActivityWorkflow .run ,
1015+ CancelActivityWorkflowParams (
1016+ cancellation_type = workflow .ActivityCancellationType .WAIT_CANCELLATION_COMPLETED .name ,
1017+ local = local ,
1018+ ),
1019+ id = f"workflow-{ uuid .uuid4 ()} " ,
1020+ task_queue = worker .task_queue ,
1021+ task_timeout = task_timeout ,
1022+ )
1023+ await assert_eq_eventually (
1024+ "Got cancelled error, cancelled? True" ,
1025+ activity_result ,
1026+ timeout = assert_timeout ,
1027+ )
1028+ await activity_inst .wait_cancel_complete .wait ()
1029+ await handle .cancel ()
10271030
1028- # Abandon - confirm error and that activity stays running
1029- handle = await client .start_workflow (
1030- CancelActivityWorkflow .run ,
1031- CancelActivityWorkflowParams (
1032- cancellation_type = workflow .ActivityCancellationType .ABANDON .name ,
1033- local = local ,
1034- ),
1035- id = f"workflow-{ uuid .uuid4 ()} " ,
1036- task_queue = worker .task_queue ,
1037- task_timeout = task_timeout ,
1031+ # Abandon - confirm error and that activity stays running
1032+ handle = await client .start_workflow (
1033+ CancelActivityWorkflow .run ,
1034+ CancelActivityWorkflowParams (
1035+ cancellation_type = workflow .ActivityCancellationType .ABANDON .name ,
1036+ local = local ,
1037+ ),
1038+ id = f"workflow-{ uuid .uuid4 ()} " ,
1039+ task_queue = worker .task_queue ,
1040+ task_timeout = task_timeout ,
1041+ )
1042+ await assert_eq_eventually (
1043+ "Error: CancelledError" , activity_result , timeout = assert_timeout
1044+ )
1045+ await asyncio .sleep (0.5 )
1046+ assert not activity_inst .wait_cancel_complete .is_set ()
1047+ await handle .cancel ()
1048+ await activity_inst .wait_cancel_complete .wait ()
1049+
1050+
1051+ @pytest .mark .skipif (
1052+ os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO" ) != "1" ,
1053+ reason = "Diagnostic repro for local-activity cancellation WFT-timeout race" ,
1054+ )
1055+ async def test_workflow_cancel_local_activity_wft_timeout_repro (client : Client ):
1056+ # This mirrors only the local-activity half of test_workflow_cancel_activity
1057+ # while bounding query/cancel RPCs so failures do not hide the useful logs.
1058+ import temporalio .worker ._workflow as workflow_worker
1059+
1060+ workflow_worker .LOG_PROTOS = (
1061+ os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_LOG_PROTOS" ) == "1"
1062+ )
1063+ iterations = int (
1064+ os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_ITERS" , "20" )
1065+ )
1066+ activity_inst = ActivityWaitCancelNotify (
1067+ cancel_delay = timedelta (
1068+ seconds = float (
1069+ os .environ .get ("TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_DELAY" , "0" )
1070+ )
10381071 )
1039- await assert_eq_eventually (
1040- "Error: CancelledError" , activity_result , timeout = assert_timeout
1072+ )
1073+ task_timeout = timedelta (
1074+ milliseconds = int (
1075+ os .environ .get (
1076+ "TEMPORAL_RUN_LOCAL_ACTIVITY_CANCEL_REPRO_TASK_TIMEOUT_MS" , "1000"
1077+ )
10411078 )
1042- await asyncio .sleep (0.5 )
1043- assert not activity_inst .wait_cancel_complete .is_set ()
1044- await handle .cancel ()
1045- await activity_inst .wait_cancel_complete .wait ()
1079+ )
1080+
1081+ try :
1082+ for i in range (iterations ):
1083+ print (f"local activity WFT-timeout repro iteration { i } " )
1084+ async with new_worker (
1085+ client , CancelActivityWorkflow , activities = [activity_inst .wait_cancel ]
1086+ ) as worker :
1087+
1088+ async def run_case (
1089+ cancellation_type : workflow .ActivityCancellationType ,
1090+ expected_result : str ,
1091+ * ,
1092+ expect_activity_complete : bool ,
1093+ ) -> None :
1094+ handle = await client .start_workflow (
1095+ CancelActivityWorkflow .run ,
1096+ CancelActivityWorkflowParams (
1097+ cancellation_type = cancellation_type .name ,
1098+ local = True ,
1099+ ),
1100+ id = f"workflow-{ uuid .uuid4 ()} " ,
1101+ task_queue = worker .task_queue ,
1102+ task_timeout = task_timeout ,
1103+ )
1104+
1105+ async def activity_result () -> str :
1106+ return await handle .query (
1107+ CancelActivityWorkflow .activity_result ,
1108+ rpc_timeout = timedelta (seconds = 2 ),
1109+ )
1110+
1111+ await assert_eq_eventually (
1112+ expected_result ,
1113+ activity_result ,
1114+ timeout = timedelta (seconds = 10 ),
1115+ interval = timedelta (milliseconds = 100 ),
1116+ )
1117+ if expect_activity_complete :
1118+ await asyncio .wait_for (
1119+ activity_inst .wait_cancel_complete .wait (), timeout = 5
1120+ )
1121+ await handle .cancel (rpc_timeout = timedelta (seconds = 2 ))
1122+
1123+ await run_case (
1124+ workflow .ActivityCancellationType .TRY_CANCEL ,
1125+ "Error: CancelledError" ,
1126+ expect_activity_complete = True ,
1127+ )
1128+ await run_case (
1129+ workflow .ActivityCancellationType .WAIT_CANCELLATION_COMPLETED ,
1130+ "Got cancelled error, cancelled? True" ,
1131+ expect_activity_complete = True ,
1132+ )
1133+ await run_case (
1134+ workflow .ActivityCancellationType .ABANDON ,
1135+ "Error: CancelledError" ,
1136+ expect_activity_complete = False ,
1137+ )
1138+ await asyncio .wait_for (activity_inst .wait_cancel_complete .wait (), 5 )
1139+ finally :
1140+ workflow_worker .LOG_PROTOS = False
10461141
10471142
10481143@workflow .defn
@@ -8852,9 +8947,9 @@ async def test_workflow_uncancel_shield_activity(client: Client):
88528947
88538948 # Verify no spurious "exception in shielded future" error logs
88548949 shielded_err = log_capturer .find_log ("exception in shielded future" )
8855- assert shielded_err is None , (
8856- f"Unexpected 'exception in shielded future' log: { shielded_err } "
8857- )
8950+ assert (
8951+ shielded_err is None
8952+ ), f"Unexpected 'exception in shielded future' log: { shielded_err } "
88588953
88598954
88608955@workflow .defn
@@ -8938,9 +9033,9 @@ async def test_workflow_uncancel_shield_child_workflow(client: Client):
89389033
89399034 # Verify no spurious "exception in shielded future" error logs
89409035 shielded_err = log_capturer .find_log ("exception in shielded future" )
8941- assert shielded_err is None , (
8942- f"Unexpected 'exception in shielded future' log: { shielded_err } "
8943- )
9036+ assert (
9037+ shielded_err is None
9038+ ), f"Unexpected 'exception in shielded future' log: { shielded_err } "
89449039
89459040
89469041@workflow .defn
@@ -9006,6 +9101,6 @@ async def test_workflow_uncancel_shield_signal_external(client: Client):
90069101
90079102 # Verify no spurious error logs
90089103 shielded_err = log_capturer .find_log ("exception in shielded future" )
9009- assert shielded_err is None , (
9010- f"Unexpected 'exception in shielded future' log: { shielded_err } "
9011- )
9104+ assert (
9105+ shielded_err is None
9106+ ), f"Unexpected 'exception in shielded future' log: { shielded_err } "
0 commit comments