-
-
Notifications
You must be signed in to change notification settings - Fork 467
Expand file tree
/
Copy pathSentryKafkaConsumerInterceptor.java
More file actions
95 lines (79 loc) · 3.56 KB
/
SentryKafkaConsumerInterceptor.java
File metadata and controls
95 lines (79 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package io.sentry.kafka;
import io.sentry.BaggageHeader;
import io.sentry.IScopes;
import io.sentry.ITransaction;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanStatus;
import io.sentry.TransactionContext;
import io.sentry.TransactionOptions;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ApiStatus.Internal
public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer";
private final @NotNull IScopes scopes;
public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
this.scopes = scopes;
}
@Override
public @NotNull ConsumerRecords<K, V> onConsume(final @NotNull ConsumerRecords<K, V> records) {
if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) {
return records;
}
final @NotNull ConsumerRecord<K, V> firstRecord = records.iterator().next();
try {
final @Nullable TransactionContext continued = continueTrace(firstRecord);
final @NotNull TransactionContext txContext =
continued != null ? continued : new TransactionContext("queue.receive", "queue.receive");
txContext.setName("queue.receive");
txContext.setOperation("queue.receive");
final @NotNull TransactionOptions txOptions = new TransactionOptions();
txOptions.setOrigin(TRACE_ORIGIN);
txOptions.setBindToScope(false);
final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions);
if (!transaction.isNoOp()) {
transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic());
transaction.setData("messaging.batch.message.count", records.count());
transaction.setStatus(SpanStatus.OK);
transaction.finish();
}
} catch (Throwable ignored) {
// Instrumentation must never break the customer's Kafka poll loop.
}
return records;
}
@Override
public void onCommit(final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override
public void close() {}
@Override
public void configure(final @Nullable Map<String, ?> configs) {}
private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord<K, V> record) {
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
final @Nullable List<String> baggageHeaders =
baggage != null ? Collections.singletonList(baggage) : null;
return scopes.continueTrace(sentryTrace, baggageHeaders);
}
private @Nullable String headerValue(
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
final @Nullable Header header = record.headers().lastHeader(headerName);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), StandardCharsets.UTF_8);
}
}