-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_parallel_flat.py
More file actions
38 lines (30 loc) · 1.27 KB
/
test_parallel_flat.py
File metadata and controls
38 lines (30 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
"""Tests for parallel example."""
import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
from src.parallel import parallel_flat
from test.conftest import deserialize_operation_payload
@pytest.mark.example
@pytest.mark.durable_execution(
handler=parallel_flat.handler,
lambda_function_name="Parallel Operations",
)
def test_parallel_flat(durable_runner):
"""Test parallel example using context.parallel()."""
with durable_runner:
result = durable_runner.run(input="test", timeout=100)
assert result.status is InvocationStatus.SUCCEEDED
assert deserialize_operation_payload(result.result) == [
"task 1 completed",
"task 2 completed",
"task 3 completed after wait",
]
# Get the parallel operation (CONTEXT type with PARALLEL subtype)
parallel_op = result.get_context("parallel_operation")
assert parallel_op is not None
assert parallel_op.status is OperationStatus.SUCCEEDED
# Verify all three child operations exist
assert len(parallel_op.child_operations) == 3
# Verify all children succeeded
for child in parallel_op.child_operations:
assert child.status is OperationStatus.SUCCEEDED