forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_entity_failure_handling.py
More file actions
186 lines (145 loc) · 6.86 KB
/
Copy pathtest_entity_failure_handling.py
File metadata and controls
186 lines (145 loc) · 6.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
E2E tests for entity failure handling using the in-memory backend.
"""
import json
import pytest
from durabletask import client, entities, task, worker
from durabletask.testing import create_test_backend
from _port_utils import find_free_port
PORT = find_free_port()
HOST = f"localhost:{PORT}"
@pytest.fixture(autouse=True)
def backend():
"""Create an in-memory backend for entity testing."""
b = create_test_backend(port=PORT)
yield b
b.stop()
b.reset()
def test_class_entity_unhandled_failure_fails():
"""Test that an unhandled exception in a class entity causes the orchestration to fail."""
class FailingEntity(entities.DurableEntity):
def fail(self, _):
raise ValueError("Something went wrong!")
def test_orchestrator(ctx: task.OrchestrationContext, _):
entity_id = entities.EntityInstanceId("FailingEntity", "testEntity")
yield ctx.call_entity(entity_id, "fail")
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(test_orchestrator)
w.add_entity(FailingEntity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(test_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.name == task.get_name(test_orchestrator)
assert state.instance_id == id
assert state.failure_details is not None
assert state.failure_details.error_type == "TaskFailedError"
assert "Something went wrong!" in state.failure_details.message
assert state.runtime_status == client.OrchestrationStatus.FAILED
def test_function_entity_unhandled_failure_fails():
"""Test that an unhandled exception in a function entity causes the orchestration to fail."""
def failing_entity(ctx: entities.EntityContext, _):
raise ValueError("Something went wrong!")
def test_orchestrator(ctx: task.OrchestrationContext, _):
entity_id = entities.EntityInstanceId("failing_entity", "testEntity")
yield ctx.call_entity(entity_id, "fail")
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(test_orchestrator)
w.add_entity(failing_entity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(test_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.name == task.get_name(test_orchestrator)
assert state.instance_id == id
assert state.failure_details is not None
assert state.failure_details.error_type == "TaskFailedError"
assert "Something went wrong!" in state.failure_details.message
assert state.runtime_status == client.OrchestrationStatus.FAILED
def test_class_entity_handled_failure_succeeds():
"""Test that a handled exception in a class entity allows the orchestration to succeed."""
class FailingEntity(entities.DurableEntity):
def fail(self, _):
raise ValueError("Something went wrong!")
def test_orchestrator(ctx: task.OrchestrationContext, _):
entity_id = entities.EntityInstanceId("FailingEntity", "testEntity")
try:
yield ctx.call_entity(entity_id, "fail")
except task.TaskFailedError as e:
return e.details.message
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(test_orchestrator)
w.add_entity(FailingEntity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(test_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.name == task.get_name(test_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.serialized_output is not None
output = json.loads(state.serialized_output)
assert "Something went wrong!" in output
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
def test_function_entity_handled_failure_succeeds():
"""Test that a handled exception in a function entity allows the orchestration to succeed."""
def failing_entity(ctx: entities.EntityContext, _):
raise ValueError("Something went wrong!")
def test_orchestrator(ctx: task.OrchestrationContext, _):
entity_id = entities.EntityInstanceId("failing_entity", "testEntity")
try:
yield ctx.call_entity(entity_id, "fail")
except task.TaskFailedError as e:
return e.details.message
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(test_orchestrator)
w.add_entity(failing_entity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(test_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.name == task.get_name(test_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.serialized_output is not None
output = json.loads(state.serialized_output)
assert "Something went wrong!" in output
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
def test_entity_failure_unlocks_entity():
"""Test that an entity failure properly unlocks the entity for subsequent operations."""
def failing_entity(ctx: entities.EntityContext, _):
raise ValueError("Something went wrong!")
def test_orchestrator(ctx: task.OrchestrationContext, _):
exception_count = 0
entity_id = entities.EntityInstanceId("failing_entity", "testEntity")
with (yield ctx.lock_entities([entity_id])):
try:
yield ctx.call_entity(entity_id, "fail")
except task.TaskFailedError:
exception_count += 1
try:
yield ctx.call_entity(entity_id, "fail")
except task.TaskFailedError:
exception_count += 1
return exception_count
with worker.TaskHubGrpcWorker(host_address=HOST) as w:
w.add_orchestrator(test_orchestrator)
w.add_entity(failing_entity)
w.start()
with client.TaskHubGrpcClient(host_address=HOST) as c:
id = c.schedule_new_orchestration(test_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.name == task.get_name(test_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.serialized_output is not None
output = json.loads(state.serialized_output)
assert output == 2
assert state.runtime_status == client.OrchestrationStatus.COMPLETED