Skip to content

Commit e3eca3f

Browse files
committed
feat(opentelemetry): [Queue Instrumentation 33] Map OTel messaging spans to Sentry queue ops
Wire OTel messaging spans into the Sentry Queues product when `sentry.enable-queue-tracing=true` so OTel-only setups (e.g. the agentless Spring Boot Jakarta sample) populate queue dashboards without needing the Sentry-native Kafka interceptors. `SpanDescriptionExtractor` now recognizes spans carrying `messaging.system` and maps them to `queue.publish` / `queue.process` / `queue.receive` ops, using the destination name as the description and `TransactionNameSource.TASK`. Op selection prefers `messaging.operation.type` (current OTel semconv), falls back to the deprecated `messaging.operation`, and only as a last resort consults `SpanKind` — `SpanKind.CONSUMER` is overloaded for both `receive` and `process`, so attribute-driven mapping is required to disambiguate. The extractor takes `SentryOptions` so the mapping stays gated; when the flag is off, behavior is unchanged. `SentrySpanExporter` additionally transfers the messaging attributes (`system`, `destination.name`, `operation.type`, `message.id`, `message.body.size`, `message.envelope.size`) onto root transactions. Root transactions don't bulk-copy OTel attributes the way child spans do, but the Queues product reads `trace.data.messaging.*`, so consumer root transactions need them propagated explicitly. These are operational metadata only (no payload contents), so the transfer is unconditional. Add `MESSAGING_OPERATION_TYPE` and `MESSAGING_MESSAGE_ENVELOPE_SIZE` to `SpanDataConvention` for use by the exporter and downstream integrations. Document the OTel-mode behavior in the two Jakarta OTel sample `application-kafka.properties` so users know the flag activates the OTel remapping path here, not the Sentry-native Kafka auto-config (which stays suppressed by its `@ConditionalOnMissingClass` OTel guard).
1 parent bbed2d2 commit e3eca3f

9 files changed

Lines changed: 246 additions & 8 deletions

File tree

sentry-opentelemetry/sentry-opentelemetry-core/api/sentry-opentelemetry-core.api

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public final class io/sentry/opentelemetry/SentrySpanProcessor : io/opentelemetr
149149

150150
public final class io/sentry/opentelemetry/SpanDescriptionExtractor {
151151
public fun <init> ()V
152-
public fun extractSpanInfo (Lio/opentelemetry/sdk/trace/data/SpanData;Lio/sentry/opentelemetry/IOtelSpanWrapper;)Lio/sentry/opentelemetry/OtelSpanInfo;
152+
public fun extractSpanInfo (Lio/opentelemetry/sdk/trace/data/SpanData;Lio/sentry/opentelemetry/IOtelSpanWrapper;Lio/sentry/SentryOptions;)Lio/sentry/opentelemetry/OtelSpanInfo;
153153
}
154154

155155
public final class io/sentry/opentelemetry/SpanNode {

sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.opentelemetry.sdk.trace.data.StatusData;
1313
import io.opentelemetry.sdk.trace.export.SpanExporter;
1414
import io.opentelemetry.semconv.HttpAttributes;
15+
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
1516
import io.opentelemetry.semconv.incubating.ProcessIncubatingAttributes;
1617
import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes;
1718
import io.sentry.Baggage;
@@ -200,7 +201,7 @@ private void createAndFinishSpanForOtelSpan(
200201
final @Nullable IOtelSpanWrapper sentrySpanMaybe =
201202
spanStorage.getSentrySpan(spanData.getSpanContext());
202203
final @NotNull OtelSpanInfo spanInfo =
203-
spanDescriptionExtractor.extractSpanInfo(spanData, sentrySpanMaybe);
204+
spanDescriptionExtractor.extractSpanInfo(spanData, sentrySpanMaybe, scopes.getOptions());
204205

205206
scopes
206207
.getOptions()
@@ -294,7 +295,7 @@ private void transferSpanDetails(
294295
final @NotNull IScopes scopesToUse =
295296
scopesToUseBeforeForking.forkedCurrentScope("SentrySpanExporter.createTransaction");
296297
final @NotNull OtelSpanInfo spanInfo =
297-
spanDescriptionExtractor.extractSpanInfo(span, sentrySpanMaybe);
298+
spanDescriptionExtractor.extractSpanInfo(span, sentrySpanMaybe, scopesToUse.getOptions());
298299

299300
scopesToUse
300301
.getOptions()
@@ -361,6 +362,23 @@ private void transferSpanDetails(
361362
maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_ID);
362363
maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_NAME);
363364

365+
// Root transactions don't bulk-copy OTel attributes into span data (unlike child spans).
366+
// The Sentry Queues product reads `trace.data.messaging.*`, so messaging attributes must
367+
// be explicitly transferred for consumer root transactions to show up correctly. These are
368+
// operational metadata (no payload contents) and are safe to transfer unconditionally.
369+
maybeTransferOtelAttribute(
370+
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_SYSTEM);
371+
maybeTransferOtelAttribute(
372+
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME);
373+
maybeTransferOtelAttribute(
374+
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE);
375+
maybeTransferOtelAttribute(
376+
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID);
377+
maybeTransferOtelAttribute(
378+
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE);
379+
maybeTransferOtelAttribute(
380+
span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_MESSAGE_ENVELOPE_SIZE);
381+
364382
scopesToUse.configureScope(
365383
ScopeType.CURRENT,
366384
scope -> attributesExtractor.extract(span, scope, scopesToUse.getOptions()));

sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ private boolean isSentryRequest(final @NotNull ReadableSpan otelSpan) {
297297
private void updateTransactionWithOtelData(
298298
final @NotNull ITransaction sentryTransaction, final @NotNull ReadableSpan otelSpan) {
299299
final @NotNull OtelSpanInfo otelSpanInfo =
300-
spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null);
300+
spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null, scopes.getOptions());
301301
sentryTransaction.setOperation(otelSpanInfo.getOp());
302302
String transactionName = otelSpanInfo.getDescription();
303303
sentryTransaction.setName(
@@ -334,7 +334,7 @@ private void updateSpanWithOtelData(
334334
});
335335

336336
final @NotNull OtelSpanInfo otelSpanInfo =
337-
spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null);
337+
spanDescriptionExtractor.extractSpanInfo(otelSpan.toSpanData(), null, scopes.getOptions());
338338
sentrySpan.setOperation(otelSpanInfo.getOp());
339339
sentrySpan.setDescription(otelSpanInfo.getDescription());
340340
}

sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import io.opentelemetry.semconv.UrlAttributes;
88
import io.opentelemetry.semconv.incubating.DbIncubatingAttributes;
99
import io.opentelemetry.semconv.incubating.HttpIncubatingAttributes;
10+
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
11+
import io.sentry.SentryOptions;
1012
import io.sentry.protocol.TransactionNameSource;
1113
import org.jetbrains.annotations.ApiStatus;
1214
import org.jetbrains.annotations.NotNull;
@@ -17,7 +19,9 @@ public final class SpanDescriptionExtractor {
1719

1820
@SuppressWarnings("deprecation")
1921
public @NotNull OtelSpanInfo extractSpanInfo(
20-
final @NotNull SpanData otelSpan, final @Nullable IOtelSpanWrapper sentrySpan) {
22+
final @NotNull SpanData otelSpan,
23+
final @Nullable IOtelSpanWrapper sentrySpan,
24+
final @NotNull SentryOptions options) {
2125
final @NotNull Attributes attributes = otelSpan.getAttributes();
2226

2327
final @Nullable String httpMethod = attributes.get(HttpAttributes.HTTP_REQUEST_METHOD);
@@ -30,6 +34,14 @@ public final class SpanDescriptionExtractor {
3034
return descriptionForDbSystem(otelSpan);
3135
}
3236

37+
if (options.isEnableQueueTracing()) {
38+
final @Nullable String messagingSystem =
39+
attributes.get(MessagingIncubatingAttributes.MESSAGING_SYSTEM);
40+
if (messagingSystem != null) {
41+
return descriptionForMessagingSystem(otelSpan);
42+
}
43+
}
44+
3345
final @NotNull String name = otelSpan.getName();
3446
final @Nullable String maybeDescription =
3547
sentrySpan != null ? sentrySpan.getDescription() : name;
@@ -91,6 +103,54 @@ private static boolean isRootSpan(SpanData otelSpan) {
91103
return !otelSpan.getParentSpanContext().isValid() || otelSpan.getParentSpanContext().isRemote();
92104
}
93105

106+
@SuppressWarnings("deprecation")
107+
private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelSpan) {
108+
final @NotNull Attributes attributes = otelSpan.getAttributes();
109+
final @NotNull String op = opForMessaging(otelSpan);
110+
final @Nullable String destination =
111+
attributes.get(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME);
112+
final @NotNull String description = destination != null ? destination : otelSpan.getName();
113+
return new OtelSpanInfo(op, description, TransactionNameSource.TASK);
114+
}
115+
116+
@SuppressWarnings("deprecation")
117+
private @NotNull String opForMessaging(final @NotNull SpanData otelSpan) {
118+
final @NotNull Attributes attributes = otelSpan.getAttributes();
119+
// Prefer `messaging.operation.type` (current OTel semconv), fall back to legacy
120+
// `messaging.operation`. OTel's SpanKind.CONSUMER is overloaded for both `receive` and
121+
// `process`, so attribute-first mapping is required. SpanKind is used only as a last resort.
122+
@Nullable
123+
String operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE);
124+
if (operationType == null) {
125+
operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION);
126+
}
127+
if (operationType != null) {
128+
switch (operationType) {
129+
case "publish":
130+
case "send":
131+
case "create":
132+
return "queue.publish";
133+
case "receive":
134+
return "queue.receive";
135+
case "process":
136+
case "deliver":
137+
return "queue.process";
138+
default:
139+
// fall through to SpanKind mapping
140+
break;
141+
}
142+
}
143+
144+
final @NotNull SpanKind kind = otelSpan.getKind();
145+
if (SpanKind.PRODUCER.equals(kind)) {
146+
return "queue.publish";
147+
}
148+
if (SpanKind.CONSUMER.equals(kind)) {
149+
return "queue.process";
150+
}
151+
return "queue";
152+
}
153+
94154
@SuppressWarnings("deprecation")
95155
private OtelSpanInfo descriptionForDbSystem(final @NotNull SpanData otelSpan) {
96156
final @NotNull Attributes attributes = otelSpan.getAttributes();

sentry-opentelemetry/sentry-opentelemetry-core/src/test/kotlin/SpanDescriptionExtractorTest.kt

Lines changed: 139 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import io.opentelemetry.semconv.HttpAttributes
1111
import io.opentelemetry.semconv.UrlAttributes
1212
import io.opentelemetry.semconv.incubating.DbIncubatingAttributes
1313
import io.opentelemetry.semconv.incubating.HttpIncubatingAttributes
14+
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
15+
import io.sentry.SentryOptions
1416
import io.sentry.protocol.TransactionNameSource
1517
import kotlin.test.Test
1618
import kotlin.test.assertEquals
@@ -228,6 +230,140 @@ class SpanDescriptionExtractorTest {
228230
assertEquals(TransactionNameSource.TASK, info.transactionNameSource)
229231
}
230232

233+
@Test
234+
fun `ignores messaging system when queue tracing disabled`() {
235+
givenSpanName("my-topic publish")
236+
givenAttributes(
237+
mapOf(
238+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
239+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic",
240+
MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish",
241+
)
242+
)
243+
244+
val info = whenExtractingSpanInfo(queueTracingEnabled = false)
245+
246+
assertEquals("my-topic publish", info.op)
247+
assertEquals("my-topic publish", info.description)
248+
assertEquals(TransactionNameSource.CUSTOM, info.transactionNameSource)
249+
}
250+
251+
@Test
252+
fun `maps messaging publish operation type to queue publish op`() {
253+
givenAttributes(
254+
mapOf(
255+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
256+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic",
257+
MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish",
258+
)
259+
)
260+
261+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
262+
263+
assertEquals("queue.publish", info.op)
264+
assertEquals("my-topic", info.description)
265+
assertEquals(TransactionNameSource.TASK, info.transactionNameSource)
266+
}
267+
268+
@Test
269+
fun `maps messaging process operation type to queue process op`() {
270+
givenAttributes(
271+
mapOf(
272+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
273+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic",
274+
MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "process",
275+
)
276+
)
277+
278+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
279+
280+
assertEquals("queue.process", info.op)
281+
assertEquals("my-topic", info.description)
282+
assertEquals(TransactionNameSource.TASK, info.transactionNameSource)
283+
}
284+
285+
@Test
286+
fun `maps messaging receive operation type to queue receive op`() {
287+
givenAttributes(
288+
mapOf(
289+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
290+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic",
291+
MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "receive",
292+
)
293+
)
294+
295+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
296+
297+
assertEquals("queue.receive", info.op)
298+
assertEquals("my-topic", info.description)
299+
assertEquals(TransactionNameSource.TASK, info.transactionNameSource)
300+
}
301+
302+
@Test
303+
fun `falls back to legacy messaging operation attribute`() {
304+
@Suppress("DEPRECATION")
305+
givenAttributes(
306+
mapOf(
307+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "rabbitmq",
308+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "queue-name",
309+
MessagingIncubatingAttributes.MESSAGING_OPERATION to "publish",
310+
)
311+
)
312+
313+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
314+
315+
assertEquals("queue.publish", info.op)
316+
assertEquals("queue-name", info.description)
317+
}
318+
319+
@Test
320+
fun `falls back to PRODUCER span kind when no operation attribute`() {
321+
givenSpanKind(SpanKind.PRODUCER)
322+
givenAttributes(
323+
mapOf(
324+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
325+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic",
326+
)
327+
)
328+
329+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
330+
331+
assertEquals("queue.publish", info.op)
332+
assertEquals("my-topic", info.description)
333+
}
334+
335+
@Test
336+
fun `falls back to CONSUMER span kind when no operation attribute`() {
337+
givenSpanKind(SpanKind.CONSUMER)
338+
givenAttributes(
339+
mapOf(
340+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
341+
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME to "my-topic",
342+
)
343+
)
344+
345+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
346+
347+
assertEquals("queue.process", info.op)
348+
assertEquals("my-topic", info.description)
349+
}
350+
351+
@Test
352+
fun `falls back to span name as description when destination missing`() {
353+
givenSpanName("my-topic publish")
354+
givenAttributes(
355+
mapOf(
356+
MessagingIncubatingAttributes.MESSAGING_SYSTEM to "kafka",
357+
MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE to "publish",
358+
)
359+
)
360+
361+
val info = whenExtractingSpanInfo(queueTracingEnabled = true)
362+
363+
assertEquals("queue.publish", info.op)
364+
assertEquals("my-topic publish", info.description)
365+
}
366+
231367
@Test
232368
fun `uses span name as op and description if no relevant attributes`() {
233369
givenSpanName("span name")
@@ -289,9 +425,10 @@ class SpanDescriptionExtractorTest {
289425
builder.put(key as AttributeKey<Any>, value)
290426
}
291427

292-
private fun whenExtractingSpanInfo(): OtelSpanInfo {
428+
private fun whenExtractingSpanInfo(queueTracingEnabled: Boolean = false): OtelSpanInfo {
293429
fixture.setup()
294-
return SpanDescriptionExtractor().extractSpanInfo(fixture.otelSpan, fixture.sentrySpan)
430+
val options = SentryOptions().apply { isEnableQueueTracing = queueTracingEnabled }
431+
return SpanDescriptionExtractor().extractSpanInfo(fixture.otelSpan, fixture.sentrySpan, options)
295432
}
296433

297434
private fun givenParentContext(parentContext: SpanContext) {

sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application-kafka.properties

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
11
# Kafka — activate with: --spring.profiles.active=kafka
2+
3+
# In OTel mode, `sentry.enable-queue-tracing=true` enables the OTel->Sentry
4+
# messaging span remapping in `SpanDescriptionExtractor`/`SentrySpanExporter`:
5+
# it maps OTel messaging spans to `queue.publish`/`queue.process` ops with the
6+
# destination as description and transfers messaging attributes to root
7+
# transactions so the Sentry Queues product lights up. Sentry's Spring Kafka
8+
# auto-config (`SentryKafkaQueueConfiguration`) stays suppressed here because
9+
# `sentry-opentelemetry-agentless-spring` pulls in the OTel customizer that
10+
# its `@ConditionalOnMissingClass(...OpenTelemetry...)` guard looks for, so
11+
# the flag does NOT wire the Sentry-native Kafka interceptors in this sample.
212
sentry.enable-queue-tracing=true
313

414
spring.kafka.bootstrap-servers=localhost:9092

sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application-kafka.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
# Kafka — activate with: --spring.profiles.active=kafka
2+
3+
# In OTel mode, `sentry.enable-queue-tracing=true` enables the OTel->Sentry
4+
# messaging span remapping in `SpanDescriptionExtractor`/`SentrySpanExporter`:
5+
# it maps OTel messaging spans to `queue.publish`/`queue.process` ops with the
6+
# destination as description and transfers messaging attributes to root
7+
# transactions so the Sentry Queues product lights up. Sentry's Spring Kafka
8+
# auto-config (`SentryKafkaQueueConfiguration`) stays suppressed here because
9+
# of its `@ConditionalOnMissingClass(...OpenTelemetry...)` guard, so the flag
10+
# does NOT wire the Sentry-native Kafka interceptors in this sample.
211
sentry.enable-queue-tracing=true
312

413
spring.kafka.bootstrap-servers=localhost:9092

sentry/api/sentry.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4398,9 +4398,11 @@ public abstract interface class io/sentry/SpanDataConvention {
43984398
public static final field HTTP_STATUS_CODE_KEY Ljava/lang/String;
43994399
public static final field MESSAGING_DESTINATION_NAME Ljava/lang/String;
44004400
public static final field MESSAGING_MESSAGE_BODY_SIZE Ljava/lang/String;
4401+
public static final field MESSAGING_MESSAGE_ENVELOPE_SIZE Ljava/lang/String;
44014402
public static final field MESSAGING_MESSAGE_ID Ljava/lang/String;
44024403
public static final field MESSAGING_MESSAGE_RECEIVE_LATENCY Ljava/lang/String;
44034404
public static final field MESSAGING_MESSAGE_RETRY_COUNT Ljava/lang/String;
4405+
public static final field MESSAGING_OPERATION_TYPE Ljava/lang/String;
44044406
public static final field MESSAGING_SYSTEM Ljava/lang/String;
44054407
public static final field PROFILER_ID Ljava/lang/String;
44064408
public static final field THREAD_ID Ljava/lang/String;

sentry/src/main/java/io/sentry/SpanDataConvention.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,7 @@ public interface SpanDataConvention {
3535
String MESSAGING_MESSAGE_ID = "messaging.message.id";
3636
String MESSAGING_MESSAGE_RETRY_COUNT = "messaging.message.retry.count";
3737
String MESSAGING_MESSAGE_BODY_SIZE = "messaging.message.body.size";
38+
String MESSAGING_MESSAGE_ENVELOPE_SIZE = "messaging.message.envelope.size";
3839
String MESSAGING_MESSAGE_RECEIVE_LATENCY = "messaging.message.receive.latency";
40+
String MESSAGING_OPERATION_TYPE = "messaging.operation.type";
3941
}

0 commit comments

Comments
 (0)