Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/21221.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve check efficiency with many topics or partitions per consumer group when `collect_consumer_group_state` is enabled.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def report_consumer_offsets_and_lag(
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for consumer_group, offsets in consumer_offsets.items():
consumer_group_state = None
for (topic, partition), consumer_offset in offsets.items():
if reported_contexts >= contexts_limit:
self.log.debug(
Expand All @@ -259,7 +260,8 @@ def report_consumer_offsets_and_lag(
'kafka_cluster_id:%s' % cluster_id,
]
if self.config._collect_consumer_group_state:
consumer_group_state = self.get_consumer_group_state(consumer_group)
if consumer_group_state is None:
consumer_group_state = self.get_consumer_group_state(consumer_group)
consumer_group_tags.append(f'consumer_group_state:{consumer_group_state}')
consumer_group_tags.extend(self.config._custom_tags)

Expand Down
37 changes: 37 additions & 0 deletions kafka_consumer/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,3 +999,40 @@ def test_build_schema_none_handling():
# Test Protobuf schema with None - should raise TypeError or base64.binascii.Error
with pytest.raises((TypeError, base64.binascii.Error)):
build_protobuf_schema(None)


def test_consumer_group_state_fetched_once_per_group(check, kafka_instance, dd_run_check, aggregator):
mock_client = seed_mock_client()
# Set up two partitions for same topic to check multiple contexts in same consumer group
partitions = ['partition1', 'partition2']
offsets = [2, 3]
topic = 'topic1'
consumer_group = 'consumer_group1'
mock_client.consumer_get_cluster_id_and_list_topics.return_value = (
'cluster_id',
[(topic, partitions)],
)
mock_client.get_partitions_for_topic.return_value = partitions
consumer_group_offsets = [(topic, p, o) for p, o in zip(partitions, offsets)]
mock_client.list_consumer_group_offsets.return_value = [
(
consumer_group,
consumer_group_offsets,
)
]
kafka_instance["collect_consumer_group_state"] = True
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.client = mock_client

dd_run_check(kafka_consumer_check)

# Check that the consumer group state is fetched only once
assert mock_client.describe_consumer_group.call_count == 1

# Check that both partitions include the state tag
for metric in ("kafka.consumer_offset", "kafka.consumer_lag"):
for partition in partitions:
aggregator.assert_metric_has_tags(
metric,
tags=[f'partition:{partition}', 'consumer_group_state:STABLE'],
)
34 changes: 11 additions & 23 deletions postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,10 @@ def test_activity_metrics_no_aggregations(aggregator, integration_check, pg_inst
def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance):
pg_instance['collect_activity_metrics'] = True
check = integration_check(pg_instance)
app = 'test_activity_vacuum_excluded'

# Run vacuum in a thread
thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING, ANALYZE) persons', application_name='test')
thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING, ANALYZE) persons', application_name=app)

# Wait for vacuum to be running
_wait_for_value(
Expand All @@ -451,7 +452,7 @@ def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance):
query="SELECT count(*) from pg_stat_activity WHERE backend_type = 'client backend' AND query ~* '^vacuum';",
)

conn_increase_txid = _get_conn(pg_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name='test')
conn_increase_txid = _get_conn(pg_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name=app)
cur = conn_increase_txid.cursor()
# Increase txid counter
_increase_txid(cur)
Expand All @@ -463,7 +464,7 @@ def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance):
# Gather metrics
check.run()

expected_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app='test', user=USER_ADMIN)
expected_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app=app, user=USER_ADMIN)
aggregator.assert_metric('postgresql.waiting_queries', value=1, count=1, tags=expected_tags)
# Vacuum process with 3 xmin age should not be reported
aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=1, tags=expected_tags)
Expand All @@ -477,25 +478,21 @@ def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance):
thread.join()


@pytest.mark.flaky(max_runs=10)
def test_backend_transaction_age(aggregator, integration_check, pg_instance):
pg_instance['collect_activity_metrics'] = True
check = integration_check(pg_instance)

check.run()

dd_agent_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app='datadog-agent', user='datadog')
test_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app='test', user='datadog')
# No transaction in progress, we have 0
if float(POSTGRES_VERSION) >= 9.6:
aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=0, count=1, tags=dd_agent_tags)
else:
aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=dd_agent_tags)
aggregator.assert_metric('postgresql.activity.xact_start_age', count=1, tags=dd_agent_tags)

conn1 = _get_conn(pg_instance)
app = 'test_backend_transaction_age'
conn1 = _get_conn(pg_instance, application_name=app)
cur = conn1.cursor()

test_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app=app, user='datadog')
# No transaction in progress, nothing should be reported for test app
aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=test_tags)
aggregator.assert_metric('postgresql.activity.xact_start_age', count=0, tags=test_tags)

# Start a transaction in repeatable read to force pinning of backend_xmin
cur.execute('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;')
# Force assignement of a txid and keep the transaction opened
Expand All @@ -510,16 +507,10 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance):
if float(POSTGRES_VERSION) >= 9.6:
aggregator.assert_metric('postgresql.activity.backend_xid_age', value=1, count=1, tags=test_tags)
aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=1, count=1, tags=test_tags)

aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=dd_agent_tags)
aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=1, count=1, tags=dd_agent_tags)
else:
aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=test_tags)
aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=test_tags)

aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=dd_agent_tags)
aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=dd_agent_tags)

aggregator.assert_metric('postgresql.activity.xact_start_age', count=1, tags=test_tags)

with _get_conn(pg_instance) as conn2:
Expand All @@ -536,9 +527,6 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance):
aggregator.assert_metric('postgresql.activity.backend_xid_age', value=2, count=1, tags=test_tags)
aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=2, count=1, tags=test_tags)

aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=dd_agent_tags)
aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=2, count=1, tags=dd_agent_tags)

# Check that xact_start_age has a value greater than the trasaction_age lower bound
aggregator.assert_metric('postgresql.activity.xact_start_age', count=1, tags=test_tags)
assert_metric_at_least(
Expand Down
2 changes: 1 addition & 1 deletion postgres/tests/test_progress_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


def _check_analyze_progress(check, pg_instance, table):
thread = run_vacuum_thread(pg_instance, f'VACUUM ANALYZE {table}')
thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}')

# Wait for vacuum to be reported
_wait_for_value(
Expand Down
Loading