55
66package io .opentelemetry .instrumentation .kafkaclients .common .v0_11 .internal ;
77
8+ import static io .opentelemetry .instrumentation .api .incubator .semconv .messaging .internal .MessagingExceptionEventExtractors .setMessagingProcessExceptionEventExtractor ;
9+ import static io .opentelemetry .instrumentation .api .incubator .semconv .messaging .internal .MessagingExceptionEventExtractors .setMessagingReceiveExceptionEventExtractor ;
10+ import static io .opentelemetry .instrumentation .api .incubator .semconv .messaging .internal .MessagingExceptionEventExtractors .setMessagingSendExceptionEventExtractor ;
811import static java .util .Collections .emptyList ;
912
1013import com .google .errorprone .annotations .CanIgnoreReturnValue ;
@@ -91,6 +94,7 @@ public Instrumenter<KafkaProducerRequest, RecordMetadata> createProducerInstrume
9194 if (captureExperimentalSpanAttributes ) {
9295 builder .addAttributesExtractor (new KafkaProducerExperimentalAttributesExtractor ());
9396 }
97+ setMessagingSendExceptionEventExtractor (builder );
9498 return builder .buildInstrumenter (SpanKindExtractor .alwaysProducer ());
9599 }
96100
@@ -103,17 +107,19 @@ public Instrumenter<KafkaReceiveRequest, Void> createConsumerReceiveInstrumenter
103107 KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter ();
104108 MessageOperation operation = MessageOperation .RECEIVE ;
105109
106- return Instrumenter .<KafkaReceiveRequest , Void >builder (
107- openTelemetry ,
108- instrumentationName ,
109- MessagingSpanNameExtractor .create (getter , operation ))
110- .addAttributesExtractor (
111- buildMessagingAttributesExtractor (getter , operation , capturedHeaders ))
112- .addAttributesExtractor (new KafkaReceiveAttributesExtractor ())
113- .addAttributesExtractors (extractors )
114- .setErrorCauseExtractor (errorCauseExtractor )
115- .setEnabled (messagingReceiveInstrumentationEnabled )
116- .buildInstrumenter (SpanKindExtractor .alwaysConsumer ());
110+ InstrumenterBuilder <KafkaReceiveRequest , Void > builder =
111+ Instrumenter .<KafkaReceiveRequest , Void >builder (
112+ openTelemetry ,
113+ instrumentationName ,
114+ MessagingSpanNameExtractor .create (getter , operation ))
115+ .addAttributesExtractor (
116+ buildMessagingAttributesExtractor (getter , operation , capturedHeaders ))
117+ .addAttributesExtractor (new KafkaReceiveAttributesExtractor ())
118+ .addAttributesExtractors (extractors )
119+ .setErrorCauseExtractor (errorCauseExtractor )
120+ .setEnabled (messagingReceiveInstrumentationEnabled );
121+ setMessagingReceiveExceptionEventExtractor (builder );
122+ return builder .buildInstrumenter (SpanKindExtractor .alwaysConsumer ());
117123 }
118124
119125 public Instrumenter <KafkaProcessRequest , Void > createConsumerProcessInstrumenter () {
@@ -138,6 +144,7 @@ public Instrumenter<KafkaProcessRequest, Void> createConsumerProcessInstrumenter
138144 if (captureExperimentalSpanAttributes ) {
139145 builder .addAttributesExtractor (new KafkaConsumerExperimentalAttributesExtractor ());
140146 }
147+ setMessagingProcessExceptionEventExtractor (builder );
141148
142149 if (messagingReceiveInstrumentationEnabled ) {
143150 builder .addSpanLinksExtractor (
@@ -154,18 +161,20 @@ public Instrumenter<KafkaReceiveRequest, Void> createBatchProcessInstrumenter()
154161 KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter ();
155162 MessageOperation operation = MessageOperation .PROCESS ;
156163
157- return Instrumenter .<KafkaReceiveRequest , Void >builder (
158- openTelemetry ,
159- instrumentationName ,
160- MessagingSpanNameExtractor .create (getter , operation ))
161- .addAttributesExtractor (
162- buildMessagingAttributesExtractor (getter , operation , capturedHeaders ))
163- .addAttributesExtractor (new KafkaReceiveAttributesExtractor ())
164- .addSpanLinksExtractor (
165- new KafkaBatchProcessSpanLinksExtractor (
166- openTelemetry .getPropagators ().getTextMapPropagator ()))
167- .setErrorCauseExtractor (errorCauseExtractor )
168- .buildInstrumenter (SpanKindExtractor .alwaysConsumer ());
164+ InstrumenterBuilder <KafkaReceiveRequest , Void > builder =
165+ Instrumenter .<KafkaReceiveRequest , Void >builder (
166+ openTelemetry ,
167+ instrumentationName ,
168+ MessagingSpanNameExtractor .create (getter , operation ))
169+ .addAttributesExtractor (
170+ buildMessagingAttributesExtractor (getter , operation , capturedHeaders ))
171+ .addAttributesExtractor (new KafkaReceiveAttributesExtractor ())
172+ .addSpanLinksExtractor (
173+ new KafkaBatchProcessSpanLinksExtractor (
174+ openTelemetry .getPropagators ().getTextMapPropagator ()))
175+ .setErrorCauseExtractor (errorCauseExtractor );
176+ setMessagingProcessExceptionEventExtractor (builder );
177+ return builder .buildInstrumenter (SpanKindExtractor .alwaysConsumer ());
169178 }
170179
171180 private static <REQUEST , RESPONSE >
0 commit comments