Skip to content

Commit 72ef9bf

Browse files
committed
fix #4278: proxy all methods in ProxiedProducer/Consumer
Remove Producer/Consumer inheritance from ProxiedProducer and ProxiedConsumer. Add __getattr__ to delegate undefined methods to the wrapped client, fixing segfaults on calls like list_topics() or assignment(). Add regression tests to verify delegation of undefined methods on both proxy classes.
1 parent 196d088 commit 72ef9bf

4 files changed

Lines changed: 61 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1818

1919
### Fixed
2020

21+
- `opentelemetry-instrumentation-confluent-kafka`: Fix `ProxiedProducer` and `ProxiedConsumer` not delegating methods to the underlying producer/consumer instances
22+
([#4278](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4278))
2123
- Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone
2224
([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305))
2325

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def close(self): # pylint: disable=useless-super-delegation
151151
return super().close()
152152

153153

154-
class ProxiedProducer(Producer):
154+
class ProxiedProducer:
155155
def __init__(self, producer: Producer, tracer: Tracer):
156156
self._producer = producer
157157
self._tracer = tracer
@@ -177,8 +177,11 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
177177
def original_producer(self):
178178
return self._producer
179179

180+
def __getattr__(self, name):
181+
return getattr(self._producer, name)
180182

181-
class ProxiedConsumer(Consumer):
183+
184+
class ProxiedConsumer:
182185
def __init__(self, consumer: Consumer, tracer: Tracer):
183186
self._consumer = consumer
184187
self._tracer = tracer
@@ -224,6 +227,9 @@ def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): # p
224227
def original_consumer(self):
225228
return self._consumer
226229

230+
def __getattr__(self, name):
231+
return getattr(self._consumer, name)
232+
227233

228234
class ConfluentKafkaInstrumentor(BaseInstrumentor):
229235
"""An instrumentor for confluent kafka module

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,45 @@ def test_producer_flush(self) -> None:
373373
span_list = self.memory_exporter.get_finished_spans()
374374
self._assert_span_count(span_list, 1)
375375
self._assert_topic(span_list[0], "topic-1")
376+
377+
def test_proxied_producer_delegates_undefined_methods(
378+
self,
379+
) -> None:
380+
"""Regression test for #4278: methods not defined on
381+
ProxiedProducer must delegate to the underlying
382+
producer instead of hitting an uninitialized handle."""
383+
instrumentation = ConfluentKafkaInstrumentor()
384+
message_queue = []
385+
386+
producer = MockedProducer(
387+
message_queue,
388+
{
389+
"bootstrap.servers": "localhost:29092",
390+
},
391+
)
392+
393+
proxied = instrumentation.instrument_producer(producer)
394+
self.assertEqual(proxied.list_topics(), "producer_topics")
395+
396+
def test_proxied_consumer_delegates_undefined_methods(
397+
self,
398+
) -> None:
399+
"""Regression test for #4278: methods not defined on
400+
ProxiedConsumer must delegate to the underlying
401+
consumer instead of hitting an uninitialized handle."""
402+
instrumentation = ConfluentKafkaInstrumentor()
403+
404+
consumer = MockConsumer(
405+
[],
406+
{
407+
"bootstrap.servers": "localhost:29092",
408+
"group.id": "mygroup",
409+
"auto.offset.reset": "earliest",
410+
},
411+
)
412+
413+
proxied = instrumentation.instrument_consumer(consumer)
414+
self.assertEqual(
415+
proxied.list_topics(), "consumer_topics"
416+
)
417+
self.assertEqual(proxied.assignment(), [])

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ def poll(self, timeout=None):
1818
return self._queue.pop(0)
1919
return None
2020

21+
def list_topics(self, topic=None, timeout=-1):
22+
return "consumer_topics"
23+
24+
def assignment(self):
25+
return []
26+
2127

2228
class MockedMessage:
2329
def __init__(
@@ -77,3 +83,6 @@ def poll(self, *args, **kwargs):
7783

7884
def flush(self, *args, **kwargs):
7985
return len(self._queue)
86+
87+
def list_topics(self, topic=None, timeout=-1):
88+
return "producer_topics"

0 commit comments

Comments
 (0)