Skip to content

Commit d0cd5a4

Browse files
authored
Merge pull request #5368 from getsentry/fix/queue-instrumentation-review-changes
fix(queue): [Queue Instrumentation 42] Review Changes
2 parents 5a245f0 + c171d68 commit d0cd5a4

14 files changed

Lines changed: 54 additions & 36 deletions

File tree

sentry-kafka/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly.
44

5-
Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.
5+
Spring users should use the Sentry Spring (Boot) SDKs, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ private boolean isIgnored() {
143143
}
144144

145145
final @NotNull TransactionContext txContext =
146-
continued != null ? continued : new TransactionContext("queue.process", "queue.process");
147-
txContext.setName("queue.process");
146+
continued != null ? continued : new TransactionContext(record.topic(), "queue.process");
147+
txContext.setName(record.topic());
148148
txContext.setOperation("queue.process");
149149

150150
final @NotNull TransactionOptions txOptions = new TransactionOptions();
@@ -204,7 +204,6 @@ private void finishTransaction(
204204
}
205205
transaction.finish();
206206
} catch (Throwable t) {
207-
// Instrumentation must never break customer processing.
208207
scopes
209208
.getOptions()
210209
.getLogger()

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,8 @@
2929
import org.jetbrains.annotations.Nullable;
3030

3131
/**
32-
* Wraps a Kafka {@link Producer} via {@link Proxy} to record a {@code queue.publish} span around
33-
* each {@code send} and to inject Sentry trace propagation headers into the produced record.
34-
*
35-
* <p>Only the two {@code send} overloads are intercepted; every other {@link Producer} method is
36-
* forwarded directly to the delegate. Because the wrapper is a dynamic proxy, it is compatible with
37-
* any Kafka client version — new methods added to the {@link Producer} interface in future Kafka
38-
* releases are forwarded automatically without recompilation.
32+
* Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send}
33+
* and to inject Sentry trace propagation headers into the produced record.
3934
*
4035
* <p>For raw Kafka usage:
4136
*
@@ -44,9 +39,8 @@
4439
* SentryKafkaProducer.wrap(new KafkaProducer<>(props));
4540
* }</pre>
4641
*
47-
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
48-
* sentry-spring-jakarta} installs this wrapper automatically via {@code
49-
* ProducerFactory.addPostProcessor(...)}.
42+
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} installs this wrapper
43+
* automatically.
5044
*/
5145
@ApiStatus.Experimental
5246
public final class SentryKafkaProducer {
@@ -57,7 +51,7 @@ public final class SentryKafkaProducer {
5751
private SentryKafkaProducer() {}
5852

5953
/**
60-
* Wraps the given producer with Sentry instrumentation using the global scopes.
54+
* Wraps the given producer with Sentry instrumentation.
6155
*
6256
* @param delegate the Kafka producer to wrap
6357
* @return an instrumented producer that records {@code queue.publish} spans

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class SentryKafkaConsumerTracingTest {
9191
verify(forkedScopes).continueTrace(eq(sentryTraceValue), eq(listOf(baggageValue)))
9292
verify(forkedScopes).startTransaction(txContextCaptor.capture(), txOptionsCaptor.capture())
9393

94-
assertEquals("queue.process", txContextCaptor.firstValue.name)
94+
assertEquals("my-topic", txContextCaptor.firstValue.name)
9595
assertEquals("queue.process", txContextCaptor.firstValue.operation)
9696
assertEquals(SentryKafkaConsumerTracing.TRACE_ORIGIN, txOptionsCaptor.firstValue.origin)
9797
assertTrue(txOptionsCaptor.firstValue.isBindToScope)

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ class SentryKafkaProducerTest {
7373
Sentry.close()
7474
}
7575

76-
private fun createTransaction(): SentryTracer {
77-
val tx = SentryTracer(TransactionContext("tx", "op"), scopes)
78-
whenever(scopes.span).thenReturn(tx)
79-
return tx
80-
}
81-
8276
@Test
8377
fun `creates queue publish span and injects headers`() {
8478
val tx = createTransaction()
@@ -358,4 +352,10 @@ class SentryKafkaProducerTest {
358352
val producer = SentryKafkaProducer.wrap(delegate, scopes)
359353
assertTrue(producer.toString().startsWith("SentryKafkaProducer[delegate="))
360354
}
355+
356+
private fun createTransaction(): SentryTracer {
357+
val tx = SentryTracer(TransactionContext("tx", "op"), scopes)
358+
whenever(scopes.span).thenReturn(tx)
359+
return tx
360+
}
361361
}

sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,6 @@ private void transferSpanDetails(
362362
maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_ID);
363363
maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_NAME);
364364

365-
// Root transactions don't bulk-copy OTel attributes into span data (unlike child spans).
366-
// The Sentry Queues product reads `trace.data.messaging.*`, so messaging attributes must
367-
// be explicitly transferred for consumer root transactions to show up correctly. These are
368-
// operational metadata (no payload contents) and are safe to transfer unconditionally.
369365
maybeTransferOtelAttribute(
370366
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_SYSTEM);
371367
maybeTransferOtelAttribute(

sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,6 @@ private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelS
116116
@SuppressWarnings("deprecation")
117117
private @NotNull String opForMessaging(final @NotNull SpanData otelSpan) {
118118
final @NotNull Attributes attributes = otelSpan.getAttributes();
119-
// Prefer `messaging.operation.type` (current OTel semconv), fall back to legacy
120-
// `messaging.operation`. OTel's SpanKind.CONSUMER is overloaded for both `receive` and
121-
// `process`, so attribute-first mapping is required. SpanKind is used only as a last resort.
122119
@Nullable
123120
String operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE);
124121
if (operationType == null) {
@@ -139,7 +136,6 @@ private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelS
139136
case "settle":
140137
return "queue.settle";
141138
default:
142-
// fall through to SpanKind mapping
143139
break;
144140
}
145141
}

sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ private boolean isIgnored() {
159159
final @NotNull TransactionContext txContext =
160160
transactionContext != null
161161
? transactionContext
162-
: new TransactionContext("queue.process", "queue.process");
163-
txContext.setName("queue.process");
162+
: new TransactionContext(record.topic(), "queue.process");
163+
txContext.setName(record.topic());
164164
txContext.setOperation("queue.process");
165165

166166
final @NotNull TransactionOptions txOptions = new TransactionOptions();

sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest {
136136

137137
verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor")
138138
verify(forkedScopes).makeCurrent()
139+
verify(forkedScopes)
140+
.startTransaction(
141+
org.mockito.kotlin.check<TransactionContext> {
142+
assertEquals("my-topic", it.name)
143+
assertEquals("queue.process", it.operation)
144+
},
145+
any(),
146+
)
139147
}
140148

141149
@Test

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ private boolean isIgnored() {
159159
final @NotNull TransactionContext txContext =
160160
transactionContext != null
161161
? transactionContext
162-
: new TransactionContext("queue.process", "queue.process");
163-
txContext.setName("queue.process");
162+
: new TransactionContext(record.topic(), "queue.process");
163+
txContext.setName(record.topic());
164164
txContext.setOperation("queue.process");
165165

166166
final @NotNull TransactionOptions txOptions = new TransactionOptions();

0 commit comments

Comments
 (0)