Skip to content

Commit ed51a88

Browse files
kafka_consumer: add broker list to cluster monitoring heartbeat (DataDog#23898)
* Add broker list to kafka_consumer cluster monitoring heartbeat Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Add changelog entry for PR DataDog#23898 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Address review comments on heartbeat broker list - Extract _get_broker_list() helper to separate topology-transform from payload-assembly - Use broker_meta.id instead of dict key; cast to str to match broker config event convention - Add tests for heartbeat brokers payload (populated and empty cases) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix _cluster_metadata not set on create_autospec mock in tests seed_mock_kafka_client uses create_autospec(KafkaClient), which does not expose instance attributes set in __init__. Setting _cluster_metadata directly on the mock fixture makes it available to _get_broker_list(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a93d4c6 commit ed51a88

3 files changed

Lines changed: 50 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add broker list to the cluster monitoring heartbeat payload.

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,15 @@ def check(self, _):
131131
def count_consumer_contexts(self, consumer_offsets):
132132
return sum(len(offsets) for offsets in consumer_offsets.values())
133133

134+
def _get_broker_list(self) -> list[dict]:
135+
cluster_metadata = self.client._cluster_metadata
136+
if not (cluster_metadata and hasattr(cluster_metadata, 'brokers')):
137+
return []
138+
return [
139+
{'id': str(broker_meta.id), 'host': broker_meta.host, 'port': broker_meta.port}
140+
for broker_meta in cluster_metadata.brokers.values()
141+
]
142+
134143
def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: str) -> None:
135144
payload = {
136145
'collection_timestamp': int(time() * 1000),
@@ -139,6 +148,7 @@ def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: st
139148
'contexts': total_contexts,
140149
'contexts_limit': self._context_limit,
141150
'bootstrap_servers': self.config._kafka_connect_str,
151+
'brokers': self._get_broker_list(),
142152
}
143153
if self.config._kafka_cluster_id_override:
144154
payload['original_kafka_cluster_id'] = self.config._auto_detected_cluster_id

kafka_consumer/tests/test_cluster_metadata.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def mock_describe_configs(resources):
146146

147147
# Set kafka_client as an attribute (not a property mock)
148148
client.kafka_client = mock_admin_client
149+
client._cluster_metadata = metadata
149150
client.get_topic_partitions.return_value = {'test-topic': [0, 1]}
150151

151152
def mock_offsets_for_times(partitions, offset=-1):
@@ -1419,3 +1420,41 @@ def test_partition_out_of_sync_broker_id_tag(
14191420
'kafka.partition.offline',
14201421
):
14211422
aggregator.assert_metric(metric, tags=expected_tags)
1423+
1424+
1425+
def test_heartbeat_brokers_populated(check):
1426+
"""Heartbeat payload includes the broker list when metadata is available."""
1427+
instance = {'kafka_connect_str': 'localhost:9092', 'enable_cluster_monitoring': True}
1428+
kafka_consumer_check = check(instance)
1429+
mock_kafka_client = seed_mock_kafka_client()
1430+
kafka_consumer_check.client = mock_kafka_client
1431+
kafka_consumer_check.event_platform_event = mock.Mock()
1432+
1433+
kafka_consumer_check._send_cluster_monitoring_heartbeat(total_contexts=5, cluster_id='test-cluster-id')
1434+
1435+
calls = kafka_consumer_check.event_platform_event.call_args_list
1436+
hb_events = [json.loads(c[0][0]) for c in calls if c[0][1] == 'data-streams-message']
1437+
hb_events = [e for e in hb_events if e.get('config_type') == 'heartbeat']
1438+
assert len(hb_events) == 1
1439+
assert hb_events[0]['brokers'] == [
1440+
{'id': '1', 'host': 'broker1', 'port': 9092},
1441+
{'id': '2', 'host': 'broker2', 'port': 9092},
1442+
]
1443+
1444+
1445+
def test_heartbeat_brokers_empty_when_no_metadata(check):
1446+
"""Heartbeat payload has an empty broker list when _cluster_metadata is None."""
1447+
instance = {'kafka_connect_str': 'localhost:9092', 'enable_cluster_monitoring': True}
1448+
kafka_consumer_check = check(instance)
1449+
mock_kafka_client = seed_mock_kafka_client()
1450+
mock_kafka_client._cluster_metadata = None
1451+
kafka_consumer_check.client = mock_kafka_client
1452+
kafka_consumer_check.event_platform_event = mock.Mock()
1453+
1454+
kafka_consumer_check._send_cluster_monitoring_heartbeat(total_contexts=0, cluster_id='test-cluster-id')
1455+
1456+
calls = kafka_consumer_check.event_platform_event.call_args_list
1457+
hb_events = [json.loads(c[0][0]) for c in calls if c[0][1] == 'data-streams-message']
1458+
hb_events = [e for e in hb_events if e.get('config_type') == 'heartbeat']
1459+
assert len(hb_events) == 1
1460+
assert hb_events[0]['brokers'] == []

0 commit comments

Comments
 (0)