Skip to content

Commit 31ae610

Browse files
[kafka_consumer] Fix consumer leak when offsets_for_times times out (DataDog#23241)
* [kafka_consumer] Fix consumer leak when offsets_for_times times out Wrap Consumer usage in get_watermark_offsets() in try/finally to ensure close_consumer() is always called, even when offsets_for_times() throws. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add changelog entry for kafka_consumer consumer leak fix Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Remove unnecessary debug log and keep fix minimal Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Restore pre-existing debug log Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 61fcf15 commit 31ae610

2 files changed

Lines changed: 15 additions & 13 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix consumer leak when offsets_for_times() times out, preventing a potential librdkafka crash

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -408,21 +408,22 @@ def get_watermark_offsets(self, partitions=None, mode=HIGH_WATERMARK):
408408

409409
# Open consumer once for both cluster_id and offset fetching
410410
self.client.open_consumer(dd_consumer_group)
411-
cluster_id, _ = self.client.consumer_get_cluster_id_and_list_topics(dd_consumer_group)
412-
413-
self.log.debug(
414-
'Querying %s %s offsets',
415-
len(topic_partitions_to_check),
416-
'highwater' if mode == HIGH_WATERMARK else 'lowwater',
417-
)
411+
try:
412+
cluster_id, _ = self.client.consumer_get_cluster_id_and_list_topics(dd_consumer_group)
418413

419-
result = {}
420-
for topic, partition, offset in self.client.consumer_offsets_for_times(
421-
partitions=topic_partitions_to_check, offset=mode
422-
):
423-
result[(topic, partition)] = offset
414+
self.log.debug(
415+
'Querying %s %s offsets',
416+
len(topic_partitions_to_check),
417+
'highwater' if mode == HIGH_WATERMARK else 'lowwater',
418+
)
424419

425-
self.client.close_consumer()
420+
result = {}
421+
for topic, partition, offset in self.client.consumer_offsets_for_times(
422+
partitions=topic_partitions_to_check, offset=mode
423+
):
424+
result[(topic, partition)] = offset
425+
finally:
426+
self.client.close_consumer()
426427

427428
self.log.debug('Got %s %s offsets', len(result), 'highwater' if mode == HIGH_WATERMARK else 'lowwater')
428429
return result, cluster_id

0 commit comments

Comments
 (0)