@@ -151,84 +151,68 @@ def close(self): # pylint: disable=useless-super-delegation
151151 return super ().close ()
152152
153153
154- class ProxiedProducer :
154+ class ProxiedProducer ( wrapt . ObjectProxy ) :
155155 def __init__ (self , producer : Producer , tracer : Tracer ):
156- self ._producer = producer
157- self ._tracer = tracer
158-
159- def flush (self , timeout = - 1 ):
160- return self ._producer .flush (timeout )
161-
162- def poll (self , timeout = - 1 ):
163- return self ._producer .poll (timeout )
164-
165- def purge (self , in_queue = True , in_flight = True , blocking = True ):
166- self ._producer .purge (in_queue , in_flight , blocking )
156+ super ().__init__ (producer )
157+ self ._self_tracer = tracer
167158
168159 def produce (self , topic , value = None , * args , ** kwargs ): # pylint: disable=keyword-arg-before-vararg
169160 new_kwargs = kwargs .copy ()
170161 new_kwargs ["topic" ] = topic
171162 new_kwargs ["value" ] = value
172163
173164 return ConfluentKafkaInstrumentor .wrap_produce (
174- self ._producer .produce , self , self ._tracer , args , new_kwargs
165+ self .__wrapped__ .produce , self , self ._self_tracer , args , new_kwargs
175166 )
176167
177168 def original_producer (self ):
178- return self ._producer
169+ return self .__wrapped__
179170
180- def __getattr__ (self , name ):
181- return getattr (self ._producer , name )
182171
183-
184- class ProxiedConsumer :
172+ class ProxiedConsumer (wrapt .ObjectProxy ):
185173 def __init__ (self , consumer : Consumer , tracer : Tracer ):
186- self ._consumer = consumer
187- self ._tracer = tracer
188- self ._current_consume_span = None
189- self ._current_context_token = None
174+ super ().__init__ (consumer )
175+ self ._self_tracer = tracer
176+ self ._self_current_consume_span = None
177+ self ._self_current_context_token = None
178+
179+ @property
180+ def _current_consume_span (self ):
181+ return self ._self_current_consume_span
182+
183+ @_current_consume_span .setter
184+ def _current_consume_span (self , value ):
185+ self ._self_current_consume_span = value
186+
187+ @property
188+ def _current_context_token (self ):
189+ return self ._self_current_context_token
190+
191+ @_current_context_token .setter
192+ def _current_context_token (self , value ):
193+ self ._self_current_context_token = value
190194
191195 def close (self , * args , ** kwargs ):
192196 return ConfluentKafkaInstrumentor .wrap_close (
193- self ._consumer .close , self , args , kwargs
197+ self .__wrapped__ .close , self , args , kwargs
194198 )
195199
196- def committed (self , partitions , timeout = - 1 ):
197- return self ._consumer .committed (partitions , timeout )
198-
199- def commit (self , * args , ** kwargs ):
200- return self ._consumer .commit (* args , ** kwargs )
201-
202200 def consume (self , * args , ** kwargs ):
203201 return ConfluentKafkaInstrumentor .wrap_consume (
204- self ._consumer .consume ,
202+ self .__wrapped__ .consume ,
205203 self ,
206- self ._tracer ,
204+ self ._self_tracer ,
207205 args ,
208206 kwargs ,
209207 )
210208
211- def get_watermark_offsets (self , partition , timeout = - 1 , * args , ** kwargs ): # pylint: disable=keyword-arg-before-vararg
212- return self ._consumer .get_watermark_offsets (
213- partition , timeout , * args , ** kwargs
214- )
215-
216- def offsets_for_times (self , partitions , timeout = - 1 ):
217- return self ._consumer .offsets_for_times (partitions , timeout )
218-
219209 def poll (self , timeout = - 1 ):
220210 return ConfluentKafkaInstrumentor .wrap_poll (
221- self ._consumer .poll , self , self ._tracer , [timeout ], {}
211+ self .__wrapped__ .poll , self , self ._self_tracer , [timeout ], {}
222212 )
223213
224- def subscribe (self , topics , on_assign = lambda * args : None , * args , ** kwargs ): # pylint: disable=keyword-arg-before-vararg
225- self ._consumer .subscribe (topics , on_assign , * args , ** kwargs )
226-
227214 def original_consumer (self ):
228- return self ._consumer
229-
230- def __getattr__ (self , name ):
231- return getattr (self ._consumer , name )
215+ return self .__wrapped__
232216
233217
234218class ConfluentKafkaInstrumentor (BaseInstrumentor ):
0 commit comments