-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathInstrumenter.java
More file actions
405 lines (363 loc) · 17.4 KB
/
Copy pathInstrumenter.java
File metadata and controls
405 lines (363 loc) · 17.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter;
import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsLogs;
import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsSpanEvents;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.internal.HttpRouteState;
import io.opentelemetry.instrumentation.api.internal.InstrumenterAccess;
import io.opentelemetry.instrumentation.api.internal.InstrumenterContext;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.InternalExceptionEventExtractor;
import io.opentelemetry.instrumentation.api.internal.SupportabilityMetrics;
import java.time.Instant;
import javax.annotation.Nullable;
/**
* The {@link Instrumenter} encapsulates the entire logic for gathering telemetry, from collecting
* the data, to starting and ending spans, to recording values using metrics instruments.
*
* <p>An {@link Instrumenter} is called at the start and the end of a request/response lifecycle.
* When instrumenting a library, there will generally be four steps.
*
* <ul>
* <li>Create an {@link Instrumenter} using {@link InstrumenterBuilder}. Use the builder to
* configure any library-specific customizations, and also expose useful knobs to your user.
* <li>Call {@link Instrumenter#shouldStart(Context, Object)} and do not proceed if it returns
* {@code false}.
* <li>Call {@link Instrumenter#start(Context, Object)} at the beginning of a request.
* <li>Call {@link Instrumenter#end(Context, Object, Object, Throwable)} at the end of a request.
* </ul>
*
* <p>For more detailed information about using the {@link Instrumenter} see the <a
* href="https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/docs/contributing/using-instrumenter-api.md">Using
* the Instrumenter API</a> page.
*/
public class Instrumenter<REQUEST, RESPONSE> {
private static final ContextKey<OperationListener[]> START_OPERATION_LISTENERS =
ContextKey.named("instrumenter-start-operation-listeners");
/**
* Returns a new {@link InstrumenterBuilder}.
*
* <p>The {@code instrumentationName} indicates the instrumentation library name, not the
* instrument<b>ed</b> library name. The value passed in this parameter should uniquely identify
* the instrumentation library so that during troubleshooting it's possible to determine where the
* telemetry came from.
*
* <p>In OpenTelemetry instrumentations we use a convention to encode the minimum supported
* version of the instrument<b>ed</b> library into the instrumentation name, for example {@code
* io.opentelemetry.apache-httpclient-4.0}. This way, if there are different instrumentations for
* different library versions it's easy to find out which instrumentations produced the telemetry
* data.
*/
public static <REQUEST, RESPONSE> InstrumenterBuilder<REQUEST, RESPONSE> builder(
OpenTelemetry openTelemetry,
String instrumentationName,
SpanNameExtractor<? super REQUEST> spanNameExtractor) {
return new InstrumenterBuilder<>(openTelemetry, instrumentationName, spanNameExtractor);
}
private static final SupportabilityMetrics supportability = SupportabilityMetrics.instance();
private final String instrumentationName;
private final Tracer tracer;
@Nullable private final Logger logger;
private final SpanNameExtractor<? super REQUEST> spanNameExtractor;
private final SpanKindExtractor<? super REQUEST> spanKindExtractor;
private final SpanStatusExtractor<? super REQUEST, ? super RESPONSE> spanStatusExtractor;
private final SpanLinksExtractor<? super REQUEST>[] spanLinksExtractors;
private final AttributesExtractor<? super REQUEST, ? super RESPONSE>[] attributesExtractors;
private final ContextCustomizer<? super REQUEST>[] contextCustomizers;
private final OperationListener[] operationListeners;
private final AttributesExtractor<? super REQUEST, ? super RESPONSE>[]
operationListenerAttributesExtractors;
private final ErrorCauseExtractor errorCauseExtractor;
@Nullable private final InternalExceptionEventExtractor<? super REQUEST> exceptionEventExtractor;
private final boolean propagateOperationListenersToOnEnd;
private final boolean enabled;
private final SpanSuppressor spanSuppressor;
// to allow converting generic lists to arrays with toArray
@SuppressWarnings({"rawtypes", "unchecked"})
Instrumenter(InstrumenterBuilder<REQUEST, RESPONSE> builder) {
this.instrumentationName = builder.instrumentationName;
this.tracer = builder.buildTracer();
this.spanNameExtractor = builder.spanNameExtractor;
this.spanKindExtractor = builder.spanKindExtractor;
this.spanStatusExtractor = builder.spanStatusExtractor;
this.spanLinksExtractors = builder.spanLinksExtractors.toArray(new SpanLinksExtractor[0]);
this.attributesExtractors = builder.attributesExtractors.toArray(new AttributesExtractor[0]);
this.contextCustomizers = builder.contextCustomizers.toArray(new ContextCustomizer[0]);
this.operationListeners = builder.buildOperationListeners().toArray(new OperationListener[0]);
this.operationListenerAttributesExtractors =
builder.operationListenerAttributesExtractors.toArray(new AttributesExtractor[0]);
this.errorCauseExtractor = builder.errorCauseExtractor;
this.propagateOperationListenersToOnEnd = builder.propagateOperationListenersToOnEnd;
this.enabled = builder.enabled;
this.spanSuppressor = builder.buildSpanSuppressor();
if (emitExceptionAsLogs()) {
this.logger = builder.buildLogger();
this.exceptionEventExtractor =
builder.exceptionEventExtractor != null
? builder.exceptionEventExtractor
: defaultExceptionEventExtractor(this.spanKindExtractor);
} else {
this.logger = null;
this.exceptionEventExtractor = null;
}
}
/**
* Determines whether the operation should be instrumented for telemetry or not. If the return
* value is {@code true}, call {@link #start(Context, Object)} and {@link #end(Context, Object,
* Object, Throwable)} around the instrumented operation; if the return value is false {@code
* false} execute the operation directly without calling those methods.
*
* <p>The {@code parentContext} is the parent of the resulting instrumented operation and should
* usually be {@link Context#current() Context.current()}. The {@code request} is the request
* object of this operation.
*/
public boolean shouldStart(Context parentContext, REQUEST request) {
if (!enabled) {
return false;
}
SpanKind spanKind = spanKindExtractor.extract(request);
boolean suppressed = spanSuppressor.shouldSuppress(parentContext, spanKind);
if (suppressed) {
supportability.recordSuppressedSpan(spanKind, instrumentationName);
}
return !suppressed;
}
/**
* Starts a new instrumented operation. The returned {@link Context} should be propagated along
* with the operation and passed to the {@link #end(Context, Object, Object, Throwable)} method
* when it is finished.
*
* <p>The {@code parentContext} is the parent of the resulting instrumented operation and should
* usually be {@link Context#current() Context.current()}. The {@code request} is the request
* object of this operation.
*/
public Context start(Context parentContext, REQUEST request) {
return doStart(parentContext, request, null);
}
/**
* Ends an instrumented operation. It is of extreme importance for this method to be always called
* after {@link #start(Context, Object) start()}. Calling {@code start()} without later {@code
* end()} will result in inaccurate or wrong telemetry and context leaks.
*
* <p>The {@code context} must be the same value that was returned from {@link #start(Context,
* Object)}. The {@code request} parameter is the request object of the operation, {@code
* response} is the response object of the operation, and {@code error} is an exception that was
* thrown by the operation or {@code null} if no error occurred.
*/
public void end(
Context context, REQUEST request, @Nullable RESPONSE response, @Nullable Throwable error) {
doEnd(context, request, response, error, null);
}
/** Internal method for creating spans with given start/end timestamps. */
Context startAndEnd(
Context parentContext,
REQUEST request,
@Nullable RESPONSE response,
@Nullable Throwable error,
Instant startTime,
Instant endTime) {
Context context = doStart(parentContext, request, startTime);
doEnd(context, request, response, error, endTime);
return context;
}
private Context doStart(Context parentContext, REQUEST request, @Nullable Instant startTime) {
try {
return doStartImpl(parentContext, request, startTime);
} finally {
InstrumenterContext.reset();
}
}
private Context doStartImpl(Context parentContext, REQUEST request, @Nullable Instant startTime) {
SpanKind spanKind = spanKindExtractor.extract(request);
SpanBuilder spanBuilder =
tracer.spanBuilder(spanNameExtractor.extract(request)).setSpanKind(spanKind);
if (startTime != null) {
spanBuilder.setStartTimestamp(startTime);
}
SpanLinksBuilder spanLinksBuilder = new SpanLinksBuilderImpl(spanBuilder);
for (SpanLinksExtractor<? super REQUEST> spanLinksExtractor : spanLinksExtractors) {
spanLinksExtractor.extract(spanLinksBuilder, parentContext, request);
}
UnsafeAttributes attributes = new UnsafeAttributes();
for (AttributesExtractor<? super REQUEST, ? super RESPONSE> extractor : attributesExtractors) {
extractor.onStart(attributes, parentContext, request);
}
Context context = parentContext;
// context customizers run before span start, so that they can have access to the parent span
// context, and so that their additions to the context will be visible to span processors
for (ContextCustomizer<? super REQUEST> contextCustomizer : contextCustomizers) {
context = contextCustomizer.onStart(context, request, attributes);
}
boolean localRoot = LocalRootSpan.isLocalRoot(parentContext);
boolean hasLocalRoot = LocalRootSpan.fromContextOrNull(context) != null;
spanBuilder.setAllAttributes(attributes);
Span span = spanBuilder.setParent(context).startSpan();
context = context.with(span);
if (operationListeners.length != 0) {
if (operationListenerAttributesExtractors.length != 0) {
UnsafeAttributes operationAttributes = new UnsafeAttributes();
operationAttributes.putAll(attributes.asMap());
for (AttributesExtractor<? super REQUEST, ? super RESPONSE> extractor :
operationListenerAttributesExtractors) {
extractor.onStart(operationAttributes, parentContext, request);
}
attributes = operationAttributes;
}
// operation listeners run after span start, so that they have access to the current span
// for capturing exemplars
long startNanos = getNanos(startTime);
for (OperationListener operationListener : operationListeners) {
context = operationListener.onStart(context, attributes, startNanos);
}
}
if (propagateOperationListenersToOnEnd || context.get(START_OPERATION_LISTENERS) != null) {
// when start and end are not called on the same instrumenter we need to use the operation
// listeners that were used during start in end to correctly handle metrics like
// http.server.active_requests that is recorded both in start and end
//
// need to also add when there is already START_OPERATION_LISTENERS, otherwise this
// instrumenter will call its parent's operation listeners in doEnd
context = context.with(START_OPERATION_LISTENERS, operationListeners);
}
if (localRoot) {
context = LocalRootSpan.store(context, span);
}
if (!hasLocalRoot && spanKind == SpanKind.SERVER) {
HttpRouteState.updateSpan(context, span);
}
return spanSuppressor.storeInContext(context, spanKind, span);
}
private void doEnd(
Context context,
REQUEST request,
@Nullable RESPONSE response,
@Nullable Throwable error,
@Nullable Instant endTime) {
Span span = Span.fromContext(context);
if (error != null) {
error = errorCauseExtractor.extract(error);
if (emitExceptionAsSpanEvents()) {
span.recordException(error);
}
// Exception logs are intentionally emitted even when the span is not recording.
if (emitExceptionAsLogs() && exceptionEventExtractor != null) {
emitExceptionLog(context, error, request, endTime);
}
}
UnsafeAttributes attributes = new UnsafeAttributes();
for (AttributesExtractor<? super REQUEST, ? super RESPONSE> extractor : attributesExtractors) {
extractor.onEnd(attributes, context, request, response, error);
}
span.setAllAttributes(attributes);
OperationListener[] operationListeners = context.get(START_OPERATION_LISTENERS);
if (operationListeners == null) {
operationListeners = this.operationListeners;
}
SpanStatusBuilder spanStatusBuilder = new SpanStatusBuilderImpl(span);
spanStatusExtractor.extract(spanStatusBuilder, request, response, error);
if (operationListeners.length != 0) {
if (operationListenerAttributesExtractors.length != 0) {
UnsafeAttributes operationAttributes = new UnsafeAttributes();
operationAttributes.putAll(attributes.asMap());
for (AttributesExtractor<? super REQUEST, ? super RESPONSE> extractor :
operationListenerAttributesExtractors) {
extractor.onEnd(operationAttributes, context, request, response, error);
}
attributes = operationAttributes;
}
long endNanos = getNanos(endTime);
for (int i = operationListeners.length - 1; i >= 0; i--) {
operationListeners[i].onEnd(context, attributes, endNanos);
}
}
if (endTime != null) {
span.end(endTime);
} else {
span.end();
}
}
private void emitExceptionLog(
Context context, Throwable throwable, REQUEST request, @Nullable Instant endTime) {
if (logger == null || exceptionEventExtractor == null) {
// this condition is to keep nullaway happy
// doEnd already guards on exceptionEventExtractor != null, so this is unreachable
return;
}
LogRecordBuilder logRecordBuilder = logger.logRecordBuilder();
logRecordBuilder.setContext(context);
if (endTime != null) {
logRecordBuilder.setTimestamp(endTime);
}
exceptionEventExtractor.extract(logRecordBuilder, context, request);
logRecordBuilder.setException(throwable);
logRecordBuilder.emit();
}
// Per semconv
// (https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#errors-in-logs),
// SERVER and CONSUMER spans should record exceptions with ERROR severity, while CLIENT and
// PRODUCER spans should use WARN.
private static <REQUEST> InternalExceptionEventExtractor<REQUEST> defaultExceptionEventExtractor(
SpanKindExtractor<? super REQUEST> spanKindExtractor) {
return (logRecordBuilder, context, request) -> {
logRecordBuilder.setEventName("exception");
SpanKind spanKind = spanKindExtractor.extract(request);
Severity severity =
(spanKind == SpanKind.SERVER || spanKind == SpanKind.CONSUMER)
? Severity.ERROR
: Severity.WARN;
logRecordBuilder.setSeverity(severity);
};
}
private static long getNanos(@Nullable Instant time) {
if (time == null) {
return System.nanoTime();
}
return SECONDS.toNanos(time.getEpochSecond()) + time.getNano();
}
static {
InstrumenterUtil.setInstrumenterAccess(
new InstrumenterAccess() {
@Override
public <RQ, RS> Context startAndEnd(
Instrumenter<RQ, RS> instrumenter,
Context parentContext,
RQ request,
@Nullable RS response,
@Nullable Throwable error,
Instant startTime,
Instant endTime) {
return instrumenter.startAndEnd(
parentContext, request, response, error, startTime, endTime);
}
@Override
public <REQUEST, RESPONSE> Context suppressSpan(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context parentContext,
REQUEST request) {
return suppressSpan(
instrumenter, parentContext, instrumenter.spanKindExtractor.extract(request));
}
@Override
public <REQUEST, RESPONSE> Context suppressSpan(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context parentContext,
SpanKind spanKind) {
return instrumenter.spanSuppressor.storeInContext(
parentContext, spanKind, Span.getInvalid());
}
});
}
}