|
114 | 114 | from temporalio.worker import ( |
115 | 115 | ExecuteWorkflowInput, |
116 | 116 | HandleSignalInput, |
| 117 | + Replayer, |
117 | 118 | UnsandboxedWorkflowRunner, |
118 | 119 | Worker, |
119 | 120 | WorkflowInstance, |
@@ -892,6 +893,98 @@ async def test_workflow_simple_local_activity(client: Client): |
892 | 893 | assert result == "Hello, Temporal!" |
893 | 894 |
|
894 | 895 |
|
| 896 | +@activity.defn |
| 897 | +async def local_activity_slow(index: int) -> None: |
| 898 | + if index % 2 == 0: |
| 899 | + await asyncio.sleep(0.05) |
| 900 | + |
| 901 | + |
| 902 | +@activity.defn |
| 903 | +async def local_activity_fast(index: int) -> None: |
| 904 | + return None |
| 905 | + |
| 906 | + |
| 907 | +@activity.defn |
| 908 | +async def local_activity_gate() -> None: |
| 909 | + await asyncio.sleep(0.05) |
| 910 | + |
| 911 | + |
| 912 | +@workflow.defn |
| 913 | +class ConcurrentLocalActivityReplayWorkflow: |
| 914 | + """Workflow that runs two concurrent coroutines with local activities. |
| 915 | +
|
| 916 | + This reproduces a replay nondeterminism bug: during first execution, |
| 917 | + local activities take real time, creating a deterministic interleaving. |
| 918 | + During replay, all local activities return instantly from markers, which |
| 919 | + can reorder coroutine scheduling and produce a different command sequence. |
| 920 | + """ |
| 921 | + |
| 922 | + @workflow.run |
| 923 | + async def run(self) -> list[int]: |
| 924 | + async def lifecycle_a(index: int) -> int: |
| 925 | + await workflow.execute_local_activity( |
| 926 | + local_activity_slow, |
| 927 | + args=[index * 2], |
| 928 | + start_to_close_timeout=timedelta(seconds=5), |
| 929 | + ) |
| 930 | + await workflow.execute_local_activity( |
| 931 | + local_activity_fast, |
| 932 | + args=[index * 2], |
| 933 | + start_to_close_timeout=timedelta(seconds=5), |
| 934 | + ) |
| 935 | + return index * 2 |
| 936 | + |
| 937 | + async def lifecycle_b(index: int) -> int: |
| 938 | + await workflow.execute_local_activity( |
| 939 | + local_activity_gate, |
| 940 | + start_to_close_timeout=timedelta(seconds=5), |
| 941 | + ) |
| 942 | + await workflow.execute_local_activity( |
| 943 | + local_activity_slow, |
| 944 | + args=[index * 2 + 1], |
| 945 | + start_to_close_timeout=timedelta(seconds=5), |
| 946 | + ) |
| 947 | + await workflow.execute_local_activity( |
| 948 | + local_activity_fast, |
| 949 | + args=[index * 2 + 1], |
| 950 | + start_to_close_timeout=timedelta(seconds=5), |
| 951 | + ) |
| 952 | + return index * 2 + 1 |
| 953 | + |
| 954 | + results: list[int] = [] |
| 955 | + for index in range(20): |
| 956 | + results.extend(await asyncio.gather(lifecycle_a(index), lifecycle_b(index))) |
| 957 | + return results |
| 958 | + |
| 959 | + |
| 960 | +async def test_workflow_concurrent_local_activity_replay(client: Client): |
| 961 | + """Test that concurrent local activities replay deterministically. |
| 962 | +
|
| 963 | + Runs a workflow with two concurrent coroutines that each issue multiple |
| 964 | + local activities, then replays the history. Without the fix, replay |
| 965 | + fails with NondeterminismError because the local activity command order |
| 966 | + diverges from the recorded marker order. |
| 967 | + """ |
| 968 | + async with new_worker( |
| 969 | + client, |
| 970 | + ConcurrentLocalActivityReplayWorkflow, |
| 971 | + activities=[local_activity_slow, local_activity_fast, local_activity_gate], |
| 972 | + ) as worker: |
| 973 | + handle = await client.start_workflow( |
| 974 | + ConcurrentLocalActivityReplayWorkflow.run, |
| 975 | + id=f"workflow-{uuid.uuid4()}", |
| 976 | + task_queue=worker.task_queue, |
| 977 | + ) |
| 978 | + expected = [v for i in range(20) for v in (i * 2, i * 2 + 1)] |
| 979 | + assert await handle.result() == expected |
| 980 | + |
| 981 | + history = await handle.fetch_history() |
| 982 | + |
| 983 | + await Replayer( |
| 984 | + workflows=[ConcurrentLocalActivityReplayWorkflow], |
| 985 | + ).replay_workflow(history) |
| 986 | + |
| 987 | + |
895 | 988 | @activity.defn |
896 | 989 | async def wait_cancel() -> str: |
897 | 990 | try: |
|
0 commit comments