Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/20948.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data streams: Don't retrieve messages for untracked topics and cleanup consumer groups used for Data Streams messages feature.
13 changes: 11 additions & 2 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,22 @@ def list_consumer_group_offsets(self, groups):
offsets.append((response_offset_info.group_id, tpo))
return offsets

def start_collecting_messages(self, start_offsets):
self.open_consumer('datadog_live_messages')
def start_collecting_messages(self, start_offsets, consumer_group):
self.open_consumer(consumer_group)
self._consumer.assign(start_offsets)

def get_next_message(self):
return self._consumer.poll(timeout=1)

def delete_consumer_group(self, consumer_group):
"""Delete a consumer group using the AdminClient."""
try:
future = self.kafka_client.delete_consumer_groups([consumer_group])
future[consumer_group].result(timeout=self.config._request_timeout)
self.log.debug("Successfully deleted consumer group: %s", consumer_group)
except Exception as e:
self.log.warning("Failed to delete consumer group %s: %s", consumer_group, e)

def describe_consumer_group(self, consumer_group):
desc = self.kafka_client.describe_consumer_groups([consumer_group])[consumer_group].result()
return desc.state.name
Expand Down
96 changes: 54 additions & 42 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ def check(self, _):
broker_timestamps,
cluster_id,
)
self.data_streams_live_message(highwater_offsets or {}, cluster_id)
if self.config._close_admin_client:
self.client.close_admin_client()
self.data_streams_live_message(highwater_offsets or {}, cluster_id)

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
Expand Down Expand Up @@ -414,7 +414,9 @@ def send_event(self, title, text, tags, event_type, aggregation_key, severity='i
self.event(event_dict)

def data_streams_live_message(self, highwater_offsets, cluster_id):
monitored_topics = None
for cfg in self.config.live_messages_configs:
monitored_topics = monitored_topics or {topic.lower() for (topic, _) in highwater_offsets.keys()}
kafka = cfg['kafka']
topic = kafka["topic"]
partition = kafka["partition"]
Expand All @@ -430,6 +432,9 @@ def data_streams_live_message(self, highwater_offsets, cluster_id):
continue
if not cluster or not cluster_id or cluster.lower() != cluster_id.lower():
continue
if topic.lower() not in monitored_topics:
self.log.debug('Skipping live messages for topic %s because it is not monitored by this check', topic)
continue
start_offsets = resolve_start_offsets(highwater_offsets, topic, partition, start_offset, n_messages)

if not start_offsets:
Expand All @@ -443,6 +448,7 @@ def data_streams_live_message(self, highwater_offsets, cluster_id):
'topic': str(topic),
'live_messages_error': 'Unable to list partitions to read from',
'message': "Unable to list partitions to read from",
'feature': 'data_streams_messages',
}
)
continue
Expand Down Expand Up @@ -471,48 +477,54 @@ def data_streams_live_message(self, highwater_offsets, cluster_id):
)
continue

self.client.start_collecting_messages(start_offsets)
for _ in range(n_messages):
message = self.client.get_next_message()
if message is None:
self.log.debug('Live messages: no message to retrieve')
self.send_log(
{
'timestamp': int(time()),
'config_id': config_id,
'technology': 'kafka',
'cluster': str(cluster),
'topic': str(topic),
'live_messages_error': 'No more messages to retrieve',
'message': "No more messages to retrieve",
}
consumer_group = f"datadog_messages_{config_id}"
self.client.start_collecting_messages(start_offsets, consumer_group)
try:
for _ in range(n_messages):
message = self.client.get_next_message()
if message is None:
self.log.debug('Live messages: no message to retrieve')
self.send_log(
{
'timestamp': int(time()),
'config_id': config_id,
'technology': 'kafka',
'cluster': str(cluster),
'topic': str(topic),
'live_messages_error': 'No more messages to retrieve',
'message': "No more messages to retrieve",
'feature': 'data_streams_messages',
}
)
break
data = {
'timestamp': int(time()),
'technology': 'kafka',
'cluster': str(cluster),
'config_id': config_id,
'topic': str(topic),
'partition': str(message.partition()),
'offset': str(message.offset()),
'feature': 'data_streams_messages',
}
decoded_value, value_schema_id, decoded_key, key_schema_id = deserialize_message(
message, value_format, value_schema, key_format, key_schema
)
break
data = {
'timestamp': int(time()),
'technology': 'kafka',
'cluster': str(cluster),
'config_id': config_id,
'topic': str(topic),
'partition': str(message.partition()),
'offset': str(message.offset()),
}
decoded_value, value_schema_id, decoded_key, key_schema_id = deserialize_message(
message, value_format, value_schema, key_format, key_schema
)
if decoded_value:
data['message_value'] = decoded_value
else:
data['message'] = "Message format not supported"
data['live_messages_error'] = 'Message format not supported'
if value_schema_id:
data['value_schema_id'] = str(value_schema_id)
if decoded_key:
data['message_key'] = decoded_key
if key_schema_id:
data['key_schema_id'] = str(key_schema_id)
self.send_log(data)
self.client.close_consumer()
if decoded_value:
data['message_value'] = decoded_value
else:
data['message'] = "Message format not supported"
data['live_messages_error'] = 'Message format not supported'
if value_schema_id:
data['value_schema_id'] = str(value_schema_id)
if decoded_key:
data['message_key'] = decoded_key
if key_schema_id:
data['key_schema_id'] = str(key_schema_id)
self.send_log(data)
finally:
self.client.close_consumer()
self.client.delete_consumer_group(consumer_group)
self._mark_messages_retrieved(config_id)


Expand Down
29 changes: 29 additions & 0 deletions kafka_consumer/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import mock
import pytest
from confluent_kafka.admin import AdminClient

from datadog_checks.dev.utils import get_metadata_metrics

Expand All @@ -34,6 +35,27 @@ def mocked_time():
return 400


def get_all_consumer_groups(kafka_instance):
"""Get all consumer groups from Kafka cluster."""
config = {
"bootstrap.servers": kafka_instance['kafka_connect_str'],
"socket.timeout.ms": 1000,
"topic.metadata.refresh.interval.ms": 2000,
}
config.update(common.get_authentication_configuration(kafka_instance))
admin_client = AdminClient(config)

final_groups = set()
try:
groups_result = admin_client.list_consumer_groups().result()
for valid_group in groups_result.valid:
final_groups.add(valid_group.group_id)
except Exception as e:
print(f"Error getting final consumer groups: {e}")

return final_groups


def test_check_kafka(aggregator, check, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
Expand Down Expand Up @@ -486,6 +508,11 @@ def test_data_streams_live_messages(dd_run_check, check, kafka_instance, datadog
]
kafka_check = check(kafka_instance)
dd_run_check(kafka_check)

# Verify that live messages is not leaving behind any new consumer groups
final_groups = get_all_consumer_groups(kafka_instance)
assert final_groups == {'my_consumer'}

expected_logs = [
{
'timestamp': 400 * 1000,
Expand All @@ -495,6 +522,7 @@ def test_data_streams_live_messages(dd_run_check, check, kafka_instance, datadog
'topic': 'marvel',
'partition': '0',
'offset': '0',
'feature': 'data_streams_messages',
'message_value': '{"name": "Peter Parker", "age": 18, "transaction_amount": 123, "currency": "dollar"}',
'ddtags': 'optional:tag1',
},
Expand All @@ -506,6 +534,7 @@ def test_data_streams_live_messages(dd_run_check, check, kafka_instance, datadog
'topic': 'marvel',
'partition': '0',
'offset': '1',
'feature': 'data_streams_messages',
'message_value': '{"name": "Bruce Banner", "age": 45,\
"transaction_amount": 456, "currency": "dollar"}',
'value_schema_id': '350',
Expand Down
23 changes: 15 additions & 8 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'partition': '0',
'offset': '12',
'feature': 'data_streams_messages',
'message_value': '{"name": "Peter Parker", "age": 18, \
"transaction_amount": 123, "currency": "dollar"}',
'message_key': '{"name": "Peter Parker"}',
Expand All @@ -733,9 +734,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'partition': '0',
'offset': '13',
'feature': 'data_streams_messages',
'message_value': '{"name": "Bruce Banner", "age": 45, \
"transaction_amount": 456, "currency": "dollar"}',
},
Expand All @@ -744,9 +746,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'message': 'No more messages to retrieve',
'live_messages_error': 'No more messages to retrieve',
'feature': 'data_streams_messages',
},
],
id='Retrieves messages from Kafka',
Expand Down Expand Up @@ -778,9 +781,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'partition': '0',
'offset': '12',
'feature': 'data_streams_messages',
'message_value': (
'{\n "isbn": "9780134190440",\n "title": "The Go Programming Language",\n '
'"author": "Alan Donovan"\n}'
Expand All @@ -792,9 +796,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'message': 'No more messages to retrieve',
'live_messages_error': 'No more messages to retrieve',
'feature': 'data_streams_messages',
},
],
id='Retrieves Protobuf messages from Kafka',
Expand Down Expand Up @@ -822,9 +827,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'partition': '0',
'offset': '12',
'feature': 'data_streams_messages',
'message_value': (
'{"isbn": 9780134190440, "title": "The Go Programming Language", "author": "Alan Donovan"}'
),
Expand All @@ -835,9 +841,10 @@ def mocked_time():
'technology': 'kafka',
'cluster': 'cluster_id',
'config_id': 'config_1_id',
'topic': 'marvel',
'topic': 'topic1',
'message': 'No more messages to retrieve',
'live_messages_error': 'No more messages to retrieve',
'feature': 'data_streams_messages',
},
],
id='Retrieves Avro messages from Kafka',
Expand All @@ -864,7 +871,7 @@ def test_data_streams_messages(
{
'kafka': {
'cluster': 'cluster_id',
'topic': 'marvel',
'topic': 'topic1',
'partition': 0,
'start_offset': 0,
'n_messages': 3,
Expand Down
Loading
Loading