Skip to content

Commit de17465

Browse files
committed
[feat]: support FLAT nesting type for map/parallel
[feat]: support FLAT nesting type for map/parallel add more tests
1 parent cabc067 commit de17465

18 files changed

Lines changed: 555 additions & 218 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java

Lines changed: 256 additions & 100 deletions
Large diffs are not rendered by default.

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestOperation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public List<Event> getEvents() {
4141
return List.copyOf(events);
4242
}
4343

44+
/** Returns the operation ID. */
45+
public String getId() {
46+
return operation.id();
47+
}
48+
4449
/** Returns the operation name. */
4550
public String getName() {
4651
return operation.name();

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ void processUpdate(OperationUpdate update, Operation operation) {
2020
.eventId(eventId.getAndIncrement())
2121
.eventTimestamp(Instant.now())
2222
.id(update.id())
23-
.name(update.name());
23+
.name(update.name())
24+
.parentId(operation.parentId());
2425

2526
Event event =
2627
switch (update.type()) {

sdk/src/main/java/software/amazon/lambda/durable/config/MapConfig.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.config;
44

5+
import java.util.Objects;
56
import software.amazon.lambda.durable.serde.SerDes;
67

78
/**
@@ -13,11 +14,12 @@ public class MapConfig {
1314
private final Integer maxConcurrency;
1415
private final CompletionConfig completionConfig;
1516
private final SerDes serDes;
17+
private final NestingType nestingType;
1618

1719
private MapConfig(Builder builder) {
18-
this.maxConcurrency = builder.maxConcurrency == null ? Integer.MAX_VALUE : builder.maxConcurrency;
19-
this.completionConfig =
20-
builder.completionConfig == null ? CompletionConfig.allCompleted() : builder.completionConfig;
20+
this.maxConcurrency = Objects.requireNonNullElse(builder.maxConcurrency, Integer.MAX_VALUE);
21+
this.completionConfig = Objects.requireNonNullElse(builder.completionConfig, CompletionConfig.allCompleted());
22+
this.nestingType = Objects.requireNonNullElse(builder.nestingType, NestingType.NESTED);
2123
this.serDes = builder.serDes;
2224
}
2325

@@ -36,25 +38,31 @@ public SerDes serDes() {
3638
return serDes;
3739
}
3840

41+
/** @return nesting type, defaults to {@link NestingType#NESTED} */
42+
public NestingType nestingType() {
43+
return nestingType;
44+
}
45+
3946
public static Builder builder() {
40-
return new Builder(null, null, null);
47+
return new Builder();
4148
}
4249

4350
public Builder toBuilder() {
44-
return new Builder(maxConcurrency, completionConfig, serDes);
51+
return new Builder()
52+
.maxConcurrency(maxConcurrency)
53+
.completionConfig(completionConfig)
54+
.serDes(serDes)
55+
.nestingType(nestingType);
4556
}
4657

4758
/** Builder for creating MapConfig instances. */
4859
public static class Builder {
60+
public NestingType nestingType;
4961
private Integer maxConcurrency;
5062
private CompletionConfig completionConfig;
5163
private SerDes serDes;
5264

53-
private Builder(Integer maxConcurrency, CompletionConfig completionConfig, SerDes serDes) {
54-
this.maxConcurrency = maxConcurrency;
55-
this.completionConfig = completionConfig;
56-
this.serDes = serDes;
57-
}
65+
private Builder() {}
5866

5967
public Builder maxConcurrency(Integer maxConcurrency) {
6068
if (maxConcurrency != null && maxConcurrency < 1) {
@@ -86,6 +94,17 @@ public Builder serDes(SerDes serDes) {
8694
return this;
8795
}
8896

97+
/**
98+
* Sets the nesting type for the map operation.
99+
*
100+
* @param nestingType the nesting type (default: {@link NestingType#NESTED})
101+
* @return this builder for method chaining
102+
*/
103+
public Builder nestingType(NestingType nestingType) {
104+
this.nestingType = nestingType;
105+
return this;
106+
}
107+
89108
public MapConfig build() {
90109
return new MapConfig(this);
91110
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.config;
4+
5+
public enum NestingType {
6+
/**
7+
* Create CONTEXT operations for each branch/iteration with full checkpointing. Operations within each
8+
* branch/iteration are wrapped in their own context. - **Observability**: High - each branch/iteration appears as
9+
* separate operation in execution history - **Cost**: Higher - consumes more operations due to CONTEXT creation
10+
* overhead - **Scale**: Lower maximum iterations due to operation limits
11+
*/
12+
NESTED,
13+
14+
/**
15+
* Skip CONTEXT operations for branches/iterations using virtual contexts. Operations execute directly without
16+
* individual context wrapping. - **Observability**: Lower - branches/iterations don't appear as separate operations
17+
* - **Cost**: ~30% lower - reduces operation consumption by skipping CONTEXT overhead - **Scale**: Higher maximum
18+
* iterations possible within operation limits
19+
*/
20+
FLAT,
21+
}

sdk/src/main/java/software/amazon/lambda/durable/config/ParallelConfig.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.config;
44

5+
import java.util.Objects;
6+
57
/**
68
* Configuration options for parallel operations in durable executions.
79
*
@@ -11,22 +13,29 @@
1113
public class ParallelConfig {
1214
private final int maxConcurrency;
1315
private final CompletionConfig completionConfig;
16+
private final NestingType nestingType;
1417

1518
private ParallelConfig(Builder builder) {
16-
this.maxConcurrency = builder.maxConcurrency == null ? Integer.MAX_VALUE : builder.maxConcurrency;
17-
this.completionConfig =
18-
builder.completionConfig == null ? CompletionConfig.allCompleted() : builder.completionConfig;
19+
this.maxConcurrency = Objects.requireNonNullElse(builder.maxConcurrency, Integer.MAX_VALUE);
20+
this.completionConfig = Objects.requireNonNullElseGet(builder.completionConfig, CompletionConfig::allCompleted);
21+
this.nestingType = Objects.requireNonNullElse(builder.nestingType, NestingType.NESTED);
1922
}
2023

2124
/** @return the maximum number of branches running simultaneously, or -1 for unlimited */
2225
public int maxConcurrency() {
2326
return maxConcurrency;
2427
}
2528

29+
/** @return the completion configuration for the parallel operation */
2630
public CompletionConfig completionConfig() {
2731
return completionConfig;
2832
}
2933

34+
/** @return the nesting type for the parallel operation */
35+
public NestingType nestingType() {
36+
return nestingType;
37+
}
38+
3039
/**
3140
* Creates a new builder for ParallelConfig.
3241
*
@@ -36,10 +45,18 @@ public static Builder builder() {
3645
return new Builder();
3746
}
3847

48+
public Builder toBuilder() {
49+
return new Builder()
50+
.maxConcurrency(maxConcurrency)
51+
.completionConfig(completionConfig)
52+
.nestingType(nestingType);
53+
}
54+
3955
/** Builder for creating ParallelConfig instances. */
4056
public static class Builder {
4157
private Integer maxConcurrency;
4258
private CompletionConfig completionConfig;
59+
private NestingType nestingType;
4360

4461
private Builder() {}
4562

@@ -71,6 +88,17 @@ public Builder completionConfig(CompletionConfig completionConfig) {
7188
return this;
7289
}
7390

91+
/**
92+
* Sets the nesting type for the parallel operation.
93+
*
94+
* @param nestingType the nesting type (default: {@link NestingType#NESTED})
95+
* @return this builder for method chaining
96+
*/
97+
public Builder nestingType(NestingType nestingType) {
98+
this.nestingType = nestingType;
99+
return this;
100+
}
101+
74102
/**
75103
* Builds the ParallelConfig instance.
76104
*

sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class DurableContextImpl extends BaseContextImpl implements DurableContex
5757
private static final int MAX_WAIT_FOR_CALLBACK_NAME_LENGTH = ParameterValidator.MAX_OPERATION_NAME_LENGTH
5858
- Math.max(WAIT_FOR_CALLBACK_CALLBACK_SUFFIX.length(), WAIT_FOR_CALLBACK_SUBMITTER_SUFFIX.length());
5959
private final OperationIdGenerator operationIdGenerator;
60+
private final DurableContextImpl parentContext;
61+
private final boolean isVirtual;
6062
private volatile DurableLogger logger;
6163

6264
/** Shared initialization — sets all fields. */
@@ -65,9 +67,13 @@ private DurableContextImpl(
6567
DurableConfig durableConfig,
6668
Context lambdaContext,
6769
String contextId,
68-
String contextName) {
70+
String contextName,
71+
boolean isVirtual,
72+
DurableContextImpl parentContext) {
6973
super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
7074
operationIdGenerator = new OperationIdGenerator(contextId);
75+
this.parentContext = parentContext;
76+
this.isVirtual = isVirtual;
7177
}
7278

7379
/**
@@ -82,19 +88,26 @@ private DurableContextImpl(
8288
*/
8389
public static DurableContextImpl createRootContext(
8490
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) {
85-
return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null);
91+
return new DurableContextImpl(executionManager, durableConfig, lambdaContext, null, null, false, null);
8692
}
8793

8894
/**
8995
* Creates a child context.
9096
*
9197
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
9298
* @param childContextName the name of the child context
99+
* @param isVirtual whether the context is virtual
93100
* @return a new DurableContext for the child context
94101
*/
95-
public DurableContextImpl createChildContext(String childContextId, String childContextName) {
102+
public DurableContextImpl createChildContext(String childContextId, String childContextName, boolean isVirtual) {
96103
return new DurableContextImpl(
97-
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
104+
getExecutionManager(),
105+
getDurableConfig(),
106+
getLambdaContext(),
107+
childContextId,
108+
childContextName,
109+
isVirtual,
110+
this);
98111
}
99112

100113
/**
@@ -387,4 +400,13 @@ public void close() {
387400
private String nextOperationId() {
388401
return operationIdGenerator.nextOperationId();
389402
}
403+
404+
/**
405+
* Get the parent context ID for its child operations, which always points to a non-virtual context
406+
*
407+
* @return the parent of this context if virtual, otherwise this context id
408+
*/
409+
public String getParentId() {
410+
return isVirtual ? parentContext.getContextId() : getContextId();
411+
}
390412
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.model;
4+
5+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
6+
import software.amazon.lambda.durable.util.ExceptionHelper;
7+
8+
public record DeserializedOperationResult<T>(OperationStatus status, T result, Throwable throwable) {
9+
public static <T> DeserializedOperationResult<T> succeeded(T result) {
10+
return new DeserializedOperationResult<>(OperationStatus.SUCCEEDED, result, null);
11+
}
12+
13+
public static <T> DeserializedOperationResult<T> failed(Throwable throwable) {
14+
return new DeserializedOperationResult<>(OperationStatus.FAILED, null, throwable);
15+
}
16+
17+
public T get() {
18+
if (status == OperationStatus.SUCCEEDED) {
19+
return result;
20+
}
21+
ExceptionHelper.sneakyThrow(throwable);
22+
return null;
23+
}
24+
}

0 commit comments

Comments
 (0)