Skip to content

Commit cb78812

Browse files
authored
[refactor]: add subType to all operations (#196)
* add subType to all operations * add validation of subType
1 parent 64dc8b4 commit cb78812

17 files changed

Lines changed: 161 additions & 142 deletions

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/HistoryEventProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ private Operation createContextOperation(String id, String name, OperationStatus
359359
.name(name)
360360
.status(status)
361361
.type(OperationType.CONTEXT)
362+
.subType(event.subType())
362363
.contextDetails(builder.build())
363364
.build();
364365
}

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalMemoryExecutionClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ private Operation toOperation(OperationUpdate update) {
189189
.id(update.id())
190190
.name(update.name())
191191
.type(update.type())
192+
.subType(update.subType())
192193
.parentId(update.parentId())
193194
.status(deriveStatus(update.action()));
194195

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
import java.util.function.Function;
1515
import java.util.function.Supplier;
1616
import org.slf4j.LoggerFactory;
17+
import software.amazon.awssdk.services.lambda.model.OperationType;
1718
import software.amazon.lambda.durable.execution.ExecutionManager;
1819
import software.amazon.lambda.durable.execution.ThreadType;
1920
import software.amazon.lambda.durable.logging.DurableLogger;
21+
import software.amazon.lambda.durable.model.OperationIdentifier;
2022
import software.amazon.lambda.durable.model.OperationSubType;
2123
import software.amazon.lambda.durable.operation.CallbackOperation;
2224
import software.amazon.lambda.durable.operation.ChildContextOperation;
@@ -132,7 +134,8 @@ public <T> DurableFuture<T> stepAsync(
132134
var operationId = nextOperationId();
133135

134136
// Create and start step operation with TypeToken
135-
var operation = new StepOperation<>(operationId, name, func, typeToken, config, this);
137+
var operation = new StepOperation<>(
138+
new OperationIdentifier(operationId, name, OperationType.STEP, null), func, typeToken, config, this);
136139

137140
operation.execute(); // Start the step (returns immediately)
138141

@@ -201,7 +204,8 @@ public DurableFuture<Void> waitAsync(String name, Duration duration) {
201204
var operationId = nextOperationId();
202205

203206
// Create and start wait operation
204-
var operation = new WaitOperation(operationId, name, duration, this);
207+
var operation =
208+
new WaitOperation(new OperationIdentifier(operationId, name, OperationType.WAIT, null), duration, this);
205209

206210
operation.execute(); // Checkpoint the wait
207211
return operation;
@@ -274,7 +278,13 @@ public <T, U> DurableFuture<T> invokeAsync(
274278
var operationId = nextOperationId();
275279

276280
// Create and start invoke operation
277-
var operation = new InvokeOperation<>(operationId, name, functionName, payload, typeToken, config, this);
281+
var operation = new InvokeOperation<>(
282+
new OperationIdentifier(operationId, name, OperationType.CHAINED_INVOKE, null),
283+
functionName,
284+
payload,
285+
typeToken,
286+
config,
287+
this);
278288

279289
operation.execute(); // checkpoint the invoke operation
280290
return operation; // Block (will throw SuspendExecutionException if needed)
@@ -302,7 +312,8 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> typ
302312
}
303313
var operationId = nextOperationId();
304314

305-
var operation = new CallbackOperation<>(operationId, name, typeToken, config, this);
315+
var operation = new CallbackOperation<>(
316+
new OperationIdentifier(operationId, name, OperationType.CALLBACK, null), typeToken, config, this);
306317
operation.execute();
307318

308319
return operation;
@@ -335,7 +346,11 @@ private <T> DurableFuture<T> runInChildContextAsync(
335346
var operationId = nextOperationId();
336347

337348
var operation = new ChildContextOperation<>(
338-
operationId, name, func, subType, typeToken, getDurableConfig().getSerDes(), this);
349+
new OperationIdentifier(operationId, name, OperationType.CONTEXT, subType),
350+
func,
351+
typeToken,
352+
getDurableConfig().getSerDes(),
353+
this);
339354

340355
operation.execute();
341356
return operation;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.model;
4+
5+
import software.amazon.awssdk.services.lambda.model.OperationType;
6+
7+
public record OperationIdentifier(
8+
String operationId, String name, OperationType operationType, OperationSubType subType) {}

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import software.amazon.lambda.durable.execution.ExecutionManager;
2323
import software.amazon.lambda.durable.execution.ThreadContext;
2424
import software.amazon.lambda.durable.execution.ThreadType;
25+
import software.amazon.lambda.durable.model.OperationIdentifier;
26+
import software.amazon.lambda.durable.model.OperationSubType;
2527
import software.amazon.lambda.durable.serde.SerDes;
2628
import software.amazon.lambda.durable.util.ExceptionHelper;
2729

@@ -46,26 +48,20 @@
4648
public abstract class BaseDurableOperation<T> implements DurableFuture<T> {
4749
private static final Logger logger = LoggerFactory.getLogger(BaseDurableOperation.class);
4850

49-
private final String operationId;
50-
private final String name;
51-
private final OperationType operationType;
51+
private final OperationIdentifier operationIdentifier;
5252
private final ExecutionManager executionManager;
5353
private final TypeToken<T> resultTypeToken;
5454
private final SerDes resultSerDes;
5555
protected final CompletableFuture<Void> completionFuture;
5656
private final DurableContext durableContext;
5757

5858
protected BaseDurableOperation(
59-
String operationId,
60-
String name,
61-
OperationType operationType,
59+
OperationIdentifier operationIdentifier,
6260
TypeToken<T> resultTypeToken,
6361
SerDes resultSerDes,
6462
DurableContext durableContext) {
65-
this.operationId = operationId;
66-
this.name = name;
63+
this.operationIdentifier = operationIdentifier;
6764
this.durableContext = durableContext;
68-
this.operationType = operationType;
6965
this.executionManager = durableContext.getExecutionManager();
7066
this.resultTypeToken = resultTypeToken;
7167
this.resultSerDes = resultSerDes;
@@ -76,14 +72,19 @@ protected BaseDurableOperation(
7672
executionManager.registerOperation(this);
7773
}
7874

75+
/** Gets the operation identifier containing id, name, type, and subType. */
76+
public OperationSubType getSubType() {
77+
return operationIdentifier.subType();
78+
}
79+
7980
/** Gets the unique identifier for this operation. */
8081
public String getOperationId() {
81-
return operationId;
82+
return operationIdentifier.operationId();
8283
}
8384

8485
/** Gets the operation name (maybe null). */
8586
public String getName() {
86-
return name;
87+
return operationIdentifier.name();
8788
}
8889

8990
/** Gets the parent context. */
@@ -93,7 +94,7 @@ protected DurableContext getContext() {
9394

9495
/** Gets the operation type */
9596
public OperationType getType() {
96-
return operationType;
97+
return operationIdentifier.operationType();
9798
}
9899

99100
/** Starts the operation, processes the operation updates from backend. Does not block. */
@@ -202,7 +203,7 @@ public void onCheckpointComplete(Operation operation) {
202203
if (ExecutionManager.isTerminalStatus(operation.status())) {
203204
// This method handles only terminal status updates. Override this method if a DurableOperation needs to
204205
// handle other updates.
205-
logger.trace("In onCheckpointComplete, completing operation {} ({})", operationId, completionFuture);
206+
logger.trace("In onCheckpointComplete, completing operation {} ({})", getOperationId(), completionFuture);
206207
// It's important that we synchronize access to the future, otherwise the processing could happen
207208
// on someone else's thread and cause a race condition.
208209
synchronized (completionFuture) {
@@ -219,7 +220,7 @@ public void onCheckpointComplete(Operation operation) {
219220
protected void markAlreadyCompleted() {
220221
// When the operation is already completed in a replay, we complete completionFuture immediately
221222
// so that the `get` method will be unblocked and the context thread will be registered
222-
logger.trace("In markAlreadyCompleted, completing operation: {} ({}).", operationId, completionFuture);
223+
logger.trace("In markAlreadyCompleted, completing operation: {} ({}).", getOperationId(), completionFuture);
223224

224225
// It's important that we synchronize access to the future, otherwise the processing could happen
225226
// on someone else's thread and cause a race condition.
@@ -250,21 +251,21 @@ protected ThreadContext getCurrentThreadContext() {
250251

251252
// polling and checkpointing
252253
protected CompletableFuture<Operation> pollForOperationUpdates() {
253-
return executionManager.pollForOperationUpdates(operationId);
254+
return executionManager.pollForOperationUpdates(getOperationId());
254255
}
255256

256257
protected CompletableFuture<Operation> pollForOperationUpdates(Duration delay) {
257-
return executionManager.pollForOperationUpdates(operationId, delay);
258+
return executionManager.pollForOperationUpdates(getOperationId(), delay);
258259
}
259260

260261
protected void sendOperationUpdate(OperationUpdate.Builder builder) {
261262
sendOperationUpdateAsync(builder).join();
262263
}
263264

264265
protected CompletableFuture<Void> sendOperationUpdateAsync(OperationUpdate.Builder builder) {
265-
return executionManager.sendOperationUpdate(builder.id(operationId)
266-
.name(name)
267-
.type(operationType)
266+
return executionManager.sendOperationUpdate(builder.id(getOperationId())
267+
.name(getName())
268+
.type(getType())
268269
.parentId(durableContext.getContextId())
269270
.build());
270271
}
@@ -329,13 +330,21 @@ protected void validateReplay(Operation checkpointed) {
329330
if (!checkpointed.type().equals(getType())) {
330331
terminateExecution(new NonDeterministicExecutionException(String.format(
331332
"Operation type mismatch for \"%s\". Expected %s, got %s",
332-
operationId, checkpointed.type(), getType())));
333+
getOperationId(), checkpointed.type(), getType())));
333334
}
334335

335336
if (!Objects.equals(checkpointed.name(), getName())) {
336337
terminateExecution(new NonDeterministicExecutionException(String.format(
337338
"Operation name mismatch for \"%s\". Expected \"%s\", got \"%s\"",
338-
operationId, checkpointed.name(), getName())));
339+
getOperationId(), checkpointed.name(), getName())));
340+
}
341+
342+
if ((getSubType() == null && checkpointed.subType() != null)
343+
|| getSubType() != null
344+
&& !Objects.equals(checkpointed.subType(), getSubType().getValue())) {
345+
terminateExecution(new NonDeterministicExecutionException(String.format(
346+
"Operation subType mismatch for \"%s\". Expected \"%s\", got \"%s\"",
347+
getOperationId(), checkpointed.subType(), getSubType())));
339348
}
340349
}
341350
}

sdk/src/main/java/software/amazon/lambda/durable/operation/CallbackOperation.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import software.amazon.awssdk.services.lambda.model.CallbackOptions;
66
import software.amazon.awssdk.services.lambda.model.Operation;
77
import software.amazon.awssdk.services.lambda.model.OperationAction;
8-
import software.amazon.awssdk.services.lambda.model.OperationType;
98
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
109
import software.amazon.lambda.durable.CallbackConfig;
1110
import software.amazon.lambda.durable.DurableCallbackFuture;
1211
import software.amazon.lambda.durable.DurableContext;
1312
import software.amazon.lambda.durable.TypeToken;
1413
import software.amazon.lambda.durable.exception.CallbackFailedException;
1514
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
15+
import software.amazon.lambda.durable.model.OperationIdentifier;
1616

1717
/** Durable operation for creating and waiting on external callbacks. */
1818
public class CallbackOperation<T> extends BaseDurableOperation<T> implements DurableCallbackFuture<T> {
@@ -22,12 +22,11 @@ public class CallbackOperation<T> extends BaseDurableOperation<T> implements Dur
2222
private String callbackId;
2323

2424
public CallbackOperation(
25-
String operationId,
26-
String name,
25+
OperationIdentifier operationIdentifier,
2726
TypeToken<T> resultTypeToken,
2827
CallbackConfig config,
2928
DurableContext durableContext) {
30-
super(operationId, name, OperationType.CALLBACK, resultTypeToken, config.serDes(), durableContext);
29+
super(operationIdentifier, resultTypeToken, config.serDes(), durableContext);
3130
this.config = config;
3231
}
3332

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import software.amazon.lambda.durable.exception.StepInterruptedException;
2727
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2828
import software.amazon.lambda.durable.execution.SuspendExecutionException;
29-
import software.amazon.lambda.durable.model.OperationSubType;
29+
import software.amazon.lambda.durable.model.OperationIdentifier;
3030
import software.amazon.lambda.durable.serde.SerDes;
3131
import software.amazon.lambda.durable.util.ExceptionHelper;
3232

@@ -44,28 +44,25 @@ public class ChildContextOperation<T> extends BaseDurableOperation<T> {
4444
private final ExecutorService userExecutor;
4545
private boolean replayChildContext;
4646
private T reconstructedResult;
47-
private final OperationSubType subType;
4847

4948
public ChildContextOperation(
50-
String operationId,
51-
String name,
49+
OperationIdentifier operationIdentifier,
5250
Function<DurableContext, T> function,
53-
OperationSubType subType,
5451
TypeToken<T> resultTypeToken,
5552
SerDes resultSerDes,
5653
DurableContext durableContext) {
57-
super(operationId, name, OperationType.CONTEXT, resultTypeToken, resultSerDes, durableContext);
54+
super(operationIdentifier, resultTypeToken, resultSerDes, durableContext);
5855
this.function = function;
5956
this.userExecutor = getContext().getDurableConfig().getExecutorService();
60-
this.subType = subType;
6157
}
6258

6359
/** Starts the operation. */
6460
@Override
6561
protected void start() {
6662
// First execution: fire-and-forget START checkpoint, then run
67-
sendOperationUpdateAsync(
68-
OperationUpdate.builder().action(OperationAction.START).subType(subType.getValue()));
63+
sendOperationUpdateAsync(OperationUpdate.builder()
64+
.action(OperationAction.START)
65+
.subType(getSubType().getValue()));
6966
executeChildContext();
7067
}
7168

@@ -144,15 +141,15 @@ private void checkpointSuccess(T result) {
144141
if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
145142
sendOperationUpdate(OperationUpdate.builder()
146143
.action(OperationAction.SUCCEED)
147-
.subType(subType.getValue())
144+
.subType(getSubType().getValue())
148145
.payload(serialized));
149146
} else {
150147
// Large result: checkpoint with empty payload + ReplayChildren flag.
151148
// Store the result so get() can return it directly without deserializing the empty payload.
152149
this.reconstructedResult = result;
153150
sendOperationUpdate(OperationUpdate.builder()
154151
.action(OperationAction.SUCCEED)
155-
.subType(subType.getValue())
152+
.subType(getSubType().getValue())
156153
.payload("")
157154
.contextOptions(
158155
ContextOptions.builder().replayChildren(true).build()));
@@ -178,7 +175,7 @@ private void handleChildContextFailure(Throwable exception) {
178175

179176
sendOperationUpdate(OperationUpdate.builder()
180177
.action(OperationAction.FAIL)
181-
.subType(subType.getValue())
178+
.subType(getSubType().getValue())
182179
.error(errorObject));
183180
}
184181

@@ -204,7 +201,7 @@ public T get() {
204201
}
205202

206203
// throw a general failed exception if a user exception is not reconstructed
207-
return switch (subType) {
204+
return switch (getSubType()) {
208205
case WAIT_FOR_CALLBACK -> handleWaitForCallbackFailure(op);
209206
// todo: handle MAP/PARALLEL
210207
case MAP -> throw new ChildContextFailedException(op);

sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import software.amazon.awssdk.services.lambda.model.ChainedInvokeOptions;
66
import software.amazon.awssdk.services.lambda.model.Operation;
77
import software.amazon.awssdk.services.lambda.model.OperationAction;
8-
import software.amazon.awssdk.services.lambda.model.OperationType;
98
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
109
import software.amazon.lambda.durable.DurableContext;
1110
import software.amazon.lambda.durable.InvokeConfig;
@@ -14,6 +13,7 @@
1413
import software.amazon.lambda.durable.exception.InvokeFailedException;
1514
import software.amazon.lambda.durable.exception.InvokeStoppedException;
1615
import software.amazon.lambda.durable.exception.InvokeTimedOutException;
16+
import software.amazon.lambda.durable.model.OperationIdentifier;
1717
import software.amazon.lambda.durable.serde.SerDes;
1818

1919
public class InvokeOperation<T, U> extends BaseDurableOperation<T> {
@@ -23,14 +23,13 @@ public class InvokeOperation<T, U> extends BaseDurableOperation<T> {
2323
private final SerDes payloadSerDes;
2424

2525
public InvokeOperation(
26-
String operationId,
27-
String name,
26+
OperationIdentifier operationIdentifier,
2827
String functionName,
2928
U payload,
3029
TypeToken<T> resultTypeToken,
3130
InvokeConfig config,
3231
DurableContext durableContext) {
33-
super(operationId, name, OperationType.CHAINED_INVOKE, resultTypeToken, config.serDes(), durableContext);
32+
super(operationIdentifier, resultTypeToken, config.serDes(), durableContext);
3433

3534
this.functionName = functionName;
3635
this.payload = payload;

0 commit comments

Comments
 (0)