Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/23898.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add broker list to the cluster monitoring heartbeat payload.
10 changes: 10 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ def check(self, _):
def count_consumer_contexts(self, consumer_offsets):
return sum(len(offsets) for offsets in consumer_offsets.values())

def _get_broker_list(self) -> list[dict]:
cluster_metadata = self.client._cluster_metadata
if not (cluster_metadata and hasattr(cluster_metadata, 'brokers')):
return []
return [
{'id': str(broker_meta.id), 'host': broker_meta.host, 'port': broker_meta.port}
for broker_meta in cluster_metadata.brokers.values()
]

def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: str) -> None:
payload = {
'collection_timestamp': int(time() * 1000),
Expand All @@ -139,6 +148,7 @@ def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: st
'contexts': total_contexts,
'contexts_limit': self._context_limit,
'bootstrap_servers': self.config._kafka_connect_str,
'brokers': self._get_broker_list(),
}
if self.config._kafka_cluster_id_override:
payload['original_kafka_cluster_id'] = self.config._auto_detected_cluster_id
Expand Down
39 changes: 39 additions & 0 deletions kafka_consumer/tests/test_cluster_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def mock_describe_configs(resources):

# Set kafka_client as an attribute (not a property mock)
client.kafka_client = mock_admin_client
client._cluster_metadata = metadata
client.get_topic_partitions.return_value = {'test-topic': [0, 1]}

def mock_offsets_for_times(partitions, offset=-1):
Expand Down Expand Up @@ -1419,3 +1420,41 @@ def test_partition_out_of_sync_broker_id_tag(
'kafka.partition.offline',
):
aggregator.assert_metric(metric, tags=expected_tags)


def test_heartbeat_brokers_populated(check):
"""Heartbeat payload includes the broker list when metadata is available."""
instance = {'kafka_connect_str': 'localhost:9092', 'enable_cluster_monitoring': True}
kafka_consumer_check = check(instance)
mock_kafka_client = seed_mock_kafka_client()
kafka_consumer_check.client = mock_kafka_client
kafka_consumer_check.event_platform_event = mock.Mock()

kafka_consumer_check._send_cluster_monitoring_heartbeat(total_contexts=5, cluster_id='test-cluster-id')

calls = kafka_consumer_check.event_platform_event.call_args_list
hb_events = [json.loads(c[0][0]) for c in calls if c[0][1] == 'data-streams-message']
hb_events = [e for e in hb_events if e.get('config_type') == 'heartbeat']
assert len(hb_events) == 1
assert hb_events[0]['brokers'] == [
{'id': '1', 'host': 'broker1', 'port': 9092},
{'id': '2', 'host': 'broker2', 'port': 9092},
]


def test_heartbeat_brokers_empty_when_no_metadata(check):
"""Heartbeat payload has an empty broker list when _cluster_metadata is None."""
instance = {'kafka_connect_str': 'localhost:9092', 'enable_cluster_monitoring': True}
kafka_consumer_check = check(instance)
mock_kafka_client = seed_mock_kafka_client()
mock_kafka_client._cluster_metadata = None
kafka_consumer_check.client = mock_kafka_client
kafka_consumer_check.event_platform_event = mock.Mock()

kafka_consumer_check._send_cluster_monitoring_heartbeat(total_contexts=0, cluster_id='test-cluster-id')

calls = kafka_consumer_check.event_platform_event.call_args_list
hb_events = [json.loads(c[0][0]) for c in calls if c[0][1] == 'data-streams-message']
hb_events = [e for e in hb_events if e.get('config_type') == 'heartbeat']
assert len(hb_events) == 1
assert hb_events[0]['brokers'] == []
Loading