Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.otel.DeterministicIdGenerator;
import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin;

/**
Expand Down Expand Up @@ -40,12 +39,8 @@ public class OtelExample extends DurableHandler<GreetingRequest, String> {

@Override
protected DurableConfig createConfiguration() {
var idGenerator = new DeterministicIdGenerator();
var tracerProvider = SdkTracerProvider.builder()
.setIdGenerator(idGenerator)
.addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create()))
.build();
var otelPlugin = new OpenTelemetryDurablePlugin(tracerProvider, idGenerator);
var otelPlugin = new OpenTelemetryDurablePlugin(
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create())));

return DurableConfig.builder().withPlugins(otelPlugin).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -52,17 +52,15 @@ public class OpenTelemetryDurablePlugin implements DurableExecutionPlugin {
private static final Logger logger = LoggerFactory.getLogger(OpenTelemetryDurablePlugin.class);
private static final String INSTRUMENTATION_NAME = "aws-durable-execution-sdk-java";

private final TracerProvider tracerProvider;
private final SdkTracerProvider tracerProvider;
private final Tracer tracer;
private final DeterministicIdGenerator idGenerator;
private final ContextExtractor contextExtractor;
private final double samplingRate;
private final boolean enableMdc;

// Per-invocation state
private volatile Span invocationSpan;
private volatile String executionArn;
private volatile boolean sampled = true;

// Thread-safe storage for operation spans (keyed by operationId) — open spans that need ending
private final ConcurrentHashMap<String, Span> operationSpans = new ConcurrentHashMap<>();
Expand All @@ -75,75 +73,52 @@ public class OpenTelemetryDurablePlugin implements DurableExecutionPlugin {
private final ConcurrentHashMap<String, SpanContext> operationContexts = new ConcurrentHashMap<>();

/**
* Creates an OTel plugin with default settings: X-Ray context extraction, 100% sampling, MDC enabled.
* Creates an OTel plugin with default settings: X-Ray context extraction, MDC enabled.
*
* @param tracerProvider the OTel tracer provider (should use {@link DeterministicIdGenerator})
* @param idGenerator the deterministic ID generator (same instance configured in the tracer provider)
* @param tracerProviderBuilder the tracer provider builder (ID generator will be overridden)
*/
public OpenTelemetryDurablePlugin(TracerProvider tracerProvider, DeterministicIdGenerator idGenerator) {
this(tracerProvider, idGenerator, new XRayContextExtractor(), 1.0, true);
public OpenTelemetryDurablePlugin(SdkTracerProviderBuilder tracerProviderBuilder) {
Comment thread
ayushiahjolia marked this conversation as resolved.
this(tracerProviderBuilder, new XRayContextExtractor(), true);
}

/**
* Creates an OTel plugin with a custom context extractor and sampling rate, MDC enabled.
* Creates an OTel plugin with a custom context extractor, MDC enabled.
*
* @param tracerProvider the OTel tracer provider
* @param idGenerator the deterministic ID generator
* @param tracerProviderBuilder the tracer provider builder (ID generator will be overridden)
* @param contextExtractor extracts parent trace context from the Lambda environment
* @param samplingRate value between 0.0 and 1.0 — fraction of executions to trace
*/
public OpenTelemetryDurablePlugin(
TracerProvider tracerProvider,
DeterministicIdGenerator idGenerator,
ContextExtractor contextExtractor,
double samplingRate) {
this(tracerProvider, idGenerator, contextExtractor, samplingRate, true);
SdkTracerProviderBuilder tracerProviderBuilder, ContextExtractor contextExtractor) {
this(tracerProviderBuilder, contextExtractor, true);
}

/**
* Creates an OTel plugin with full configuration.
*
* @param tracerProvider the OTel tracer provider
* @param idGenerator the deterministic ID generator
* <p>The plugin internally creates a {@link DeterministicIdGenerator} and sets it on the provided builder before
* building the tracer provider. Customers configure exporters, span processors, and samplers on the builder — the
* plugin handles ID generation.
*
* @param tracerProviderBuilder the tracer provider builder (ID generator will be overridden)
* @param contextExtractor extracts parent trace context from the Lambda environment
* @param samplingRate value between 0.0 and 1.0 — fraction of executions to trace
* @param enableMdc if true, injects trace_id/span_id into SLF4J MDC for log correlation
*/
public OpenTelemetryDurablePlugin(
TracerProvider tracerProvider,
DeterministicIdGenerator idGenerator,
ContextExtractor contextExtractor,
double samplingRate,
boolean enableMdc) {
this.tracerProvider = tracerProvider;
this.idGenerator = idGenerator;
SdkTracerProviderBuilder tracerProviderBuilder, ContextExtractor contextExtractor, boolean enableMdc) {
this.idGenerator = new DeterministicIdGenerator();
this.tracerProvider = tracerProviderBuilder.setIdGenerator(idGenerator).build();
this.tracer = tracerProvider.get(INSTRUMENTATION_NAME);
this.contextExtractor = contextExtractor;
this.samplingRate = samplingRate;
this.enableMdc = enableMdc;
}

/**
* Creates an OTel plugin using a pre-configured {@link SdkTracerProvider}.
*
* @param sdkTracerProvider the SDK tracer provider
* @param idGenerator the deterministic ID generator
*/
public OpenTelemetryDurablePlugin(SdkTracerProvider sdkTracerProvider, DeterministicIdGenerator idGenerator) {
this((TracerProvider) sdkTracerProvider, idGenerator);
}

// ─── Invocation hooks ────────────────────────────────────────────────

@Override
public void onInvocationStart(InvocationInfo info) {
this.executionArn = info.executionArn();
idGenerator.setExecutionArn(info.executionArn());

// Determine sampling (consistent across all invocations of same execution)
this.sampled = SamplingUtil.shouldSampleExecution(info.executionArn(), samplingRate);
if (!sampled) return;

// Extract parent context from Lambda environment (X-Ray, W3C, etc.)
var extractedParentContext = contextExtractor.extract();

Expand All @@ -162,7 +137,7 @@ public void onInvocationStart(InvocationInfo info) {

@Override
public void onInvocationEnd(InvocationEndInfo info) {
if (!sampled || invocationSpan == null) return;
if (invocationSpan == null) return;

// End any operation spans that are still open (operations that didn't complete in this invocation)
for (var entry : operationSpans.entrySet()) {
Expand Down Expand Up @@ -196,19 +171,17 @@ public void onInvocationEnd(InvocationEndInfo info) {
invocationSpan = null;

// Flush spans before Lambda freezes
if (tracerProvider instanceof SdkTracerProvider sdkProvider) {
var flushResult = sdkProvider.forceFlush().join(5, java.util.concurrent.TimeUnit.SECONDS);
if (!flushResult.isSuccess()) {
logger.warn("OTel span flush failed or timed out — some spans may be lost");
}
var flushResult = tracerProvider.forceFlush().join(5, java.util.concurrent.TimeUnit.SECONDS);
if (!flushResult.isSuccess()) {
logger.warn("OTel span flush failed or timed out — some spans may be lost");
}
}

// ─── Operation hooks ─────────────────────────────────────────────────

@Override
public void onOperationStart(OperationInfo info) {
if (!sampled || info.id() == null) return;
if (info.id() == null) return;

idGenerator.setNextSpanOperationId(info.id());

Expand Down Expand Up @@ -239,7 +212,7 @@ public void onOperationStart(OperationInfo info) {

@Override
public void onOperationEnd(OperationEndInfo info) {
if (!sampled || info.id() == null) return;
if (info.id() == null) return;

// End the operation span that was started in onOperationStart
var span = operationSpans.remove(info.id());
Expand All @@ -257,7 +230,6 @@ public void onOperationEnd(OperationEndInfo info) {

@Override
public void onUserFunctionStart(UserFunctionStartInfo info) {
if (!sampled) return;
var key = attemptKey(info.id(), info.attempt());

// Use the operation span as parent for the attempt span
Expand Down Expand Up @@ -295,7 +267,6 @@ public void onUserFunctionStart(UserFunctionStartInfo info) {

@Override
public void onUserFunctionEnd(UserFunctionEndInfo info) {
if (!sampled) return;
var key = attemptKey(info.id(), info.attempt());

// Close scope first (must happen on same thread as makeCurrent)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ void inject_withNullArn_doesNotSetArn() {
void plugin_withMdcEnabled_setsArnInMdc() {
// Test MDC through the full plugin lifecycle (where makeCurrent is called on same thread)
var spanExporter = InMemorySpanExporter.create();
var idGenerator = new DeterministicIdGenerator();
var tracerProvider = SdkTracerProvider.builder()
.setIdGenerator(idGenerator)
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();

var plugin = new OpenTelemetryDurablePlugin(
tracerProvider, idGenerator, () -> io.opentelemetry.context.Context.root(), 1.0, true);
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(spanExporter)),
() -> io.opentelemetry.context.Context.root(),
true);

plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec-mdc-test", true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@ class OpenTelemetryDurablePluginTest {

private InMemorySpanExporter spanExporter;
private OpenTelemetryDurablePlugin plugin;
private DeterministicIdGenerator idGenerator;

@BeforeEach
void setUp() {
spanExporter = InMemorySpanExporter.create();
idGenerator = new DeterministicIdGenerator();

var tracerProvider = SdkTracerProvider.builder()
.setIdGenerator(idGenerator)
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();

plugin = new OpenTelemetryDurablePlugin(
tracerProvider, idGenerator, () -> io.opentelemetry.context.Context.root(), 1.0, false);
SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(spanExporter)),
() -> io.opentelemetry.context.Context.root(),
false);
}

@Test
Expand Down Expand Up @@ -215,16 +210,13 @@ void operationNotCompleted_spanEndedAtInvocationEnd() {
}

@Test
void sampling_disabledExecution_producesNoSpans() {
void sampling_disabled_producesNoSpans() {
spanExporter = InMemorySpanExporter.create();
var sampledPlugin = new OpenTelemetryDurablePlugin(
SdkTracerProvider.builder()
.setIdGenerator(idGenerator)
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build(),
idGenerator,
.setSampler(io.opentelemetry.sdk.trace.samplers.Sampler.alwaysOff())
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter)),
() -> io.opentelemetry.context.Context.root(),
0.0, // 0% sampling — nothing should be traced
false);

sampledPlugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true));
Expand Down

This file was deleted.

Loading