Skip to content

Commit 8998719

Browse files
committed
Added Callback Serdes
1 parent 9133818 commit 8998719

11 files changed

Lines changed: 1073 additions & 0 deletions

File tree

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import com.amazonaws.lambda.durable.CallbackConfig;
6+
import com.amazonaws.lambda.durable.DurableContext;
7+
import com.amazonaws.lambda.durable.DurableHandler;
8+
import java.time.Duration;
9+
10+
/**
11+
* Example demonstrating callback operations for external system integration.
12+
*
13+
* <p>This handler demonstrates a human approval workflow:
14+
*
15+
* <ol>
16+
* <li>Prepare the request for approval
17+
* <li>Create a callback and send the callback ID to an external approval system
18+
* <li>Suspend execution until the external system responds
19+
* <li>Process the approval result
20+
* </ol>
21+
*
22+
* <p>External systems respond using AWS Lambda APIs:
23+
* <ul>
24+
* <li>{@code SendDurableExecutionCallbackSuccess} - approve with result
25+
* <li>{@code SendDurableExecutionCallbackFailure} - reject with error
26+
* <li>{@code SendDurableExecutionCallbackHeartbeat} - keep callback alive
27+
* </ul>
28+
*/
29+
public class CallbackExample extends DurableHandler<ApprovalRequest, String> {
30+
31+
@Override
32+
public String handleRequest(ApprovalRequest input, DurableContext context) {
33+
// Step 1: Prepare the approval request
34+
var prepared = context.step("prepare", String.class, () -> {
35+
return "Approval request for: " + input.description() + " ($" + input.amount() + ")";
36+
});
37+
38+
// Step 2: Create callback for external approval
39+
// Configure with 24-hour timeout and 1-hour heartbeat timeout
40+
var config = CallbackConfig.builder()
41+
.timeout(Duration.ofHours(24))
42+
.heartbeatTimeout(Duration.ofHours(1))
43+
.build();
44+
45+
var callback = context.createCallback("approval", String.class, config);
46+
47+
// Step 2.5: Log AWS CLI command to complete the callback
48+
context.step("log-callback-command", Void.class, () -> {
49+
var callbackId = callback.callbackId();
50+
var command = String.format(
51+
"aws lambda send-durable-execution-callback-success --callback-id %s --payload '{\"result\":\"approved\"}'",
52+
callbackId
53+
);
54+
context.getLogger().info("To complete this callback, run: {}", command);
55+
return null;
56+
});
57+
58+
// Step 3: Wait for external approval (suspends execution)
59+
var approvalResult = callback.future().get();
60+
61+
// Step 4: Process the approval
62+
var result = context.step("process-approval", String.class, () -> {
63+
return prepared + " - " + approvalResult;
64+
});
65+
66+
return result;
67+
}
68+
}
69+
70+
/** Input for the approval workflow. */
71+
record ApprovalRequest(String description, double amount) {}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import com.amazonaws.lambda.durable.model.ExecutionStatus;
8+
import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner;
9+
import org.junit.jupiter.api.Test;
10+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
11+
import software.amazon.awssdk.services.lambda.model.OperationType;
12+
13+
class CallbackExampleTest {
14+
15+
@Test
16+
void testCallbackExampleSuspendsForApproval() {
17+
var handler = new CallbackExample();
18+
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);
19+
20+
var input = new ApprovalRequest("New laptop", 1500.00);
21+
22+
// First run - prepares request and creates callback, then suspends
23+
var result = runner.run(input);
24+
25+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
26+
27+
// Verify the callback was created
28+
var callbackOp = runner.getOperation("approval");
29+
assertNotNull(callbackOp);
30+
assertEquals(OperationType.CALLBACK, callbackOp.getType());
31+
assertEquals(OperationStatus.STARTED, callbackOp.getStatus());
32+
}
33+
34+
@Test
35+
void testCallbackExampleCompletesAfterApproval() {
36+
var handler = new CallbackExample();
37+
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);
38+
39+
var input = new ApprovalRequest("New laptop", 1500.00);
40+
41+
// First run - suspends waiting for callback
42+
var result = runner.run(input);
43+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
44+
45+
// Simulate external system approving the request
46+
var callbackId = runner.getCallbackId("approval");
47+
runner.completeCallback(callbackId, "\"Approved by manager\"");
48+
49+
// Second run - callback complete, finishes processing
50+
result = runner.run(input);
51+
52+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
53+
assertEquals(
54+
"Approval request for: New laptop ($1500.0) - Approved by manager",
55+
result.getResult(String.class));
56+
}
57+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.amazonaws.lambda.durable;
5+
6+
import static org.junit.jupiter.api.Assertions.*;
7+
8+
import com.amazonaws.lambda.durable.exception.CallbackFailedException;
9+
import com.amazonaws.lambda.durable.exception.CallbackTimeoutException;
10+
import com.amazonaws.lambda.durable.model.ExecutionStatus;
11+
import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner;
12+
import java.time.Duration;
13+
import org.junit.jupiter.api.Test;
14+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
15+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
16+
import software.amazon.awssdk.services.lambda.model.OperationType;
17+
18+
class CallbackIntegrationTest {
19+
20+
@Test
21+
void callbackSuccessFlow() {
22+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
23+
var cb = ctx.createCallback("approval", String.class);
24+
return cb.future().get();
25+
});
26+
27+
// First run - creates callback, suspends
28+
var result = runner.run("test");
29+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
30+
31+
var op = runner.getOperation("approval");
32+
assertNotNull(op);
33+
assertEquals(OperationType.CALLBACK, op.getType());
34+
assertEquals(OperationStatus.STARTED, op.getStatus());
35+
36+
// Simulate external system completing callback
37+
var callbackId = runner.getCallbackId("approval");
38+
assertNotNull(callbackId);
39+
runner.completeCallback(callbackId, "\"approved\"");
40+
41+
// Re-run - callback complete, returns result
42+
result = runner.run("test");
43+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
44+
assertEquals("approved", result.getResult(String.class));
45+
}
46+
47+
@Test
48+
void callbackFailureFlow() {
49+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
50+
var cb = ctx.createCallback("approval", String.class);
51+
return cb.future().get();
52+
});
53+
54+
// First run - creates callback, suspends
55+
var result = runner.run("test");
56+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
57+
58+
// Simulate external system failing callback
59+
var callbackId = runner.getCallbackId("approval");
60+
var error = ErrorObject.builder().errorType("Rejected").errorMessage("Request denied").build();
61+
runner.failCallback(callbackId, error);
62+
63+
// Re-run - callback failed, throws exception
64+
result = runner.run("test");
65+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
66+
}
67+
68+
@Test
69+
void callbackTimeoutFlow() {
70+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
71+
var cb = ctx.createCallback(
72+
"approval", String.class, CallbackConfig.builder().timeout(Duration.ofMinutes(5)).build());
73+
return cb.future().get();
74+
});
75+
76+
// First run - creates callback, suspends
77+
var result = runner.run("test");
78+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
79+
80+
// Simulate timeout
81+
var callbackId = runner.getCallbackId("approval");
82+
runner.timeoutCallback(callbackId);
83+
84+
// Re-run - callback timed out, throws exception
85+
result = runner.run("test");
86+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
87+
}
88+
89+
@Test
90+
void multipleCallbacksInSameExecution() {
91+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
92+
var cb1 = ctx.createCallback("approval1", String.class);
93+
var cb2 = ctx.createCallback("approval2", String.class);
94+
95+
var result1 = cb1.future().get();
96+
var result2 = cb2.future().get();
97+
98+
return result1 + " and " + result2;
99+
});
100+
101+
// First run - creates both callbacks, suspends on first
102+
var result = runner.run("test");
103+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
104+
105+
// Complete first callback
106+
var callbackId1 = runner.getCallbackId("approval1");
107+
runner.completeCallback(callbackId1, "\"first\"");
108+
109+
// Second run - first callback done, suspends on second
110+
result = runner.run("test");
111+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
112+
113+
// Complete second callback
114+
var callbackId2 = runner.getCallbackId("approval2");
115+
runner.completeCallback(callbackId2, "\"second\"");
116+
117+
// Third run - both callbacks done, returns result
118+
result = runner.run("test");
119+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
120+
assertEquals("first and second", result.getResult(String.class));
121+
}
122+
123+
@Test
124+
void callbackWithSteps() {
125+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
126+
var step1 = ctx.step("prepare", String.class, () -> "prepared");
127+
128+
var cb = ctx.createCallback("approval", String.class);
129+
var approval = cb.future().get();
130+
131+
var step2 = ctx.step("finalize", String.class, () -> step1 + " -> " + approval + " -> done");
132+
133+
return step2;
134+
});
135+
136+
// First run - step1 completes, callback created, suspends
137+
var result = runner.run("test");
138+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
139+
140+
// Complete callback
141+
var callbackId = runner.getCallbackId("approval");
142+
runner.completeCallback(callbackId, "\"approved\"");
143+
144+
// Second run - callback done, step2 completes
145+
result = runner.run("test");
146+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
147+
assertEquals("prepared -> approved -> done", result.getResult(String.class));
148+
}
149+
}

0 commit comments

Comments
 (0)