Skip to content

Commit 8dedfd4

Browse files
committed
Implemented Cloud based testing for callback
1 parent f212be2 commit 8dedfd4

9 files changed

Lines changed: 425 additions & 8 deletions

File tree

AGENTS.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ import static org.junit.jupiter.api.Assertions.*; // Tests
6464
import static java.util.Collections.emptyList; // Factory methods
6565
import static com.amazonaws.lambda.durable.model.Status.*; // Enums
6666

67-
// AVOID fully qualified names in code
67+
// ALWAYS use proper imports, NEVER use fully qualified class names in code
68+
// Bad: var lambda = software.amazon.awssdk.services.lambda.LambdaClient.create();
69+
// Good: import software.amazon.awssdk.services.lambda.LambdaClient;
70+
// var lambda = LambdaClient.create();
71+
6872
// Bad: com.amazonaws.lambda.durable.model.Status.SUCCESS
6973
// Good: import static and use SUCCESS directly
7074

examples/src/main/java/com/amazonaws/lambda/durable/examples/CallbackExample.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,9 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
3636
});
3737

3838
// Step 2: Create callback for external approval
39-
// Configure with 24-hour timeout and 1-hour heartbeat timeout
39+
// Configure with 5min timeout
4040
var config = CallbackConfig.builder()
41-
.timeout(Duration.ofHours(24))
42-
.heartbeatTimeout(Duration.ofHours(1))
41+
.timeout(Duration.ofMinutes(5))
4342
.build();
4443

4544
var callback = context.createCallback("approval", String.class, config);

examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,41 @@ void testCustomConfigExample() {
223223
assertTrue(stepResult.contains("user_age"));
224224
assertTrue(stepResult.contains("email_address"));
225225
}
226+
227+
@Test
228+
void testCallbackExample() throws Exception {
229+
var runner = CloudDurableTestRunner.create(
230+
arn("callback-example"), ApprovalRequest.class, String.class);
231+
232+
// Start async execution
233+
var execution = runner.startAsync(new ApprovalRequest("Purchase order", 5000.0));
234+
235+
// Wait for callback to appear
236+
execution.pollUntil(exec -> exec.hasCallback("approval"));
237+
238+
// Get callback ID
239+
var callbackId = execution.getCallbackId("approval");
240+
assertNotNull(callbackId);
241+
242+
// Complete the callback using AWS SDK
243+
var lambda = software.amazon.awssdk.services.lambda.LambdaClient.create();
244+
lambda.sendDurableExecutionCallbackSuccess(req -> req
245+
.callbackId(callbackId)
246+
.result(software.amazon.awssdk.core.SdkBytes.fromUtf8String("\"approved\"")));
247+
248+
// Wait for execution to complete
249+
var result = execution.pollUntilComplete();
250+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
251+
252+
var finalResult = result.getResult(String.class);
253+
assertNotNull(finalResult);
254+
assertTrue(finalResult.contains("Approval request for: Purchase order"));
255+
assertTrue(finalResult.contains("5000"));
256+
assertTrue(finalResult.contains("approved"));
257+
258+
// Verify all operations completed
259+
assertNotNull(execution.getOperation("prepare"));
260+
assertNotNull(execution.getOperation("log-callback-command"));
261+
assertNotNull(execution.getOperation("process-approval"));
262+
}
226263
}

examples/template.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,28 @@ Resources:
208208
DockerContext: ../
209209
DockerTag: durable-examples
210210

211+
CallbackExampleFunction:
212+
Type: AWS::Serverless::Function
213+
Properties:
214+
PackageType: Image
215+
FunctionName: callback-example
216+
ImageConfig:
217+
Command: ["com.amazonaws.lambda.durable.examples.CallbackExample::handleRequest"]
218+
DurableConfig:
219+
ExecutionTimeout: 300
220+
RetentionPeriodInDays: 7
221+
Policies:
222+
- Statement:
223+
- Effect: Allow
224+
Action:
225+
- lambda:CheckpointDurableExecutions
226+
- lambda:GetDurableExecutionState
227+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:callback-example"
228+
Metadata:
229+
Dockerfile: examples/Dockerfile
230+
DockerContext: ../
231+
DockerTag: durable-examples
232+
211233
Outputs:
212234
SimpleStepExampleFunction:
213235
Description: Simple Step Example Function ARN
@@ -280,3 +302,11 @@ Outputs:
280302
LoggingExampleFunctionName:
281303
Description: Logging Example Function Name
282304
Value: !Ref LoggingExampleFunction
305+
306+
CallbackExampleFunction:
307+
Description: Callback Example Function ARN
308+
Value: !GetAtt CallbackExampleFunction.Arn
309+
310+
CallbackExampleFunctionName:
311+
Description: Callback Example Function Name
312+
Value: !Ref CallbackExampleFunction
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.amazonaws.lambda.durable.testing;
5+
6+
import com.amazonaws.lambda.durable.model.ExecutionStatus;
7+
import java.time.Duration;
8+
import java.time.Instant;
9+
import java.util.List;
10+
import java.util.concurrent.TimeoutException;
11+
import java.util.function.Predicate;
12+
import software.amazon.awssdk.services.lambda.LambdaClient;
13+
import software.amazon.awssdk.services.lambda.model.Event;
14+
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionHistoryRequest;
15+
import software.amazon.awssdk.services.lambda.model.ResourceNotFoundException;
16+
17+
/**
18+
* Handle for an asynchronously executing durable function.
19+
* Allows incremental polling and inspection of execution state.
20+
*/
21+
public class AsyncExecution<O> {
22+
private final String executionArn;
23+
private final LambdaClient lambdaClient;
24+
private final Class<O> outputType;
25+
private final Duration pollInterval;
26+
private final Duration timeout;
27+
private final HistoryEventProcessor processor;
28+
private List<Event> currentHistory;
29+
private TestResult<O> currentResult;
30+
31+
public AsyncExecution(
32+
String executionArn,
33+
LambdaClient lambdaClient,
34+
Class<O> outputType,
35+
Duration pollInterval,
36+
Duration timeout) {
37+
this.executionArn = executionArn;
38+
this.lambdaClient = lambdaClient;
39+
this.outputType = outputType;
40+
this.pollInterval = pollInterval;
41+
this.timeout = timeout;
42+
this.processor = new HistoryEventProcessor();
43+
}
44+
45+
/**
46+
* Poll execution history until the given condition is met.
47+
*
48+
* @param condition predicate to test on each poll
49+
* @return this execution for chaining
50+
* @throws TimeoutException if condition not met within timeout
51+
*/
52+
public AsyncExecution<O> pollUntil(Predicate<AsyncExecution<O>> condition) throws TimeoutException {
53+
var startTime = Instant.now();
54+
55+
while (Duration.between(startTime, Instant.now()).compareTo(timeout) < 0) {
56+
refreshHistory();
57+
58+
if (condition.test(this)) {
59+
return this;
60+
}
61+
62+
try {
63+
Thread.sleep(pollInterval.toMillis());
64+
} catch (InterruptedException e) {
65+
Thread.currentThread().interrupt();
66+
throw new RuntimeException("Polling interrupted", e);
67+
}
68+
}
69+
70+
throw new TimeoutException("Condition not met within timeout of " + timeout);
71+
}
72+
73+
/**
74+
* Poll until execution completes and return the final result.
75+
*
76+
* @return test result with execution status and output
77+
* @throws TimeoutException if execution doesn't complete within timeout
78+
*/
79+
public TestResult<O> pollUntilComplete() throws TimeoutException {
80+
pollUntil(AsyncExecution::isComplete);
81+
return currentResult;
82+
}
83+
84+
/**
85+
* Check if execution has completed (succeeded or failed).
86+
*/
87+
public boolean isComplete() {
88+
if (currentHistory == null) {
89+
return false;
90+
}
91+
return currentHistory.stream()
92+
.anyMatch(e -> {
93+
var eventType = e.eventTypeAsString();
94+
return "ExecutionSucceeded".equals(eventType) || "ExecutionFailed".equals(eventType);
95+
});
96+
}
97+
98+
/**
99+
* Check if an operation with the given name exists.
100+
*/
101+
public boolean hasOperation(String name) {
102+
if (currentResult == null) {
103+
return false;
104+
}
105+
return currentResult.getOperations().stream()
106+
.anyMatch(op -> name.equals(op.getName()));
107+
}
108+
109+
/**
110+
* Check if a callback operation with the given name exists and is started.
111+
*/
112+
public boolean hasCallback(String name) {
113+
if (currentHistory == null) {
114+
return false;
115+
}
116+
// Look for CallbackStarted event with this name
117+
return currentHistory.stream()
118+
.anyMatch(e -> name.equals(e.name()) && "CallbackStarted".equals(e.eventTypeAsString()));
119+
}
120+
121+
/**
122+
* Get the callback ID for a callback operation.
123+
*
124+
* @param operationName name of the callback operation
125+
* @return callback ID
126+
* @throws IllegalStateException if no callback found for operation
127+
*/
128+
public String getCallbackId(String operationName) {
129+
if (currentResult == null) {
130+
throw new IllegalStateException("No history available - call pollUntil first");
131+
}
132+
133+
var operation = currentResult.getOperations().stream()
134+
.filter(op -> operationName.equals(op.getName()))
135+
.findFirst()
136+
.orElseThrow(() -> new IllegalStateException(
137+
"No operation found with name: " + operationName));
138+
139+
var callbackDetails = operation.getCallbackDetails();
140+
if (callbackDetails == null || callbackDetails.callbackId() == null) {
141+
throw new IllegalStateException(
142+
"Operation '" + operationName + "' is not a callback or has no callback ID");
143+
}
144+
145+
return callbackDetails.callbackId();
146+
}
147+
148+
/**
149+
* Get details for a specific operation.
150+
*/
151+
public TestOperation getOperation(String name) {
152+
if (currentResult == null) {
153+
throw new IllegalStateException("No history available - call pollUntil first");
154+
}
155+
return currentResult.getOperation(name);
156+
}
157+
158+
/**
159+
* Get all operations in the execution.
160+
*/
161+
public List<TestOperation> getOperations() {
162+
if (currentResult == null) {
163+
throw new IllegalStateException("No history available - call pollUntil first");
164+
}
165+
return currentResult.getOperations();
166+
}
167+
168+
/**
169+
* Get current execution status.
170+
*/
171+
public ExecutionStatus getStatus() {
172+
if (currentResult == null) {
173+
return ExecutionStatus.PENDING;
174+
}
175+
return currentResult.getStatus();
176+
}
177+
178+
/**
179+
* Get the execution ARN.
180+
*/
181+
public String getExecutionArn() {
182+
return executionArn;
183+
}
184+
185+
private void refreshHistory() {
186+
try {
187+
var request = GetDurableExecutionHistoryRequest.builder()
188+
.durableExecutionArn(executionArn)
189+
.includeExecutionData(true)
190+
.build();
191+
var response = lambdaClient.getDurableExecutionHistory(request);
192+
this.currentHistory = response.events();
193+
this.currentResult = processor.processEvents(currentHistory, outputType);
194+
} catch (ResourceNotFoundException e) {
195+
// Execution doesn't exist yet - this can happen immediately after async invoke
196+
// Leave currentHistory as null, pollUntil will retry
197+
this.currentHistory = null;
198+
this.currentResult = null;
199+
}
200+
}
201+
}

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/CloudDurableTestRunner.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,48 @@ public TestResult<O> run(I input) {
115115
}
116116
}
117117

118+
/**
119+
* Start an asynchronous execution and return a handle for incremental polling.
120+
* Use this for callback-based tests where you need to interact with the execution
121+
* while it's running.
122+
*
123+
* @param input the input to the function
124+
* @return execution handle for polling and inspection
125+
*/
126+
public AsyncExecution<O> startAsync(I input) {
127+
try {
128+
// Serialize input
129+
var serDes = new JacksonSerDes();
130+
var inputJson = serDes.serialize(input);
131+
132+
// Invoke function with EVENT type (async)
133+
var invokeRequest = InvokeRequest.builder()
134+
.functionName(functionArn)
135+
.invocationType(InvocationType.EVENT)
136+
.payload(SdkBytes.fromUtf8String(inputJson))
137+
.build();
138+
139+
var response = lambdaClient.invoke(invokeRequest);
140+
141+
// Extract execution ARN from response
142+
var executionArn = response.durableExecutionArn();
143+
if (executionArn == null) {
144+
throw new RuntimeException("No durable execution ARN returned - function may not be durable");
145+
}
146+
147+
// Give the execution a moment to initialize before returning
148+
// This prevents immediate polling from failing with "execution does not exist"
149+
Thread.sleep(100);
150+
151+
return new AsyncExecution<>(executionArn, lambdaClient, outputType, pollInterval, timeout);
152+
} catch (InterruptedException e) {
153+
Thread.currentThread().interrupt();
154+
throw new RuntimeException("Interrupted while starting async execution", e);
155+
} catch (Exception e) {
156+
throw new RuntimeException("Function invocation failed", e);
157+
}
158+
}
159+
118160
public TestOperation getOperation(String name) {
119161
if (lastResult == null) {
120162
throw new IllegalStateException("No execution has been run yet");

0 commit comments

Comments
 (0)