Skip to content

Commit 5e210a3

Browse files
committed
add waitForCallback
1 parent 2677372 commit 5e210a3

19 files changed

Lines changed: 347 additions & 96 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ Your durable function extends `DurableHandler<I, O>` and implements `handleReque
2121
- `ctx.stepAsync()` – Start a concurrent step
2222
- `ctx.wait()` – Suspend execution without compute charges
2323
- `ctx.createCallback()` – Wait for external events (approvals, webhooks)
24+
- `ctx.waitForCallback()` – Run an isolated child context with its own checkpoint log
25+
- `ctx.waitForCallbackAsync()` – Start a concurrent child context
2426
- `ctx.invoke()` – Invoke another Lambda function and wait for the result
2527
- `ctx.invokeAsync()` – Start a concurrent Lambda function invocation
2628
- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log

docs/core/callbacks.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Callbacks suspend execution until an external system sends a result. Use this fo
77
DurableCallbackFuture<String> callback = ctx.createCallback("approval", String.class);
88

99
// Send the callback ID to an external system within a step
10-
ctx.step("send-notification", String.class, () -> {
10+
ctx.step("send-notification", String.class, stepCtx -> {
1111
notificationService.sendApprovalRequest(callback.callbackId(), requestDetails);
1212
return "notification-sent";
1313
});
@@ -18,6 +18,16 @@ String approvalResult = callback.get();
1818

1919
The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload.
2020

21+
#### waitForCallback() ####
22+
23+
`waitForCallback` simplifies callback handling by combining callback creation and submission in one operation. The SDK creates the callback, executes your submitter function with the callback ID, and waits for the result.
24+
25+
```java
26+
ctx.waitForCallback("send-notification", String.class, (callbackId, stepCtx) -> {
27+
notificationService.sendApprovalRequest(callbackId, requestDetails);
28+
})
29+
```
30+
2131
#### Callback Configuration
2232

2333
Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization:
@@ -30,6 +40,12 @@ var config = CallbackConfig.builder()
3040
.build();
3141

3242
var callback = ctx.createCallback("approval", String.class, config);
43+
44+
var waitForCallbackConfig = WaitForCallbackConfig.builder()
45+
.callbackConfig(config)
46+
.stepConfig(StepConfig.builder().retryStrategy(...).build())
47+
.build();
48+
ctx.waitForCallback("approval", String.class, callbackId -> sendApprovalRequest(callbackId), waitForCallbackConfig);
3349
```
3450

3551
| Option | Description |

examples/src/main/java/software/amazon/lambda/durable/examples/CallbackExample.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
4444

4545
var config = CallbackConfig.builder().timeout(timeout).build();
4646

47+
var preapprovalCallback = context.waitForCallbackAsync("preapproval", String.class, (callbackId, ctx) -> {
48+
ctx.getLogger().info("Sending callback {} to preapproval system", callbackId);
49+
});
50+
4751
var callback = context.createCallback("approval", String.class, config);
4852

4953
// Step 2.5: Log AWS CLI command to complete the callback
@@ -57,15 +61,14 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
5761
return null;
5862
});
5963

64+
var preapprovalResult = preapprovalCallback.get();
65+
6066
// Step 3: Wait for external approval (suspends execution)
6167
var approvalResult = callback.get();
6268

6369
// Step 4: Process the approval
64-
var result = context.step("process-approval", String.class, () -> {
65-
return prepared + " - " + approvalResult;
66-
});
67-
68-
return result;
70+
return context.step(
71+
"process-approval", String.class, () -> prepared + " - " + preapprovalResult + " - " + approvalResult);
6972
}
7073
}
7174

examples/src/test/java/software/amazon/lambda/durable/examples/CallbackExampleTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,19 @@ void testCallbackExampleCompletesAfterApproval() {
4646
var callbackId = runner.getCallbackId("approval");
4747
runner.completeCallback(callbackId, "\"Approved by manager\"");
4848

49-
// Second run - callback complete, finishes processing
49+
result = runner.run(input);
50+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
51+
52+
// second run - pending preapproval
53+
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
54+
runner.completeCallback(preapprovalCallbackId, "\"Sent to preapprover\"");
55+
56+
// third run - callback complete, finishes processing
5057
result = runner.run(input);
5158

5259
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
5360
assertEquals(
54-
"Approval request for: New laptop ($1500.0) - Approved by manager", result.getResult(String.class));
61+
"Approval request for: New laptop ($1500.0) - Sent to preapprover - Approved by manager",
62+
result.getResult(String.class));
5563
}
5664
}

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,16 @@ void testErrorHandlingExample() {
275275
@Test
276276
void testCallbackExample() throws Exception {
277277
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
278+
var lambda = LambdaClient.create();
278279

279280
// Start async execution
280281
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));
281282

283+
execution.pollUntil(exec -> exec.hasCallback("preapproval-callback"));
284+
var preapprovalCallbackId = execution.getCallbackId("preapproval-callback");
285+
lambda.sendDurableExecutionCallbackSuccess(
286+
req -> req.callbackId(preapprovalCallbackId).result(SdkBytes.fromUtf8String("\"preapproved\"")));
287+
282288
// Wait for callback to appear
283289
execution.pollUntil(exec -> exec.hasCallback("approval"));
284290

@@ -287,7 +293,6 @@ void testCallbackExample() throws Exception {
287293
assertNotNull(callbackId);
288294

289295
// Complete the callback using AWS SDK
290-
var lambda = LambdaClient.create();
291296
lambda.sendDurableExecutionCallbackSuccess(
292297
req -> req.callbackId(callbackId).result(SdkBytes.fromUtf8String("\"approved\"")));
293298

@@ -297,6 +302,7 @@ void testCallbackExample() throws Exception {
297302

298303
var finalResult = result.getResult(String.class);
299304
assertNotNull(finalResult);
305+
assertTrue(finalResult.contains("preapproved"));
300306
assertTrue(finalResult.contains("Approval request for: Purchase order"));
301307
assertTrue(finalResult.contains("5000"));
302308
assertTrue(finalResult.contains("approved"));

examples/src/test/java/software/amazon/lambda/durable/examples/GenericTypesExampleTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ class GenericTypesExampleTest {
1212
@Test
1313
void testGenericTypesExample() {
1414
var handler = new GenericTypesExample();
15-
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
16-
.withSkipTime(true);
15+
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);
1716

1817
var input = new GenericTypesExample.Input("user123");
1918
var result = runner.run(input);
@@ -48,8 +47,7 @@ void testGenericTypesExample() {
4847
@Test
4948
void testOperationTracking() {
5049
var handler = new GenericTypesExample();
51-
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler)
52-
.withSkipTime(true);
50+
var runner = LocalDurableTestRunner.create(GenericTypesExample.Input.class, handler);
5351

5452
var input = new GenericTypesExample.Input("user456");
5553
var result = runner.run(input);

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ChildContextIntegrationTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ void childContextResultSurvivesReplay() {
2323
var childExecutionCount = new AtomicInteger(0);
2424

2525
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
26-
return ctx.runInChildContext("compute", String.class, child -> {
26+
return ctx.runInChildContext("compute", TypeToken.get(String.class), child -> {
2727
childExecutionCount.incrementAndGet();
2828
return child.step("work", String.class, () -> "result-" + input);
2929
});
@@ -191,7 +191,6 @@ void waitInsideChildContextSuspendsAndResumes() {
191191
return child.step("after-wait", String.class, () -> "done");
192192
});
193193
});
194-
runner.withSkipTime(true);
195194

196195
var result = runner.runUntilComplete("test");
197196
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
@@ -211,8 +210,6 @@ void waitInsideChildContextReturnsPendingThenCompletes() {
211210
return child.step("after-wait", String.class, () -> "done");
212211
});
213212
});
214-
runner.withSkipTime(false);
215-
216213
// First run - should suspend at the wait
217214
var result = runner.run("test");
218215
assertEquals(ExecutionStatus.PENDING, result.getStatus());
@@ -246,16 +243,13 @@ void twoAsyncChildContextsBothWaitSuspendAndResume() {
246243
return f1.get() + "+" + f2.get();
247244
});
248245

249-
runner.withSkipTime(false);
250-
251246
// First run - both child contexts should suspend at their waits
252247
// TODO: Using run() + runUntilComplete() instead of manual run/advanceTime/run due to a
253248
// thread coordination race condition that causes flakiness on slow CI workers.
254249
var result = runner.run("test");
255250
assertEquals(ExecutionStatus.PENDING, result.getStatus());
256251

257252
// Now let runUntilComplete handle the rest (with skipTime so waits auto-advance)
258-
runner.withSkipTime(true);
259253
var finalResult = runner.runUntilComplete("test");
260254
assertEquals(ExecutionStatus.SUCCEEDED, finalResult.getStatus());
261255
assertEquals("a-done+b-done", finalResult.getResult(String.class));
@@ -285,7 +279,6 @@ void oneChildWaitsWhileOtherKeepsProcessingSuspendsAfterWorkDone() {
285279
});
286280
return busy.get() + "|" + waiting.get();
287281
});
288-
runner.withSkipTime(false);
289282

290283
// First run: busy child completes its work, but waiter's wait is still outstanding → PENDING
291284
var result = runner.run("test");

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CustomConfigIntegrationTest.java

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,18 @@ void testCustomConfig_WithMultipleSteps() {
9898
var customConfig = DurableConfig.builder().withSerDes(customSerDes).build();
9999

100100
var runner = LocalDurableTestRunner.create(
101-
String.class,
102-
(input, context) -> {
103-
var step1 = context.step("step1", String.class, () -> "Step1: " + input);
101+
String.class,
102+
(input, context) -> {
103+
var step1 = context.step("step1", String.class, () -> "Step1: " + input);
104104

105-
// Remove wait operation to avoid complexity in this test
106-
var step2 = context.step("step2", String.class, () -> "Step2: " + input);
105+
// Remove wait operation to avoid complexity in this test
106+
var step2 = context.step("step2", String.class, () -> "Step2: " + input);
107107

108-
var step3 = context.step("step3", String.class, () -> "Step3: " + input);
108+
var step3 = context.step("step3", String.class, () -> "Step3: " + input);
109109

110-
return step1 + " | " + step2 + " | " + step3;
111-
},
112-
customConfig)
113-
.withSkipTime(true);
110+
return step1 + " | " + step2 + " | " + step3;
111+
},
112+
customConfig);
114113

115114
var result = runner.run("test");
116115

@@ -137,24 +136,21 @@ void testCustomConfig_WithRetry() {
137136
var customConfig = DurableConfig.builder().withSerDes(customSerDes).build();
138137

139138
var runner = LocalDurableTestRunner.create(
140-
String.class,
141-
(input, context) -> {
142-
return context.step(
143-
"retry-step",
144-
String.class,
145-
() -> {
146-
int currentAttempt = attemptCount.incrementAndGet();
147-
// Always fail to test retry behavior (like existing RetryIntegrationTest)
148-
throw new RuntimeException("Simulated failure attempt " + currentAttempt);
149-
},
150-
StepConfig.builder()
151-
.retryStrategy(
152-
software.amazon.lambda.durable.retry.RetryStrategies.Presets
153-
.DEFAULT)
154-
.build());
155-
},
156-
customConfig)
157-
.withSkipTime(true);
139+
String.class,
140+
(input, context) -> {
141+
return context.step(
142+
"retry-step",
143+
String.class,
144+
() -> {
145+
int currentAttempt = attemptCount.incrementAndGet();
146+
// Always fail to test retry behavior (like existing RetryIntegrationTest)
147+
throw new RuntimeException("Simulated failure attempt " + currentAttempt);
148+
},
149+
StepConfig.builder()
150+
.retryStrategy(software.amazon.lambda.durable.retry.RetryStrategies.Presets.DEFAULT)
151+
.build());
152+
},
153+
customConfig);
158154

159155
// First run should return PENDING (retry scheduled) - matching existing RetryIntegrationTest pattern
160156
var result = runner.run("test");

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/IntegrationTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ void testFullWaitOperation() {
104104
var step2 = context.step("step2", String.class, () -> "Step 2 done");
105105
return new TestOutput(step1 + " + " + step2);
106106
});
107-
runner.withSkipTime(true);
108107

109108
var result = runner.runUntilComplete(new TestInput("test"));
110109

@@ -167,7 +166,6 @@ void testOperationFiltering() {
167166
context.step("good-step", String.class, () -> "ok");
168167
return "done";
169168
});
170-
runner.withSkipTime(true);
171169

172170
var result = runner.runUntilComplete(new TestInput("test"));
173171

@@ -184,9 +182,8 @@ void testWaitOperationWithManualAdvance() {
184182
context.wait(Duration.ofSeconds(5));
185183
return "done";
186184
});
187-
runner.withSkipTime(false);
188185

189-
var result = runner.runUntilComplete(new TestInput("test"));
186+
var result = runner.run(new TestInput("test"));
190187

191188
assertEquals(ExecutionStatus.PENDING, result.getStatus());
192189
assertEquals(1, result.getSucceededOperations().size());

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ void testAtLeastOnceReExecutesAfterCheckpointFailure() {
143143
.semantics(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
144144
.build());
145145
});
146-
runner.withSkipTime(true);
147146

148147
runner.run("test");
149148
assertEquals(1, executionCount.get());

0 commit comments

Comments
 (0)