Skip to content

Commit 39d2143

Browse files
maschnetworkzhongkechenphipag
authored
feat: createCallback implementation and asyncTest setup
* feat: initial callback design * feat: implemented callback with serdes and samples * Added Callback Serdes * Simplified Callback Tests * Implemented Cloud based testing for callback * Finalize tests * removed temp design * Adapted readme * Adding serdes to config * Adapted readme * Review comments and formatting * add a method to construct TypeToken from Class (#19) * add a method to construct TypeToken from Class * remove resultType from StepOperation * fix tests and examples * revert the accidental changes made by IntelliJ * feat(error-handling): Support specific type error reconstruction from… (#15) * feat: implemented callback with serdes and samples * Implemented Cloud based testing for callback * Formatting * Review fixes and simplifications * Review fixes and simplifications * Review comments * Fixed branches for testing * History Event Process - fix missing cases * Review fixes and simplifications --------- Co-authored-by: Frank Chen <65260095+zhongkechen@users.noreply.github.com> Co-authored-by: Philipp Page <pagejep@amazon.com>
1 parent ba2f2d5 commit 39d2143

24 files changed

Lines changed: 1819 additions & 63 deletions

File tree

AGENTS.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ mvn test -Dtest=DurableContextTest
2828

2929
# Skip tests
3030
mvn install -DskipTests
31+
32+
# Format code (ALWAYS run after making changes)
33+
mvn spotless:apply
3134
```
3235

3336
## Key Directories
@@ -64,7 +67,11 @@ import static org.junit.jupiter.api.Assertions.*; // Tests
6467
import static java.util.Collections.emptyList; // Factory methods
6568
import static com.amazonaws.lambda.durable.model.Status.*; // Enums
6669

67-
// AVOID fully qualified names in code
70+
// ALWAYS use proper imports, NEVER use fully qualified class names in code
71+
// Bad: var lambda = software.amazon.awssdk.services.lambda.LambdaClient.create();
72+
// Good: import software.amazon.awssdk.services.lambda.LambdaClient;
73+
// var lambda = LambdaClient.create();
74+
6875
// Bad: com.amazonaws.lambda.durable.model.Status.SUCCESS
6976
// Good: import static and use SUCCESS directly
7077

@@ -232,6 +239,10 @@ Check `ExecutionManager` for thread registration and coordination logic if debug
232239
- Check existing code for patterns (especially in `operation/` package)
233240
- Prefer minimal changes over large refactors
234241

242+
## After Making Changes
243+
244+
**ALWAYS run `mvn spotless:apply` after making code changes** to ensure consistent formatting across the codebase. This applies code formatting rules automatically.
245+
235246
## Further Reading
236247

237248
### Official AWS SDKs

README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Your durable function extends `DurableHandler<I, O>` and implements `handleReque
2020
- `ctx.step()` – Execute code and checkpoint the result
2121
- `ctx.stepAsync()` – Start concurrent operations
2222
- `ctx.wait()` – Suspend execution without compute charges
23+
- `ctx.createCallback()` – Wait for external events (approvals, webhooks)
2324

2425
## Quick Start
2526

@@ -110,6 +111,63 @@ ctx.wait(Duration.ofMinutes(30));
110111
ctx.wait("cooling-off-period", Duration.ofDays(7));
111112
```
112113

114+
### createCallback() – Wait for External Events
115+
116+
Callbacks suspend execution until an external system sends a result. Use this for human approvals, webhooks, or any event-driven workflow.
117+
118+
```java
119+
// Create a callback and get the ID to share with external systems
120+
DurableCallbackFuture<String> callback = ctx.createCallback("approval", String.class);
121+
122+
// Send the callback ID to an external system within a step
123+
ctx.step("send-notification", String.class, () -> {
124+
notificationService.sendApprovalRequest(callback.callbackId(), requestDetails);
125+
return "notification-sent";
126+
});
127+
128+
// Suspend until the external system calls back with a result
129+
String approvalResult = callback.get();
130+
```
131+
132+
The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload.
133+
134+
#### Callback Configuration
135+
136+
Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization:
137+
138+
```java
139+
var config = CallbackConfig.builder()
140+
.timeout(Duration.ofHours(24)) // Max time to wait for callback
141+
.heartbeatTimeout(Duration.ofHours(1)) // Max time between heartbeats
142+
.serDes(new CustomSerDes()) // Custom serialization/deserialization
143+
.build();
144+
145+
var callback = ctx.createCallback("approval", String.class, config);
146+
```
147+
148+
| Option | Description |
149+
|--------|-------------|
150+
| `timeout()` | Maximum duration to wait for the callback to complete |
151+
| `heartbeatTimeout()` | Maximum duration between heartbeat signals from the external system |
152+
| `serDes()` | Custom SerDes for deserializing callback results (e.g., encryption, custom formats) |
153+
154+
#### Callback Exceptions
155+
156+
| Exception | When Thrown |
157+
|-----------|-------------|
158+
| `CallbackTimeoutException` | Callback exceeded its timeout duration |
159+
| `CallbackFailedException` | External system sent an error response |
160+
161+
```java
162+
try {
163+
var result = callback.get();
164+
} catch (CallbackTimeoutException e) {
165+
// Callback timed out - implement fallback logic
166+
} catch (CallbackFailedException e) {
167+
// External system reported an error
168+
}
169+
```
170+
113171
## Step Configuration
114172

115173
Configure step behavior with `StepConfig`:
@@ -307,6 +365,8 @@ The SDK throws specific exceptions to help you handle different failure scenario
307365
|-----------|-------------|---------------|
308366
| `StepFailedException` | Step exhausted all retry attempts | Catch to implement fallback logic or let execution fail |
309367
| `StepInterruptedException` | `AT_MOST_ONCE` step was interrupted before completion | Implement manual recovery (check if operation completed externally) |
368+
| `CallbackTimeoutException` | Callback exceeded its timeout duration | Implement fallback logic or escalation |
369+
| `CallbackFailedException` | External system sent an error response to the callback | Handle the error or propagate failure |
310370
| `NonDeterministicExecutionException` | Code changed between original execution and replay | Fix code to maintain determinism; don't change step order/names |
311371

312372
```java
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import com.amazonaws.lambda.durable.CallbackConfig;
6+
import com.amazonaws.lambda.durable.DurableContext;
7+
import com.amazonaws.lambda.durable.DurableHandler;
8+
import java.time.Duration;
9+
10+
/**
11+
* Example demonstrating callback operations for external system integration.
12+
*
13+
* <p>This handler demonstrates a human approval workflow:
14+
*
15+
* <ol>
16+
* <li>Prepare the request for approval
17+
* <li>Create a callback and send the callback ID to an external approval system
18+
* <li>Suspend execution until the external system responds
19+
* <li>Process the approval result
20+
* </ol>
21+
*
22+
* <p>External systems respond using AWS Lambda APIs:
23+
*
24+
* <ul>
25+
* <li>{@code SendDurableExecutionCallbackSuccess} - approve with result
26+
* <li>{@code SendDurableExecutionCallbackFailure} - reject with error
27+
* <li>{@code SendDurableExecutionCallbackHeartbeat} - keep callback alive
28+
* </ul>
29+
*/
30+
public class CallbackExample extends DurableHandler<ApprovalRequest, String> {
31+
32+
@Override
33+
public String handleRequest(ApprovalRequest input, DurableContext context) {
34+
// Step 1: Prepare the approval request
35+
var prepared = context.step(
36+
"prepare",
37+
String.class,
38+
() -> "Approval request for: " + input.description() + " ($" + input.amount() + ")");
39+
40+
// Step 2: Create callback for external approval
41+
// Use timeout from input if provided, otherwise default to 5 minutes
42+
var timeout =
43+
input.timeoutSeconds() != null ? Duration.ofSeconds(input.timeoutSeconds()) : Duration.ofMinutes(5);
44+
45+
var config = CallbackConfig.builder().timeout(timeout).build();
46+
47+
var callback = context.createCallback("approval", String.class, config);
48+
49+
// Step 2.5: Log AWS CLI command to complete the callback
50+
context.step("log-callback-command", Void.class, () -> {
51+
var callbackId = callback.callbackId();
52+
// The result must be base64-encoded JSON
53+
var command = String.format(
54+
"aws lambda send-durable-execution-callback-success --callback-id %s --result $(echo -n '\"approved\"' | base64)",
55+
callbackId);
56+
context.getLogger().info("To complete this callback, run: {}", command);
57+
return null;
58+
});
59+
60+
// Step 3: Wait for external approval (suspends execution)
61+
var approvalResult = callback.get();
62+
63+
// Step 4: Process the approval
64+
var result = context.step("process-approval", String.class, () -> {
65+
return prepared + " - " + approvalResult;
66+
});
67+
68+
return result;
69+
}
70+
}
71+
72+
/** Input for the approval workflow. */
73+
record ApprovalRequest(String description, double amount, Integer timeoutSeconds) {
74+
// Convenience constructor for default timeout
75+
public ApprovalRequest(String description, double amount) {
76+
this(description, amount, null);
77+
}
78+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import com.amazonaws.lambda.durable.model.ExecutionStatus;
8+
import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner;
9+
import org.junit.jupiter.api.Test;
10+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
11+
import software.amazon.awssdk.services.lambda.model.OperationType;
12+
13+
class CallbackExampleTest {
14+
15+
@Test
16+
void testCallbackExampleSuspendsForApproval() {
17+
var handler = new CallbackExample();
18+
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);
19+
20+
var input = new ApprovalRequest("New laptop", 1500.00);
21+
22+
// First run - prepares request and creates callback, then suspends
23+
var result = runner.run(input);
24+
25+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
26+
27+
// Verify the callback was created
28+
var callbackOp = runner.getOperation("approval");
29+
assertNotNull(callbackOp);
30+
assertEquals(OperationType.CALLBACK, callbackOp.getType());
31+
assertEquals(OperationStatus.STARTED, callbackOp.getStatus());
32+
}
33+
34+
@Test
35+
void testCallbackExampleCompletesAfterApproval() {
36+
var handler = new CallbackExample();
37+
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);
38+
39+
var input = new ApprovalRequest("New laptop", 1500.00);
40+
41+
// First run - suspends waiting for callback
42+
var result = runner.run(input);
43+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
44+
45+
// Simulate external system approving the request
46+
var callbackId = runner.getCallbackId("approval");
47+
runner.completeCallback(callbackId, "\"Approved by manager\"");
48+
49+
// Second run - callback complete, finishes processing
50+
result = runner.run(input);
51+
52+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
53+
assertEquals(
54+
"Approval request for: New laptop ($1500.0) - Approved by manager", result.getResult(String.class));
55+
}
56+
}

examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
import org.junit.jupiter.api.Test;
1212
import org.junit.jupiter.api.condition.EnabledIf;
1313
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
14+
import software.amazon.awssdk.core.SdkBytes;
15+
import software.amazon.awssdk.services.lambda.LambdaClient;
16+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1417
import software.amazon.awssdk.services.sts.StsClient;
1518

1619
@EnabledIf("isEnabled")
17-
public class CloudBasedIntegrationTest {
20+
class CloudBasedIntegrationTest {
1821

1922
private static String account;
2023
private static String region;
@@ -237,4 +240,96 @@ void testErrorHandlingExample() {
237240
assertTrue(finalResult.contains("fallback-result"));
238241
assertTrue(finalResult.contains("payment-"));
239242
}
243+
244+
@Test
245+
void testCallbackExample() throws Exception {
246+
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
247+
248+
// Start async execution
249+
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));
250+
251+
// Wait for callback to appear
252+
execution.pollUntil(exec -> exec.hasCallback("approval"));
253+
254+
// Get callback ID
255+
var callbackId = execution.getCallbackId("approval");
256+
assertNotNull(callbackId);
257+
258+
// Complete the callback using AWS SDK
259+
var lambda = LambdaClient.create();
260+
lambda.sendDurableExecutionCallbackSuccess(
261+
req -> req.callbackId(callbackId).result(SdkBytes.fromUtf8String("\"approved\"")));
262+
263+
// Wait for execution to complete
264+
var result = execution.pollUntilComplete();
265+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
266+
267+
var finalResult = result.getResult(String.class);
268+
assertNotNull(finalResult);
269+
assertTrue(finalResult.contains("Approval request for: Purchase order"));
270+
assertTrue(finalResult.contains("5000"));
271+
assertTrue(finalResult.contains("approved"));
272+
273+
// Verify all operations completed
274+
assertNotNull(execution.getOperation("prepare"));
275+
assertNotNull(execution.getOperation("log-callback-command"));
276+
assertNotNull(execution.getOperation("process-approval"));
277+
}
278+
279+
@Test
280+
void testCallbackExampleWithFailure() {
281+
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
282+
283+
// Start async execution
284+
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));
285+
286+
// Wait for callback to appear
287+
execution.pollUntil(exec -> exec.hasCallback("approval"));
288+
289+
// Get callback ID
290+
var callbackId = execution.getCallbackId("approval");
291+
assertNotNull(callbackId);
292+
293+
// Fail the callback using AWS SDK
294+
var lambda = LambdaClient.create();
295+
lambda.sendDurableExecutionCallbackFailure(req -> req.callbackId(callbackId)
296+
.error(err -> err.errorType("ApprovalRejected").errorMessage("Approval rejected by manager")));
297+
298+
// Wait for execution to complete
299+
var result = execution.pollUntilComplete();
300+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
301+
302+
// Verify the callback operation shows failure
303+
var approvalOp = execution.getOperation("approval");
304+
assertNotNull(approvalOp);
305+
var callbackDetails = approvalOp.getCallbackDetails();
306+
assertNotNull(callbackDetails);
307+
assertNotNull(callbackDetails.error());
308+
// Error message is redacted in the response, just verify error exists
309+
assertTrue(callbackDetails.error().toString().contains("ErrorObject"));
310+
}
311+
312+
@Test
313+
void testCallbackExampleWithTimeout() {
314+
var runner = CloudDurableTestRunner.create(arn("callback-example"), ApprovalRequest.class, String.class);
315+
316+
// Start async execution with 10 second timeout
317+
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0, 10));
318+
319+
// Wait for callback to appear
320+
execution.pollUntil(exec -> exec.hasCallback("approval"));
321+
322+
// Get callback ID but don't complete it - let it timeout
323+
var callbackId = execution.getCallbackId("approval");
324+
assertNotNull(callbackId);
325+
326+
// Wait for execution to complete (should timeout after 10 seconds)
327+
var result = execution.pollUntilComplete();
328+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
329+
330+
// Verify the callback operation shows timeout status
331+
var approvalOp = execution.getOperation("approval");
332+
assertNotNull(approvalOp);
333+
assertEquals(OperationStatus.TIMED_OUT, approvalOp.getStatus());
334+
}
240335
}

0 commit comments

Comments
 (0)