Skip to content

Commit ca69447

Browse files
adinauerclaude
andcommitted
ref(kafka): Extract sentry-kafka module from spring-jakarta
Move Kafka producer interceptor to a new sentry-kafka module and rename to SentryKafkaProducerInterceptor. Add SentryKafkaConsumerInterceptor for vanilla kafka-clients users. Spring integration now depends on sentry-kafka and passes a Spring-specific trace origin. This allows non-Spring applications to use Kafka queue instrumentation directly via kafka-clients interceptor config. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 24cff6d commit ca69447

File tree

23 files changed

+418
-178
lines changed

23 files changed

+418
-178
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Sentry SDK for Java and Android
3535
| sentry | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry?style=for-the-badge&logo=sentry&color=green) | 21 |
3636
| sentry-jul | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jul?style=for-the-badge&logo=sentry&color=green) |
3737
| sentry-jdbc | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jdbc?style=for-the-badge&logo=sentry&color=green) |
38+
| sentry-kafka | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-kafka?style=for-the-badge&logo=sentry&color=green) |
3839
| sentry-apollo | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo?style=for-the-badge&logo=sentry&color=green) | 21 |
3940
| sentry-apollo-3 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-3?style=for-the-badge&logo=sentry&color=green) | 21 |
4041
| sentry-apollo-4 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-4?style=for-the-badge&logo=sentry&color=green) | 21 |

buildSrc/src/main/java/Config.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ object Config {
8080
val SENTRY_JCACHE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jcache"
8181
val SENTRY_QUARTZ_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.quartz"
8282
val SENTRY_JDBC_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jdbc"
83+
val SENTRY_KAFKA_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.kafka"
8384
val SENTRY_OPENFEATURE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.openfeature"
8485
val SENTRY_LAUNCHDARKLY_SERVER_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.launchdarkly-server"
8586
val SENTRY_LAUNCHDARKLY_ANDROID_SDK_NAME = "$SENTRY_ANDROID_SDK_NAME.launchdarkly"

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-star
184184
springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" }
185185
springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" }
186186
spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" }
187+
kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" }
187188
springboot4-otel = { module = "io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter", version.ref = "otelInstrumentation" }
188189
springboot4-resttestclient = { module = "org.springframework.boot:spring-boot-resttestclient", version.ref = "springboot4" }
189190
springboot4-starter = { module = "org.springframework.boot:spring-boot-starter", version.ref = "springboot4" }

sentry-kafka/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# sentry-kafka
2+
3+
This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly.
4+
5+
Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.

sentry-kafka/api/sentry-kafka.api

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
public final class io/sentry/kafka/BuildConfig {
2+
public static final field SENTRY_KAFKA_SDK_NAME Ljava/lang/String;
3+
public static final field VERSION_NAME Ljava/lang/String;
4+
}
5+
6+
public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor {
7+
public static final field TRACE_ORIGIN Ljava/lang/String;
8+
public fun <init> (Lio/sentry/IScopes;)V
9+
public fun close ()V
10+
public fun configure (Ljava/util/Map;)V
11+
public fun onCommit (Ljava/util/Map;)V
12+
public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
13+
}
14+
15+
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
16+
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
17+
public static final field TRACE_ORIGIN Ljava/lang/String;
18+
public fun <init> (Lio/sentry/IScopes;)V
19+
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
20+
public fun close ()V
21+
public fun configure (Ljava/util/Map;)V
22+
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
23+
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
24+
}
25+

sentry-kafka/build.gradle.kts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import net.ltgt.gradle.errorprone.errorprone
2+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
3+
4+
plugins {
5+
`java-library`
6+
id("io.sentry.javadoc")
7+
alias(libs.plugins.kotlin.jvm)
8+
jacoco
9+
alias(libs.plugins.errorprone)
10+
alias(libs.plugins.gradle.versions)
11+
alias(libs.plugins.buildconfig)
12+
}
13+
14+
tasks.withType<KotlinCompile>().configureEach {
15+
compilerOptions.jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_1_8
16+
}
17+
18+
dependencies {
19+
api(projects.sentry)
20+
compileOnly(libs.kafka.clients)
21+
compileOnly(libs.jetbrains.annotations)
22+
compileOnly(libs.nopen.annotations)
23+
24+
errorprone(libs.errorprone.core)
25+
errorprone(libs.nopen.checker)
26+
errorprone(libs.nullaway)
27+
28+
// tests
29+
testImplementation(projects.sentryTestSupport)
30+
testImplementation(kotlin(Config.kotlinStdLib))
31+
testImplementation(libs.kotlin.test.junit)
32+
testImplementation(libs.mockito.kotlin)
33+
testImplementation(libs.mockito.inline)
34+
testImplementation(libs.kafka.clients)
35+
}
36+
37+
configure<SourceSetContainer> { test { java.srcDir("src/test/java") } }
38+
39+
jacoco { toolVersion = libs.versions.jacoco.get() }
40+
41+
tasks.jacocoTestReport {
42+
reports {
43+
xml.required.set(true)
44+
html.required.set(false)
45+
}
46+
}
47+
48+
tasks {
49+
jacocoTestCoverageVerification {
50+
violationRules { rule { limit { minimum = Config.QualityPlugins.Jacoco.minimumCoverage } } }
51+
}
52+
check {
53+
dependsOn(jacocoTestCoverageVerification)
54+
dependsOn(jacocoTestReport)
55+
}
56+
}
57+
58+
tasks.withType<JavaCompile>().configureEach {
59+
options.errorprone {
60+
check("NullAway", net.ltgt.gradle.errorprone.CheckSeverity.ERROR)
61+
option("NullAway:AnnotatedPackages", "io.sentry")
62+
}
63+
}
64+
65+
buildConfig {
66+
useJavaOutput()
67+
packageName("io.sentry.kafka")
68+
buildConfigField("String", "SENTRY_KAFKA_SDK_NAME", "\"${Config.Sentry.SENTRY_KAFKA_SDK_NAME}\"")
69+
buildConfigField("String", "VERSION_NAME", "\"${project.version}\"")
70+
}
71+
72+
tasks.jar {
73+
manifest {
74+
attributes(
75+
"Sentry-Version-Name" to project.version,
76+
"Sentry-SDK-Name" to Config.Sentry.SENTRY_KAFKA_SDK_NAME,
77+
"Sentry-SDK-Package-Name" to "maven:io.sentry:sentry-kafka",
78+
"Implementation-Vendor" to "Sentry",
79+
"Implementation-Title" to project.name,
80+
"Implementation-Version" to project.version,
81+
)
82+
}
83+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.sentry.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.IScopes;
5+
import io.sentry.ITransaction;
6+
import io.sentry.SentryTraceHeader;
7+
import io.sentry.SpanDataConvention;
8+
import io.sentry.SpanStatus;
9+
import io.sentry.TransactionContext;
10+
import io.sentry.TransactionOptions;
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.Map;
15+
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
16+
import org.apache.kafka.clients.consumer.ConsumerRecord;
17+
import org.apache.kafka.clients.consumer.ConsumerRecords;
18+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.header.Header;
21+
import org.jetbrains.annotations.ApiStatus;
22+
import org.jetbrains.annotations.NotNull;
23+
import org.jetbrains.annotations.Nullable;
24+
25+
@ApiStatus.Internal
26+
public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
27+
28+
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer";
29+
30+
private final @NotNull IScopes scopes;
31+
32+
public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
33+
this.scopes = scopes;
34+
}
35+
36+
@Override
37+
public @NotNull ConsumerRecords<K, V> onConsume(final @NotNull ConsumerRecords<K, V> records) {
38+
if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) {
39+
return records;
40+
}
41+
42+
final @NotNull ConsumerRecord<K, V> firstRecord = records.iterator().next();
43+
44+
try {
45+
final @Nullable TransactionContext continued = continueTrace(firstRecord);
46+
final @NotNull TransactionContext txContext =
47+
continued != null ? continued : new TransactionContext("queue.receive", "queue.receive");
48+
txContext.setName("queue.receive");
49+
txContext.setOperation("queue.receive");
50+
51+
final @NotNull TransactionOptions txOptions = new TransactionOptions();
52+
txOptions.setOrigin(TRACE_ORIGIN);
53+
txOptions.setBindToScope(false);
54+
55+
final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions);
56+
if (!transaction.isNoOp()) {
57+
transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
58+
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic());
59+
transaction.setData("messaging.batch.message.count", records.count());
60+
transaction.setStatus(SpanStatus.OK);
61+
transaction.finish();
62+
}
63+
} catch (Throwable ignored) {
64+
// Instrumentation must never break the customer's Kafka poll loop.
65+
}
66+
67+
return records;
68+
}
69+
70+
@Override
71+
public void onCommit(final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets) {}
72+
73+
@Override
74+
public void close() {}
75+
76+
@Override
77+
public void configure(final @Nullable Map<String, ?> configs) {}
78+
79+
private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord<K, V> record) {
80+
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
81+
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
82+
final @Nullable List<String> baggageHeaders =
83+
baggage != null ? Collections.singletonList(baggage) : null;
84+
return scopes.continueTrace(sentryTrace, baggageHeaders);
85+
}
86+
87+
private @Nullable String headerValue(
88+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
89+
final @Nullable Header header = record.headers().lastHeader(headerName);
90+
if (header == null || header.value() == null) {
91+
return null;
92+
}
93+
return new String(header.value(), StandardCharsets.UTF_8);
94+
}
95+
}

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java renamed to sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.sentry.spring.jakarta.kafka;
1+
package io.sentry.kafka;
22

33
import io.sentry.BaggageHeader;
44
import io.sentry.DateUtils;
@@ -19,28 +19,23 @@
1919
import org.jetbrains.annotations.NotNull;
2020
import org.jetbrains.annotations.Nullable;
2121

22-
/**
23-
* A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
24-
* headers into outgoing records.
25-
*
26-
* <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
27-
* "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
28-
* #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
29-
*
30-
* <p>If the customer already has a {@link ProducerInterceptor}, the {@link
31-
* SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
32-
* org.springframework.kafka.support.CompositeProducerInterceptor}.
33-
*/
3422
@ApiStatus.Internal
35-
public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
23+
public final class SentryKafkaProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
3624

37-
static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer";
38-
static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
25+
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
26+
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
3927

4028
private final @NotNull IScopes scopes;
29+
private final @NotNull String traceOrigin;
4130

42-
public SentryProducerInterceptor(final @NotNull IScopes scopes) {
31+
public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) {
32+
this(scopes, TRACE_ORIGIN);
33+
}
34+
35+
public SentryKafkaProducerInterceptor(
36+
final @NotNull IScopes scopes, final @NotNull String traceOrigin) {
4337
this.scopes = scopes;
38+
this.traceOrigin = traceOrigin;
4439
}
4540

4641
@Override
@@ -56,7 +51,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) {
5651

5752
try {
5853
final @NotNull SpanOptions spanOptions = new SpanOptions();
59-
spanOptions.setOrigin(TRACE_ORIGIN);
54+
spanOptions.setOrigin(traceOrigin);
6055
final @NotNull ISpan span =
6156
activeSpan.startChild("queue.publish", record.topic(), spanOptions);
6257
if (span.isNoOp()) {
@@ -71,7 +66,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) {
7166
span.setStatus(SpanStatus.OK);
7267
span.finish();
7368
} catch (Throwable ignored) {
74-
// Instrumentation must never break the customer's Kafka send
69+
// Instrumentation must never break the customer's Kafka send.
7570
}
7671

7772
return record;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.sentry.kafka
2+
3+
import io.sentry.IScopes
4+
import io.sentry.ITransaction
5+
import io.sentry.SentryOptions
6+
import io.sentry.TransactionContext
7+
import io.sentry.TransactionOptions
8+
import kotlin.test.Test
9+
import kotlin.test.assertSame
10+
import org.apache.kafka.clients.consumer.ConsumerRecord
11+
import org.apache.kafka.clients.consumer.ConsumerRecords
12+
import org.apache.kafka.clients.consumer.OffsetAndMetadata
13+
import org.apache.kafka.common.TopicPartition
14+
import org.mockito.kotlin.any
15+
import org.mockito.kotlin.mock
16+
import org.mockito.kotlin.never
17+
import org.mockito.kotlin.verify
18+
import org.mockito.kotlin.whenever
19+
20+
class SentryKafkaConsumerInterceptorTest {
21+
22+
@Test
23+
fun `does nothing when queue tracing is disabled`() {
24+
val scopes = mock<IScopes>()
25+
val options = SentryOptions().apply { isEnableQueueTracing = false }
26+
whenever(scopes.options).thenReturn(options)
27+
28+
val interceptor = SentryKafkaConsumerInterceptor<String, String>(scopes)
29+
val records = singleRecordBatch()
30+
31+
val result = interceptor.onConsume(records)
32+
33+
assertSame(records, result)
34+
verify(scopes, never()).startTransaction(any<TransactionContext>(), any<TransactionOptions>())
35+
}
36+
37+
@Test
38+
fun `starts and finishes queue receive transaction for consumed batch`() {
39+
val scopes = mock<IScopes>()
40+
val options = SentryOptions().apply { isEnableQueueTracing = true }
41+
val transaction = mock<ITransaction>()
42+
43+
whenever(scopes.options).thenReturn(options)
44+
whenever(scopes.continueTrace(any(), any())).thenReturn(null)
45+
whenever(scopes.startTransaction(any<TransactionContext>(), any<TransactionOptions>()))
46+
.thenReturn(transaction)
47+
whenever(transaction.isNoOp).thenReturn(false)
48+
49+
val interceptor = SentryKafkaConsumerInterceptor<String, String>(scopes)
50+
51+
interceptor.onConsume(singleRecordBatch())
52+
53+
verify(scopes).startTransaction(any<TransactionContext>(), any<TransactionOptions>())
54+
verify(transaction).setData("messaging.system", "kafka")
55+
verify(transaction).setData("messaging.destination.name", "my-topic")
56+
verify(transaction).setData("messaging.batch.message.count", 1)
57+
verify(transaction).finish()
58+
}
59+
60+
@Test
61+
fun `commit callback is no-op`() {
62+
val interceptor = SentryKafkaConsumerInterceptor<String, String>(mock())
63+
64+
interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1)))
65+
}
66+
67+
private fun singleRecordBatch(): ConsumerRecords<String, String> {
68+
val partition = TopicPartition("my-topic", 0)
69+
val record = ConsumerRecord("my-topic", 0, 0L, "key", "value")
70+
return ConsumerRecords(mapOf(partition to listOf(record)))
71+
}
72+
}

0 commit comments

Comments
 (0)