Skip to content

Commit 2677372

Browse files
authored
[feature]: add ContextId and ContextName to child context logger (aws#134)
* lazy initialize loggers * rename close method * minor code improvements
1 parent d057869 commit 2677372

16 files changed

Lines changed: 329 additions & 245 deletions

File tree

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ void testManyAsyncStepsExample() {
389389
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
390390

391391
var finalResult = result.getResult(String.class);
392+
System.out.println("ManyAsyncStepsExample result: " + finalResult);
392393
assertNotNull(finalResult);
393394
assertTrue(finalResult.contains("500 async steps"));
394395
assertTrue(finalResult.contains("249500")); // Sum of 0..499 * 2

sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,27 @@
66
import software.amazon.lambda.durable.execution.ExecutionManager;
77
import software.amazon.lambda.durable.logging.DurableLogger;
88

9-
public abstract class BaseContext {
9+
public abstract class BaseContext implements AutoCloseable {
1010
protected final ExecutionManager executionManager;
1111
private final DurableConfig durableConfig;
1212
private final Context lambdaContext;
1313
private final ExecutionContext executionContext;
1414
private final String contextId;
15+
private final String contextName;
1516
private boolean isReplaying;
1617

1718
/** Creates a new BaseContext instance. */
1819
protected BaseContext(
19-
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) {
20+
ExecutionManager executionManager,
21+
DurableConfig durableConfig,
22+
Context lambdaContext,
23+
String contextId,
24+
String contextName) {
2025
this.executionManager = executionManager;
2126
this.durableConfig = durableConfig;
2227
this.lambdaContext = lambdaContext;
2328
this.contextId = contextId;
29+
this.contextName = contextName;
2430
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
2531
this.isReplaying = executionManager.hasOperationsForContext(contextId);
2632
}
@@ -71,6 +77,10 @@ public String getContextId() {
7177
return contextId;
7278
}
7379

80+
public String getContextName() {
81+
return contextName;
82+
}
83+
7484
public ExecutionManager getExecutionManager() {
7585
return executionManager;
7686
}

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,17 @@
2020

2121
public class DurableContext extends BaseContext {
2222
private final AtomicInteger operationCounter;
23-
private final DurableLogger logger;
23+
private volatile DurableLogger logger;
2424

2525
/** Shared initialization — sets all fields. */
2626
private DurableContext(
27-
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) {
28-
super(executionManager, durableConfig, lambdaContext, contextId);
27+
ExecutionManager executionManager,
28+
DurableConfig durableConfig,
29+
Context lambdaContext,
30+
String contextId,
31+
String contextName) {
32+
super(executionManager, durableConfig, lambdaContext, contextId, contextName);
2933
this.operationCounter = new AtomicInteger(0);
30-
31-
var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null;
32-
this.logger = new DurableLogger(
33-
LoggerFactory.getLogger(DurableContext.class),
34-
executionManager,
35-
requestId,
36-
durableConfig.getLoggerConfig().suppressReplayLogs());
3734
}
3835

3936
/**
@@ -48,7 +45,7 @@ private DurableContext(
4845
*/
4946
public static DurableContext createRootContext(
5047
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) {
51-
return new DurableContext(executionManager, durableConfig, lambdaContext, null);
48+
return new DurableContext(executionManager, durableConfig, lambdaContext, null, null);
5249
}
5350

5451
/**
@@ -57,8 +54,9 @@ public static DurableContext createRootContext(
5754
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
5855
* @return a new DurableContext for the child context
5956
*/
60-
public DurableContext createChildContext(String childContextId) {
61-
return new DurableContext(executionManager, getDurableConfig(), getLambdaContext(), childContextId);
57+
public DurableContext createChildContext(String childContextId, String childContextName) {
58+
return new DurableContext(
59+
executionManager, getDurableConfig(), getLambdaContext(), childContextId, childContextName);
6260
}
6361

6462
/**
@@ -67,8 +65,9 @@ public DurableContext createChildContext(String childContextId) {
6765
* @param stepOperationId the ID of the step operation (used for thread registration)
6866
* @return a new StepContext instance
6967
*/
70-
public StepContext createStepContext(String stepOperationId) {
71-
return new StepContext(executionManager, getDurableConfig(), getLambdaContext(), stepOperationId);
68+
public StepContext createStepContext(String stepOperationId, String stepOperationName, int attempt) {
69+
return new StepContext(
70+
executionManager, getDurableConfig(), getLambdaContext(), stepOperationId, stepOperationName, attempt);
7271
}
7372

7473
// ========== step methods ==========
@@ -305,9 +304,28 @@ public <T> DurableFuture<T> runInChildContextAsync(
305304
* @return the durable logger
306305
*/
307306
public DurableLogger getLogger() {
307+
// lazy initialize logger
308+
if (logger == null) {
309+
synchronized (this) {
310+
if (logger == null) {
311+
logger = new DurableLogger(LoggerFactory.getLogger(DurableContext.class), this);
312+
}
313+
}
314+
}
308315
return logger;
309316
}
310317

318+
/**
319+
* Clears the logger's thread properties. Called during context destruction to prevent memory leaks and ensure clean
320+
* state for subsequent executions.
321+
*/
322+
@Override
323+
public void close() {
324+
if (logger != null) {
325+
logger.close();
326+
}
327+
}
328+
311329
/**
312330
* Get the next operationId. For root contexts, returns sequential IDs like "1", "2", "3". For child contexts,
313331
* prefixes with the contextId to ensure global uniqueness, e.g. "1-1", "1-2" for operations inside child context

sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -38,52 +38,45 @@ public static <I, O> DurableExecutionOutput execute(
3838
Class<I> inputType,
3939
BiFunction<I, DurableContext, O> handler,
4040
DurableConfig config) {
41-
var executionManager = new ExecutionManager(
42-
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);
43-
44-
var handlerFuture = CompletableFuture.supplyAsync(
45-
() -> {
46-
var userInput =
47-
extractUserInput(executionManager.getExecutionOperation(), config.getSerDes(), inputType);
48-
// Create context in the executor thread so it detects the correct thread name
49-
var context = DurableContext.createRootContext(executionManager, config, lambdaContext);
50-
executionManager.registerActiveThread(null);
51-
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
52-
return handler.apply(userInput, context);
53-
},
54-
config.getExecutorService()); // Get executor from config for running user code
55-
56-
// Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function
57-
// will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a failure.
58-
return executionManager
59-
.runUntilCompleteOrSuspend(handlerFuture)
60-
.handle((result, ex) -> {
61-
if (ex != null) {
62-
// an exception thrown from handlerFuture or suspension/termination occurred
63-
Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex);
64-
if (cause instanceof SuspendExecutionException) {
65-
return DurableExecutionOutput.pending();
41+
try (var executionManager = new ExecutionManager(input, config)) {
42+
executionManager.registerActiveThread(null);
43+
var handlerFuture = CompletableFuture.supplyAsync(
44+
() -> {
45+
var userInput = extractUserInput(
46+
executionManager.getExecutionOperation(), config.getSerDes(), inputType);
47+
// use try-with-resources to clear logger properties
48+
try (var context = DurableContext.createRootContext(executionManager, config, lambdaContext)) {
49+
// Create context in the executor thread so it detects the correct thread name
50+
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
51+
return handler.apply(userInput, context);
6652
}
53+
},
54+
config.getExecutorService()); // Get executor from config for running user code
55+
56+
// Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function
57+
// will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a
58+
// failure.
59+
return executionManager
60+
.runUntilCompleteOrSuspend(handlerFuture)
61+
.handle((result, ex) -> {
62+
if (ex != null) {
63+
// an exception thrown from handlerFuture or suspension/termination occurred
64+
Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex);
65+
if (cause instanceof SuspendExecutionException) {
66+
return DurableExecutionOutput.pending();
67+
}
68+
69+
logger.debug("Execution failed: {}", cause.getMessage());
70+
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
71+
}
72+
// user handler complete successfully
73+
var outputPayload = config.getSerDes().serialize(result);
6774

68-
logger.debug("Execution failed: {}", cause.getMessage());
69-
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
70-
}
71-
// user handler complete successfully
72-
var outputPayload = config.getSerDes().serialize(result);
73-
74-
logger.debug("Execution completed");
75-
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
76-
})
77-
.whenComplete((v, ex) -> {
78-
// We shutdown the execution to make sure remaining checkpoint calls in the queue are drained
79-
// We DO NOT shutdown the executor since it should stay warm for re-invokes against a warm Lambda
80-
// runtime.
81-
// For example, a re-invoke after a wait should re-use the same executor instance from
82-
// DurableConfig.
83-
// userExecutor.shutdown();
84-
executionManager.shutdown();
85-
})
86-
.join();
75+
logger.debug("Execution completed");
76+
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
77+
})
78+
.join();
79+
}
8780
}
8881

8982
private static String handleLargePayload(ExecutionManager executionManager, String outputPayload) {

sdk/src/main/java/software/amazon/lambda/durable/StepContext.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import software.amazon.lambda.durable.logging.DurableLogger;
99

1010
public class StepContext extends BaseContext {
11-
private final DurableLogger logger;
11+
private volatile DurableLogger logger;
12+
private final int attempt;
1213

1314
/**
1415
* Creates a new StepContext instance for use in step operations.
@@ -22,19 +23,36 @@ protected StepContext(
2223
ExecutionManager executionManager,
2324
DurableConfig durableConfig,
2425
Context lambdaContext,
25-
String stepOperationId) {
26-
super(executionManager, durableConfig, lambdaContext, stepOperationId);
26+
String stepOperationId,
27+
String stepOperationName,
28+
int attempt) {
29+
super(executionManager, durableConfig, lambdaContext, stepOperationId, stepOperationName);
30+
this.attempt = attempt;
31+
}
2732

28-
var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null;
29-
this.logger = new DurableLogger(
30-
LoggerFactory.getLogger(StepContext.class),
31-
executionManager,
32-
requestId,
33-
durableConfig.getLoggerConfig().suppressReplayLogs());
33+
/** @return the current attempt */
34+
public int getAttempt() {
35+
return attempt;
3436
}
3537

3638
@Override
3739
public DurableLogger getLogger() {
40+
// lazy initialize logger
41+
if (logger == null) {
42+
synchronized (this) {
43+
if (logger == null) {
44+
logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this);
45+
}
46+
}
47+
}
3848
return logger;
3949
}
50+
51+
/** Closes the logger for this context. */
52+
@Override
53+
public void close() {
54+
if (logger != null) {
55+
logger.close();
56+
}
57+
}
4058
}

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
2323
import software.amazon.lambda.durable.DurableConfig;
2424
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
25+
import software.amazon.lambda.durable.model.DurableExecutionInput;
2526
import software.amazon.lambda.durable.operation.BaseDurableOperation;
2627

2728
/**
@@ -45,7 +46,7 @@
4546
*
4647
* @see InternalExecutor
4748
*/
48-
public class ExecutionManager {
49+
public class ExecutionManager implements AutoCloseable {
4950

5051
private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
5152

@@ -65,25 +66,21 @@ public class ExecutionManager {
6566
// ===== Checkpoint Batching =====
6667
private final CheckpointBatcher checkpointBatcher;
6768

68-
public ExecutionManager(
69-
String durableExecutionArn,
70-
String checkpointToken,
71-
CheckpointUpdatedExecutionState initialExecutionState,
72-
DurableConfig config) {
73-
this.durableExecutionArn = durableExecutionArn;
69+
public ExecutionManager(DurableExecutionInput input, DurableConfig config) {
70+
this.durableExecutionArn = input.durableExecutionArn();
7471

7572
// Create checkpoint batcher for internal coordination
7673
this.checkpointBatcher =
77-
new CheckpointBatcher(config, durableExecutionArn, checkpointToken, this::onCheckpointComplete);
74+
new CheckpointBatcher(config, durableExecutionArn, input.checkpointToken(), this::onCheckpointComplete);
7875

79-
this.operationStorage = checkpointBatcher.fetchAllPages(initialExecutionState).stream()
76+
this.operationStorage = checkpointBatcher.fetchAllPages(input.initialExecutionState()).stream()
8077
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
8178

8279
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
8380
this.executionMode =
8481
new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
8582

86-
executionOp = findExecutionOp(initialExecutionState);
83+
executionOp = findExecutionOp(input.initialExecutionState());
8784

8885
// Validate initial operation is an EXECUTION operation
8986
if (executionOp == null) {
@@ -248,7 +245,9 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId,
248245
}
249246

250247
// ===== Utilities =====
251-
public void shutdown() {
248+
/** Shutdown the checkpoint batcher. */
249+
@Override
250+
public void close() {
252251
checkpointBatcher.shutdown();
253252
}
254253

0 commit comments

Comments
 (0)