Skip to content

Commit a17eb7f

Browse files
committed
add nesting_type to concurrency operations
1 parent 950f203 commit a17eb7f

4 files changed

Lines changed: 127 additions & 0 deletions

File tree

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Example demonstrating map operations for processing collections durably."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import MapConfig, NestingType
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> list[int]:
12+
"""Process a list of items using context.map()."""
13+
items = [1, 2, 3, 4, 5]
14+
15+
# Use context.map() to process items concurrently and extract results immediately
16+
return context.map(
17+
inputs=items,
18+
func=lambda ctx, item, index, _: ctx.step(
19+
lambda _: item * 2, name=f"map_item_{index}"
20+
),
21+
name="map_operation",
22+
config=MapConfig(max_concurrency=2, nesting_type=NestingType.FLAT),
23+
).get_results()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""Example demonstrating parallel operations for concurrent execution."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import ParallelConfig, NestingType
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
from aws_durable_execution_sdk_python.config import Duration
9+
10+
11+
@durable_execution
12+
def handler(_event: Any, context: DurableContext) -> list[str]:
13+
"""Execute multiple operations in parallel using context.parallel()."""
14+
15+
# Use context.parallel() to execute functions concurrently and extract results immediately
16+
return context.parallel(
17+
functions=[
18+
lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"),
19+
lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"),
20+
lambda ctx: (
21+
ctx.wait(Duration.from_seconds(1), name="wait_in_task3"),
22+
"task 3 completed after wait",
23+
)[1],
24+
],
25+
name="parallel_operation",
26+
config=ParallelConfig(max_concurrency=2, nesting_type=NestingType.FLAT),
27+
).get_results()
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Tests for map_operations example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
6+
7+
from src.map import map_operations_flat
8+
from test.conftest import deserialize_operation_payload
9+
10+
11+
@pytest.mark.example
12+
@pytest.mark.durable_execution(
13+
handler=map_operations_flat.handler,
14+
lambda_function_name="map operations",
15+
)
16+
def test_map_operations(durable_runner):
17+
"""Test map_operations example using context.map()."""
18+
with durable_runner:
19+
result = durable_runner.run(input="test", timeout=10)
20+
21+
assert result.status is InvocationStatus.SUCCEEDED
22+
assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10]
23+
24+
# Get the map operation (CONTEXT type with MAP subtype)
25+
map_op = result.get_context("map_operation")
26+
assert map_op is not None
27+
assert map_op.status is OperationStatus.SUCCEEDED
28+
29+
# Verify all five child operations exist
30+
assert len(map_op.child_operations) == 5
31+
32+
# Verify child step operation names
33+
child_names = {op.name for op in map_op.child_operations}
34+
expected_names = {f"map_item_{i}" for i in range(5)}
35+
assert child_names == expected_names
36+
37+
# Verify all children succeeded
38+
for child in map_op.child_operations:
39+
assert child.status is OperationStatus.SUCCEEDED
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""Tests for parallel example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
6+
7+
from src.parallel import parallel_flat
8+
from test.conftest import deserialize_operation_payload
9+
10+
11+
@pytest.mark.example
12+
@pytest.mark.durable_execution(
13+
handler=parallel_flat.handler,
14+
lambda_function_name="Parallel Operations",
15+
)
16+
def test_parallel_flat(durable_runner):
17+
"""Test parallel example using context.parallel()."""
18+
with durable_runner:
19+
result = durable_runner.run(input="test", timeout=100)
20+
21+
assert result.status is InvocationStatus.SUCCEEDED
22+
assert deserialize_operation_payload(result.result) == [
23+
"task 1 completed",
24+
"task 2 completed",
25+
"task 3 completed after wait",
26+
]
27+
28+
# Get the parallel operation (CONTEXT type with PARALLEL subtype)
29+
parallel_op = result.get_context("parallel_operation")
30+
assert parallel_op is not None
31+
assert parallel_op.status is OperationStatus.SUCCEEDED
32+
33+
# Verify all three child operations exist
34+
assert len(parallel_op.child_operations) == 3
35+
36+
# Verify all children succeeded
37+
for child in parallel_op.child_operations:
38+
assert child.status is OperationStatus.SUCCEEDED

0 commit comments

Comments
 (0)