11# (c) Copyright IBM Corp. 2025
22
3+
34try :
5+ import contextvars
46 import inspect
57 from typing import TYPE_CHECKING , Any , Callable , Dict , List , Optional , Tuple
68
79 import kafka # noqa: F401
810 import wrapt
11+ from opentelemetry import context , trace
912 from opentelemetry .trace import SpanKind
1013
1114 from instana .log import logger
1215 from instana .propagators .format import Format
16+ from instana .singletons import get_tracer
1317 from instana .util .traceutils import (
1418 get_tracer_tuple ,
1519 tracing_is_off ,
1620 )
21+ from instana .span .span import InstanaSpan
1722
1823 if TYPE_CHECKING :
1924 from kafka .producer .future import FutureRecordMetadata
2025
26+ consumer_token = None
27+ consumer_span = contextvars .ContextVar ("kafka_python_consumer_span" )
28+
2129 @wrapt .patch_function_wrapper ("kafka" , "KafkaProducer.send" )
2230 def trace_kafka_send (
2331 wrapped : Callable [..., "kafka.KafkaProducer.send" ],
@@ -59,35 +67,86 @@ def trace_kafka_send(
5967 kwargs ["headers" ] = headers
6068 try :
6169 res = wrapped (* args , ** kwargs )
70+ return res
6271 except Exception as exc :
6372 span .record_exception (exc )
64- else :
65- return res
6673
6774 def create_span (
6875 span_type : str ,
6976 topic : Optional [str ],
7077 headers : Optional [List [Tuple [str , bytes ]]] = [],
71- exception : Optional [str ] = None ,
78+ exception : Optional [Exception ] = None ,
7279 ) -> None :
73- tracer , parent_span , _ = get_tracer_tuple ()
74- parent_context = (
75- parent_span .get_span_context ()
76- if parent_span
77- else tracer .extract (
78- Format .KAFKA_HEADERS ,
79- headers ,
80- disable_w3c_trace_context = True ,
80+ try :
81+ span = consumer_span .get (None )
82+ if span is not None :
83+ close_consumer_span (span )
84+
85+ tracer , parent_span , _ = get_tracer_tuple ()
86+
87+ if not tracer :
88+ tracer = get_tracer ()
89+
90+ is_suppressed = False
91+ if topic :
92+ is_suppressed = tracer .exporter ._HostAgent__is_endpoint_ignored (
93+ "kafka" ,
94+ span_type ,
95+ topic ,
96+ )
97+
98+ if not is_suppressed and headers :
99+ for header_name , header_value in headers :
100+ if header_name == "x_instana_l_s" and header_value == b"0" :
101+ is_suppressed = True
102+ break
103+
104+ if is_suppressed :
105+ return
106+
107+ parent_context = (
108+ parent_span .get_span_context ()
109+ if parent_span
110+ else tracer .extract (
111+ Format .KAFKA_HEADERS ,
112+ headers ,
113+ disable_w3c_trace_context = True ,
114+ )
115+ )
116+ span = tracer .start_span (
117+ "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
81118 )
82- )
83- with tracer .start_as_current_span (
84- "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
85- ) as span :
86119 if topic :
87120 span .set_attribute ("kafka.service" , topic )
88121 span .set_attribute ("kafka.access" , span_type )
89122 if exception :
90123 span .record_exception (exception )
124+ span .end ()
125+
126+ save_consumer_span_into_context (span )
127+ except Exception :
128+ pass
129+
130+ def save_consumer_span_into_context (span : "InstanaSpan" ) -> None :
131+ global consumer_token
132+ ctx = trace .set_span_in_context (span )
133+ consumer_token = context .attach (ctx )
134+ consumer_span .set (span )
135+
136+ def close_consumer_span (span : "InstanaSpan" ) -> None :
137+ global consumer_token
138+ if span .is_recording ():
139+ span .end ()
140+ consumer_span .set (None )
141+ if consumer_token is not None :
142+ context .detach (consumer_token )
143+ consumer_token = None
144+
145+ def clear_context () -> None :
146+ global consumer_token
147+ context .attach (trace .set_span_in_context (None ))
148+ consumer_token = None
149+ consumer_span .set (None )
91150
92151 @wrapt .patch_function_wrapper ("kafka" , "KafkaConsumer.__next__" )
93152 def trace_kafka_consume (
@@ -96,29 +155,41 @@ def trace_kafka_consume(
96155 args : Tuple [int , str , Tuple [Any , ...]],
97156 kwargs : Dict [str , Any ],
98157 ) -> "FutureRecordMetadata" :
99- if tracing_is_off ():
100- return wrapped (* args , ** kwargs )
101-
102158 exception = None
103159 res = None
104160
105161 try :
106162 res = wrapped (* args , ** kwargs )
163+ create_span (
164+ "consume" ,
165+ res .topic if res else list (instance .subscription ())[0 ],
166+ res .headers ,
167+ )
168+ return res
169+ except StopIteration :
170+ pass
107171 except Exception as exc :
108172 exception = exc
109- finally :
110- if res :
111- create_span (
112- "consume" ,
113- res .topic if res else list (instance .subscription ())[0 ],
114- res .headers ,
115- )
116- else :
117- create_span (
118- "consume" , list (instance .subscription ())[0 ], exception = exception
119- )
173+ create_span (
174+ "consume" , list (instance .subscription ())[0 ], exception = exception
175+ )
120176
121- return res
177+ @wrapt .patch_function_wrapper ("kafka" , "KafkaConsumer.close" )
178+ def trace_kafka_close (
179+ wrapped : Callable [..., None ],
180+ instance : "kafka.KafkaConsumer" ,
181+ args : Tuple [Any , ...],
182+ kwargs : Dict [str , Any ],
183+ ) -> None :
184+ try :
185+ span = consumer_span .get (None )
186+ if span is not None :
187+ close_consumer_span (span )
188+ except Exception as e :
189+ logger .debug (
190+ f"Error while closing kafka-consumer span: { e } "
191+ ) # pragma: no cover
192+ return wrapped (* args , ** kwargs )
122193
123194 @wrapt .patch_function_wrapper ("kafka" , "KafkaConsumer.poll" )
124195 def trace_kafka_poll (
@@ -127,9 +198,6 @@ def trace_kafka_poll(
127198 args : Tuple [int , str , Tuple [Any , ...]],
128199 kwargs : Dict [str , Any ],
129200 ) -> Optional [Dict [str , Any ]]:
130- if tracing_is_off ():
131- return wrapped (* args , ** kwargs )
132-
133201 # The KafkaConsumer.consume() from the kafka-python-ng call the
134202 # KafkaConsumer.poll() internally, so we do not consider it here.
135203 if any (
@@ -143,23 +211,17 @@ def trace_kafka_poll(
143211
144212 try :
145213 res = wrapped (* args , ** kwargs )
214+ for partition , consumer_records in res .items ():
215+ for message in consumer_records :
216+ create_span (
217+ "poll" ,
218+ partition .topic ,
219+ message .headers if hasattr (message , "headers" ) else [],
220+ )
221+ return res
146222 except Exception as exc :
147223 exception = exc
148- finally :
149- if res :
150- for partition , consumer_records in res .items ():
151- for message in consumer_records :
152- create_span (
153- "poll" ,
154- partition .topic ,
155- message .headers if hasattr (message , "headers" ) else [],
156- )
157- else :
158- create_span (
159- "poll" , list (instance .subscription ())[0 ], exception = exception
160- )
161-
162- return res
224+ create_span ("poll" , list (instance .subscription ())[0 ], exception = exception )
163225
164226 logger .debug ("Instrumenting Kafka (kafka-python)" )
165227except ImportError :
0 commit comments