-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathtest_dts_batch_actions.py
More file actions
57 lines (44 loc) · 2.53 KB
/
Copy pathtest_dts_batch_actions.py
File metadata and controls
57 lines (44 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
import pytest
from durabletask import client, task
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Read the environment variables
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
pytestmark = pytest.mark.dts
def empty_orchestrator(ctx: task.OrchestrationContext, _):
return "Complete"
def test_get_all_orchestration_states():
# Start a worker, which will connect to the sidecar in a background thread
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None) as w:
w.add_orchestrator(empty_orchestrator)
w.start()
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None)
id = c.schedule_new_orchestration(empty_orchestrator, input="Hello")
c.wait_for_orchestration_completion(id, timeout=30)
all_orchestrations = c.get_all_orchestration_states()
all_orchestrations_with_state = c.get_all_orchestration_states(fetch_inputs_and_outputs=True)
this_orch = c.get_orchestration_state(id)
assert this_orch is not None
assert this_orch.instance_id == id
assert all_orchestrations is not None
assert len(all_orchestrations) > 1
print(f"Received {len(all_orchestrations)} orchestrations")
assert len([o for o in all_orchestrations if o.instance_id == id]) == 1
orchestration_state = [o for o in all_orchestrations if o.instance_id == id][0]
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
assert orchestration_state.serialized_input is None
assert orchestration_state.serialized_output is None
assert orchestration_state.failure_details is None
assert all_orchestrations_with_state is not None
assert len(all_orchestrations_with_state) > 1
print(f"Received {len(all_orchestrations_with_state)} orchestrations")
assert len([o for o in all_orchestrations_with_state if o.instance_id == id]) == 1
orchestration_state = [o for o in all_orchestrations_with_state if o.instance_id == id][0]
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
assert orchestration_state.serialized_input == '"Hello"'
assert orchestration_state.serialized_output == '"Complete"'
assert orchestration_state.failure_details is None