forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_worker_concurrency_loop_async.py
More file actions
97 lines (76 loc) · 3.79 KB
/
Copy pathtest_worker_concurrency_loop_async.py
File metadata and controls
97 lines (76 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import asyncio
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
class DummyStub:
def __init__(self):
self.completed = []
def CompleteOrchestratorTask(self, res):
self.completed.append(('orchestrator', res))
def CompleteActivityTask(self, res):
self.completed.append(('activity', res))
class DummyRequest:
def __init__(self, kind, instance_id):
self.kind = kind
self.instanceId = instance_id
self.orchestrationInstance = type('O', (), {'instanceId': instance_id})
self.name = 'dummy'
self.taskId = 1
self.input = type('I', (), {'value': ''})
self.pastEvents = []
self.newEvents = []
def HasField(self, field):
return (field == 'orchestratorRequest' and self.kind == 'orchestrator') or \
(field == 'activityRequest' and self.kind == 'activity')
def WhichOneof(self, _):
return f'{self.kind}Request'
class DummyCompletionToken:
pass
def test_worker_concurrency_loop_async():
options = ConcurrencyOptions(
maximum_concurrent_activity_work_items=2,
maximum_concurrent_orchestration_work_items=1,
maximum_thread_pool_workers=2,
)
grpc_worker = TaskHubGrpcWorker(concurrency_options=options)
stub = DummyStub()
async def dummy_orchestrator(req, stub, completionToken):
await asyncio.sleep(0.1)
stub.CompleteOrchestratorTask('ok')
async def cancel_dummy_orchestrator(req, stub, completionToken):
pass
async def dummy_activity(req, stub, completionToken):
await asyncio.sleep(0.1)
stub.CompleteActivityTask('ok')
async def cancel_dummy_activity(req, stub, completionToken):
pass
# Patch the worker's _execute_orchestrator and _execute_activity
grpc_worker._execute_orchestrator = dummy_orchestrator.__get__(grpc_worker, TaskHubGrpcWorker)
grpc_worker._cancel_orchestrator = cancel_dummy_orchestrator.__get__(grpc_worker, TaskHubGrpcWorker)
grpc_worker._execute_activity = dummy_activity.__get__(grpc_worker, TaskHubGrpcWorker)
grpc_worker._cancel_activity = cancel_dummy_activity.__get__(grpc_worker, TaskHubGrpcWorker)
orchestrator_requests = [DummyRequest('orchestrator', f'orch{i}') for i in range(3)]
activity_requests = [DummyRequest('activity', f'act{i}') for i in range(4)]
async def run_test():
# Clear stub state before each run
stub.completed.clear()
grpc_worker._async_worker_manager.prepare_for_run()
worker_task = asyncio.create_task(grpc_worker._async_worker_manager.run())
# Need to yield to that thread in order to let it start up on the second run
startup_attempts = 0
while grpc_worker._async_worker_manager._shutdown and startup_attempts < 10:
await asyncio.sleep(0.1)
startup_attempts += 1
for req in orchestrator_requests:
grpc_worker._async_worker_manager.submit_orchestration(dummy_orchestrator, cancel_dummy_orchestrator, req, stub, DummyCompletionToken())
for req in activity_requests:
grpc_worker._async_worker_manager.submit_activity(dummy_activity, cancel_dummy_activity, req, stub, DummyCompletionToken())
await asyncio.sleep(1.0)
orchestrator_count = sum(1 for t, _ in stub.completed if t == 'orchestrator')
activity_count = sum(1 for t, _ in stub.completed if t == 'activity')
assert orchestrator_count == 3, f"Expected 3 orchestrator completions, got {orchestrator_count}"
assert activity_count == 4, f"Expected 4 activity completions, got {activity_count}"
grpc_worker._async_worker_manager._shutdown = True
await worker_task
asyncio.run(run_test())
asyncio.run(run_test())