-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathOpenTelemetryDurablePlugin.java
More file actions
420 lines (354 loc) · 17.8 KB
/
Copy pathOpenTelemetryDurablePlugin.java
File metadata and controls
420 lines (354 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.otel;
import static software.amazon.lambda.durable.otel.SpanAttributes.*;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.plugin.DurableExecutionPlugin;
import software.amazon.lambda.durable.plugin.InvocationEndInfo;
import software.amazon.lambda.durable.plugin.InvocationInfo;
import software.amazon.lambda.durable.plugin.InvocationStatus;
import software.amazon.lambda.durable.plugin.OperationEndInfo;
import software.amazon.lambda.durable.plugin.OperationInfo;
import software.amazon.lambda.durable.plugin.UserFunctionEndInfo;
import software.amazon.lambda.durable.plugin.UserFunctionStartInfo;
/**
* OpenTelemetry plugin for the AWS Lambda Durable Execution SDK.
*
* <p>Creates spans at three levels:
*
* <ul>
* <li><b>Invocation span</b> — one per Lambda invocation
* <li><b>Operation span</b> — created when an operation starts, ended when it completes or when the invocation ends
* <li><b>Attempt span</b> — one per user function execution (step attempt, child context run)
* </ul>
*
* <p>Trace ID resolution:
*
* <ol>
* <li>Uses the X-Ray trace ID from {@code _X_AMZN_TRACE_ID} when available. The durable execution backend propagates
* the same Root to all invocations of the same execution, naturally unifying the trace.
* <li>Falls back to a deterministic trace ID derived from the execution ARN (for local tests or non-Lambda
* environments).
* </ol>
*
* <p>Requires the ADOT Lambda Layer for trace export. Configure with:
*
* <ul>
* <li>Lambda Layer: {@code aws-otel-java-agent} (ADOT Java auto-instrumentation layer)
* <li>Env var: {@code AWS_LAMBDA_EXEC_WRAPPER=/opt/otel-handler}
* <li>Tracing: Active (to populate {@code _X_AMZN_TRACE_ID})
* </ul>
*
* <p>Thread-safe: uses {@link ConcurrentHashMap} for span/scope storage since the SDK runs user code on multiple
* threads.
*
* @deprecated This is a preview API that is experimental and may be changed or removed in future releases.
*/
@Deprecated
public class OpenTelemetryDurablePlugin implements DurableExecutionPlugin {
private static final Logger logger = LoggerFactory.getLogger(OpenTelemetryDurablePlugin.class);
private static final String INSTRUMENTATION_NAME = "aws-durable-execution-sdk-java";
private final SdkTracerProvider tracerProvider;
private final Tracer tracer;
private final DeterministicIdGenerator idGenerator;
private final ContextExtractor contextExtractor;
private final boolean enableMdc;
// Per-invocation state
private volatile Span invocationSpan;
private volatile String durableExecutionArn;
// Thread-safe storage for operation spans (keyed by operationId) — open spans that need ending
private final ConcurrentHashMap<String, Span> operationSpans = new ConcurrentHashMap<>();
// Thread-safe storage for attempt spans/scopes (keyed by operationId + "-" + attempt)
private final ConcurrentHashMap<String, Span> attemptSpans = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Scope> attemptScopes = new ConcurrentHashMap<>();
// Store operation span contexts for parent resolution (keyed by operationId)
private final ConcurrentHashMap<String, SpanContext> operationContexts = new ConcurrentHashMap<>();
/**
* Creates an OTel plugin with default settings: X-Ray context extraction, MDC enabled.
*
* <p>Uses the provided tracer provider builder. Customers configure exporters and span processors on the builder —
* the plugin handles ID generation.
*
* <p>For ADOT layer usage, configure with an OTLP exporter:
*
* <pre>{@code
* var otlpExporter = OtlpGrpcSpanExporter.getDefault(); // sends to localhost:4317
* var plugin = new OpenTelemetryDurablePlugin(
* SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(otlpExporter)));
* }</pre>
*
* @param tracerProviderBuilder the tracer provider builder (ID generator will be overridden)
*/
public OpenTelemetryDurablePlugin(SdkTracerProviderBuilder tracerProviderBuilder) {
this(tracerProviderBuilder, new XRayContextExtractor(), true);
}
/**
* Creates an OTel plugin with a custom context extractor, MDC enabled.
*
* @param tracerProviderBuilder the tracer provider builder (ID generator will be overridden)
* @param contextExtractor extracts parent trace context from the Lambda environment
*/
public OpenTelemetryDurablePlugin(
SdkTracerProviderBuilder tracerProviderBuilder, ContextExtractor contextExtractor) {
this(tracerProviderBuilder, contextExtractor, true);
}
/**
* Creates an OTel plugin with full configuration.
*
* @param tracerProviderBuilder the tracer provider builder (ID generator will be overridden)
* @param contextExtractor extracts parent trace context from the Lambda environment
* @param enableMdc if true, injects trace_id/span_id into SLF4J MDC for log correlation
*/
public OpenTelemetryDurablePlugin(
SdkTracerProviderBuilder tracerProviderBuilder, ContextExtractor contextExtractor, boolean enableMdc) {
this.idGenerator = new DeterministicIdGenerator();
this.tracerProvider = tracerProviderBuilder.setIdGenerator(idGenerator).build();
this.tracer = tracerProvider.get(INSTRUMENTATION_NAME);
this.contextExtractor = contextExtractor;
this.enableMdc = enableMdc;
}
// ─── Invocation hooks ────────────────────────────────────────────────
@Override
public void onInvocationStart(InvocationInfo info) {
this.durableExecutionArn = info.durableExecutionArn();
// Set execution ARN for deterministic span ID generation
idGenerator.setDurableExecutionArn(info.durableExecutionArn());
// Extract trace context from environment (X-Ray header)
var extractedContext = contextExtractor.extract();
if (extractedContext != null) {
// Use the X-Ray trace ID — backend propagates same Root across all invocations
idGenerator.setExtractedTraceId(extractedContext.traceId());
}
// If no extracted context, idGenerator falls back to ARN-derived trace ID
// Determine parent context for the invocation span
Context parentContext;
if (extractedContext != null && extractedContext.parentSpanId() != null) {
// Create a remote parent span context from the X-Ray Parent field
var parentSpanContext = SpanContext.createFromRemoteParent(
extractedContext.traceId(),
extractedContext.parentSpanId(),
TraceFlags.getSampled(),
TraceState.getDefault());
parentContext = Context.root().with(Span.wrap(parentSpanContext));
} else {
parentContext = Context.root();
}
// Create invocation span as child of Lambda's X-Ray segment (via Parent field)
var spanBuilder = tracer.spanBuilder("durable.invocation")
.setParent(parentContext)
.setAttribute(DURABLE_EXECUTION_ARN, info.durableExecutionArn())
.setAttribute(DURABLE_FIRST_INVOCATION, info.isFirstInvocation());
if (info.requestId() != null) {
spanBuilder.setAttribute(AttributeKey.stringKey("faas.invocation_id"), info.requestId());
}
invocationSpan = spanBuilder.startSpan();
}
@Override
public void onInvocationEnd(InvocationEndInfo info) {
if (invocationSpan == null) return;
// End any operation spans that are still open (operations that didn't complete in this invocation)
for (var entry : operationSpans.entrySet()) {
var span = entry.getValue();
span.setAttribute(AttributeKey.stringKey("durable.operation.status"), "PENDING");
span.end();
}
operationSpans.clear();
operationContexts.clear();
// End any attempt spans that are still open (e.g., SuspendExecutionException skipped onUserFunctionEnd)
for (var entry : attemptScopes.entrySet()) {
entry.getValue().close();
}
attemptScopes.clear();
for (var entry : attemptSpans.entrySet()) {
entry.getValue().end();
}
attemptSpans.clear();
// End invocation span
invocationSpan.setAttribute(
DURABLE_INVOCATION_STATUS, info.invocationStatus().name());
if (info.invocationStatus() == InvocationStatus.FAILED && info.executionError() != null) {
invocationSpan.setStatus(StatusCode.ERROR, info.executionError().getMessage());
invocationSpan.recordException(info.executionError());
}
invocationSpan.end();
invocationSpan = null;
// Flush spans before Lambda freezes
var flushResult = tracerProvider.forceFlush().join(5, java.util.concurrent.TimeUnit.SECONDS);
if (!flushResult.isSuccess()) {
logger.warn("OTel span flush failed or timed out — some spans may be lost");
}
}
// ─── Operation hooks ─────────────────────────────────────────────────
@Override
public void onOperationStart(OperationInfo info) {
if (info.id() == null) return;
idGenerator.setNextSpanOperationId(info.id());
var parentContext = resolveParentContext(info.parentId());
var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name()))
.setParent(parentContext)
.setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn)
.setAttribute(DURABLE_OPERATION_ID, info.id())
.setAttribute(DURABLE_OPERATION_TYPE, info.type());
if (info.name() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
}
if (info.subType() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_SUBTYPE, info.subType());
}
if (info.parentId() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_PARENT_ID, info.parentId());
}
var span = spanBuilder.startSpan();
// Store the open span — will be ended in onOperationEnd or onInvocationEnd
operationSpans.put(info.id(), span);
operationContexts.put(info.id(), span.getSpanContext());
}
@Override
public void onOperationEnd(OperationEndInfo info) {
if (info.id() == null) return;
var span = operationSpans.remove(info.id());
if (span != null) {
// Operation was started in this invocation — end normally
if (info.error() != null) {
span.setStatus(StatusCode.ERROR, info.error().getMessage());
span.recordException(info.error());
}
span.end();
} else {
// Operation was started in a prior invocation — create a continuation span with Link
// to the deterministic span ID from the original invocation.
var deterministicSpanId = idGenerator.generateSpanIdForOperation(info.id());
var traceId = idGenerator.generateTraceId();
var linkedSpanContext =
SpanContext.create(traceId, deterministicSpanId, TraceFlags.getSampled(), TraceState.getDefault());
var parentContext = resolveParentContext(info.parentId());
var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name()))
.setParent(parentContext)
.addLink(linkedSpanContext)
.setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn)
.setAttribute(DURABLE_OPERATION_ID, info.id())
.setAttribute(DURABLE_OPERATION_TYPE, info.type());
if (info.name() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
}
var continuationSpan = spanBuilder.startSpan();
if (info.error() != null) {
continuationSpan.setStatus(StatusCode.ERROR, info.error().getMessage());
continuationSpan.recordException(info.error());
}
continuationSpan.end();
}
}
// ─── User function hooks ─────────────────────────────────────────────
@Override
public void onUserFunctionStart(UserFunctionStartInfo info) {
var key = attemptKey(info.id(), info.attempt());
// Use the operation span as parent for the attempt span
var parentContext = resolveParentContext(info.id());
var spanBuilder = tracer.spanBuilder(attemptSpanName(info.type(), info.subType(), info.name(), info.attempt()))
.setParent(parentContext)
.setStartTimestamp(info.startTimestamp() != null ? info.startTimestamp() : Instant.now());
spanBuilder.setAttribute(DURABLE_EXECUTION_ARN, durableExecutionArn);
spanBuilder.setAttribute(DURABLE_OPERATION_ID, info.id());
if (info.type() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_TYPE, info.type());
}
if (info.name() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
}
if (info.attempt() != null) {
spanBuilder.setAttribute(DURABLE_ATTEMPT_NUMBER, info.attempt().longValue());
}
var span = spanBuilder.startSpan();
attemptSpans.put(key, span);
// Make span current on this thread so auto-instrumented calls become children
var scope = span.makeCurrent();
attemptScopes.put(key, scope);
// Inject trace context into MDC for log-trace correlation
if (enableMdc) {
MdcSpanEnricher.inject(durableExecutionArn);
}
}
@Override
public void onUserFunctionEnd(UserFunctionEndInfo info) {
var key = attemptKey(info.id(), info.attempt());
// Close scope first (must happen on same thread as makeCurrent)
var scope = attemptScopes.remove(key);
if (scope != null) {
scope.close();
}
var span = attemptSpans.remove(key);
if (span == null) return;
// Set outcome
var outcome = info.succeeded() ? "succeeded" : (info.error() != null ? "failed" : "unknown");
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, outcome);
if (!info.succeeded() && info.error() != null) {
if (info.error() instanceof SuspendExecutionException) {
span.setAttribute(DURABLE_ATTEMPT_OUTCOME, "pending");
} else {
span.setStatus(StatusCode.ERROR, info.error().getMessage());
span.recordException(info.error());
}
}
if (info.endTimestamp() != null) {
span.end(info.endTimestamp());
} else {
span.end();
}
// Clear MDC after user function completes
if (enableMdc) {
MdcSpanEnricher.clear();
}
}
// ─── Helpers ─────────────────────────────────────────────────────────
private Context resolveParentContext(String parentId) {
if (parentId != null) {
var parentSpanContext = operationContexts.get(parentId);
if (parentSpanContext != null) {
return Context.current().with(Span.wrap(parentSpanContext));
}
// Parent operation from a prior invocation — create non-recording placeholder
var deterministicParentSpanId = idGenerator.generateSpanIdForOperation(parentId);
var traceId = idGenerator.generateTraceId();
var placeholderContext = SpanContext.create(
traceId, deterministicParentSpanId, TraceFlags.getSampled(), TraceState.getDefault());
return Context.current().with(Span.wrap(placeholderContext));
}
// Fall back to invocation span as parent
if (invocationSpan != null) {
return Context.current().with(invocationSpan);
}
return Context.current();
}
private static String spanName(String type, String subType, String name) {
if (name != null) {
return "durable." + (subType != null ? subType.toLowerCase() : type.toLowerCase()) + ":" + name;
}
return "durable." + (subType != null ? subType.toLowerCase() : type.toLowerCase());
}
private static String attemptSpanName(String type, String subType, String name, Integer attempt) {
var base = spanName(type, subType, name);
if (attempt != null) {
return base + " [attempt " + attempt + "]";
}
return base + " [fn]";
}
private static String attemptKey(String operationId, Integer attempt) {
return operationId + "-" + (attempt != null ? attempt : "ctx");
}
}