Skip to content

Commit eac52b6

Browse files
committed
fix(otel): Fix span emission for cross-invocation operations and replay
1 parent 1468236 commit eac52b6

9 files changed

Lines changed: 624 additions & 20 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.otel;
4+
5+
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
6+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
7+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
8+
import java.util.List;
9+
import software.amazon.lambda.durable.DurableConfig;
10+
import software.amazon.lambda.durable.DurableContext;
11+
import software.amazon.lambda.durable.DurableHandler;
12+
import software.amazon.lambda.durable.examples.types.GreetingRequest;
13+
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;
14+
15+
/**
16+
* OTel + X-Ray example: map operation that processes items concurrently.
17+
*
18+
* <p>Exercises map operation tracing — verifies spans for map operation + each item step.
19+
*/
20+
public class OtelXRayMapExample extends DurableHandler<GreetingRequest, String> {
21+
22+
@Override
23+
protected DurableConfig createConfiguration() {
24+
var otlpExporter = OtlpGrpcSpanExporter.getDefault();
25+
var otelPlugin = new OpenTelemetryDurablePlugin(
26+
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
27+
return DurableConfig.builder().withPlugins(otelPlugin).build();
28+
}
29+
30+
@Override
31+
public String handleRequest(GreetingRequest input, DurableContext context) {
32+
context.getLogger().info("Starting OTel X-Ray map example for {}", input.getName());
33+
34+
var items = List.of("alpha", "beta", "gamma");
35+
var result = context.map(
36+
"process-items",
37+
items,
38+
String.class,
39+
(item, index, childCtx) ->
40+
childCtx.step("transform-" + item, String.class, stepCtx -> item.toUpperCase()));
41+
42+
return "Mapped " + result.succeeded().size() + " items";
43+
}
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.otel;
4+
5+
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
6+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
7+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
8+
import software.amazon.lambda.durable.DurableConfig;
9+
import software.amazon.lambda.durable.DurableContext;
10+
import software.amazon.lambda.durable.DurableHandler;
11+
import software.amazon.lambda.durable.examples.types.GreetingRequest;
12+
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;
13+
14+
/**
15+
* OTel + X-Ray example: nested child contexts with inner steps.
16+
*
17+
* <p>Exercises nested context tracing — verifies span hierarchy for outer → inner → step.
18+
*/
19+
public class OtelXRayNestedContextExample extends DurableHandler<GreetingRequest, String> {
20+
21+
@Override
22+
protected DurableConfig createConfiguration() {
23+
var otlpExporter = OtlpGrpcSpanExporter.getDefault();
24+
var otelPlugin = new OpenTelemetryDurablePlugin(
25+
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
26+
return DurableConfig.builder().withPlugins(otelPlugin).build();
27+
}
28+
29+
@Override
30+
public String handleRequest(GreetingRequest input, DurableContext context) {
31+
context.getLogger().info("Starting OTel X-Ray nested context example for {}", input.getName());
32+
33+
return context.runInChildContext("outer", String.class, outerCtx -> {
34+
var intermediate = outerCtx.step("outer-step", String.class, stepCtx -> "Hello, " + input.getName());
35+
return outerCtx.runInChildContext("inner", String.class, innerCtx -> {
36+
return innerCtx.step("deep-step", String.class, stepCtx -> intermediate.toUpperCase() + "!");
37+
});
38+
});
39+
}
40+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.otel;
4+
5+
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
6+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
7+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
8+
import software.amazon.lambda.durable.DurableConfig;
9+
import software.amazon.lambda.durable.DurableContext;
10+
import software.amazon.lambda.durable.DurableHandler;
11+
import software.amazon.lambda.durable.examples.types.GreetingRequest;
12+
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;
13+
14+
/**
15+
* OTel + X-Ray example: parallel operation with multiple branches.
16+
*
17+
* <p>Exercises parallel operation tracing — verifies spans for parallel + branch steps.
18+
*/
19+
public class OtelXRayParallelExample extends DurableHandler<GreetingRequest, String> {
20+
21+
@Override
22+
protected DurableConfig createConfiguration() {
23+
var otlpExporter = OtlpGrpcSpanExporter.getDefault();
24+
var otelPlugin = new OpenTelemetryDurablePlugin(
25+
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
26+
return DurableConfig.builder().withPlugins(otelPlugin).build();
27+
}
28+
29+
@Override
30+
public String handleRequest(GreetingRequest input, DurableContext context) {
31+
context.getLogger().info("Starting OTel X-Ray parallel example for {}", input.getName());
32+
33+
var parallel = context.parallel("fan-out");
34+
try (parallel) {
35+
parallel.branch(
36+
"branch-a",
37+
String.class,
38+
childCtx -> childCtx.step("step-a", String.class, stepCtx -> "A: " + input.getName()));
39+
parallel.branch(
40+
"branch-b",
41+
String.class,
42+
childCtx -> childCtx.step("step-b", String.class, stepCtx -> "B: " + input.getName()));
43+
}
44+
var result = parallel.get();
45+
46+
return "Parallel completed: " + result.succeeded() + " branches";
47+
}
48+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.otel;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.examples.types.GreetingRequest;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
/**
13+
* Cloud-like tests for OTel examples using LocalDurableTestRunner.
14+
*
15+
* <p>These verify that the OTel plugin doesn't break execution for various scenarios. When deployed to Lambda with
16+
* CloudDurableTestRunner, these same scenarios validate that flush/export works correctly under real concurrency.
17+
*/
18+
class OtelXRayCloudTest {
19+
20+
@Test
21+
void mapExample_executesSuccessfully() {
22+
var handler = new OtelXRayMapExample();
23+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
24+
25+
var result = runner.runUntilComplete(new GreetingRequest("test"));
26+
27+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
28+
assertEquals("Mapped 3 items", result.getResult(String.class));
29+
}
30+
31+
@Test
32+
void parallelExample_executesSuccessfully() {
33+
var handler = new OtelXRayParallelExample();
34+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
35+
36+
var result = runner.runUntilComplete(new GreetingRequest("test"));
37+
38+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
39+
assertTrue(result.getResult(String.class).contains("Parallel completed: 2 branches"));
40+
}
41+
42+
@Test
43+
void nestedContextExample_executesSuccessfully() {
44+
var handler = new OtelXRayNestedContextExample();
45+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
46+
47+
var result = runner.runUntilComplete(new GreetingRequest("World"));
48+
49+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
50+
assertEquals("HELLO, WORLD!", result.getResult(String.class));
51+
}
52+
}

otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.ConcurrentHashMap;
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
22+
import software.amazon.lambda.durable.execution.SuspendExecutionException;
2223
import software.amazon.lambda.durable.plugin.DurableExecutionPlugin;
2324
import software.amazon.lambda.durable.plugin.InvocationEndInfo;
2425
import software.amazon.lambda.durable.plugin.InvocationInfo;
@@ -258,16 +259,45 @@ public void onOperationStart(OperationInfo info) {
258259
public void onOperationEnd(OperationEndInfo info) {
259260
if (info.id() == null) return;
260261

261-
// End the operation span that was started in onOperationStart
262262
var span = operationSpans.remove(info.id());
263-
if (span == null) return;
264263

265-
if (info.error() != null) {
266-
span.setStatus(StatusCode.ERROR, info.error().getMessage());
267-
span.recordException(info.error());
268-
}
264+
if (span != null) {
265+
// Operation was started in this invocation — end normally
266+
if (info.error() != null) {
267+
span.setStatus(StatusCode.ERROR, info.error().getMessage());
268+
span.recordException(info.error());
269+
}
270+
span.end();
271+
} else {
272+
// Operation was started in a prior invocation — create a continuation span with Link
273+
// to the deterministic span ID from the original invocation.
274+
var deterministicSpanId = idGenerator.generateSpanIdForOperation(info.id());
275+
var traceId = idGenerator.generateTraceId();
276+
var linkedSpanContext =
277+
SpanContext.create(traceId, deterministicSpanId, TraceFlags.getSampled(), TraceState.getDefault());
278+
279+
var parentContext = resolveParentContext(info.parentId());
280+
281+
var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name()))
282+
.setParent(parentContext)
283+
.addLink(linkedSpanContext)
284+
.setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn)
285+
.setAttribute(DURABLE_OPERATION_ID, info.id())
286+
.setAttribute(DURABLE_OPERATION_TYPE, info.type());
269287

270-
span.end();
288+
if (info.name() != null) {
289+
spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
290+
}
291+
292+
var continuationSpan = spanBuilder.startSpan();
293+
294+
if (info.error() != null) {
295+
continuationSpan.setStatus(StatusCode.ERROR, info.error().getMessage());
296+
continuationSpan.recordException(info.error());
297+
}
298+
299+
continuationSpan.end();
300+
}
271301
}
272302

273303
// ─── User function hooks ─────────────────────────────────────────────
@@ -327,8 +357,12 @@ public void onUserFunctionEnd(UserFunctionEndInfo info) {
327357
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, outcome);
328358

329359
if (!info.succeeded() && info.error() != null) {
330-
span.setStatus(StatusCode.ERROR, info.error().getMessage());
331-
span.recordException(info.error());
360+
if (info.error() instanceof SuspendExecutionException) {
361+
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, "pending");
362+
} else {
363+
span.setStatus(StatusCode.ERROR, info.error().getMessage());
364+
span.recordException(info.error());
365+
}
332366
}
333367

334368
if (info.endTimestamp() != null) {

0 commit comments

Comments
 (0)