Skip to content

Commit 96dea7a

Browse files
committed
move nextOperationId from DurableContext to ExecutionManager
1 parent c4f440f commit 96dea7a

13 files changed

Lines changed: 98 additions & 138 deletions

sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import com.amazonaws.services.lambda.runtime.Context;
1313
import java.time.Duration;
1414
import java.util.Objects;
15-
import java.util.concurrent.atomic.AtomicInteger;
1615
import java.util.function.Supplier;
1716
import org.slf4j.LoggerFactory;
1817

@@ -22,7 +21,6 @@ public class DurableContext {
2221
private final ExecutionManager executionManager;
2322
private final DurableConfig durableConfig;
2423
private final Context lambdaContext;
25-
private final AtomicInteger operationCounter;
2624
private final DurableLogger logger;
2725
private final ExecutionContext executionContext;
2826

@@ -31,7 +29,6 @@ public class DurableContext {
3129
this.executionManager = executionManager;
3230
this.durableConfig = durableConfig;
3331
this.lambdaContext = lambdaContext;
34-
this.operationCounter = new AtomicInteger(0);
3532
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
3633

3734
var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null;
@@ -89,11 +86,8 @@ public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Suppl
8986
if (config.serDes() == null) {
9087
config = config.toBuilder().serDes(durableConfig.getSerDes()).build();
9188
}
92-
var operationId = nextOperationId();
93-
9489
// Create and start step operation with TypeToken
95-
var operation = new StepOperation<>(
96-
operationId, name, func, typeToken, config, executionManager, logger, durableConfig);
90+
var operation = new StepOperation<>(name, func, typeToken, config, executionManager, logger, durableConfig);
9791

9892
operation.execute(); // Start the step (returns immediately)
9993

@@ -107,10 +101,8 @@ public Void wait(Duration duration) {
107101
}
108102

109103
public Void wait(String waitName, Duration duration) {
110-
var operationId = nextOperationId();
111-
112104
// Create and start wait operation
113-
var operation = new WaitOperation(operationId, waitName, duration, executionManager);
105+
var operation = new WaitOperation(waitName, duration, executionManager);
114106

115107
operation.execute(); // Checkpoint the wait
116108
return operation.get(); // Block (will throw SuspendExecutionException if needed)
@@ -176,11 +168,8 @@ public <T, U> DurableFuture<T> invokeAsync(
176168
if (config.payloadSerDes() == null) {
177169
config = config.toBuilder().payloadSerDes(durableConfig.getSerDes()).build();
178170
}
179-
var operationId = nextOperationId();
180-
181171
// Create and start invoke operation
182-
var operation =
183-
new InvokeOperation<>(operationId, name, functionName, payload, typeToken, config, executionManager);
172+
var operation = new InvokeOperation<>(name, functionName, payload, typeToken, config, executionManager);
184173

185174
operation.execute(); // checkpoint the invoke operation
186175
return operation; // Block (will throw SuspendExecutionException if needed)
@@ -204,9 +193,7 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> typ
204193
if (config.serDes() == null) {
205194
config = config.toBuilder().serDes(durableConfig.getSerDes()).build();
206195
}
207-
var operationId = nextOperationId();
208-
209-
var operation = new CallbackOperation<>(operationId, name, typeToken, config, executionManager);
196+
var operation = new CallbackOperation<>(name, typeToken, config, executionManager);
210197
operation.execute();
211198

212199
return operation;
@@ -234,11 +221,4 @@ public DurableLogger getLogger() {
234221
public ExecutionContext getExecutionContext() {
235222
return executionContext;
236223
}
237-
238-
// ============= internal utilities ===============
239-
240-
/** Get the next operationId (latest operationId + 1) */
241-
private String nextOperationId() {
242-
return String.valueOf(operationCounter.incrementAndGet());
243-
}
244224
}

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class CheckpointBatcher {
4949
MAX_ITEM_COUNT, MAX_BATCH_SIZE_BYTES, CheckpointBatcher::estimateSize, this::checkpointBatch);
5050
}
5151

52-
/** Queues a checkpoint request for batched execution */
52+
/** Queues a operation update for batched checkpoint */
5353
CompletableFuture<Void> checkpoint(OperationUpdate update) {
5454
logger.debug("Checkpoint request received: Action {}", update.action());
5555
return checkpointApiRequestBatcher.submit(update, config.getCheckpointDelay());

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Map;
1212
import java.util.concurrent.CompletableFuture;
1313
import java.util.concurrent.Phaser;
14+
import java.util.concurrent.atomic.AtomicInteger;
1415
import java.util.concurrent.atomic.AtomicReference;
1516
import java.util.stream.Collectors;
1617
import org.slf4j.Logger;
@@ -47,6 +48,7 @@ public class ExecutionManager {
4748
private final String executionOperationId;
4849
private final String durableExecutionArn;
4950
private final AtomicReference<ExecutionMode> executionMode;
51+
private final AtomicInteger operationCounter;
5052

5153
// ===== Thread Coordination =====
5254
private final Map<String, ThreadType> activeThreads = Collections.synchronizedMap(new HashMap<>());
@@ -62,6 +64,7 @@ public ExecutionManager(
6264
String checkpointToken,
6365
CheckpointUpdatedExecutionState initialExecutionState,
6466
DurableConfig config) {
67+
this.operationCounter = new AtomicInteger(0);
6568
this.durableExecutionArn = durableExecutionArn;
6669
this.executionOperationId = initialExecutionState.operations().get(0).id();
6770

@@ -217,7 +220,15 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId,
217220
return checkpointBatcher.pollForUpdate(operationId, delay);
218221
}
219222

220-
// ===== Utilities =====
223+
// ============= internal utilities ===============
224+
225+
/** Get the next operationId (latest operationId + 1) */
226+
public String nextOperationId() {
227+
return String.valueOf(operationCounter.incrementAndGet());
228+
}
229+
230+
// ============= lifecycle management =================
231+
221232
public void shutdown() {
222233
checkpointBatcher.shutdown();
223234
}

sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,20 @@ public abstract class BaseDurableOperation<T> implements DurableFuture<T> {
5454
private final Phaser phaser;
5555

5656
public BaseDurableOperation(
57-
String operationId,
5857
String name,
5958
OperationType operationType,
6059
TypeToken<T> resultTypeToken,
6160
SerDes resultSerDes,
6261
ExecutionManager executionManager) {
63-
this.operationId = operationId;
6462
this.name = name;
6563
this.operationType = operationType;
6664
this.executionManager = executionManager;
6765
this.resultTypeToken = resultTypeToken;
6866
this.resultSerDes = resultSerDes;
6967

68+
// get the next operation id from executionManager
69+
this.operationId = executionManager.nextOperationId();
70+
7071
// todo: phaser could be used only in ExecutionManager and invisible from operations.
7172
this.phaser = executionManager.startPhaser(operationId);
7273
}

sdk/src/main/java/com/amazonaws/lambda/durable/operation/CallbackOperation.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,8 @@ public class CallbackOperation<T> extends BaseDurableOperation<T> implements Dur
2525
private String callbackId;
2626

2727
public CallbackOperation(
28-
String operationId,
29-
String name,
30-
TypeToken<T> resultTypeToken,
31-
CallbackConfig config,
32-
ExecutionManager executionManager) {
33-
super(operationId, name, OperationType.CALLBACK, resultTypeToken, config.serDes(), executionManager);
28+
String name, TypeToken<T> resultTypeToken, CallbackConfig config, ExecutionManager executionManager) {
29+
super(name, OperationType.CALLBACK, resultTypeToken, config.serDes(), executionManager);
3430
this.config = config;
3531
}
3632

sdk/src/main/java/com/amazonaws/lambda/durable/operation/InvokeOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ public class InvokeOperation<T, U> extends BaseDurableOperation<T> {
2222
private final SerDes payloadSerDes;
2323

2424
public InvokeOperation(
25-
String operationId,
2625
String name,
2726
String functionName,
2827
U payload,
2928
TypeToken<T> resultTypeToken,
3029
InvokeConfig config,
3130
ExecutionManager executionManager) {
32-
super(operationId, name, OperationType.CHAINED_INVOKE, resultTypeToken, config.serDes(), executionManager);
31+
super(name, OperationType.CHAINED_INVOKE, resultTypeToken, config.serDes(), executionManager);
3332

3433
this.functionName = functionName;
3534
this.payload = payload;

sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ public class StepOperation<T> extends BaseDurableOperation<T> {
3737
private final ExecutorService userExecutor;
3838

3939
public StepOperation(
40-
String operationId,
4140
String name,
4241
Supplier<T> function,
4342
TypeToken<T> resultTypeToken,
4443
StepConfig config,
4544
ExecutionManager executionManager,
4645
DurableLogger durableLogger,
4746
DurableConfig durableConfig) {
48-
super(operationId, name, OperationType.STEP, resultTypeToken, config.serDes(), executionManager);
47+
super(name, OperationType.STEP, resultTypeToken, config.serDes(), executionManager);
4948

5049
this.function = function;
5150
this.config = config;

sdk/src/main/java/com/amazonaws/lambda/durable/operation/WaitOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ public class WaitOperation extends BaseDurableOperation<Void> {
2323

2424
private final Duration duration;
2525

26-
public WaitOperation(String operationId, String name, Duration duration, ExecutionManager executionManager) {
27-
super(operationId, name, OperationType.WAIT, TypeToken.get(Void.class), NOOP_SER_DES, executionManager);
26+
public WaitOperation(String name, Duration duration, ExecutionManager executionManager) {
27+
super(name, OperationType.WAIT, TypeToken.get(Void.class), NOOP_SER_DES, executionManager);
2828
this.duration = duration;
2929
}
3030

0 commit comments

Comments
 (0)