Skip to content

Commit 6379fe4

Browse files
JMGalvaoclaude
andcommitted
fix #4278: use wrapt.ObjectProxy to preserve isinstance() compatibility
Replace __getattr__ composition with wrapt.ObjectProxy so that ProxiedProducer and ProxiedConsumer remain valid isinstance() targets for the wrapped types. Internal span-tracking state uses _self_ prefix (wrapt convention) exposed via properties to keep utils.py interface unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5f515d0 commit 6379fe4

2 files changed

Lines changed: 40 additions & 51 deletions

File tree

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 31 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -151,84 +151,68 @@ def close(self): # pylint: disable=useless-super-delegation
151151
return super().close()
152152

153153

154-
class ProxiedProducer:
154+
class ProxiedProducer(wrapt.ObjectProxy):
155155
def __init__(self, producer: Producer, tracer: Tracer):
156-
self._producer = producer
157-
self._tracer = tracer
158-
159-
def flush(self, timeout=-1):
160-
return self._producer.flush(timeout)
161-
162-
def poll(self, timeout=-1):
163-
return self._producer.poll(timeout)
164-
165-
def purge(self, in_queue=True, in_flight=True, blocking=True):
166-
self._producer.purge(in_queue, in_flight, blocking)
156+
super().__init__(producer)
157+
self._self_tracer = tracer
167158

168159
def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
169160
new_kwargs = kwargs.copy()
170161
new_kwargs["topic"] = topic
171162
new_kwargs["value"] = value
172163

173164
return ConfluentKafkaInstrumentor.wrap_produce(
174-
self._producer.produce, self, self._tracer, args, new_kwargs
165+
self.__wrapped__.produce, self, self._self_tracer, args, new_kwargs
175166
)
176167

177168
def original_producer(self):
178-
return self._producer
169+
return self.__wrapped__
179170

180-
def __getattr__(self, name):
181-
return getattr(self._producer, name)
182171

183-
184-
class ProxiedConsumer:
172+
class ProxiedConsumer(wrapt.ObjectProxy):
185173
def __init__(self, consumer: Consumer, tracer: Tracer):
186-
self._consumer = consumer
187-
self._tracer = tracer
188-
self._current_consume_span = None
189-
self._current_context_token = None
174+
super().__init__(consumer)
175+
self._self_tracer = tracer
176+
self._self_current_consume_span = None
177+
self._self_current_context_token = None
178+
179+
@property
180+
def _current_consume_span(self):
181+
return self._self_current_consume_span
182+
183+
@_current_consume_span.setter
184+
def _current_consume_span(self, value):
185+
self._self_current_consume_span = value
186+
187+
@property
188+
def _current_context_token(self):
189+
return self._self_current_context_token
190+
191+
@_current_context_token.setter
192+
def _current_context_token(self, value):
193+
self._self_current_context_token = value
190194

191195
def close(self, *args, **kwargs):
192196
return ConfluentKafkaInstrumentor.wrap_close(
193-
self._consumer.close, self, args, kwargs
197+
self.__wrapped__.close, self, args, kwargs
194198
)
195199

196-
def committed(self, partitions, timeout=-1):
197-
return self._consumer.committed(partitions, timeout)
198-
199-
def commit(self, *args, **kwargs):
200-
return self._consumer.commit(*args, **kwargs)
201-
202200
def consume(self, *args, **kwargs):
203201
return ConfluentKafkaInstrumentor.wrap_consume(
204-
self._consumer.consume,
202+
self.__wrapped__.consume,
205203
self,
206-
self._tracer,
204+
self._self_tracer,
207205
args,
208206
kwargs,
209207
)
210208

211-
def get_watermark_offsets(self, partition, timeout=-1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
212-
return self._consumer.get_watermark_offsets(
213-
partition, timeout, *args, **kwargs
214-
)
215-
216-
def offsets_for_times(self, partitions, timeout=-1):
217-
return self._consumer.offsets_for_times(partitions, timeout)
218-
219209
def poll(self, timeout=-1):
220210
return ConfluentKafkaInstrumentor.wrap_poll(
221-
self._consumer.poll, self, self._tracer, [timeout], {}
211+
self.__wrapped__.poll, self, self._self_tracer, [timeout], {}
222212
)
223213

224-
def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
225-
self._consumer.subscribe(topics, on_assign, *args, **kwargs)
226-
227214
def original_consumer(self):
228-
return self._consumer
229-
230-
def __getattr__(self, name):
231-
return getattr(self._consumer, name)
215+
return self.__wrapped__
232216

233217

234218
class ConfluentKafkaInstrumentor(BaseInstrumentor):

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def test_instrument_api(self) -> None:
4848
producer = Producer({"bootstrap.servers": "localhost:29092"})
4949
producer = instrumentation.instrument_producer(producer)
5050

51-
self.assertEqual(producer.__class__, ProxiedProducer)
51+
self.assertIsInstance(producer, ProxiedProducer)
52+
self.assertIsInstance(producer, Producer)
5253

5354
producer = instrumentation.uninstrument_producer(producer)
5455
self.assertEqual(producer.__class__, Producer)
@@ -62,7 +63,8 @@ def test_instrument_api(self) -> None:
6263
)
6364

6465
consumer = instrumentation.instrument_consumer(consumer)
65-
self.assertEqual(consumer.__class__, ProxiedConsumer)
66+
self.assertIsInstance(consumer, ProxiedConsumer)
67+
self.assertIsInstance(consumer, Consumer)
6668

6769
consumer = instrumentation.uninstrument_consumer(consumer)
6870
self.assertEqual(consumer.__class__, Consumer)
@@ -76,7 +78,7 @@ def test_instrument_api(self) -> None:
7678
)
7779

7880
consumer = instrumentation.instrument_consumer(consumer)
79-
self.assertEqual(consumer.__class__, ProxiedConsumer)
81+
self.assertIsInstance(consumer, ProxiedConsumer)
8082

8183
consumer = instrumentation.uninstrument_consumer(consumer)
8284
self.assertEqual(consumer.__class__, Consumer)
@@ -123,7 +125,8 @@ def test_consumer_commit_method_exists(self) -> None:
123125
)
124126

125127
consumer = instrumentation.instrument_consumer(consumer)
126-
self.assertEqual(consumer.__class__, ProxiedConsumer)
128+
self.assertIsInstance(consumer, ProxiedConsumer)
129+
self.assertIsInstance(consumer, Consumer)
127130
self.assertTrue(hasattr(consumer, "commit"))
128131

129132
def test_context_setter(self) -> None:
@@ -465,6 +468,7 @@ def test_proxied_producer_delegates_undefined_methods(
465468
)
466469

467470
proxied = instrumentation.instrument_producer(producer)
471+
self.assertIsInstance(proxied, MockedProducer)
468472
self.assertEqual(proxied.list_topics(), "producer_topics")
469473

470474
def test_proxied_consumer_delegates_undefined_methods(
@@ -485,5 +489,6 @@ def test_proxied_consumer_delegates_undefined_methods(
485489
)
486490

487491
proxied = instrumentation.instrument_consumer(consumer)
492+
self.assertIsInstance(proxied, MockConsumer)
488493
self.assertEqual(proxied.list_topics(), "consumer_topics")
489494
self.assertEqual(proxied.assignment(), [])

0 commit comments

Comments
 (0)