Skip to content

Commit b0a4fc1

Browse files
authored
Merge branch 'main' into dependabot/github_actions/actions-deps-7a8a6735dd
2 parents 3e6bbce + aa1a435 commit b0a4fc1

3 files changed

Lines changed: 39 additions & 86 deletions

File tree

AGENTS.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,9 +495,10 @@ const allResults = results.getResults();
495495
**Python:**
496496

497497
```python
498+
from collections.abc import Sequence
498499
from aws_durable_execution_sdk_python.concurrency import MapConfig, CompletionConfig
499500

500-
def process_item(ctx: DurableContext, item: dict, index: int) -> dict:
501+
def process_item(ctx: DurableContext, item: dict, index: int, items: Sequence[dict]) -> dict:
501502
return ctx.step(lambda _: process(item), name=f"process-{index}")
502503

503504
results = context.map(
@@ -833,7 +834,7 @@ Resources:
833834
Type: AWS::Lambda::Function
834835
Properties:
835836
FunctionName: myDurableFunction
836-
Runtime: nodejs22.x # or python3.12
837+
Runtime: nodejs22.x # or python3.14
837838
Handler: index.handler
838839
Role: !GetAtt DurableFunctionRole.Arn
839840
Code:
@@ -894,7 +895,7 @@ Resources:
894895
Type: AWS::Serverless::Function
895896
Properties:
896897
FunctionName: myDurableFunction
897-
Runtime: nodejs22.x # or python3.12
898+
Runtime: nodejs22.x # or python3.14
898899
Handler: index.handler
899900
CodeUri: ./src
900901
DurableConfig:

docs/core/map.md

Lines changed: 33 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
**Concurrency control** - Limiting how many items process simultaneously using `max_concurrency` in `MapConfig`.
3030

31-
**Item batching** - Grouping multiple items together for processing as a single unit to optimize efficiency.
32-
3331
**Completion criteria** - Rules that determine when a map operation succeeds or fails based on individual item results.
3432

3533
[↑ Back to top](#table-of-contents)
@@ -42,7 +40,7 @@ Use map operations to:
4240
- Transform collections with automatic checkpointing
4341
- Process lists of items in parallel
4442
- Handle large datasets with resilience
45-
- Control concurrency and batching behavior
43+
- Control concurrency behavior
4644
- Define custom success/failure criteria
4745

4846
Map operations use `context.map()` to process collections efficiently. Each item becomes an independent operation that executes in parallel with other items.
@@ -55,7 +53,6 @@ Map operations use `context.map()` to process collections efficiently. Each item
5553
- **Independent checkpointing** - Each item's result is saved separately
5654
- **Partial completion** - Completed items don't reprocess on replay
5755
- **Concurrency control** - Limit simultaneous processing with `max_concurrency`
58-
- **Batching support** - Group items for efficient processing
5956
- **Flexible completion** - Define custom success/failure criteria
6057
- **Result ordering** - Results maintain the same order as inputs
6158

@@ -77,19 +74,20 @@ def square(context: DurableContext, item: int, index: int, items: list[int]) ->
7774
return item * item
7875

7976
@durable_execution
80-
def handler(event: dict, context: DurableContext) -> BatchResult[int]:
77+
def handler(event: dict, context: DurableContext) -> dict:
8178
"""Process a list of items using map operations."""
8279
items = [1, 2, 3, 4, 5]
83-
80+
8481
result = context.map(items, square)
85-
return result
82+
# Convert to dict for JSON serialization (BatchResult is not JSON serializable)
83+
return result.to_dict()
8684
```
8785

8886
When this function runs:
8987
1. Each item is processed in parallel
9088
2. The `square` function is called for each item
9189
3. Each result is checkpointed independently
92-
4. The function returns a `BatchResult` with results `[1, 4, 9, 16, 25]`
90+
4. The function returns a dict with results `[1, 4, 9, 16, 25]`
9391

9492
If the function is interrupted after processing items 0-2, it resumes at item 3 without reprocessing the first three items.
9593

@@ -102,7 +100,7 @@ If the function is interrupted after processing items 0-2, it resumes at item 3
102100
```python
103101
def map(
104102
inputs: Sequence[U],
105-
func: Callable[[DurableContext, U | BatchedInput[Any, U], int, Sequence[U]], T],
103+
func: Callable[[DurableContext, U, int, Sequence[U]], T],
106104
name: str | None = None,
107105
config: MapConfig | None = None,
108106
) -> BatchResult[T]
@@ -113,7 +111,7 @@ def map(
113111
- `inputs` - A sequence of items to process (list, tuple, or any sequence type).
114112
- `func` - A callable that processes each item. See [Map function signature](#map-function-signature) for details.
115113
- `name` (optional) - A name for the map operation, useful for debugging and testing.
116-
- `config` (optional) - A `MapConfig` object to configure concurrency, batching, and completion criteria.
114+
- `config` (optional) - A `MapConfig` object to configure concurrency and completion criteria.
117115

118116
**Returns:** A `BatchResult[T]` containing the results from processing all items.
119117

@@ -128,7 +126,7 @@ The map function receives four parameters:
128126
```python
129127
def process_item(
130128
context: DurableContext,
131-
item: U | BatchedInput[Any, U],
129+
item: U,
132130
index: int,
133131
items: Sequence[U]
134132
) -> T:
@@ -140,7 +138,7 @@ def process_item(
140138
**Parameters:**
141139

142140
- `context` - A `DurableContext` for the item's processing. Use this to call steps, waits, or other operations.
143-
- `item` - The current item being processed. Can be a `BatchedInput` if batching is configured.
141+
- `item` - The current item being processed.
144142
- `index` - The zero-based index of the item in the original collection.
145143
- `items` - The full collection of items being processed.
146144

@@ -165,10 +163,10 @@ def validate_email(
165163
}
166164

167165
@durable_execution
168-
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
166+
def handler(event: dict, context: DurableContext) -> dict:
169167
emails = ["jane_doe@example.com", "john_doe@example.org", "invalid"]
170168
result = context.map(emails, validate_email)
171-
return result
169+
return result.to_dict()
172170
```
173171

174172
[↑ Back to top](#table-of-contents)
@@ -186,34 +184,30 @@ from aws_durable_execution_sdk_python import (
186184
from aws_durable_execution_sdk_python.config import (
187185
MapConfig,
188186
CompletionConfig,
189-
ItemBatcher,
190187
)
191188

192189
def process_item(context: DurableContext, item: int, index: int, items: list[int]) -> dict:
193190
"""Process a single item."""
194191
return {"item": item, "squared": item * item}
195192

196193
@durable_execution
197-
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
194+
def handler(event: dict, context: DurableContext) -> dict:
198195
items = list(range(100))
199-
196+
200197
# Configure map operation
201198
config = MapConfig(
202199
max_concurrency=10, # Process 10 items at a time
203-
item_batcher=ItemBatcher(max_items_per_batch=5), # Batch 5 items together
204200
completion_config=CompletionConfig.all_successful(), # Require all to succeed
205201
)
206-
202+
207203
result = context.map(items, process_item, name="process_numbers", config=config)
208-
return result
204+
return result.to_dict()
209205
```
210206

211207
### MapConfig parameters
212208

213209
**max_concurrency** - Maximum number of items to process concurrently. If `None`, all items process in parallel. Use this to control resource usage.
214210

215-
**item_batcher** - Configuration for batching items together. Use `ItemBatcher(max_items_per_batch=N)` to group items.
216-
217211
**completion_config** - Defines when the map operation succeeds or fails:
218212
- `CompletionConfig()` - Default, allows any number of failures
219213
- `CompletionConfig.all_successful()` - Requires all items to succeed
@@ -244,45 +238,14 @@ def fetch_data(context: DurableContext, url: str, index: int, urls: list[str]) -
244238
return {"url": url, "data": "..."}
245239

246240
@durable_execution
247-
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
241+
def handler(event: dict, context: DurableContext) -> dict:
248242
urls = [f"https://example.com/api/{i}" for i in range(100)]
249-
243+
250244
# Process only 5 URLs at a time
251245
config = MapConfig(max_concurrency=5)
252-
253-
result = context.map(urls, fetch_data, config=config)
254-
return result
255-
```
256-
257-
### Batching items
258-
259-
Group multiple items for efficient processing:
260246

261-
```python
262-
from aws_durable_execution_sdk_python.config import MapConfig, ItemBatcher, BatchedInput
263-
264-
def process_batch(
265-
context: DurableContext,
266-
batch: BatchedInput[None, int],
267-
index: int,
268-
items: list[int]
269-
) -> list[dict]:
270-
"""Process a batch of items together."""
271-
# Process all items in the batch together
272-
return [{"item": item, "squared": item * item} for item in batch.items]
273-
274-
@durable_execution
275-
def handler(event: dict, context: DurableContext) -> BatchResult[list[dict]]:
276-
items = list(range(100))
277-
278-
# Process items in batches of 10
279-
config = MapConfig(
280-
item_batcher=ItemBatcher(max_items_per_batch=10)
281-
)
282-
283-
result = context.map(items, process_batch, config=config)
284-
return result
285-
```
247+
result = context.map(urls, fetch_data, config=config)
248+
return result.to_dict()```
286249

287250
### Custom completion criteria
288251

@@ -299,19 +262,19 @@ def process_item(context: DurableContext, item: int, index: int, items: list[int
299262
return {"item": item, "processed": True}
300263

301264
@durable_execution
302-
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
265+
def handler(event: dict, context: DurableContext) -> dict:
303266
items = list(range(20))
304-
267+
305268
# Succeed if at least 15 items succeed, fail after 5 failures
306269
config = MapConfig(
307270
completion_config=CompletionConfig(
308271
min_successful=15,
309272
tolerated_failure_count=5,
310273
)
311274
)
312-
275+
313276
result = context.map(items, process_item, config=config)
314-
return result
277+
return result.to_dict()
315278
```
316279

317280
### Using context operations in map functions
@@ -344,11 +307,13 @@ def process_user(
344307
return {"user_id": user_id, "notification_sent": notification["sent"]}
345308

346309
@durable_execution
347-
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
310+
def handler(event: dict, context: DurableContext) -> dict:
311+
"""Process multiple users using context operations within map functions."""
348312
user_ids = ["user_1", "user_2", "user_3"]
349-
313+
350314
result = context.map(user_ids, process_user)
351-
return result
315+
# Convert to dict for JSON serialization (BatchResult is not JSON serializable)
316+
return result.to_dict()
352317
```
353318

354319
### Filtering and transforming results
@@ -391,8 +356,6 @@ def handler(event: dict, context: DurableContext) -> list[str]:
391356

392357
**Control concurrency for external calls** - When calling external APIs, use `max_concurrency` to avoid rate limits.
393358

394-
**Batch for efficiency** - For small, fast operations, use `item_batcher` to reduce overhead.
395-
396359
**Define completion criteria** - Use `CompletionConfig` to specify when the operation should succeed or fail.
397360

398361
**Keep map functions focused** - Each map function should process one item. Don't mix collection iteration with item processing.
@@ -401,7 +364,7 @@ def handler(event: dict, context: DurableContext) -> list[str]:
401364

402365
**Handle errors gracefully** - Wrap error-prone code in try-except blocks or use completion criteria to tolerate failures.
403366

404-
**Consider collection size** - For very large collections (10,000+ items), consider batching or processing in chunks.
367+
**Consider collection size** - For very large collections (10,000+ items), consider processing in chunks.
405368

406369
**Monitor memory usage** - Large collections create many checkpoints. Monitor Lambda memory usage.
407370

@@ -415,23 +378,15 @@ def handler(event: dict, context: DurableContext) -> list[str]:
415378

416379
**Use max_concurrency wisely** - Too much concurrency can overwhelm external services or exhaust Lambda resources. Start conservative and increase as needed.
417380

418-
**Batch small operations** - If each item processes quickly (< 100ms), batching reduces overhead:
419-
420-
```python
421-
config = MapConfig(
422-
item_batcher=ItemBatcher(max_items_per_batch=10)
423-
)
424-
```
425-
426381
**Optimize map functions** - Keep map functions lightweight. Move heavy computation into steps within the map function.
427382

428383
**Use appropriate completion criteria** - Fail fast with `tolerated_failure_count` to avoid processing remaining items when many fail.
429384

430385
**Monitor checkpoint size** - Large result objects increase checkpoint size and Lambda memory usage. Return only necessary data.
431386

432-
**Consider memory limits** - Processing thousands of items creates many checkpoints. Monitor Lambda memory and adjust batch size or concurrency.
387+
**Consider memory limits** - Processing thousands of items creates many checkpoints. Monitor Lambda memory and adjust concurrency.
433388

434-
**Profile your workload** - Test with representative data to find optimal concurrency and batch settings.
389+
**Profile your workload** - Test with representative data to find optimal concurrency settings.
435390

436391
[↑ Back to top](#table-of-contents)
437392

@@ -443,7 +398,7 @@ A: Map operations process a collection of similar items using the same function.
443398

444399
**Q: How many items can I process?**
445400

446-
A: There's no hard limit, but consider Lambda's memory and timeout constraints. For very large collections (10,000+ items), use batching or process in chunks.
401+
A: There's no hard limit, but consider Lambda's memory and timeout constraints. For very large collections (10,000+ items), consider processing in chunks.
447402

448403
**Q: Do items process in order?**
449404

@@ -471,10 +426,6 @@ for item_result in batch_result.results:
471426

472427
A: Yes, you can call `context.map()` inside a map function to process nested collections.
473428

474-
**Q: How does batching work?**
475-
476-
A: When you configure `item_batcher`, multiple items are grouped together and passed as a `BatchedInput` to your map function. Process all items in `batch.items`.
477-
478429
**Q: What's the difference between serdes and item_serdes?**
479430

480431
A: `item_serdes` serializes individual item results as they complete. `serdes` serializes the entire `BatchResult` at the end. Use both for custom serialization at different levels.

docs/getting-started.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,14 @@ pip install aws-durable-execution-sdk-python-testing
168168

169169
```python
170170
import pytest
171+
from aws_durable_execution_sdk_python.execution import InvocationStatus
171172
from my_function import handler
172173

173174
@pytest.mark.durable_execution(handler=handler, lambda_function_name="my_function")
174175
def test_my_function(durable_runner):
175176
with durable_runner:
176177
result = durable_runner.run(input={"data": "test"}, timeout=10)
177-
assert result.status == "SUCCEEDED"
178+
assert result.status == InvocationStatus.SUCCEEDED
178179
```
179180

180181
Run tests without AWS credentials:

0 commit comments

Comments
 (0)