Skip to content

Commit 15c3e7a

Browse files
[kafka_consumer] Send cluster monitoring heartbeat via data streams messages (DataDog#23281)
* [kafka_consumer] Send heartbeat via data streams messages when cluster monitoring is enabled Report context count and limit on every check run so we can tell if a customer is hitting the context ceiling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add changelog entry for cluster monitoring heartbeat Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fall back to admin client metadata for cluster_id in heartbeat When highwater offset collection is skipped (context limit exceeded), cluster_id is empty. Use the cluster metadata from request_metadata_update() which runs unconditionally. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Derive cluster_id internally in heartbeat method Always read cluster_id from admin client metadata instead of accepting it as a parameter. Simpler and removes dependency on caller state. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use getattr for _cluster_metadata access to handle mocked clients Tests mock self.client without _cluster_metadata attribute, causing AttributeError. Use getattr with a default to be safe. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Accept cluster_id from caller, use _auto_detected_cluster_id for original The check() method already resolves cluster_id (with override) and stores the auto-detected value in config._auto_detected_cluster_id. Reuse that rather than trying to read _cluster_metadata directly, which breaks with mocked clients in tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7cd9465 commit 15c3e7a

2 files changed

Lines changed: 14 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Send cluster monitoring heartbeat via data streams messages with context count and limit.

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def check(self, _):
130130

131131
# Collect cluster metadata if enabled
132132
if self.config._cluster_monitoring_enabled:
133+
self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id)
133134
try:
134135
self.metadata_collector.collect_all_metadata(highwater_offsets)
135136
except Exception as e:
@@ -141,6 +142,18 @@ def check(self, _):
141142
def count_consumer_contexts(self, consumer_offsets):
142143
return sum(len(offsets) for offsets in consumer_offsets.values())
143144

145+
def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: str) -> None:
146+
payload = {
147+
'collection_timestamp': int(time() * 1000),
148+
'kafka_cluster_id': cluster_id,
149+
'config_type': 'heartbeat',
150+
'contexts': total_contexts,
151+
'contexts_limit': self._context_limit,
152+
}
153+
if self.config._kafka_cluster_id_override:
154+
payload['original_kafka_cluster_id'] = self.config._auto_detected_cluster_id
155+
self.event_platform_event(json.dumps(payload), "data-streams-message")
156+
144157
def get_consumer_offsets(self):
145158
# {(consumer_group, topic, partition): offset}
146159
self.log.debug('Getting consumer offsets')

0 commit comments

Comments
 (0)