Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private void applyResult(Operation op, OperationResult result) {
.id(op.id())
.name(op.name())
.type(op.type())
.subType(op.subType())
.action(action)
.parentId(op.parentId())
.payload(result.result())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ public <T> DurableFuture<T> stepAsync(

// Create and start step operation with TypeToken
var operation = new StepOperation<>(
OperationIdentifier.of(operationId, name, OperationType.STEP), func, resultType, config, this);
OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.STEP),
func,
resultType,
config,
this);

operation.execute(); // Start the step (returns immediately)

Expand All @@ -161,8 +165,8 @@ public DurableFuture<Void> waitAsync(String name, Duration duration) {
var operationId = nextOperationId();

// Create and start wait operation
var operation =
new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this);
var operation = new WaitOperation(
OperationIdentifier.of(operationId, name, OperationType.WAIT, OperationSubType.WAIT), duration, this);

operation.execute(); // Checkpoint the wait
return operation;
Expand All @@ -187,7 +191,8 @@ public <T, U> DurableFuture<T> invokeAsync(

// Create and start invoke operation
var operation = new InvokeOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE),
OperationIdentifier.of(
operationId, name, OperationType.CHAINED_INVOKE, OperationSubType.CHAINED_INVOKE),
functionName,
payload,
resultType,
Expand All @@ -207,7 +212,10 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> res
var operationId = nextOperationId();

var operation = new CallbackOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CALLBACK), resultType, config, this);
OperationIdentifier.of(operationId, name, OperationType.CALLBACK, OperationSubType.CALLBACK),
resultType,
config,
this);
operation.execute();

return operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
/**
* Fine-grained classification of durable operations beyond the basic operation types.
*
* <p>Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the
* {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs.
* <p>Used as the {@code subType} field in checkpoint updates for all operations. Matches the {@code OperationSubType}
* enum in the JavaScript and Python durable execution SDKs.
*/
public enum OperationSubType {
STEP("Step"),
WAIT("Wait"),
CALLBACK("Callback"),
CHAINED_INVOKE("ChainedInvoke"),
RUN_IN_CHILD_CONTEXT("RunInChildContext"),
MAP("Map"),
MAP_ITERATION("MapIteration"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ private Throwable translateException(Operation op, ErrorObject errorObject) {
case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op);

// the following subtypes should not be able to reach here
case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType());
case PARALLEL, MAP, WAIT_FOR_CONDITION, STEP, WAIT, CALLBACK, CHAINED_INVOKE ->
new IllegalStateException("Unexpected sub-type: " + getSubType());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.model.OperationSubType;

class ReplayValidationTest {
private static final String EXECUTION_NAME = "exec-name";
Expand Down Expand Up @@ -67,6 +68,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() {
.id(OPERATION_ID1)
.name("test")
.type(OperationType.STEP)
.subType(OperationSubType.STEP.getValue())
.status(OperationStatus.SUCCEEDED)
.stepDetails(StepDetails.builder().result("\"result\"").build())
.build();
Expand All @@ -83,6 +85,7 @@ void shouldPassValidationWhenWaitTypeMatches() {
var existingOp = Operation.builder()
.id(OPERATION_ID1)
.type(OperationType.WAIT)
.subType(OperationSubType.WAIT.getValue())
.status(OperationStatus.SUCCEEDED)
.build();

Expand Down Expand Up @@ -144,6 +147,7 @@ void shouldHandleNullNamesCorrectly() {
.id(OPERATION_ID1)
.name(null)
.type(OperationType.STEP)
.subType(OperationSubType.STEP.getValue())
.status(OperationStatus.SUCCEEDED)
.stepDetails(StepDetails.builder().result("\"result\"").build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.model.OperationSubType;

class DurableExecutionTest {

Expand Down Expand Up @@ -187,6 +188,7 @@ void testExecuteReplay() {
.id(OPERATION_ID1)
.name("step1")
.type(OperationType.STEP)
.subType(OperationSubType.STEP.getValue())
.status(OperationStatus.SUCCEEDED)
.stepDetails(StepDetails.builder().result("\"First\"").build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.model.OperationSubType;
import software.amazon.lambda.durable.serde.JacksonSerDes;
import software.amazon.lambda.durable.serde.SerDes;

Expand All @@ -34,7 +35,7 @@ class CallbackOperationTest {
private static final String OPERATION_ID = TestUtils.hashOperationId("1");
private static final String OPERATION_NAME = "approval";
private static final OperationIdentifier OPERATION_IDENTIFIER =
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK);
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK, OperationSubType.CALLBACK);
private static final String EXECUTION_NAME = "exec-name";
private static final String EXECUTION_OP_ID = "123";
private static final String EXECUTION_ARN = "arn:aws:lambda:us-east-1:123456789012:function:test/durable-execution/"
Expand Down Expand Up @@ -139,6 +140,7 @@ void replayReturnsExistingCallbackIdWhenSucceeded() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.SUCCEEDED)
.callbackDetails(CallbackDetails.builder()
.callbackId("existing-callback-id")
Expand All @@ -165,6 +167,7 @@ void getReturnsDeserializedResultWhenSucceeded() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.SUCCEEDED)
.callbackDetails(CallbackDetails.builder()
.callbackId("callback-id")
Expand Down Expand Up @@ -193,6 +196,7 @@ void getThrowsCallbackExceptionWhenFailed() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.FAILED)
.callbackDetails(CallbackDetails.builder()
.callbackId("callback-id")
Expand Down Expand Up @@ -224,6 +228,7 @@ void getThrowsCallbackTimeoutExceptionWhenTimedOut() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.TIMED_OUT)
.callbackDetails(
CallbackDetails.builder().callbackId("callback-id").build())
Expand Down Expand Up @@ -252,6 +257,7 @@ void operationUsesCustomSerDesWhenConfigContainsOne() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.SUCCEEDED)
.callbackDetails(CallbackDetails.builder()
.callbackId("callback-id")
Expand Down Expand Up @@ -280,6 +286,7 @@ void operationUsesDefaultSerDesWhenConfigIsNull() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.SUCCEEDED)
.callbackDetails(CallbackDetails.builder()
.callbackId("callback-id")
Expand Down Expand Up @@ -310,6 +317,7 @@ void operationUsesDefaultSerDesWhenConfigSerDesIsNull() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.SUCCEEDED)
.callbackDetails(CallbackDetails.builder()
.callbackId("callback-id")
Expand Down Expand Up @@ -338,6 +346,7 @@ void getThrowsSerDesExceptionWithHelpfulMessageWhenDeserializationFails() {
.id(OPERATION_ID)
.name(OPERATION_NAME)
.type(OperationType.CALLBACK)
.subType(OperationSubType.CALLBACK.getValue())
.status(OperationStatus.SUCCEEDED)
.callbackDetails(CallbackDetails.builder()
.callbackId("test-callback-123")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.model.OperationSubType;
import software.amazon.lambda.durable.serde.JacksonSerDes;

class InvokeOperationTest {
private static final String OPERATION_ID = "2";
private static final String OPERATION_NAME = "test-invoke";
private static final OperationIdentifier OPERATION_IDENTIFIER =
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE);
private static final OperationIdentifier OPERATION_IDENTIFIER = OperationIdentifier.of(
OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE, OperationSubType.CHAINED_INVOKE);

private ExecutionManager executionManager;
private DurableContextImpl durableContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.model.OperationSubType;
import software.amazon.lambda.durable.serde.JacksonSerDes;

class StepOperationTest {
Expand All @@ -32,7 +33,7 @@ class StepOperationTest {
private static final String OPERATION_NAME = "test-step";
private static final String RESULT = "result";
private static final OperationIdentifier OPERATION_IDENTIFIER =
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP);
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP, OperationSubType.STEP);
private ExecutionManager executionManager;
private DurableContextImpl durableContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.model.OperationSubType;

class WaitOperationTest {
private static final String OPERATION_ID = "2";
private static final String CONTEXT_ID = "handler";
private static final String OPERATION_NAME = "test-wait";
private static final OperationIdentifier OPERATION_IDENTIFIER =
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT);
OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT, OperationSubType.WAIT);
private ExecutionManager executionManager;
private DurableContextImpl durableContext;

Expand Down
Loading