Skip to content

Commit 9515cf3

Browse files
committed
fix(plugin): Propagate error to onOperationEnd for failed operations
1 parent 54da728 commit 9515cf3

3 files changed

Lines changed: 185 additions & 2 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,52 @@ void waitCompletedDuringSuspension_producesOperationSpan() {
202202
.toList());
203203
}
204204

205+
@Test
206+
void invokeFailedDuringSuspension_producesErrorSpan() {
207+
var runner = LocalDurableTestRunner.create(
208+
String.class,
209+
(input, ctx) -> {
210+
try {
211+
ctx.invoke("call-target", "target-fn", "{}", String.class);
212+
} catch (Exception e) {
213+
// expected failure
214+
}
215+
return "handled";
216+
},
217+
otelConfig);
218+
219+
// First invocation: invoke starts, suspends waiting for target
220+
var result1 = runner.run("input");
221+
assertEquals(ExecutionStatus.PENDING, result1.getStatus());
222+
223+
// Target fails while Lambda is frozen
224+
runner.failChainedInvoke(
225+
"call-target",
226+
software.amazon.awssdk.services.lambda.model.ErrorObject.builder()
227+
.errorType("TargetError")
228+
.errorMessage("target function failed")
229+
.build());
230+
231+
// Resume
232+
var result2 = runner.run("input");
233+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
234+
235+
// The invoke operation span from the second invocation should have ERROR status
236+
var allSpans = spanExporter.getFinishedSpanItems();
237+
var invokeSpans = allSpans.stream()
238+
.filter(s -> s.getName().contains("call-target"))
239+
.toList();
240+
assertTrue(invokeSpans.size() >= 2, "Should have at least 2 invoke spans (one PENDING, one ended with error)");
241+
242+
// The span from the second invocation should be marked as error
243+
var errorSpan = invokeSpans.stream()
244+
.filter(s -> s.getStatus().getStatusCode() == io.opentelemetry.api.trace.StatusCode.ERROR)
245+
.findFirst();
246+
assertTrue(
247+
errorSpan.isPresent(),
248+
"Should have an invoke span with ERROR status when invoke fails during suspension");
249+
}
250+
205251
@Test
206252
void childContext_producesNestedSpans() {
207253
var runner = LocalDurableTestRunner.create(

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/PluginIntegrationTest.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.List;
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import org.junit.jupiter.api.Test;
13+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1314
import software.amazon.lambda.durable.config.StepConfig;
1415
import software.amazon.lambda.durable.model.ExecutionStatus;
1516
import software.amazon.lambda.durable.model.WaitForConditionResult;
@@ -235,6 +236,100 @@ void plugin_operationEnd_firedOnceForStepCompletingInCurrentInvocation() {
235236
assertEquals(1, step1EndCount, "step1 onOperationEnd should fire exactly once");
236237
}
237238

239+
@Test
240+
void plugin_operationEnd_includesError_whenInvokeFailsDuringSuspension() {
241+
var plugin = new RecordingPlugin();
242+
var config = DurableConfig.builder().withPlugins(plugin).build();
243+
244+
var runner = LocalDurableTestRunner.create(
245+
String.class,
246+
(input, context) -> {
247+
try {
248+
context.invoke("call-target", "target-fn", "{}", String.class);
249+
} catch (Exception e) {
250+
// expected
251+
}
252+
return "done";
253+
},
254+
config);
255+
256+
// First invocation: invoke starts, suspends waiting for target
257+
var result1 = runner.run("input");
258+
assertEquals(ExecutionStatus.PENDING, result1.getStatus());
259+
260+
// Target fails while Lambda is frozen
261+
runner.failChainedInvoke(
262+
"call-target",
263+
ErrorObject.builder()
264+
.errorType("TargetError")
265+
.errorMessage("target function failed")
266+
.build());
267+
268+
// Clear plugin state to only track second invocation
269+
plugin.operationEnds.clear();
270+
271+
var result2 = runner.run("input");
272+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
273+
274+
// onOperationEnd for "call-target" should fire with error info
275+
var invokeEnd = plugin.operationEnds.stream()
276+
.filter(info -> "call-target".equals(info.name()))
277+
.findFirst()
278+
.orElse(null);
279+
assertNotNull(invokeEnd, "onOperationEnd should fire for failed invoke during suspension");
280+
assertNotNull(invokeEnd.error(), "error should be propagated for failed invoke");
281+
assertEquals("target function failed", invokeEnd.error().getMessage());
282+
}
283+
284+
@Test
285+
void plugin_operationEnd_includesError_whenStepFailsViaCheckpoint() {
286+
var plugin = new RecordingPlugin();
287+
var config = DurableConfig.builder().withPlugins(plugin).build();
288+
289+
var runner = LocalDurableTestRunner.create(
290+
String.class,
291+
(input, context) -> context.step(
292+
"failing-step",
293+
String.class,
294+
stepCtx -> {
295+
throw new RuntimeException("step exploded");
296+
},
297+
StepConfig.builder()
298+
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
299+
.build()),
300+
config);
301+
302+
var result = runner.run("input");
303+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
304+
305+
// onOperationEnd for "failing-step" should fire with error info
306+
var stepEnd = plugin.operationEnds.stream()
307+
.filter(info -> "failing-step".equals(info.name()))
308+
.findFirst()
309+
.orElse(null);
310+
assertNotNull(stepEnd, "onOperationEnd should fire for failed step");
311+
assertNotNull(stepEnd.error(), "error should be propagated for failed step");
312+
assertTrue(stepEnd.error().getMessage().contains("step exploded"));
313+
}
314+
315+
@Test
316+
void plugin_operationEnd_noError_whenOperationSucceeds() {
317+
var plugin = new RecordingPlugin();
318+
var config = DurableConfig.builder().withPlugins(plugin).build();
319+
320+
var runner = LocalDurableTestRunner.create(
321+
String.class, (input, context) -> context.step("ok-step", String.class, stepCtx -> "success"), config);
322+
323+
runner.runUntilComplete("input");
324+
325+
var stepEnd = plugin.operationEnds.stream()
326+
.filter(info -> "ok-step".equals(info.name()))
327+
.findFirst()
328+
.orElse(null);
329+
assertNotNull(stepEnd, "onOperationEnd should fire for successful step");
330+
assertNull(stepEnd.error(), "error should be null for successful step");
331+
}
332+
238333
// ─── User function hooks ─────────────────────────────────────────────
239334

240335
@Test

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import java.util.concurrent.atomic.AtomicReference;
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
13+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1314
import software.amazon.awssdk.services.lambda.model.Operation;
15+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1416
import software.amazon.awssdk.services.lambda.model.OperationType;
1517
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1618
import software.amazon.lambda.durable.context.DurableContextImpl;
19+
import software.amazon.lambda.durable.exception.DurableOperationException;
1720
import software.amazon.lambda.durable.exception.IllegalDurableOperationException;
1821
import software.amazon.lambda.durable.exception.NonDeterministicExecutionException;
1922
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
@@ -141,7 +144,7 @@ public void execute() {
141144
// This enables the OTel plugin to emit spans for operations that transitioned while Lambda was frozen.
142145
if (replayCompletedOperation.get()
143146
&& executionManager.isOperationUpdatedSinceLastInvocation(getOperationId())) {
144-
fireOnOperationEnd(existing, null);
147+
fireOnOperationEnd(existing, extractErrorFromOperation(existing));
145148
}
146149

147150
replay(existing);
@@ -354,7 +357,7 @@ public void onCheckpointComplete(Operation operation) {
354357

355358
// Fire onOperationEnd plugin hook — operation reached terminal status for the first time (not replay)
356359
if (!replayCompletedOperation.get()) {
357-
fireOnOperationEnd(operation, null);
360+
fireOnOperationEnd(operation, extractErrorFromOperation(operation));
358361
}
359362

360363
markCompletionFutureCompleted();
@@ -522,4 +525,43 @@ private void fireOnOperationEnd(Operation operation, Throwable error) {
522525
operation, operationIdentifier, durableContext.getParentId(), error);
523526
getPluginRunner().onOperationEnd(info);
524527
}
528+
529+
/**
530+
* Extracts the error from a terminal operation as a Throwable. Returns null if the operation succeeded or has no
531+
* error details.
532+
*/
533+
private Throwable extractErrorFromOperation(Operation operation) {
534+
if (operation.status() != OperationStatus.FAILED
535+
&& operation.status() != OperationStatus.TIMED_OUT
536+
&& operation.status() != OperationStatus.STOPPED) {
537+
return null;
538+
}
539+
var errorObject = getErrorObject(operation);
540+
if (errorObject == null) {
541+
return null;
542+
}
543+
return new DurableOperationException(operation, errorObject);
544+
}
545+
546+
/** Extracts the ErrorObject from an operation based on its type. */
547+
private static ErrorObject getErrorObject(Operation operation) {
548+
if (operation.type() == null) {
549+
return null;
550+
}
551+
return switch (operation.type()) {
552+
case STEP ->
553+
operation.stepDetails() != null ? operation.stepDetails().error() : null;
554+
case CHAINED_INVOKE ->
555+
operation.chainedInvokeDetails() != null
556+
? operation.chainedInvokeDetails().error()
557+
: null;
558+
case CALLBACK ->
559+
operation.callbackDetails() != null
560+
? operation.callbackDetails().error()
561+
: null;
562+
case CONTEXT ->
563+
operation.contextDetails() != null ? operation.contextDetails().error() : null;
564+
default -> null;
565+
};
566+
}
525567
}

0 commit comments

Comments
 (0)