Process arrays and run operations in parallel with concurrency control.
Process arrays with automatic concurrency control and completion policies:
TypeScript:
const items = [1, 2, 3, 4, 5];
const results = await context.map(
'process-items',
items,
async (ctx, item, index) => {
return await ctx.step(`process-${index}`, async () =>
processItem(item)
);
},
{
maxConcurrency: 3,
completionConfig: {
minSuccessful: 4,
toleratedFailureCount: 1
}
}
);
results.throwIfError();
const allResults = results.getResults();Python:
# Note: process is decorated with @durable_step
from aws_durable_execution_sdk_python.config import MapConfig, CompletionConfig
items = [1, 2, 3, 4, 5]
def process_item(ctx: DurableContext, item: int, index: int, items: list):
return ctx.step(process(item), name=f'process-{index}')
results = context.map(
inputs=items,
func=process_item,
name='process-items',
config=MapConfig(
max_concurrency=3,
completion_config=CompletionConfig(
min_successful=4,
tolerated_failure_count=1
)
)
)
results.throw_if_error()
all_results = results.get_results()Run heterogeneous operations concurrently:
TypeScript:
const results = await context.parallel(
'parallel-ops',
[
{
name: 'fetch-user',
func: async (ctx) => ctx.step(async () => fetchUser(userId))
},
{
name: 'fetch-orders',
func: async (ctx) => ctx.step(async () => fetchOrders(userId))
},
{
name: 'fetch-preferences',
func: async (ctx) => ctx.step(async () => fetchPreferences(userId))
}
],
{ maxConcurrency: 3 }
);
const [user, orders, preferences] = results.getResults();Python:
# Note: fetch_user, fetch_orders, fetch_preferences are decorated with @durable_step
from aws_durable_execution_sdk_python.config import ParallelConfig
def fetch_user_data(ctx: DurableContext):
return ctx.step(fetch_user(user_id))
def fetch_orders_data(ctx: DurableContext):
return ctx.step(fetch_orders(user_id))
def fetch_prefs_data(ctx: DurableContext):
return ctx.step(fetch_preferences(user_id))
results = context.parallel(
[fetch_user_data, fetch_orders_data, fetch_prefs_data],
name='parallel-ops',
config=ParallelConfig(max_concurrency=3)
)
user, orders, preferences = results.get_results()Require a minimum number of successful operations:
TypeScript:
const results = await context.map(
'process-batch',
items,
async (ctx, item, index) => ctx.step(async () => process(item)),
{
completionConfig: {
minSuccessful: 8 // Need at least 8 successes
}
}
);Allow a specific number of failures:
TypeScript:
const results = await context.map(
'process-batch',
items,
async (ctx, item, index) => ctx.step(async () => process(item)),
{
completionConfig: {
toleratedFailureCount: 2 // Allow up to 2 failures
}
}
);Allow a percentage of failures:
TypeScript:
const results = await context.map(
'process-batch',
items,
async (ctx, item, index) => ctx.step(async () => process(item)),
{
completionConfig: {
toleratedFailurePercentage: 10 // Allow up to 10% failures
}
}
);Python:
results = context.map(
inputs=items,
func=process_item,
config=MapConfig(
completion_config=CompletionConfig(
tolerated_failure_percentage=10
)
),
name='process-batch'
)TypeScript:
const results = await context.map('process', items, processFunc);
console.log(results.status); // 'SUCCEEDED' | 'FAILED'
console.log(results.totalCount); // Total items
console.log(results.startedCount); // Items started
console.log(results.successCount); // Successful items
console.log(results.failureCount); // Failed items
console.log(results.hasFailure); // BooleanTypeScript:
// Get all results (throws if any failed)
const allResults = results.getResults();
// Get successful results only
const successful = results.succeeded().map(item => item.result);
// Get failed items
const failed = results.failed().map(item => ({
index: item.index,
error: item.error
}));
// Get all items with status
const all = results.all.map(item => ({
index: item.index,
status: item.status,
result: item.result,
error: item.error
}));TypeScript:
const results = await context.map('process', items, processFunc);
if (results.hasFailure) {
context.logger.error('Some items failed', {
failureCount: results.failureCount,
failures: results.failed().map(f => f.index)
});
// Retry failed items
const failedItems = results.failed().map(f => items[f.index]);
await context.map('retry-failed', failedItems, processFunc);
}TypeScript:
const results = await context.map(
'process',
items,
processFunc,
{ maxConcurrency: 5 } // Process 5 items at a time
);Adjust based on item characteristics:
TypeScript:
const results = await context.map(
'process',
items,
async (ctx, item, index) => {
// Heavy items get their own processing
if (item.size > 1000) {
return await ctx.step(`heavy-${index}`, async () =>
processHeavy(item)
);
}
// Light items can be batched
return await ctx.step(`light-${index}`, async () =>
processLight(item)
);
},
{ maxConcurrency: 10 }
);TypeScript:
const results = await context.map(
'process-with-approval',
items,
async (ctx, item, index) => {
const processed = await ctx.step('process', async () =>
process(item)
);
const approved = await ctx.waitForCallback(
'approval',
async (callbackId) => sendApproval(item, callbackId),
{ timeout: { hours: 24 } }
);
return { processed, approved };
},
{ maxConcurrency: 3 }
);TypeScript:
const results = await context.map(
'process-batches',
batches,
async (ctx, batch, batchIndex) => {
return await ctx.map(
`batch-${batchIndex}`,
batch.items,
async (itemCtx, item, itemIndex) => {
return await itemCtx.step(async () => process(item));
}
);
}
);TypeScript:
const results = await context.map(
'complex-process',
items,
async (ctx, item, index) => {
return await ctx.runInChildContext(`item-${index}`, async (childCtx) => {
const validated = await childCtx.step('validate', async () =>
validate(item)
);
await childCtx.wait({ seconds: 1 });
const processed = await childCtx.step('process', async () =>
process(validated)
);
return processed;
});
},
{ maxConcurrency: 5 }
);// Small items: Higher concurrency
const results = await context.map(
'small-items',
smallItems,
processFunc,
{ maxConcurrency: 20 }
);
// Large items: Lower concurrency
const results = await context.map(
'large-items',
largeItems,
processFunc,
{ maxConcurrency: 3 }
);Use completion policies to stop early:
const results = await context.map(
'find-match',
candidates,
async (ctx, candidate) => {
return await ctx.step(async () => checkMatch(candidate));
},
{
completionConfig: {
minSuccessful: 1 // Stop after first success
}
}
);- Set appropriate maxConcurrency based on downstream system capacity
- Use completion policies to handle partial failures gracefully
- Name all operations for debugging
- Handle batch results explicitly - check for failures
- Consider retry strategies for failed items
- Monitor concurrency limits to avoid overwhelming systems
- Use child contexts for complex per-item workflows
- Implement circuit breakers for external service calls