diff --git a/kafka_consumer/changelog.d/21221.fixed b/kafka_consumer/changelog.d/21221.fixed new file mode 100644 index 0000000000000..8989c5c776b6b --- /dev/null +++ b/kafka_consumer/changelog.d/21221.fixed @@ -0,0 +1 @@ +Improve check efficiency with many topics or partitions per consumer group when `collect_consumer_group_state` is enabled. \ No newline at end of file diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index aa240eed08df6..3cfce2a6d22bb 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -243,6 +243,7 @@ def report_consumer_offsets_and_lag( reported_contexts = 0 self.log.debug("Reporting consumer offsets and lag metrics") for consumer_group, offsets in consumer_offsets.items(): + consumer_group_state = None for (topic, partition), consumer_offset in offsets.items(): if reported_contexts >= contexts_limit: self.log.debug( @@ -259,7 +260,8 @@ def report_consumer_offsets_and_lag( 'kafka_cluster_id:%s' % cluster_id, ] if self.config._collect_consumer_group_state: - consumer_group_state = self.get_consumer_group_state(consumer_group) + if consumer_group_state is None: + consumer_group_state = self.get_consumer_group_state(consumer_group) consumer_group_tags.append(f'consumer_group_state:{consumer_group_state}') consumer_group_tags.extend(self.config._custom_tags) diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index f9e11fb3a909c..2903cf3e0bf6b 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -999,3 +999,40 @@ def test_build_schema_none_handling(): # Test Protobuf schema with None - should raise TypeError or base64.binascii.Error with pytest.raises((TypeError, base64.binascii.Error)): build_protobuf_schema(None) + + +def test_consumer_group_state_fetched_once_per_group(check, kafka_instance, dd_run_check, aggregator): + mock_client = seed_mock_client() + # Set up two partitions for same topic to check multiple contexts in same consumer group + partitions = ['partition1', 'partition2'] + offsets = [2, 3] + topic = 'topic1' + consumer_group = 'consumer_group1' + mock_client.consumer_get_cluster_id_and_list_topics.return_value = ( + 'cluster_id', + [(topic, partitions)], + ) + mock_client.get_partitions_for_topic.return_value = partitions + consumer_group_offsets = [(topic, p, o) for p, o in zip(partitions, offsets)] + mock_client.list_consumer_group_offsets.return_value = [ + ( + consumer_group, + consumer_group_offsets, + ) + ] + kafka_instance["collect_consumer_group_state"] = True + kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client = mock_client + + dd_run_check(kafka_consumer_check) + + # Check that the consumer group state is fetched only once + assert mock_client.describe_consumer_group.call_count == 1 + + # Check that both partitions include the state tag + for metric in ("kafka.consumer_offset", "kafka.consumer_lag"): + for partition in partitions: + aggregator.assert_metric_has_tags( + metric, + tags=[f'partition:{partition}', 'consumer_group_state:STABLE'], + ) diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index 7fc9b4792487b..1bdc917202d5c 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -440,9 +440,10 @@ def test_activity_metrics_no_aggregations(aggregator, integration_check, pg_inst def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance): pg_instance['collect_activity_metrics'] = True check = integration_check(pg_instance) + app = 'test_activity_vacuum_excluded' # Run vacuum in a thread - thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING, ANALYZE) persons', application_name='test') + thread = run_vacuum_thread(pg_instance, 'VACUUM (DISABLE_PAGE_SKIPPING, ANALYZE) persons', application_name=app) # Wait for vacuum to be running _wait_for_value( @@ -451,7 +452,7 @@ def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance): query="SELECT count(*) from pg_stat_activity WHERE backend_type = 'client backend' AND query ~* '^vacuum';", ) - conn_increase_txid = _get_conn(pg_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name='test') + conn_increase_txid = _get_conn(pg_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name=app) cur = conn_increase_txid.cursor() # Increase txid counter _increase_txid(cur) @@ -463,7 +464,7 @@ def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance): # Gather metrics check.run() - expected_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app='test', user=USER_ADMIN) + expected_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app=app, user=USER_ADMIN) aggregator.assert_metric('postgresql.waiting_queries', value=1, count=1, tags=expected_tags) # Vacuum process with 3 xmin age should not be reported aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=1, tags=expected_tags) @@ -477,25 +478,21 @@ def test_activity_vacuum_excluded(aggregator, integration_check, pg_instance): thread.join() -@pytest.mark.flaky(max_runs=10) def test_backend_transaction_age(aggregator, integration_check, pg_instance): pg_instance['collect_activity_metrics'] = True check = integration_check(pg_instance) check.run() - dd_agent_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app='datadog-agent', user='datadog') - test_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app='test', user='datadog') - # No transaction in progress, we have 0 - if float(POSTGRES_VERSION) >= 9.6: - aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=0, count=1, tags=dd_agent_tags) - else: - aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=dd_agent_tags) - aggregator.assert_metric('postgresql.activity.xact_start_age', count=1, tags=dd_agent_tags) - - conn1 = _get_conn(pg_instance) + app = 'test_backend_transaction_age' + conn1 = _get_conn(pg_instance, application_name=app) cur = conn1.cursor() + test_tags = _get_expected_tags(check, pg_instance, db=DB_NAME, app=app, user='datadog') + # No transaction in progress, nothing should be reported for test app + aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=test_tags) + aggregator.assert_metric('postgresql.activity.xact_start_age', count=0, tags=test_tags) + # Start a transaction in repeatable read to force pinning of backend_xmin cur.execute('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;') # Force assignement of a txid and keep the transaction opened @@ -510,16 +507,10 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance): if float(POSTGRES_VERSION) >= 9.6: aggregator.assert_metric('postgresql.activity.backend_xid_age', value=1, count=1, tags=test_tags) aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=1, count=1, tags=test_tags) - - aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=dd_agent_tags) - aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=1, count=1, tags=dd_agent_tags) else: aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=test_tags) aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=test_tags) - aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=dd_agent_tags) - aggregator.assert_metric('postgresql.activity.backend_xmin_age', count=0, tags=dd_agent_tags) - aggregator.assert_metric('postgresql.activity.xact_start_age', count=1, tags=test_tags) with _get_conn(pg_instance) as conn2: @@ -536,9 +527,6 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance): aggregator.assert_metric('postgresql.activity.backend_xid_age', value=2, count=1, tags=test_tags) aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=2, count=1, tags=test_tags) - aggregator.assert_metric('postgresql.activity.backend_xid_age', count=0, tags=dd_agent_tags) - aggregator.assert_metric('postgresql.activity.backend_xmin_age', value=2, count=1, tags=dd_agent_tags) - # Check that xact_start_age has a value greater than the trasaction_age lower bound aggregator.assert_metric('postgresql.activity.xact_start_age', count=1, tags=test_tags) assert_metric_at_least( diff --git a/postgres/tests/test_progress_stats.py b/postgres/tests/test_progress_stats.py index af7d09e377f0f..444de384b7fd3 100644 --- a/postgres/tests/test_progress_stats.py +++ b/postgres/tests/test_progress_stats.py @@ -31,7 +31,7 @@ def _check_analyze_progress(check, pg_instance, table): - thread = run_vacuum_thread(pg_instance, f'VACUUM ANALYZE {table}') + thread = run_vacuum_thread(pg_instance, f'ANALYZE {table}') # Wait for vacuum to be reported _wait_for_value(