Skip to content

Commit 7988a91

Browse files
committed
feat(sdk): Add subtypes for Step, Wait, Callback and Invoke operations
Added new OperationSubType values for all operation types that were missing them (step, wait, callback and invoke). And pass these values when creating operations in DurableContextImpl. ChildContextOperation explicitly checks for unreachable subtypes. Updated ChildContextOperation to check for the new subtype values. Updated the tests for the above operations that check the subtype to use these new subtype values. Updated LocalMemoryExecutionClient.applyResult to preserve subtype when updating an operation. Previously the subtype value was not passed on to the updated operation, causing test failures.
1 parent dbd1108 commit 7988a91

18 files changed

Lines changed: 80 additions & 61 deletions

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ private void applyResult(Operation op, OperationResult result) {
190190
.id(op.id())
191191
.name(op.name())
192192
.type(op.type())
193+
.subType(op.subType())
193194
.action(action)
194195
.parentId(op.parentId())
195196
.payload(result.result())

sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.function.BiFunction;
1212
import java.util.function.Function;
1313
import org.slf4j.LoggerFactory;
14-
import software.amazon.awssdk.services.lambda.model.OperationType;
1514
import software.amazon.lambda.durable.DurableCallbackFuture;
1615
import software.amazon.lambda.durable.DurableConfig;
1716
import software.amazon.lambda.durable.DurableContext;
@@ -146,7 +145,7 @@ public <T> DurableFuture<T> stepAsync(
146145

147146
// Create and start step operation with TypeToken
148147
var operation = new StepOperation<>(
149-
OperationIdentifier.of(operationId, name, OperationType.STEP), func, resultType, config, this);
148+
OperationIdentifier.of(operationId, name, OperationSubType.STEP), func, resultType, config, this);
150149

151150
operation.execute(); // Start the step (returns immediately)
152151

@@ -162,7 +161,7 @@ public DurableFuture<Void> waitAsync(String name, Duration duration) {
162161

163162
// Create and start wait operation
164163
var operation =
165-
new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this);
164+
new WaitOperation(OperationIdentifier.of(operationId, name, OperationSubType.WAIT), duration, this);
166165

167166
operation.execute(); // Checkpoint the wait
168167
return operation;
@@ -187,7 +186,7 @@ public <T, U> DurableFuture<T> invokeAsync(
187186

188187
// Create and start invoke operation
189188
var operation = new InvokeOperation<>(
190-
OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE),
189+
OperationIdentifier.of(operationId, name, OperationSubType.CHAINED_INVOKE),
191190
functionName,
192191
payload,
193192
resultType,
@@ -207,7 +206,7 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> res
207206
var operationId = nextOperationId();
208207

209208
var operation = new CallbackOperation<>(
210-
OperationIdentifier.of(operationId, name, OperationType.CALLBACK), resultType, config, this);
209+
OperationIdentifier.of(operationId, name, OperationSubType.CALLBACK), resultType, config, this);
211210
operation.execute();
212211

213212
return operation;
@@ -248,11 +247,7 @@ private <T> DurableFuture<T> runInChildContextAsync(
248247
var operationId = nextOperationId();
249248

250249
var operation = new ChildContextOperation<>(
251-
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType),
252-
func,
253-
resultType,
254-
config,
255-
this);
250+
OperationIdentifier.of(operationId, name, subType), func, resultType, config, this);
256251

257252
operation.execute();
258253
return operation;
@@ -277,7 +272,7 @@ public <I, O> DurableFuture<MapResult<O>> mapAsync(
277272
var operationId = nextOperationId();
278273

279274
var operation = new MapOperation<>(
280-
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.MAP),
275+
OperationIdentifier.of(operationId, name, OperationSubType.MAP),
281276
itemList,
282277
function,
283278
resultType,
@@ -293,7 +288,7 @@ public ParallelDurableFuture parallel(String name, ParallelConfig config) {
293288
var operationId = nextOperationId();
294289

295290
var parallelOp = new ParallelOperation(
296-
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL),
291+
OperationIdentifier.of(operationId, name, OperationSubType.PARALLEL),
297292
getDurableConfig().getSerDes(),
298293
this,
299294
config);

sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,21 @@
55
import software.amazon.awssdk.services.lambda.model.OperationType;
66

77
/**
8-
* Identifies a durable operation by its unique ID, human-readable name, type, and optional sub-type.
8+
* Identifies a durable operation by its unique ID, human-readable name, type and sub-type.
99
*
1010
* @param operationId unique sequential identifier for the operation within an execution
1111
* @param name human-readable name for the operation
12-
* @param operationType the kind of operation (STEP, WAIT, CALLBACK, etc.)
13-
* @param subType optional sub-type for operations that need further classification (e.g. child contexts)
12+
* @param subType the operation sub-type which also determines the operation type
1413
*/
15-
public record OperationIdentifier(
16-
String operationId, String name, OperationType operationType, OperationSubType subType) {
14+
public record OperationIdentifier(String operationId, String name, OperationSubType subType) {
1715

18-
/** Creates an identifier without a sub-type. */
19-
public static OperationIdentifier of(String operationId, String name, OperationType type) {
20-
return new OperationIdentifier(operationId, name, type, null);
16+
/** Returns the operation type derived from the sub-type. */
17+
public OperationType operationType() {
18+
return subType.getOperationType();
2119
}
2220

23-
/** Creates an identifier with a sub-type. */
24-
public static OperationIdentifier of(
25-
String operationId, String name, OperationType type, OperationSubType subType) {
26-
return new OperationIdentifier(operationId, name, type, subType);
21+
/** Creates an identifier with the given sub-type. */
22+
public static OperationIdentifier of(String operationId, String name, OperationSubType subType) {
23+
return new OperationIdentifier(operationId, name, subType);
2724
}
2825
}

sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,41 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.model;
44

5+
import software.amazon.awssdk.services.lambda.model.OperationType;
6+
57
/**
6-
* Fine-grained classification of durable operations beyond the basic operation types.
8+
* Fine-grained classification of durable operations that pairs each subtype with its parent {@link OperationType}.
79
*
8-
* <p>Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the
9-
* {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs.
10+
* <p>Used as the source of both the {@code type} and {@code subType} fields in checkpoint updates. Matches
11+
* the {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs.
1012
*/
1113
public enum OperationSubType {
12-
RUN_IN_CHILD_CONTEXT("RunInChildContext"),
13-
MAP("Map"),
14-
MAP_ITERATION("MapIteration"),
15-
PARALLEL("Parallel"),
16-
PARALLEL_BRANCH("ParallelBranch"),
17-
WAIT_FOR_CALLBACK("WaitForCallback"),
18-
WAIT_FOR_CONDITION("WaitForCondition"),
19-
WITH_RETRY("WithRetry");
14+
STEP(OperationType.STEP, "Step"),
15+
WAIT(OperationType.WAIT, "Wait"),
16+
CALLBACK(OperationType.CALLBACK, "Callback"),
17+
CHAINED_INVOKE(OperationType.CHAINED_INVOKE, "ChainedInvoke"),
18+
RUN_IN_CHILD_CONTEXT(OperationType.CONTEXT, "RunInChildContext"),
19+
MAP(OperationType.CONTEXT, "Map"),
20+
MAP_ITERATION(OperationType.CONTEXT, "MapIteration"),
21+
PARALLEL(OperationType.CONTEXT, "Parallel"),
22+
PARALLEL_BRANCH(OperationType.CONTEXT, "ParallelBranch"),
23+
WAIT_FOR_CALLBACK(OperationType.CONTEXT, "WaitForCallback"),
24+
WAIT_FOR_CONDITION(OperationType.STEP, "WaitForCondition"),
25+
WITH_RETRY(OperationType.CONTEXT, "WithRetry");
2026

27+
private final OperationType operationType;
2128
private final String value;
2229

23-
OperationSubType(String value) {
30+
OperationSubType(OperationType operationType, String value) {
31+
this.operationType = operationType;
2432
this.value = value;
2533
}
2634

35+
/** Returns the parent {@link OperationType} for this subtype. */
36+
public OperationType getOperationType() {
37+
return operationType;
38+
}
39+
2740
/** Returns the wire-format string value sent in checkpoint updates. */
2841
public String getValue() {
2942
return value;

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,11 @@ protected void sendOperationUpdate(OperationUpdate.Builder builder) {
410410

411411
/** Sends an operation update asynchronously. */
412412
protected CompletableFuture<Void> sendOperationUpdateAsync(OperationUpdate.Builder builder) {
413-
var updateBuilder =
414-
builder.id(getOperationId()).name(getName()).type(getType()).parentId(durableContext.getParentId());
415-
if (getSubType() != null) {
416-
updateBuilder.subType(getSubType().getValue());
417-
}
413+
var updateBuilder = builder.id(getOperationId())
414+
.name(getName())
415+
.type(getType())
416+
.subType(getSubType().getValue())
417+
.parentId(durableContext.getParentId());
418418
var update = updateBuilder.build();
419419
if (replayCompletedOperation.get()) {
420420
// We are replaying a completed operation, so complete the completableFuture without checkpointing
@@ -444,9 +444,7 @@ protected void validateReplay(Operation checkpointed) {
444444
getOperationId(), checkpointed.name(), getName())));
445445
}
446446

447-
if ((getSubType() == null && checkpointed.subType() != null)
448-
|| getSubType() != null
449-
&& !Objects.equals(checkpointed.subType(), getSubType().getValue())) {
447+
if (!Objects.equals(checkpointed.subType(), getSubType().getValue())) {
450448
throw terminateExecution(new NonDeterministicExecutionException(String.format(
451449
"Operation subType mismatch for \"%s\". Expected \"%s\", got \"%s\"",
452450
getOperationId(), checkpointed.subType(), getSubType())));

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ private Throwable translateException(Operation op, ErrorObject errorObject) {
248248
case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op);
249249

250250
// the following subtypes should not be able to reach here
251-
case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType());
251+
case PARALLEL, MAP, WAIT_FOR_CONDITION, STEP, WAIT, CALLBACK, CHAINED_INVOKE ->
252+
new IllegalStateException("Unexpected sub-type: " + getSubType());
252253
};
253254
}
254255

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.function.Function;
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
19-
import software.amazon.awssdk.services.lambda.model.OperationType;
2019
import software.amazon.lambda.durable.DurableContext;
2120
import software.amazon.lambda.durable.TypeToken;
2221
import software.amazon.lambda.durable.config.NestingType;
@@ -116,7 +115,7 @@ protected <R> ChildContextOperation<R> createItem(
116115
SerDes serDes,
117116
OperationSubType branchSubType) {
118117
return new ChildContextOperation<>(
119-
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, branchSubType),
118+
OperationIdentifier.of(operationId, name, branchSubType),
120119
function,
121120
resultType,
122121
RunInChildContextConfig.builder()

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import software.amazon.awssdk.services.lambda.model.Operation;
99
import software.amazon.awssdk.services.lambda.model.OperationAction;
1010
import software.amazon.awssdk.services.lambda.model.OperationStatus;
11-
import software.amazon.awssdk.services.lambda.model.OperationType;
1211
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1312
import software.amazon.awssdk.services.lambda.model.StepOptions;
1413
import software.amazon.lambda.durable.StepContext;
@@ -48,7 +47,7 @@ public WaitForConditionOperation(
4847
WaitForConditionConfig<T> config,
4948
DurableContextImpl durableContext) {
5049
super(
51-
OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.WAIT_FOR_CONDITION),
50+
OperationIdentifier.of(operationId, name, OperationSubType.WAIT_FOR_CONDITION),
5251
resultTypeToken,
5352
config.serDes(),
5453
durableContext);

sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.lambda.durable.execution.ThreadContext;
2222
import software.amazon.lambda.durable.execution.ThreadType;
2323
import software.amazon.lambda.durable.model.DurableExecutionInput;
24+
import software.amazon.lambda.durable.model.OperationSubType;
2425

2526
class ReplayValidationTest {
2627
private static final String EXECUTION_NAME = "exec-name";
@@ -67,6 +68,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() {
6768
.id(OPERATION_ID1)
6869
.name("test")
6970
.type(OperationType.STEP)
71+
.subType(OperationSubType.STEP.getValue())
7072
.status(OperationStatus.SUCCEEDED)
7173
.stepDetails(StepDetails.builder().result("\"result\"").build())
7274
.build();
@@ -83,6 +85,7 @@ void shouldPassValidationWhenWaitTypeMatches() {
8385
var existingOp = Operation.builder()
8486
.id(OPERATION_ID1)
8587
.type(OperationType.WAIT)
88+
.subType(OperationSubType.WAIT.getValue())
8689
.status(OperationStatus.SUCCEEDED)
8790
.build();
8891

@@ -144,6 +147,7 @@ void shouldHandleNullNamesCorrectly() {
144147
.id(OPERATION_ID1)
145148
.name(null)
146149
.type(OperationType.STEP)
150+
.subType(OperationSubType.STEP.getValue())
147151
.status(OperationStatus.SUCCEEDED)
148152
.stepDetails(StepDetails.builder().result("\"result\"").build())
149153
.build();

sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2626
import software.amazon.lambda.durable.model.DurableExecutionInput;
2727
import software.amazon.lambda.durable.model.ExecutionStatus;
28+
import software.amazon.lambda.durable.model.OperationSubType;
2829

2930
class DurableExecutionTest {
3031

@@ -187,6 +188,7 @@ void testExecuteReplay() {
187188
.id(OPERATION_ID1)
188189
.name("step1")
189190
.type(OperationType.STEP)
191+
.subType(OperationSubType.STEP.getValue())
190192
.status(OperationStatus.SUCCEEDED)
191193
.stepDetails(StepDetails.builder().result("\"First\"").build())
192194
.build();

0 commit comments

Comments
 (0)