Skip to content

Commit 894d67b

Browse files
authored
fix: per context replaying (#481)
1 parent b4b5461 commit 894d67b

9 files changed

Lines changed: 42 additions & 35 deletions

File tree

docs/adr/004-child-context-execution.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Inner operation IDs are prefixed with the parent context's operation ID using `-
4646

4747
### Per-context replay state
4848

49-
A global `executionMode` doesn't work for child contexts — a child may be replaying while the parent is already executing. Each `DurableContext` tracks its own replay state via an `isReplaying` field, initialized by checking `ExecutionManager.hasOperationsForContext(contextId)`.
49+
A global `executionMode` doesn't work for child contexts — a child may be replaying while the parent is already executing. Each `DurableContext` tracks its own replay state via an `isReplaying` field, initialized by checking `ExecutionManager.hasOperationsForContext(contextId)`. `StepContext` does not track replay state because steps are retried by attempt, not replayed as independent contexts.
5050

5151
### Thread model
5252

docs/design.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ SuspendExecutionException # Internal: triggers suspension (not
519519

520520
Terminal states (SUCCEEDED, FAILED, CANCELLED, TIMED_OUT, STOPPED) stay in REPLAY mode since we're just returning cached results.
521521

522-
This is a one-way transition (REPLAY → EXECUTION, never back). `DurableLogger` checks `isReplaying()` to suppress duplicate logs during replay.
522+
This is a one-way transition (REPLAY → EXECUTION, never back). `DurableLogger` checks `DurableContext.isReplaying()` to suppress duplicate logs during replay; `StepContext` logs are attempt-based and are never replay-suppressed.
523523

524524
### MDC-Based Context Enrichment
525525

@@ -896,4 +896,3 @@ var result = stepFuture.get();
896896
| 6 | `wait()` returns. `stepFuture.get()` → result already available. |||
897897

898898
If the wait duration hasn't elapsed when the step completes, the execution is suspended. If the step finishes *after* the wait, the step thread keeps the execution alive (prevents suspension) while the wait polls to completion.
899-

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ static DurableContext getCurrentContext() {
2626
return (DurableContext) BaseContext.getCurrentContext();
2727
}
2828

29+
/** Returns whether this context is currently replaying checkpointed durable operations. */
30+
boolean isReplaying();
31+
2932
/**
3033
* Executes a durable step with the given name and blocks until it completes.
3134
*

sdk/src/main/java/software/amazon/lambda/durable/context/BaseContext.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,4 @@ static BaseContext getCurrentContext() {
6262

6363
/** Gets the context name for this context. Null for root context. */
6464
String getContextName();
65-
66-
/** Returns whether this context is currently in replay mode. */
67-
boolean isReplaying();
6865
}

sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ public abstract class BaseContextImpl implements BaseContext {
1717
private final String contextName;
1818
private final ThreadType threadType;
1919

20-
private boolean isReplaying;
21-
2220
/**
2321
* Creates a new BaseContext instance.
2422
*
@@ -41,7 +39,6 @@ protected BaseContextImpl(
4139
this.lambdaContext = lambdaContext;
4240
this.contextId = contextId;
4341
this.contextName = contextName;
44-
this.isReplaying = executionManager.hasOperationsForContext(contextId);
4542
this.threadType = threadType;
4643
}
4744

@@ -99,19 +96,6 @@ public ExecutionManager getExecutionManager() {
9996
return executionManager;
10097
}
10198

102-
/** Returns whether this context is currently in replay mode. */
103-
@Override
104-
public boolean isReplaying() {
105-
return isReplaying;
106-
}
107-
108-
/**
109-
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
110-
*/
111-
public void setExecutionMode() {
112-
this.isReplaying = false;
113-
}
114-
11599
/** Returns a durable logger for this context. */
116100
public DurableLogger getLogger() {
117101
return DurableLogger.INSTANCE;

sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class DurableContextImpl extends BaseContextImpl implements DurableContex
6060
private final OperationIdGenerator operationIdGenerator;
6161
private final DurableContextImpl parentContext;
6262
private final boolean isVirtual;
63+
private boolean isReplaying;
6364

6465
/** Shared initialization — sets all fields. */
6566
private DurableContextImpl(
@@ -74,6 +75,7 @@ private DurableContextImpl(
7475
operationIdGenerator = new OperationIdGenerator(contextId);
7576
this.parentContext = parentContext;
7677
this.isVirtual = isVirtual;
78+
this.isReplaying = executionManager.hasOperationsForContext(contextId);
7779
}
7880

7981
/**
@@ -437,6 +439,19 @@ private String nextOperationId() {
437439
return operationIdGenerator.nextOperationId();
438440
}
439441

442+
/** Returns whether this context is currently in replay mode. */
443+
@Override
444+
public boolean isReplaying() {
445+
return isReplaying;
446+
}
447+
448+
/**
449+
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
450+
*/
451+
public void setExecutionMode() {
452+
this.isReplaying = false;
453+
}
454+
440455
/**
441456
* Get the parent context ID for its child operations, which always points to a non-virtual context
442457
*

sdk/src/main/java/software/amazon/lambda/durable/context/StepContextImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
/**
1212
* Context available inside a step operation's user function.
1313
*
14-
* <p>Provides access to the current retry attempt number and a logger that includes execution metadata. Extends
15-
* {@link BaseContext} for thread lifecycle management.
14+
* <p>Provides access to the current retry attempt number and a logger that includes execution metadata. Steps are
15+
* retried by attempt rather than replayed, so this context does not track replay state.
1616
*/
1717
public class StepContextImpl extends BaseContextImpl implements StepContext {
1818
private final int attempt;

sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,14 @@ public void error(String message, Throwable t) {
122122
}
123123

124124
private boolean shouldSuppress(BaseContext context) {
125-
return context.getDurableConfig().getLoggerConfig().suppressReplayLogs() && context.isReplaying();
125+
return context instanceof DurableContext durableContext
126+
&& context.getDurableConfig().getLoggerConfig().suppressReplayLogs()
127+
&& durableContext.isReplaying();
126128
}
127129

128130
private void log(Runnable logAction) {
129131
var threadLocalContext = BaseContext.getCurrentContext();
130-
if (threadLocalContext == null || !shouldSuppress(threadLocalContext)) {
132+
if (!shouldSuppress(threadLocalContext)) {
131133
logAction.run();
132134
}
133135
}

sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,9 @@ void setsExecutionMdcOnFirstLog() {
114114
@Test
115115
void setStepThreadPropertiesSetsMdc() {
116116
var logger = new DurableLogger(new RecordingLogger().delegate());
117-
var replaying = new AtomicBoolean(false);
118117

119118
BaseContextImpl.setCurrentContext(
120-
createStepContext(replaying, LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2));
119+
createStepContext(LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2));
121120
DurableLogger.attachContext();
122121
try {
123122
logger.info("step log");
@@ -145,6 +144,20 @@ void clearThreadPropertiesRemovesMdc() {
145144
assertNull(MDC.get(DurableLogger.MDC_REQUEST_ID));
146145
}
147146

147+
@Test
148+
void stepLogsAreNotSuppressed() {
149+
var recordingLogger = new RecordingLogger();
150+
var logger = new DurableLogger(recordingLogger.delegate());
151+
152+
withContext(createStepContext(LoggerConfig.defaults(), REQUEST_ID, "op-1", "validateOrder", 2), () -> {
153+
logger.info("step logs should always emit");
154+
});
155+
156+
assertEquals(1, recordingLogger.calls().size());
157+
assertEquals(
158+
"step logs should always emit", recordingLogger.calls().get(0).message());
159+
}
160+
148161
@Test
149162
void replayModeTransitionAllowsSubsequentLogs() {
150163
var recordingLogger = new RecordingLogger();
@@ -223,12 +236,7 @@ private static DurableContext createDurableContext(
223236
}
224237

225238
private static StepContext createStepContext(
226-
AtomicBoolean replaying,
227-
LoggerConfig loggerConfig,
228-
String requestId,
229-
String operationId,
230-
String operationName,
231-
int attempt) {
239+
LoggerConfig loggerConfig, String requestId, String operationId, String operationName, int attempt) {
232240
return (StepContext) Proxy.newProxyInstance(
233241
StepContext.class.getClassLoader(),
234242
new Class<?>[] {StepContext.class},
@@ -239,7 +247,6 @@ private static StepContext createStepContext(
239247
case "getContextId" -> operationId;
240248
case "getContextName" -> operationName;
241249
case "getAttempt" -> attempt;
242-
case "isReplaying" -> replaying.get();
243250
case "toString" -> "TestStepContext";
244251
case "hashCode" -> System.identityHashCode(proxy);
245252
case "equals" -> proxy == args[0];

0 commit comments

Comments
 (0)