1717import net .bytebuddy .description .method .MethodDescription ;
1818import net .bytebuddy .description .type .TypeDescription ;
1919import net .bytebuddy .matcher .ElementMatcher ;
20+ import org .apache .kafka .clients .ApiVersions ;
2021import org .apache .kafka .clients .producer .Callback ;
2122import org .apache .kafka .clients .producer .ProducerRecord ;
2223import org .apache .kafka .clients .producer .RecordMetadata ;
24+ import org .apache .kafka .common .record .RecordBatch ;
2325
2426@ AutoService (Instrumenter .class )
2527public final class KafkaProducerInstrumentation extends Instrumenter .Default {
@@ -61,6 +63,7 @@ public static class ProducerAdvice {
6163
6264 @ Advice .OnMethodEnter (suppress = Throwable .class )
6365 public static Scope startSpan (
66+ @ Advice .FieldValue ("apiVersions" ) final ApiVersions apiVersions ,
6467 @ Advice .Argument (value = 0 , readOnly = false ) ProducerRecord record ,
6568 @ Advice .Argument (value = 1 , readOnly = false ) Callback callback ) {
6669 final Scope scope = GlobalTracer .get ().buildSpan ("kafka.produce" ).startActive (false );
@@ -69,28 +72,33 @@ public static Scope startSpan(
6972
7073 callback = new ProducerCallback (callback , scope );
7174
72- try {
73- GlobalTracer .get ()
74- .inject (
75- scope .span ().context (),
76- Format .Builtin .TEXT_MAP ,
77- new TextMapInjectAdapter (record .headers ()));
78- } catch (final IllegalStateException e ) {
79- // headers must be read-only from reused record. try again with new one.
80- record =
81- new ProducerRecord <>(
82- record .topic (),
83- record .partition (),
84- record .timestamp (),
85- record .key (),
86- record .value (),
87- record .headers ());
88-
89- GlobalTracer .get ()
90- .inject (
91- scope .span ().context (),
92- Format .Builtin .TEXT_MAP ,
93- new TextMapInjectAdapter (record .headers ()));
75+ // Do not inject headers for batch versions below 2
76+ // This is how similar check is being done in Kafka client itself:
77+ // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
78+ if (apiVersions .maxUsableProduceMagic () >= RecordBatch .MAGIC_VALUE_V2 ) {
79+ try {
80+ GlobalTracer .get ()
81+ .inject (
82+ scope .span ().context (),
83+ Format .Builtin .TEXT_MAP ,
84+ new TextMapInjectAdapter (record .headers ()));
85+ } catch (final IllegalStateException e ) {
86+ // headers must be read-only from reused record. try again with new one.
87+ record =
88+ new ProducerRecord <>(
89+ record .topic (),
90+ record .partition (),
91+ record .timestamp (),
92+ record .key (),
93+ record .value (),
94+ record .headers ());
95+
96+ GlobalTracer .get ()
97+ .inject (
98+ scope .span ().context (),
99+ Format .Builtin .TEXT_MAP ,
100+ new TextMapInjectAdapter (record .headers ()));
101+ }
94102 }
95103
96104 return scope ;
0 commit comments