@@ -116,7 +116,27 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
116116from .version import __version__
117117
118118
119+ def _capture_config (args , kwargs ):
120+ """Return the config dict that was passed to a Producer/Consumer
121+ constructor, regardless of whether it was supplied positionally, as
122+ ``conf=`` kwarg, or (for Consumer) expanded as **kwargs."""
123+ if args and isinstance (args [0 ], dict ):
124+ return args [0 ]
125+ conf = kwargs .get ("conf" )
126+ if isinstance (conf , dict ):
127+ return conf
128+ # confluent_kafka.Consumer also supports Consumer(**conf) — in that case
129+ # the kwargs themselves are the config.
130+ if kwargs :
131+ return dict (kwargs )
132+ return None
133+
134+
119135class AutoInstrumentedProducer (Producer ):
136+ def __init__ (self , * args , ** kwargs ):
137+ super ().__init__ (* args , ** kwargs )
138+ self .config = _capture_config (args , kwargs )
139+
120140 # This method is deliberately implemented in order to allow wrapt to wrap this function
121141 def produce (self , topic , value = None , * args , ** kwargs ): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation
122142 super ().produce (topic , value , * args , ** kwargs )
@@ -125,6 +145,7 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
125145class AutoInstrumentedConsumer (Consumer ):
126146 def __init__ (self , * args , ** kwargs ):
127147 super ().__init__ (* args , ** kwargs )
148+ self .config = _capture_config (args , kwargs )
128149 self ._current_consume_span = None
129150
130151 # This method is deliberately implemented in order to allow wrapt to wrap this function
@@ -144,6 +165,10 @@ class ProxiedProducer(Producer):
144165 def __init__ (self , producer : Producer , tracer : Tracer ):
145166 self ._producer = producer
146167 self ._tracer = tracer
168+ # Surface the wrapped producer's config (if any) so that
169+ # KafkaPropertiesExtractor.extract_bootstrap_servers can read it
170+ # through this proxy.
171+ self .config = getattr (producer , "config" , None )
147172
148173 def flush (self , timeout = - 1 ):
149174 return self ._producer .flush (timeout )
@@ -173,6 +198,8 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
173198 self ._tracer = tracer
174199 self ._current_consume_span = None
175200 self ._current_context_token = None
201+ # See ProxiedProducer.__init__ for rationale.
202+ self .config = getattr (consumer , "config" , None )
176203
177204 def close (self , * args , ** kwargs ):
178205 return ConfluentKafkaInstrumentor .wrap_close (
@@ -355,11 +382,15 @@ def wrap_produce(func, instance, tracer, args, kwargs):
355382 topic = KafkaPropertiesExtractor .extract_produce_topic (
356383 args , kwargs
357384 )
385+ bootstrap_servers = (
386+ KafkaPropertiesExtractor .extract_bootstrap_servers (instance )
387+ )
358388 _enrich_span (
359389 span ,
360390 topic ,
361- operation = MessagingOperationTypeValues .RECEIVE ,
362- ) # Replace
391+ operation = MessagingOperationTypeValues .PUBLISH ,
392+ bootstrap_servers = bootstrap_servers ,
393+ ) # Publish
363394 propagate .inject (
364395 headers ,
365396 setter = _kafka_setter ,
@@ -373,6 +404,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
373404
374405 record = func (* args , ** kwargs )
375406 if record :
407+ bootstrap_servers = (
408+ KafkaPropertiesExtractor .extract_bootstrap_servers (instance )
409+ )
376410 with tracer .start_as_current_span (
377411 "recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER
378412 ):
@@ -383,6 +417,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
383417 record .partition (),
384418 record .offset (),
385419 operation = MessagingOperationTypeValues .PROCESS ,
420+ bootstrap_servers = bootstrap_servers ,
386421 )
387422 instance ._current_context_token = context .attach (
388423 trace .set_span_in_context (instance ._current_consume_span )
@@ -397,6 +432,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):
397432
398433 records = func (* args , ** kwargs )
399434 if len (records ) > 0 :
435+ bootstrap_servers = (
436+ KafkaPropertiesExtractor .extract_bootstrap_servers (instance )
437+ )
400438 with tracer .start_as_current_span (
401439 "recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER
402440 ):
@@ -405,6 +443,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
405443 instance ._current_consume_span ,
406444 records [0 ].topic (),
407445 operation = MessagingOperationTypeValues .PROCESS ,
446+ bootstrap_servers = bootstrap_servers ,
408447 )
409448 instance ._current_context_token = context .attach (
410449 trace .set_span_in_context (instance ._current_consume_span )
0 commit comments