Skip to content

Commit 9126337

Browse files
committed
feat(sdk): Add subtypes for Step, Wait, Callback and Invoke operations
Added new OperationSubType values for all operation types that were missing them (step, wait, callback and invoke). And pass these values when creating operations in DurableContextImpl. ChildContextOperation explicitly checks for unreachable subtypes. Updated ChildContextOperation to check for the new subtype values. Updated the tests for the above operations that check the subtype to use these new subtype values. Updated LocalMemoryExecutionClient.applyResult to preserve subtype when updating an operation. Previously the subtype value was not passed on to the updated operation, causing test failures.
1 parent dbd1108 commit 9126337

10 files changed

Lines changed: 45 additions & 13 deletions

File tree

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ private void applyResult(Operation op, OperationResult result) {
190190
.id(op.id())
191191
.name(op.name())
192192
.type(op.type())
193+
.subType(op.subType())
193194
.action(action)
194195
.parentId(op.parentId())
195196
.payload(result.result())

sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,11 @@ public <T> DurableFuture<T> stepAsync(
146146

147147
// Create and start step operation with TypeToken
148148
var operation = new StepOperation<>(
149-
OperationIdentifier.of(operationId, name, OperationType.STEP), func, resultType, config, this);
149+
OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.STEP),
150+
func,
151+
resultType,
152+
config,
153+
this);
150154

151155
operation.execute(); // Start the step (returns immediately)
152156

@@ -161,8 +165,8 @@ public DurableFuture<Void> waitAsync(String name, Duration duration) {
161165
var operationId = nextOperationId();
162166

163167
// Create and start wait operation
164-
var operation =
165-
new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this);
168+
var operation = new WaitOperation(
169+
OperationIdentifier.of(operationId, name, OperationType.WAIT, OperationSubType.WAIT), duration, this);
166170

167171
operation.execute(); // Checkpoint the wait
168172
return operation;
@@ -187,7 +191,8 @@ public <T, U> DurableFuture<T> invokeAsync(
187191

188192
// Create and start invoke operation
189193
var operation = new InvokeOperation<>(
190-
OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE),
194+
OperationIdentifier.of(
195+
operationId, name, OperationType.CHAINED_INVOKE, OperationSubType.CHAINED_INVOKE),
191196
functionName,
192197
payload,
193198
resultType,
@@ -207,7 +212,10 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> res
207212
var operationId = nextOperationId();
208213

209214
var operation = new CallbackOperation<>(
210-
OperationIdentifier.of(operationId, name, OperationType.CALLBACK), resultType, config, this);
215+
OperationIdentifier.of(operationId, name, OperationType.CALLBACK, OperationSubType.CALLBACK),
216+
resultType,
217+
config,
218+
this);
211219
operation.execute();
212220

213221
return operation;

sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55
/**
66
* Fine-grained classification of durable operations beyond the basic operation types.
77
*
8-
* <p>Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the
9-
* {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs.
8+
* <p>Used as the {@code subType} field in checkpoint updates for all operations. Matches the {@code OperationSubType}
9+
* enum in the JavaScript and Python durable execution SDKs.
1010
*/
1111
public enum OperationSubType {
12+
STEP("Step"),
13+
WAIT("Wait"),
14+
CALLBACK("Callback"),
15+
CHAINED_INVOKE("ChainedInvoke"),
1216
RUN_IN_CHILD_CONTEXT("RunInChildContext"),
1317
MAP("Map"),
1418
MAP_ITERATION("MapIteration"),

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,8 @@ private Throwable translateException(Operation op, ErrorObject errorObject) {
248248
case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op);
249249

250250
// the following subtypes should not be able to reach here
251-
case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType());
251+
case PARALLEL, MAP, WAIT_FOR_CONDITION, STEP, WAIT, CALLBACK, CHAINED_INVOKE ->
252+
new IllegalStateException("Unexpected sub-type: " + getSubType());
252253
};
253254
}
254255

sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.lambda.durable.execution.ThreadContext;
2222
import software.amazon.lambda.durable.execution.ThreadType;
2323
import software.amazon.lambda.durable.model.DurableExecutionInput;
24+
import software.amazon.lambda.durable.model.OperationSubType;
2425

2526
class ReplayValidationTest {
2627
private static final String EXECUTION_NAME = "exec-name";
@@ -67,6 +68,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() {
6768
.id(OPERATION_ID1)
6869
.name("test")
6970
.type(OperationType.STEP)
71+
.subType(OperationSubType.STEP.getValue())
7072
.status(OperationStatus.SUCCEEDED)
7173
.stepDetails(StepDetails.builder().result("\"result\"").build())
7274
.build();
@@ -83,6 +85,7 @@ void shouldPassValidationWhenWaitTypeMatches() {
8385
var existingOp = Operation.builder()
8486
.id(OPERATION_ID1)
8587
.type(OperationType.WAIT)
88+
.subType(OperationSubType.WAIT.getValue())
8689
.status(OperationStatus.SUCCEEDED)
8790
.build();
8891

@@ -144,6 +147,7 @@ void shouldHandleNullNamesCorrectly() {
144147
.id(OPERATION_ID1)
145148
.name(null)
146149
.type(OperationType.STEP)
150+
.subType(OperationSubType.STEP.getValue())
147151
.status(OperationStatus.SUCCEEDED)
148152
.stepDetails(StepDetails.builder().result("\"result\"").build())
149153
.build();

sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2626
import software.amazon.lambda.durable.model.DurableExecutionInput;
2727
import software.amazon.lambda.durable.model.ExecutionStatus;
28+
import software.amazon.lambda.durable.model.OperationSubType;
2829

2930
class DurableExecutionTest {
3031

@@ -187,6 +188,7 @@ void testExecuteReplay() {
187188
.id(OPERATION_ID1)
188189
.name("step1")
189190
.type(OperationType.STEP)
191+
.subType(OperationSubType.STEP.getValue())
190192
.status(OperationStatus.SUCCEEDED)
191193
.stepDetails(StepDetails.builder().result("\"First\"").build())
192194
.build();

sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import software.amazon.lambda.durable.execution.ThreadType;
2727
import software.amazon.lambda.durable.model.DurableExecutionInput;
2828
import software.amazon.lambda.durable.model.OperationIdentifier;
29+
import software.amazon.lambda.durable.model.OperationSubType;
2930
import software.amazon.lambda.durable.serde.JacksonSerDes;
3031
import software.amazon.lambda.durable.serde.SerDes;
3132

@@ -34,7 +35,7 @@ class CallbackOperationTest {
3435
private static final String OPERATION_ID = TestUtils.hashOperationId("1");
3536
private static final String OPERATION_NAME = "approval";
3637
private static final OperationIdentifier OPERATION_IDENTIFIER =
37-
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK);
38+
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK, OperationSubType.CALLBACK);
3839
private static final String EXECUTION_NAME = "exec-name";
3940
private static final String EXECUTION_OP_ID = "123";
4041
private static final String EXECUTION_ARN = "arn:aws:lambda:us-east-1:123456789012:function:test/durable-execution/"
@@ -139,6 +140,7 @@ void replayReturnsExistingCallbackIdWhenSucceeded() {
139140
.id(OPERATION_ID)
140141
.name(OPERATION_NAME)
141142
.type(OperationType.CALLBACK)
143+
.subType(OperationSubType.CALLBACK.getValue())
142144
.status(OperationStatus.SUCCEEDED)
143145
.callbackDetails(CallbackDetails.builder()
144146
.callbackId("existing-callback-id")
@@ -165,6 +167,7 @@ void getReturnsDeserializedResultWhenSucceeded() {
165167
.id(OPERATION_ID)
166168
.name(OPERATION_NAME)
167169
.type(OperationType.CALLBACK)
170+
.subType(OperationSubType.CALLBACK.getValue())
168171
.status(OperationStatus.SUCCEEDED)
169172
.callbackDetails(CallbackDetails.builder()
170173
.callbackId("callback-id")
@@ -193,6 +196,7 @@ void getThrowsCallbackExceptionWhenFailed() {
193196
.id(OPERATION_ID)
194197
.name(OPERATION_NAME)
195198
.type(OperationType.CALLBACK)
199+
.subType(OperationSubType.CALLBACK.getValue())
196200
.status(OperationStatus.FAILED)
197201
.callbackDetails(CallbackDetails.builder()
198202
.callbackId("callback-id")
@@ -224,6 +228,7 @@ void getThrowsCallbackTimeoutExceptionWhenTimedOut() {
224228
.id(OPERATION_ID)
225229
.name(OPERATION_NAME)
226230
.type(OperationType.CALLBACK)
231+
.subType(OperationSubType.CALLBACK.getValue())
227232
.status(OperationStatus.TIMED_OUT)
228233
.callbackDetails(
229234
CallbackDetails.builder().callbackId("callback-id").build())
@@ -252,6 +257,7 @@ void operationUsesCustomSerDesWhenConfigContainsOne() {
252257
.id(OPERATION_ID)
253258
.name(OPERATION_NAME)
254259
.type(OperationType.CALLBACK)
260+
.subType(OperationSubType.CALLBACK.getValue())
255261
.status(OperationStatus.SUCCEEDED)
256262
.callbackDetails(CallbackDetails.builder()
257263
.callbackId("callback-id")
@@ -280,6 +286,7 @@ void operationUsesDefaultSerDesWhenConfigIsNull() {
280286
.id(OPERATION_ID)
281287
.name(OPERATION_NAME)
282288
.type(OperationType.CALLBACK)
289+
.subType(OperationSubType.CALLBACK.getValue())
283290
.status(OperationStatus.SUCCEEDED)
284291
.callbackDetails(CallbackDetails.builder()
285292
.callbackId("callback-id")
@@ -310,6 +317,7 @@ void operationUsesDefaultSerDesWhenConfigSerDesIsNull() {
310317
.id(OPERATION_ID)
311318
.name(OPERATION_NAME)
312319
.type(OperationType.CALLBACK)
320+
.subType(OperationSubType.CALLBACK.getValue())
313321
.status(OperationStatus.SUCCEEDED)
314322
.callbackDetails(CallbackDetails.builder()
315323
.callbackId("callback-id")
@@ -338,6 +346,7 @@ void getThrowsSerDesExceptionWithHelpfulMessageWhenDeserializationFails() {
338346
.id(OPERATION_ID)
339347
.name(OPERATION_NAME)
340348
.type(OperationType.CALLBACK)
349+
.subType(OperationSubType.CALLBACK.getValue())
341350
.status(OperationStatus.SUCCEEDED)
342351
.callbackDetails(CallbackDetails.builder()
343352
.callbackId("test-callback-123")

sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@
2525
import software.amazon.lambda.durable.execution.ThreadContext;
2626
import software.amazon.lambda.durable.execution.ThreadType;
2727
import software.amazon.lambda.durable.model.OperationIdentifier;
28+
import software.amazon.lambda.durable.model.OperationSubType;
2829
import software.amazon.lambda.durable.serde.JacksonSerDes;
2930

3031
class InvokeOperationTest {
3132
private static final String OPERATION_ID = "2";
3233
private static final String OPERATION_NAME = "test-invoke";
33-
private static final OperationIdentifier OPERATION_IDENTIFIER =
34-
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE);
34+
private static final OperationIdentifier OPERATION_IDENTIFIER = OperationIdentifier.of(
35+
OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE, OperationSubType.CHAINED_INVOKE);
3536

3637
private ExecutionManager executionManager;
3738
private DurableContextImpl durableContext;

sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import software.amazon.lambda.durable.execution.ThreadContext;
2525
import software.amazon.lambda.durable.execution.ThreadType;
2626
import software.amazon.lambda.durable.model.OperationIdentifier;
27+
import software.amazon.lambda.durable.model.OperationSubType;
2728
import software.amazon.lambda.durable.serde.JacksonSerDes;
2829

2930
class StepOperationTest {
@@ -32,7 +33,7 @@ class StepOperationTest {
3233
private static final String OPERATION_NAME = "test-step";
3334
private static final String RESULT = "result";
3435
private static final OperationIdentifier OPERATION_IDENTIFIER =
35-
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP);
36+
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP, OperationSubType.STEP);
3637
private ExecutionManager executionManager;
3738
private DurableContextImpl durableContext;
3839

sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
import software.amazon.lambda.durable.execution.ThreadContext;
2020
import software.amazon.lambda.durable.execution.ThreadType;
2121
import software.amazon.lambda.durable.model.OperationIdentifier;
22+
import software.amazon.lambda.durable.model.OperationSubType;
2223

2324
class WaitOperationTest {
2425
private static final String OPERATION_ID = "2";
2526
private static final String CONTEXT_ID = "handler";
2627
private static final String OPERATION_NAME = "test-wait";
2728
private static final OperationIdentifier OPERATION_IDENTIFIER =
28-
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT);
29+
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT, OperationSubType.WAIT);
2930
private ExecutionManager executionManager;
3031
private DurableContextImpl durableContext;
3132

0 commit comments

Comments
 (0)