Skip to content

Commit 7678056

Browse files
authored
[testing]: add custom serDes support (#320)
* add custom serDes support * update test case * make null check a little bit nicer
1 parent 5f16fd1 commit 7678056

13 files changed

Lines changed: 249 additions & 170 deletions

File tree

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.HashMap;
1010
import java.util.List;
1111
import java.util.Map;
12+
import java.util.concurrent.atomic.AtomicInteger;
1213
import org.junit.jupiter.api.BeforeAll;
1314
import org.junit.jupiter.api.Test;
1415
import org.junit.jupiter.api.condition.EnabledIf;
@@ -28,6 +29,8 @@
2829
import software.amazon.lambda.durable.examples.types.GreetingRequest;
2930
import software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample;
3031
import software.amazon.lambda.durable.model.ExecutionStatus;
32+
import software.amazon.lambda.durable.serde.JacksonSerDes;
33+
import software.amazon.lambda.durable.serde.SerDes;
3134
import software.amazon.lambda.durable.testing.CloudDurableTestRunner;
3235

3336
@EnabledIf("isEnabled")
@@ -80,14 +83,41 @@ private static String arn(String functionName) {
8083
+ ":$LATEST";
8184
}
8285

86+
/** Custom SerDes that tracks serialization calls. */
87+
static class TrackingSerDes implements SerDes {
88+
private final JacksonSerDes delegate = new JacksonSerDes();
89+
private final AtomicInteger serializeCount = new AtomicInteger(0);
90+
private final AtomicInteger deserializeCount = new AtomicInteger(0);
91+
92+
@Override
93+
public String serialize(Object value) {
94+
serializeCount.incrementAndGet();
95+
return delegate.serialize(value);
96+
}
97+
98+
@Override
99+
public <T> T deserialize(String data, TypeToken<T> typeToken) {
100+
deserializeCount.incrementAndGet();
101+
return delegate.deserialize(data, typeToken);
102+
}
103+
104+
public int getSerializeCount() {
105+
return serializeCount.get();
106+
}
107+
108+
public int getDeserializeCount() {
109+
return deserializeCount.get();
110+
}
111+
}
112+
83113
@Test
84114
void testSimpleStepExample() {
85115
var runner = CloudDurableTestRunner.create(
86116
arn("simple-step-example"), new TypeToken<Map<String, String>>() {}, get(String.class), lambdaClient);
87117
var result = runner.run(Map.of("message", "test"));
88118

89119
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
90-
assertNotNull(result.getResult(String.class));
120+
assertNotNull(result.getResult());
91121

92122
var createGreetingOp = runner.getOperation("create-greeting");
93123
assertNotNull(createGreetingOp);
@@ -103,7 +133,7 @@ void testNoopExampleWithLargeInput() {
103133
var result = runner.run(Map.of("name", largeInput));
104134

105135
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
106-
assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class));
136+
assertEquals("HELLO, " + largeInput + "!", result.getResult());
107137
}
108138

109139
@Test
@@ -113,7 +143,7 @@ void testSimpleInvokeExample() {
113143
var result = runner.run(Map.of("name", functionNameSuffix));
114144

115145
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
116-
assertNotNull(result.getResult(String.class));
146+
assertNotNull(result.getResult());
117147

118148
var createGreetingOp = runner.getOperation("call-greeting1");
119149
assertNotNull(createGreetingOp);
@@ -131,7 +161,7 @@ void testRetryExample() {
131161

132162
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
133163

134-
var finalResult = result.getResult(String.class);
164+
var finalResult = result.getResult();
135165
assertNotNull(finalResult);
136166
assertTrue(finalResult.contains("Retry example completed"));
137167
assertTrue(finalResult.contains("Flaky API succeeded"));
@@ -152,7 +182,7 @@ void testRetryInProcessExample() {
152182

153183
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
154184

155-
var finalResult = result.getResult(String.class);
185+
var finalResult = result.getResult();
156186
assertNotNull(finalResult);
157187
assertTrue(finalResult.contains("Retry in-process completed"));
158188
assertTrue(finalResult.contains("Long operation completed"));
@@ -175,7 +205,7 @@ void testWaitExample() {
175205

176206
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
177207

178-
var finalResult = result.getResult(String.class);
208+
var finalResult = result.getResult();
179209
assertNotNull(finalResult);
180210
assertTrue(finalResult.contains("Started processing for TestUser"));
181211
assertFalse(finalResult.contains("continued after 10s"));
@@ -195,7 +225,7 @@ void testWaitAtLeastExample() {
195225

196226
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
197227

198-
var finalResult = result.getResult(String.class);
228+
var finalResult = result.getResult();
199229
assertNotNull(finalResult);
200230
assertTrue(finalResult.contains("Processed: TestUser"));
201231

@@ -212,7 +242,7 @@ void testWaitAtLeastInProcessExample() {
212242

213243
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
214244

215-
var finalResult = result.getResult(String.class);
245+
var finalResult = result.getResult();
216246
assertNotNull(finalResult);
217247
assertTrue(finalResult.contains("Processed: TestUser"));
218248

@@ -232,7 +262,7 @@ void testGenericTypesExample() {
232262

233263
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
234264

235-
GenericTypesExample.Output output = result.getResult(GenericTypesExample.Output.class);
265+
GenericTypesExample.Output output = result.getResult();
236266
assertNotNull(output);
237267

238268
// Verify items list
@@ -265,14 +295,16 @@ void testGenericTypesExample() {
265295
void testGenericInputOutputExample() {
266296
final TypeToken<Map<String, Map<String, List<String>>>> resultType = new TypeToken<>() {};
267297
final TypeToken<Map<String, String>> inputType = new TypeToken<>() {};
298+
final TrackingSerDes customSerDes = new TrackingSerDes();
268299

269-
var runner =
270-
CloudDurableTestRunner.create(arn("generic-input-output-example"), inputType, resultType, lambdaClient);
300+
var runner = CloudDurableTestRunner.create(arn("generic-input-output-example"), inputType, resultType)
301+
.withLambdaClient(lambdaClient)
302+
.withSerDes(customSerDes);
271303
var result = runner.run(new HashMap<>(Map.of("userId", "user123")));
272304

273305
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
274306

275-
var output = result.getResult(resultType);
307+
var output = result.getResult();
276308
assertNotNull(output);
277309

278310
// Verify categories nested map
@@ -285,6 +317,10 @@ void testGenericInputOutputExample() {
285317

286318
// Verify operations were executed
287319
assertNotNull(runner.getOperation("fetch-categories"));
320+
321+
// verify custom SerDes was called
322+
assertEquals(1, customSerDes.getDeserializeCount());
323+
assertEquals(1, customSerDes.getSerializeCount());
288324
}
289325

290326
@Test
@@ -295,7 +331,7 @@ void testCustomConfigExample() {
295331

296332
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
297333

298-
var finalResult = result.getResult(String.class);
334+
var finalResult = result.getResult();
299335
assertNotNull(finalResult);
300336
assertTrue(finalResult.contains("Created custom object"));
301337
assertTrue(finalResult.contains("user123"));
@@ -325,7 +361,7 @@ void testErrorHandlingExample() {
325361

326362
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
327363

328-
var finalResult = result.getResult(String.class);
364+
var finalResult = result.getResult();
329365
assertNotNull(finalResult);
330366
assertTrue(finalResult.startsWith("Completed: "));
331367
assertTrue(finalResult.contains("fallback-result"));
@@ -360,7 +396,7 @@ void testCallbackExample() {
360396
var result = execution.pollUntilComplete();
361397
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
362398

363-
var finalResult = result.getResult(String.class);
399+
var finalResult = result.getResult();
364400
assertNotNull(finalResult);
365401
assertTrue(finalResult.contains("preapproved"));
366402
assertTrue(finalResult.contains("Approval request for: Purchase order"));
@@ -484,7 +520,7 @@ void testChildContextExample() {
484520
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
485521
assertEquals(
486522
"Order for Alice [validated] | Stock available for Alice [confirmed] | Base rate for Alice + regional adjustment [shipping ready]",
487-
result.getResult(String.class));
523+
result.getResult());
488524

489525
// Verify child context operations were tracked
490526
assertNotNull(runner.getOperation("order-validation"));
@@ -507,7 +543,7 @@ void testManyAsyncStepsExample(int steps, long maxExecutionTime, long maxReplayT
507543

508544
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
509545

510-
var finalResult = result.getResult(ManyAsyncStepsExample.Output.class);
546+
var finalResult = result.getResult();
511547
System.out.printf("ManyAsyncStepsExample result (%d steps): %s\n", steps, finalResult);
512548
assertNotNull(finalResult);
513549
assertEquals((long) steps * (steps - 1), finalResult.result()); // Sum of 0..steps * 2
@@ -545,7 +581,7 @@ void testManyAsyncChildContextExample(int steps, long maxExecutionTime, long max
545581

546582
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
547583

548-
var finalResult = result.getResult(ManyAsyncChildContextExample.Output.class);
584+
var finalResult = result.getResult();
549585
System.out.printf("ManyAsyncChildContextExample result (%d child contexts): %s\n", steps, finalResult);
550586
assertNotNull(finalResult);
551587
assertEquals((long) steps * (steps - 1), finalResult.result()); // Sum of 0..steps * 2
@@ -576,7 +612,7 @@ void testSimpleMapExample() {
576612
var result = runner.run(new GreetingRequest("Alice"));
577613

578614
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
579-
assertEquals("Hello, Alice! | Hello, ALICE! | Hello, alice!", result.getResult(String.class));
615+
assertEquals("Hello, Alice! | Hello, ALICE! | Hello, alice!", result.getResult());
580616
}
581617

582618
@Test
@@ -585,7 +621,7 @@ void testComplexMapExample() {
585621
var result = runner.run(100);
586622

587623
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
588-
var output = result.getResult(String.class);
624+
var output = result.getResult();
589625
assertNotNull(output);
590626

591627
// Part 1: Concurrent order processing with step + wait + step
@@ -605,7 +641,7 @@ void testWaitForConditionExample() {
605641
var result = runner.run(3);
606642

607643
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
608-
assertEquals(3, result.getResult(Integer.class));
644+
assertEquals(3, result.getResult());
609645
}
610646

611647
@Test
@@ -620,7 +656,7 @@ void testConcurrentWaitForConditionExample() {
620656
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
621657

622658
// Verify each operation finished with 3 attempts
623-
var allOperationsOutput = result.getResult(String.class);
659+
var allOperationsOutput = result.getResult();
624660
var operationOutputs = allOperationsOutput.split(" \\| ");
625661
assertEquals(100, operationOutputs.length);
626662
for (var operationOutput : operationOutputs) {

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/AsyncExecution.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import software.amazon.awssdk.services.lambda.model.ResourceNotFoundException;
1616
import software.amazon.lambda.durable.TypeToken;
1717
import software.amazon.lambda.durable.model.ExecutionStatus;
18+
import software.amazon.lambda.durable.serde.SerDes;
1819
import software.amazon.lambda.durable.testing.cloud.HistoryEventProcessor;
1920

2021
/**
@@ -25,6 +26,7 @@ public class AsyncExecution<O> {
2526
private final String executionArn;
2627
private final LambdaClient lambdaClient;
2728
private final TypeToken<O> outputType;
29+
private final SerDes serDes;
2830
private final Duration pollInterval;
2931
private final Duration timeout;
3032
private final HistoryEventProcessor processor;
@@ -35,13 +37,15 @@ public AsyncExecution(
3537
String executionArn,
3638
LambdaClient lambdaClient,
3739
TypeToken<O> outputType,
40+
SerDes serDes,
3841
Duration pollInterval,
3942
Duration timeout) {
4043
this.executionArn = executionArn;
4144
this.lambdaClient = lambdaClient;
4245
this.outputType = outputType;
4346
this.pollInterval = pollInterval;
4447
this.timeout = timeout;
48+
this.serDes = serDes;
4549
this.processor = new HistoryEventProcessor();
4650
}
4751

@@ -191,7 +195,7 @@ private void refreshHistory() {
191195
.build();
192196
var response = lambdaClient.getDurableExecutionHistory(request);
193197
this.currentHistory = response.events();
194-
this.currentResult = processor.processEvents(currentHistory, outputType);
198+
this.currentResult = processor.processEvents(currentHistory, outputType, serDes);
195199
} catch (ResourceNotFoundException e) {
196200
// Execution doesn't exist yet - this can happen immediately after async invoke
197201
// Leave currentHistory as null, pollUntil will retry

0 commit comments

Comments
 (0)