Skip to content

Commit 716b41a

Browse files
committed
[feat]: support FLAT nesting type for map/parallel
1 parent cabc067 commit 716b41a

18 files changed

Lines changed: 476 additions & 184 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java

Lines changed: 174 additions & 66 deletions
Large diffs are not rendered by default.

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestOperation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public List<Event> getEvents() {
4141
return List.copyOf(events);
4242
}
4343

44+
/** Returns the operation ID. */
45+
public String getId() {
46+
return operation.id();
47+
}
48+
4449
/** Returns the operation name. */
4550
public String getName() {
4651
return operation.name();

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ void processUpdate(OperationUpdate update, Operation operation) {
2020
.eventId(eventId.getAndIncrement())
2121
.eventTimestamp(Instant.now())
2222
.id(update.id())
23-
.name(update.name());
23+
.name(update.name())
24+
.parentId(operation.parentId());
2425

2526
Event event =
2627
switch (update.type()) {

sdk/src/main/java/software/amazon/lambda/durable/config/MapConfig.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.config;
44

5+
import java.util.Objects;
56
import software.amazon.lambda.durable.serde.SerDes;
67

78
/**
@@ -13,11 +14,12 @@ public class MapConfig {
1314
private final Integer maxConcurrency;
1415
private final CompletionConfig completionConfig;
1516
private final SerDes serDes;
17+
private final NestingType nestingType;
1618

1719
private MapConfig(Builder builder) {
18-
this.maxConcurrency = builder.maxConcurrency == null ? Integer.MAX_VALUE : builder.maxConcurrency;
19-
this.completionConfig =
20-
builder.completionConfig == null ? CompletionConfig.allCompleted() : builder.completionConfig;
20+
this.maxConcurrency = Objects.requireNonNullElse(builder.maxConcurrency, Integer.MAX_VALUE);
21+
this.completionConfig = Objects.requireNonNullElse(builder.completionConfig, CompletionConfig.allCompleted());
22+
this.nestingType = Objects.requireNonNullElse(builder.nestingType, NestingType.NESTED);
2123
this.serDes = builder.serDes;
2224
}
2325

@@ -36,25 +38,31 @@ public SerDes serDes() {
3638
return serDes;
3739
}
3840

41+
/** @return nesting type, defaults to {@link NestingType#NESTED} */
42+
public NestingType nestingType() {
43+
return nestingType;
44+
}
45+
3946
public static Builder builder() {
40-
return new Builder(null, null, null);
47+
return new Builder();
4148
}
4249

4350
public Builder toBuilder() {
44-
return new Builder(maxConcurrency, completionConfig, serDes);
51+
return new Builder()
52+
.maxConcurrency(maxConcurrency)
53+
.completionConfig(completionConfig)
54+
.serDes(serDes)
55+
.nestingType(nestingType);
4556
}
4657

4758
/** Builder for creating MapConfig instances. */
4859
public static class Builder {
60+
public NestingType nestingType;
4961
private Integer maxConcurrency;
5062
private CompletionConfig completionConfig;
5163
private SerDes serDes;
5264

53-
private Builder(Integer maxConcurrency, CompletionConfig completionConfig, SerDes serDes) {
54-
this.maxConcurrency = maxConcurrency;
55-
this.completionConfig = completionConfig;
56-
this.serDes = serDes;
57-
}
65+
private Builder() {}
5866

5967
public Builder maxConcurrency(Integer maxConcurrency) {
6068
if (maxConcurrency != null && maxConcurrency < 1) {
@@ -86,6 +94,17 @@ public Builder serDes(SerDes serDes) {
8694
return this;
8795
}
8896

97+
/**
98+
* Sets the nesting type for the map operation.
99+
*
100+
* @param nestingType the nesting type (default: {@link NestingType#NESTED})
101+
* @return this builder for method chaining
102+
*/
103+
public Builder nestingType(NestingType nestingType) {
104+
this.nestingType = nestingType;
105+
return this;
106+
}
107+
89108
public MapConfig build() {
90109
return new MapConfig(this);
91110
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.config;
4+
5+
public enum NestingType {
6+
/**
7+
* Create CONTEXT operations for each branch/iteration with full checkpointing. Operations within each
8+
* branch/iteration are wrapped in their own context. - **Observability**: High - each branch/iteration appears as
9+
* separate operation in execution history - **Cost**: Higher - consumes more operations due to CONTEXT creation
10+
* overhead - **Scale**: Lower maximum iterations due to operation limits
11+
*/
12+
NESTED,
13+
14+
/**
15+
* Skip CONTEXT operations for branches/iterations using virtual contexts. Operations execute directly without
16+
* individual context wrapping. - **Observability**: Lower - branches/iterations don't appear as separate operations
17+
* - **Cost**: ~30% lower - reduces operation consumption by skipping CONTEXT overhead - **Scale**: Higher maximum
18+
* iterations possible within operation limits
19+
*/
20+
FLAT,
21+
}

sdk/src/main/java/software/amazon/lambda/durable/config/ParallelConfig.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.config;
44

5+
import java.util.Objects;
6+
57
/**
68
* Configuration options for parallel operations in durable executions.
79
*
@@ -11,22 +13,29 @@
1113
public class ParallelConfig {
1214
private final int maxConcurrency;
1315
private final CompletionConfig completionConfig;
16+
private final NestingType nestingType;
1417

1518
private ParallelConfig(Builder builder) {
16-
this.maxConcurrency = builder.maxConcurrency == null ? Integer.MAX_VALUE : builder.maxConcurrency;
17-
this.completionConfig =
18-
builder.completionConfig == null ? CompletionConfig.allCompleted() : builder.completionConfig;
19+
this.maxConcurrency = Objects.requireNonNullElse(builder.maxConcurrency, Integer.MAX_VALUE);
20+
this.completionConfig = Objects.requireNonNullElseGet(builder.completionConfig, CompletionConfig::allCompleted);
21+
this.nestingType = Objects.requireNonNullElse(builder.nestingType, NestingType.NESTED);
1922
}
2023

2124
/** @return the maximum number of branches running simultaneously, or -1 for unlimited */
2225
public int maxConcurrency() {
2326
return maxConcurrency;
2427
}
2528

29+
/** @return the completion configuration for the parallel operation */
2630
public CompletionConfig completionConfig() {
2731
return completionConfig;
2832
}
2933

34+
/** @return the nesting type for the parallel operation */
35+
public NestingType nestingType() {
36+
return nestingType;
37+
}
38+
3039
/**
3140
* Creates a new builder for ParallelConfig.
3241
*
@@ -36,10 +45,18 @@ public static Builder builder() {
3645
return new Builder();
3746
}
3847

48+
public Builder toBuilder() {
49+
return new Builder()
50+
.maxConcurrency(maxConcurrency)
51+
.completionConfig(completionConfig)
52+
.nestingType(nestingType);
53+
}
54+
3955
/** Builder for creating ParallelConfig instances. */
4056
public static class Builder {
4157
private Integer maxConcurrency;
4258
private CompletionConfig completionConfig;
59+
private NestingType nestingType;
4360

4461
private Builder() {}
4562

@@ -71,6 +88,17 @@ public Builder completionConfig(CompletionConfig completionConfig) {
7188
return this;
7289
}
7390

91+
/**
92+
* Sets the nesting type for the parallel operation.
93+
*
94+
* @param nestingType the nesting type (default: {@link NestingType#NESTED})
95+
* @return this builder for method chaining
96+
*/
97+
public Builder nestingType(NestingType nestingType) {
98+
this.nestingType = nestingType;
99+
return this;
100+
}
101+
74102
/**
75103
* Builds the ParallelConfig instance.
76104
*

sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737
import software.amazon.lambda.durable.model.WaitForConditionResult;
3838
import software.amazon.lambda.durable.operation.CallbackOperation;
3939
import software.amazon.lambda.durable.operation.ChildContextOperation;
40+
import software.amazon.lambda.durable.operation.ConcurrencyOperation;
4041
import software.amazon.lambda.durable.operation.InvokeOperation;
4142
import software.amazon.lambda.durable.operation.MapOperation;
4243
import software.amazon.lambda.durable.operation.ParallelOperation;
4344
import software.amazon.lambda.durable.operation.StepOperation;
4445
import software.amazon.lambda.durable.operation.WaitForConditionOperation;
4546
import software.amazon.lambda.durable.operation.WaitOperation;
47+
import software.amazon.lambda.durable.serde.SerDes;
4648
import software.amazon.lambda.durable.util.ParameterValidator;
4749

4850
/**
@@ -57,6 +59,8 @@ public class DurableContextImpl extends BaseContextImpl implements DurableContex
5759
private static final int MAX_WAIT_FOR_CALLBACK_NAME_LENGTH = ParameterValidator.MAX_OPERATION_NAME_LENGTH
5860
- Math.max(WAIT_FOR_CALLBACK_CALLBACK_SUFFIX.length(), WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX.length());
5961
private final OperationIdGenerator operationIdGenerator;
62+
private final DurableContextImpl parentContext;
63+
private final boolean isVirtual;
6064
private volatile DurableLogger logger;
6165

6266
/** Shared initialization — sets all fields. */
@@ -65,9 +69,13 @@ private DurableContextImpl(
6569
DurableConfig durableConfig,
6670
Context lambdaContext,
6771
String contextId,
68-
String contextName) {
72+
String contextName,
73+
boolean isVirtual,
74+
DurableContextImpl parentContext) {
6975
super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
7076
operationIdGenerator = new OperationIdGenerator(contextId);
77+
this.parentContext = parentContext;
78+
this.isVirtual = isVirtual;
7179
}
7280

7381
/**
@@ -82,19 +90,26 @@ private DurableContextImpl(
8290
*/
8391
public static DurableContextImpl createRootContext(
8492
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) {
85-
return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null);
93+
return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null, false, null);
8694
}
8795

8896
/**
8997
* Creates a child context.
9098
*
9199
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
92100
* @param childContextName the name of the child context
101+
* @param isVirtual whether the context is virtual
93102
* @return a new DurableContext for the child context
94103
*/
95-
public DurableContextImpl createChildContext(String childContextId, String childContextName) {
104+
public DurableContextImpl createChildContext(String childContextId, String childContextName, boolean isVirtual) {
96105
return new DurableContextImpl(
97-
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
106+
getExecutionManager(),
107+
getDurableConfig(),
108+
getLambdaContext(),
109+
childContextId,
110+
childContextName,
111+
isVirtual,
112+
this);
98113
}
99114

100115
/**
@@ -387,4 +402,13 @@ public void close() {
387402
private String nextOperationId() {
388403
return operationIdGenerator.nextOperationId();
389404
}
405+
406+
/**
407+
* Get the parent context ID for its child operations, which always points to a non-virtual context
408+
*
409+
* @return the parent of this context if virtual, otherwise this context id
410+
*/
411+
public String getParentId() {
412+
return isVirtual ? parentContext.getContextId() : getContextId();
413+
}
390414
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.model;
4+
5+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
6+
import software.amazon.lambda.durable.util.ExceptionHelper;
7+
8+
public record DeserializedOperationResult<T>(OperationStatus status, T result, Throwable throwable) {
9+
public static <T> DeserializedOperationResult<T> succeeded(T result) {
10+
return new DeserializedOperationResult<>(OperationStatus.SUCCEEDED, result, null);
11+
}
12+
13+
public static <T> DeserializedOperationResult<T> failed(Throwable throwable) {
14+
return new DeserializedOperationResult<>(OperationStatus.FAILED, null, throwable);
15+
}
16+
17+
public T get() {
18+
if (status == OperationStatus.SUCCEEDED) {
19+
return result;
20+
}
21+
ExceptionHelper.sneakyThrow(throwable);
22+
return null;
23+
}
24+
}

0 commit comments

Comments
 (0)