-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathtest_class_based_entities_e2e.py
More file actions
138 lines (105 loc) · 4.72 KB
/
Copy pathtest_class_based_entities_e2e.py
File metadata and controls
138 lines (105 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
E2E tests for class-based durable entities using the in-memory backend.
"""
import time
import pytest
from durabletask import client, entities, task, worker
from durabletask.testing import create_test_backend
HOST = "localhost:50059"
@pytest.fixture(autouse=True)
def backend():
"""Create an in-memory backend for entity testing."""
b = create_test_backend(port=50059)
yield b
b.stop()
b.reset()
def test_client_signal_class_entity_and_custom_name():
"""Test signaling a class-based entity with a custom registration name from the client."""
invoked = False
class EmptyEntity(entities.DurableEntity):
def do_nothing(self, _):
nonlocal invoked # don't do this in a real app!
invoked = True
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_entity(EmptyEntity, name="EntityNameCustom")
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity")
c.signal_entity(entity_id, "do_nothing")
time.sleep(2) # wait for the signal to be processed
assert invoked
def test_client_get_class_entity():
"""Test signaling a class-based entity and reading its state via the client."""
invoked = False
class EmptyEntity(entities.DurableEntity):
def do_nothing(self, _):
self.set_state(1)
nonlocal invoked # don't do this in a real app!
invoked = True
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_entity(EmptyEntity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity")
c.signal_entity(entity_id, "do_nothing")
time.sleep(2) # wait for the signal to be processed
state = c.get_entity(entity_id, include_state=True)
assert state is not None
assert state.id == entity_id
assert state.get_state(int) == 1
assert invoked
def test_orchestration_signal_class_entity_and_custom_name():
"""Test signaling a class-based entity with a custom name from an orchestration."""
invoked = False
class EmptyEntity(entities.DurableEntity):
def do_nothing(self, _):
nonlocal invoked # don't do this in a real app!
invoked = True
def empty_orchestrator(ctx: task.OrchestrationContext, _):
entity_id = entities.EntityInstanceId("EntityNameCustom", "testEntity")
ctx.signal_entity(entity_id, "do_nothing")
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(empty_orchestrator)
w.add_entity(EmptyEntity, name="EntityNameCustom")
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
time.sleep(2) # wait for the signal to be processed
assert invoked
assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status is None
def test_orchestration_call_class_entity():
"""Test calling a class-based entity from an orchestration and awaiting the result."""
invoked = False
class EmptyEntity(entities.DurableEntity):
def do_nothing(self, _):
nonlocal invoked # don't do this in a real app!
invoked = True
def empty_orchestrator(ctx: task.OrchestrationContext, _):
entity_id = entities.EntityInstanceId("EmptyEntity", "testEntity")
yield ctx.call_entity(entity_id, "do_nothing")
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(empty_orchestrator)
w.add_entity(EmptyEntity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
assert invoked
assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status is None