Skip to content

Commit bd511af

Browse files
authored
refactor code (#157)
1 parent f97ef29 commit bd511af

9 files changed

Lines changed: 133 additions & 156 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@
44

55
import com.amazonaws.services.lambda.runtime.Context;
66
import software.amazon.lambda.durable.execution.ExecutionManager;
7+
import software.amazon.lambda.durable.execution.SuspendExecutionException;
8+
import software.amazon.lambda.durable.execution.ThreadContext;
9+
import software.amazon.lambda.durable.execution.ThreadType;
710
import software.amazon.lambda.durable.logging.DurableLogger;
811

912
public abstract class BaseContext implements AutoCloseable {
10-
protected final ExecutionManager executionManager;
13+
private final ExecutionManager executionManager;
1114
private final DurableConfig durableConfig;
1215
private final Context lambdaContext;
1316
private final ExecutionContext executionContext;
1417
private final String contextId;
1518
private final String contextName;
19+
private final ThreadType threadType;
20+
1621
private boolean isReplaying;
1722

1823
/** Creates a new BaseContext instance. */
@@ -21,14 +26,19 @@ protected BaseContext(
2126
DurableConfig durableConfig,
2227
Context lambdaContext,
2328
String contextId,
24-
String contextName) {
29+
String contextName,
30+
ThreadType threadType) {
2531
this.executionManager = executionManager;
2632
this.durableConfig = durableConfig;
2733
this.lambdaContext = lambdaContext;
2834
this.contextId = contextId;
2935
this.contextName = contextName;
3036
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
3137
this.isReplaying = executionManager.hasOperationsForContext(contextId);
38+
this.threadType = threadType;
39+
40+
// write the thread id and type to thread local
41+
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
3242
}
3343

3444
// =============== accessors ================
@@ -96,4 +106,22 @@ boolean isReplaying() {
96106
void setExecutionMode() {
97107
this.isReplaying = false;
98108
}
109+
110+
public void close() {
111+
// this is called in the user thread, after the context's user code has completed
112+
if (getContextId() != null) {
113+
// if this is a child context or a step context, we need to
114+
// deregister the context's thread from the execution manager
115+
try {
116+
executionManager.deregisterActiveThread(getContextId());
117+
} catch (SuspendExecutionException e) {
118+
// Expected when this is the last active thread. Must catch here because:
119+
// 1/ This runs in a worker thread detached from handlerFuture
120+
// 2/ Uncaught exception would prevent stepAsync().get() from resume
121+
// Suspension/Termination is already signaled via
122+
// suspendExecutionFuture/terminateExecutionFuture
123+
// before the throw.
124+
}
125+
}
126+
}
99127
}

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.function.Supplier;
1616
import org.slf4j.LoggerFactory;
1717
import software.amazon.lambda.durable.execution.ExecutionManager;
18+
import software.amazon.lambda.durable.execution.ThreadType;
1819
import software.amazon.lambda.durable.logging.DurableLogger;
1920
import software.amazon.lambda.durable.model.OperationSubType;
2021
import software.amazon.lambda.durable.operation.CallbackOperation;
@@ -39,7 +40,7 @@ private DurableContext(
3940
Context lambdaContext,
4041
String contextId,
4142
String contextName) {
42-
super(executionManager, durableConfig, lambdaContext, contextId, contextName);
43+
super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
4344
this.operationCounter = new AtomicInteger(0);
4445
}
4546

@@ -66,7 +67,7 @@ public static DurableContext createRootContext(
6667
*/
6768
public DurableContext createChildContext(String childContextId, String childContextName) {
6869
return new DurableContext(
69-
executionManager, getDurableConfig(), getLambdaContext(), childContextId, childContextName);
70+
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
7071
}
7172

7273
/**
@@ -77,7 +78,12 @@ public DurableContext createChildContext(String childContextId, String childCont
7778
*/
7879
public StepContext createStepContext(String stepOperationId, String stepOperationName, int attempt) {
7980
return new StepContext(
80-
executionManager, getDurableConfig(), getLambdaContext(), stepOperationId, stepOperationName, attempt);
81+
getExecutionManager(),
82+
getDurableConfig(),
83+
getLambdaContext(),
84+
stepOperationId,
85+
stepOperationName,
86+
attempt);
8187
}
8288

8389
// ========== step methods ==========
@@ -170,18 +176,19 @@ public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Suppl
170176

171177
// ========== wait methods ==========
172178

173-
public Void wait(String waitName, Duration duration) {
174-
return waitAsync(waitName, duration).get(); // Block (will throw SuspendExecutionException if needed)
179+
public Void wait(String name, Duration duration) {
180+
// Block (will throw SuspendExecutionException if there is no active thread)
181+
return waitAsync(name, duration).get();
175182
}
176183

177-
public DurableFuture<Void> waitAsync(String waitName, Duration duration) {
184+
public DurableFuture<Void> waitAsync(String name, Duration duration) {
178185
ParameterValidator.validateDuration(duration, "Wait duration");
179-
ParameterValidator.validateOperationName(waitName);
186+
ParameterValidator.validateOperationName(name);
180187

181188
var operationId = nextOperationId();
182189

183190
// Create and start wait operation
184-
var operation = new WaitOperation(operationId, waitName, duration, this);
191+
var operation = new WaitOperation(operationId, name, duration, this);
185192

186193
operation.execute(); // Checkpoint the wait
187194
return operation;
@@ -444,6 +451,7 @@ public void close() {
444451
if (logger != null) {
445452
logger.close();
446453
}
454+
super.close();
447455
}
448456

449457
/**

sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2020
import software.amazon.lambda.durable.execution.ExecutionManager;
2121
import software.amazon.lambda.durable.execution.SuspendExecutionException;
22-
import software.amazon.lambda.durable.execution.ThreadContext;
23-
import software.amazon.lambda.durable.execution.ThreadType;
2422
import software.amazon.lambda.durable.model.DurableExecutionInput;
2523
import software.amazon.lambda.durable.model.DurableExecutionOutput;
2624
import software.amazon.lambda.durable.serde.SerDes;
@@ -46,8 +44,6 @@ public static <I, O> DurableExecutionOutput execute(
4644
executionManager.getExecutionOperation(), config.getSerDes(), inputType);
4745
// use try-with-resources to clear logger properties
4846
try (var context = DurableContext.createRootContext(executionManager, config, lambdaContext)) {
49-
// Create context in the executor thread so it detects the correct thread name
50-
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
5147
return handler.apply(userInput, context);
5248
}
5349
},

sdk/src/main/java/software/amazon/lambda/durable/StepContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.amazonaws.services.lambda.runtime.Context;
66
import org.slf4j.LoggerFactory;
77
import software.amazon.lambda.durable.execution.ExecutionManager;
8+
import software.amazon.lambda.durable.execution.ThreadType;
89
import software.amazon.lambda.durable.logging.DurableLogger;
910

1011
public class StepContext extends BaseContext {
@@ -26,7 +27,7 @@ protected StepContext(
2627
String stepOperationId,
2728
String stepOperationName,
2829
int attempt) {
29-
super(executionManager, durableConfig, lambdaContext, stepOperationId, stepOperationName);
30+
super(executionManager, durableConfig, lambdaContext, stepOperationId, stepOperationName, ThreadType.STEP);
3031
this.attempt = attempt;
3132
}
3233

@@ -54,5 +55,6 @@ public void close() {
5455
if (logger != null) {
5556
logger.close();
5657
}
58+
super.close();
5759
}
5860
}

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,6 @@ public String getOperationId() {
8181
return operationId;
8282
}
8383

84-
/** Gets the unique thread id */
85-
protected String getThreadId() {
86-
return getOperationId() + "-" + getType().name().toLowerCase();
87-
}
88-
8984
/** Gets the operation name (maybe null). */
9085
public String getName() {
9186
return name;
@@ -186,7 +181,7 @@ protected Operation waitForOperationCompletion() {
186181
completionFuture.thenRun(() -> registerActiveThread(threadContext.threadId()));
187182

188183
// Deregister the current thread to allow suspension
189-
deregisterActiveThread(threadContext.threadId());
184+
executionManager.deregisterActiveThread(threadContext.threadId());
190185
}
191186
}
192187

@@ -245,10 +240,6 @@ protected T terminateExecutionWithIllegalDurableOperationException(String messag
245240
}
246241

247242
// advanced thread and context control
248-
protected void deregisterActiveThread(String threadId) {
249-
executionManager.deregisterActiveThread(threadId);
250-
}
251-
252243
protected void registerActiveThread(String threadId) {
253244
executionManager.registerActiveThread(threadId);
254245
}
@@ -257,10 +248,6 @@ protected ThreadContext getCurrentThreadContext() {
257248
return executionManager.getCurrentThreadContext();
258249
}
259250

260-
protected void setCurrentThreadContext(ThreadContext threadContext) {
261-
executionManager.setCurrentThreadContext(threadContext);
262-
}
263-
264251
// polling and checkpointing
265252
protected CompletableFuture<Operation> pollForOperationUpdates() {
266253
return executionManager.pollForOperationUpdates(operationId);

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import software.amazon.lambda.durable.exception.StepInterruptedException;
2727
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2828
import software.amazon.lambda.durable.execution.SuspendExecutionException;
29-
import software.amazon.lambda.durable.execution.ThreadContext;
30-
import software.amazon.lambda.durable.execution.ThreadType;
3129
import software.amazon.lambda.durable.model.OperationSubType;
3230
import software.amazon.lambda.durable.serde.SerDes;
3331
import software.amazon.lambda.durable.util.ExceptionHelper;
@@ -96,7 +94,9 @@ protected void replay(Operation existing) {
9694
private void executeChildContext() {
9795
// The operationId is already globally unique (prefixed by parent context path via
9896
// DurableContext.nextOperationId), so we use it directly as the contextId.
99-
// E.g., root child context "1", nested child context "1-2", deeply nested "1-2-1".
97+
// E.g., first level child context "hash(1)",
98+
// second level child context "hash(hash(1)-2)",
99+
// third level child context "hash(hash(hash(1)-2)-1)".
100100
var contextId = getOperationId();
101101

102102
// Thread registration is intentionally split across two threads:
@@ -107,35 +107,34 @@ private void executeChildContext() {
107107
// registerActiveThread is idempotent (no-op if already registered).
108108
registerActiveThread(contextId);
109109

110-
CompletableFuture.runAsync(
111-
() -> {
112-
setCurrentThreadContext(new ThreadContext(contextId, ThreadType.CONTEXT));
113-
// use a try-with-resources to clear logger properties
114-
try (var childContext = getContext().createChildContext(contextId, getName())) {
115-
try {
116-
T result = function.apply(childContext);
117-
118-
if (replayChildContext) {
119-
// Replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
120-
// Advance the phaser so get() doesn't block waiting for a checkpoint response.
121-
this.reconstructedResult = result;
122-
markAlreadyCompleted();
123-
return;
124-
}
125-
126-
checkpointSuccess(result);
127-
} catch (Throwable e) {
128-
handleChildContextFailure(e);
129-
} finally {
130-
try {
131-
deregisterActiveThread(contextId);
132-
} catch (SuspendExecutionException e) {
133-
// Expected when this is the last active thread — suspension already signaled
134-
}
135-
}
136-
}
137-
},
138-
userExecutor);
110+
Runnable userHandler = () -> {
111+
// use a try-with-resources to
112+
// - add thread id/type to thread local when the step starts
113+
// - clear logger properties when the step finishes
114+
try (var childContext = getContext().createChildContext(contextId, getName())) {
115+
try {
116+
T result = function.apply(childContext);
117+
118+
handleChildContextSuccess(result);
119+
} catch (Throwable e) {
120+
handleChildContextFailure(e);
121+
}
122+
}
123+
};
124+
125+
// Execute user provided child context code in user-configured executor
126+
CompletableFuture.runAsync(userHandler, userExecutor);
127+
}
128+
129+
private void handleChildContextSuccess(T result) {
130+
if (replayChildContext) {
131+
// Replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
132+
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
133+
this.reconstructedResult = result;
134+
markAlreadyCompleted();
135+
} else {
136+
checkpointSuccess(result);
137+
}
139138
}
140139

141140
private void checkpointSuccess(T result) {

0 commit comments

Comments
 (0)