Skip to content

Commit 733ffda

Browse files
kafka_consumer: always fetch highwater offsets when cluster monitoring is enabled (#24149)
* kafka_consumer: always fetch highwater offsets when cluster monitoring is enabled When enable_cluster_monitoring is true the consumer context count can easily exceed the default max_partition_contexts (500), causing the check to skip highwater offset collection entirely. This silently zeros out kafka.topic.message_rate and stops kafka.broker_offset from being emitted, because _collect_topic_metadata receives an empty highwater_offsets dict. Bypass the context limit guard when cluster monitoring is active so that highwater offsets are always fetched; the existing per-metric context caps in report_highwater_offsets and report_consumer_offsets_and_lag still apply. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: add changelog entry for PR #24149 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_consumer: bypass context reporting limit when cluster monitoring is enabled When enable_cluster_monitoring is true, report all consumer lag and highwater offset metrics without capping at max_partition_contexts. Cluster monitoring users need full cluster visibility by design, so capping metric reporting makes no sense in that mode. Uses float('inf') as the reporting limit, which works correctly with the existing int comparisons in report_highwater_offsets and report_consumer_offsets_and_lag (int == inf is False, int >= inf is False, int < inf is True). Also suppresses the misleading "narrow your target" warning in cluster monitoring mode. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9b2a03c commit 733ffda

2 files changed

Lines changed: 10 additions & 4 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix kafka.broker_offset and kafka.topic.message_rate not being collected when enable_cluster_monitoring is true and the consumer context count exceeds max_partition_contexts.

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ def check(self, _):
6767
persistent_cache_key = "broker_timestamps_"
6868
consumer_contexts_count = self.count_consumer_contexts(consumer_offsets)
6969
try:
70-
if consumer_contexts_count < self._context_limit:
70+
# Cluster monitoring always requires highwater offsets (for topic.message_rate and other
71+
# cluster metadata metrics), so bypass the consumer context limit in that case.
72+
if consumer_contexts_count < self._context_limit or self.config._cluster_monitoring_enabled:
7173
# Fetch highwater offsets
7274
# Build partitions list or use all if configured
7375
# If cluster monitoring is enabled, always fetch all broker highwater marks
@@ -100,7 +102,10 @@ def check(self, _):
100102
consumer_offsets,
101103
highwater_offsets,
102104
)
103-
if total_contexts >= self._context_limit:
105+
# When cluster monitoring is enabled, all offsets and lag metrics are reported regardless
106+
# of context count so that the full cluster picture is always available.
107+
reporting_limit = float('inf') if self.config._cluster_monitoring_enabled else self._context_limit
108+
if total_contexts >= self._context_limit and not self.config._cluster_monitoring_enabled:
104109
self.warning(
105110
"""Discovered %s metric contexts - this exceeds the maximum number of %s contexts permitted by the
106111
check. Please narrow your target by specifying in your kafka_consumer.yaml the consumer groups, topics
@@ -113,11 +118,11 @@ def check(self, _):
113118
if self.config._kafka_cluster_id_override:
114119
cluster_id = self.config._kafka_cluster_id_override
115120

116-
self.report_highwater_offsets(highwater_offsets, self._context_limit, cluster_id)
121+
self.report_highwater_offsets(highwater_offsets, reporting_limit, cluster_id)
117122
self.report_consumer_offsets_and_lag(
118123
consumer_offsets,
119124
highwater_offsets,
120-
self._context_limit - len(highwater_offsets),
125+
reporting_limit - len(highwater_offsets),
121126
broker_timestamps,
122127
cluster_id,
123128
)

0 commit comments

Comments
 (0)