diff --git a/harbor/assets/configuration/spec.yaml b/harbor/assets/configuration/spec.yaml index 2e80862f49f40..6c7bf6fa96bb4 100644 --- a/harbor/assets/configuration/spec.yaml +++ b/harbor/assets/configuration/spec.yaml @@ -31,33 +31,3 @@ files: type: file path: /var/log/harbor/*.log source: harbor -- name: auto_conf.yaml - options: - - template: ad_identifiers - overrides: - value.example: - - nginx-photon - - template: init_config - options: [] - - template: instances - options: - - name: url - required: true - description: The Harbor HTTP url. - value: - example: 'http://%%host%%/' - type: string - - name: username - required: true - description: | - The username to use for authentication against the Harbor API. - Note: Some metrics and service checks requires an admin account to be collected. - This includes chartmuseum.status, replication registries data, volume information. - value: - type: string - - name: password - required: true - description: The password used together with the username for authentication against the Harbor API. - secret: true - value: - type: string diff --git a/harbor/changelog.d/23595.fixed b/harbor/changelog.d/23595.fixed new file mode 100644 index 0000000000000..53014f7ef1d82 --- /dev/null +++ b/harbor/changelog.d/23595.fixed @@ -0,0 +1 @@ +Remove the default auto configuration that used literal placeholder credentials. \ No newline at end of file diff --git a/harbor/datadog_checks/harbor/data/auto_conf.yaml b/harbor/datadog_checks/harbor/data/auto_conf.yaml deleted file mode 100644 index a89b5a9e07d56..0000000000000 --- a/harbor/datadog_checks/harbor/data/auto_conf.yaml +++ /dev/null @@ -1,32 +0,0 @@ -## @param ad_identifiers - list of strings - required -## A list of container identifiers that are used by Autodiscovery to identify -## which container the check should be run against. For more information, see: -## https://docs.datadoghq.com/agent/guide/ad_identifiers/ -# -ad_identifiers: - - nginx-photon - -## All options defined here are available to all instances. -# -init_config: - -## Every instance is scheduled independently of the others. -# -instances: - - ## @param url - string - required - ## The Harbor HTTP url. - # - - url: http://%%host%%/ - - ## @param username - string - required - ## The username to use for authentication against the Harbor API. - ## Note: Some metrics and service checks requires an admin account to be collected. - ## This includes chartmuseum.status, replication registries data, volume information. - # - username: - - ## @param password - string - required - ## The password used together with the username for authentication against the Harbor API. - # - password: diff --git a/postgres/assets/configuration/spec.yaml b/postgres/assets/configuration/spec.yaml index 133804c73bcca..6355f3f0ce86b 100644 --- a/postgres/assets/configuration/spec.yaml +++ b/postgres/assets/configuration/spec.yaml @@ -1065,6 +1065,97 @@ files: example: - "customer.*" + - name: collect_column_statistics + description: | + Enable collection of column statistics from pg_stats. + Requires the `datadog.column_statistics()` function to be created in the database. + options: + - name: enabled + description: | + Enable collection of column statistics. Requires `dbm: true`. + value: + type: boolean + example: false + - name: collection_interval + description: | + Set the column statistics collection interval (in seconds). + value: + type: number + example: 3600 + display_default: 3600 + - name: max_tables + description: | + Maximum number of tables to collect column statistics for. + value: + type: number + example: 500 + display_default: 500 + - name: include_databases + description: | + A list of regex patterns to include databases. + Any database whose name matches any one of these patterns will be included. + If empty, all databases matching other filters are included. + value: + type: array + items: + type: string + example: + - "mydb" + - name: exclude_databases + description: | + A list of regex patterns to exclude databases. + Any database whose name matches any one of these patterns will be excluded. + If empty, all databases matching other filters are included. + value: + type: array + items: + type: string + example: + - "privatedb.*" + - name: include_schemas + description: | + A list of regex patterns to include schemas. + Any schema whose name matches any one of these patterns will be included. + If empty, all schemas matching other filters are included. + value: + type: array + items: + type: string + example: + - "myschema" + - name: exclude_schemas + description: | + A list of regex patterns to exclude schemas. + Any schema whose name matches any one of these patterns will be excluded. + If empty, all schemas matching other filters are included. + value: + type: array + items: + type: string + example: + - "privateschema.*" + - name: include_tables + description: | + A list of regex patterns to include tables. + Any table whose name matches any one of these patterns will be included. + If empty, all tables matching other filters are included. + value: + type: array + items: + type: string + example: + - "users.*" + - name: exclude_tables + description: | + A list of regex patterns to exclude tables. + Any table whose name matches any one of these patterns will be excluded. + If empty, all tables matching other filters are included. + value: + type: array + items: + type: string + example: + - "temp_.*" - name: aws display_priority: 0 description: | diff --git a/postgres/changelog.d/23364.added b/postgres/changelog.d/23364.added new file mode 100644 index 0000000000000..724d3fb8c8fb4 --- /dev/null +++ b/postgres/changelog.d/23364.added @@ -0,0 +1 @@ +Add column statistics collection from pg_stats for Database Monitoring. \ No newline at end of file diff --git a/postgres/datadog_checks/postgres/column_statistics.py b/postgres/datadog_checks/postgres/column_statistics.py new file mode 100644 index 0000000000000..df1bebb583171 --- /dev/null +++ b/postgres/datadog_checks/postgres/column_statistics.py @@ -0,0 +1,342 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import re +import time + +import psycopg +from psycopg.rows import dict_row + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from datadog_checks.postgres import PostgreSql + +from datadog_checks.base.utils.db.health import HealthStatus +from datadog_checks.base.utils.db.utils import default_json_event_encoding +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method + +from .filters import regex_exclude_clauses, regex_include_clause +from .health import PostgresHealthEvent +from .util import payload_pg_version + + +def agent_check_getter(self): + return self._check + + +COLUMN_STATISTICS_QUERY = """\ +WITH tables AS ( + SELECT n.nspname AS schemaname, c.relname AS tablename + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname NOT IN ('pg_catalog', 'information_schema') + AND c.relkind = 'r' + {filters} + ORDER BY n.nspname, c.relname + LIMIT %s +), +column_data AS ( + SELECT + t.schemaname, + t.tablename, + json_build_object( + 'name', s.attname, + 'avg_width', s.avg_width, + 'n_distinct', s.n_distinct, + 'null_frac', s.null_frac, + 'inherited', s.inherited, + 'correlation', s.correlation, + 'most_common_freqs', s.most_common_freqs + ) AS col, + EXTRACT(EPOCH FROM (NOW() - st.last_analyze))::bigint AS last_analyze_age, + EXTRACT(EPOCH FROM (NOW() - st.last_autoanalyze))::bigint AS last_autoanalyze_age, + EXTRACT(EPOCH FROM (NOW() - st.last_vacuum))::bigint AS last_vacuum_age, + EXTRACT(EPOCH FROM (NOW() - st.last_autovacuum))::bigint AS last_autovacuum_age, + EXTRACT(EPOCH FROM (NOW() - GREATEST(st.last_analyze, st.last_autoanalyze)))::bigint AS stats_age + FROM datadog.column_statistics() s + JOIN tables t ON t.schemaname = s.schemaname AND t.tablename = s.tablename + JOIN pg_stat_all_tables st ON st.schemaname = t.schemaname AND st.relname = t.tablename +) +SELECT + schemaname, + tablename, + json_agg(col ORDER BY col->>'name') AS columns, + (array_agg(last_analyze_age))[1] AS last_analyze_age, + (array_agg(last_autoanalyze_age))[1] AS last_autoanalyze_age, + (array_agg(last_vacuum_age))[1] AS last_vacuum_age, + (array_agg(last_autovacuum_age))[1] AS last_autovacuum_age, + (array_agg(stats_age))[1] AS stats_age +FROM column_data +GROUP BY schemaname, tablename +ORDER BY schemaname, tablename +""" + + +PAYLOAD_MAX_COLUMNS = 5_000 +MAX_QUERY_DURATION_SECONDS = 60 + + +class PostgresColumnStatisticsCollectorConfig: + def __init__(self): + self.collection_interval = 3600 + self.max_tables = 500 + self.include_databases: list[str] = [] + self.exclude_databases: list[str] = [] + self.include_schemas: list[str] = [] + self.exclude_schemas: list[str] = [] + self.include_tables: list[str] = [] + self.exclude_tables: list[str] = [] + + +class PostgresColumnStatisticsCollector: + """Collects column statistics from pg_stats via the datadog.column_statistics() function.""" + + def __init__(self, check: PostgreSql, cancel_event): + self._check = check + self._log = check.log + self._cancel_event = cancel_event + self._config = PostgresColumnStatisticsCollectorConfig() + self._config.collection_interval = check._config.collect_column_statistics.collection_interval + self._config.max_tables = check._config.collect_column_statistics.max_tables + self._config.include_databases = list(check._config.collect_column_statistics.include_databases or []) + self._config.exclude_databases = list(check._config.collect_column_statistics.exclude_databases or []) + self._config.include_schemas = list(check._config.collect_column_statistics.include_schemas or []) + self._config.exclude_schemas = list(check._config.collect_column_statistics.exclude_schemas or []) + self._config.include_tables = list(check._config.collect_column_statistics.include_tables or []) + self._config.exclude_tables = list(check._config.collect_column_statistics.exclude_tables or []) + self._function_not_found_dbs: set[str] = set() + self._insufficient_privilege_dbs: set[str] = set() + self._reset() + + def _reset(self): + self._collection_started_at = None + self._total_tables_count = 0 + self._total_columns_count = 0 + self._payloads_count = 0 + self._queued_tables = [] + self._queued_columns_count = 0 + + @property + def _base_event(self): + return { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "dbms": "postgres", + "dbms_version": payload_pg_version(self._check.version), + "cloud_metadata": self._check.cloud_metadata, + "dbm_type": "column_statistics", + "collection_interval": self._config.collection_interval, + } + + def _build_filters(self) -> tuple[str, list[str]]: + query = "" + params: list[str] = [] + + query += regex_exclude_clauses("n.nspname", self._config.exclude_schemas) + params.extend(self._config.exclude_schemas) + + query += regex_include_clause("n.nspname", self._config.include_schemas) + params.extend(self._config.include_schemas) + + query += regex_exclude_clauses("c.relname", self._config.exclude_tables) + params.extend(self._config.exclude_tables) + + query += regex_include_clause("c.relname", self._config.include_tables) + params.extend(self._config.include_tables) + + return query, params + + def _get_databases(self): + if self._check.autodiscovery: + databases = list(self._check.autodiscovery.get_items() or []) + else: + databases = [self._check._config.dbname] + if self._config.exclude_databases: + databases = [db for db in databases if not any(re.search(p, db) for p in self._config.exclude_databases)] + if self._config.include_databases: + databases = [db for db in databases if any(re.search(p, db) for p in self._config.include_databases)] + return databases + + @tracked_method(agent_check_getter=agent_check_getter) + def collect_column_statistics(self, tags_no_db: list[str]) -> bool: + """Collect column statistics across all discovered databases.""" + status = "success" + try: + self._collection_started_at = time.time() * 1000 + + if self._cancel_event.is_set(): + self._log.debug("Column stats collection cancelled") + return True + + databases = self._get_databases() + self._log.debug("Collecting column stats for %d databases", len(databases)) + + for db_name in databases: + if self._cancel_event.is_set(): + self._log.debug("Column stats collection cancelled") + break + self._collect_for_database(db_name, tags_no_db) + + self._log.debug( + "Submitted column stats: %d tables, %d columns, %d payloads across %d databases", + self._total_tables_count, + self._total_columns_count, + self._payloads_count, + len(databases), + ) + return True + except Exception as e: + status = "error" + self._log.error("Error collecting column stats: %s", e) + raise + finally: + elapsed = (time.time() * 1000) - self._collection_started_at if self._collection_started_at else 0 + tags = self._check.tags + ["status:" + status] + self._check.histogram( + "dd.postgres.column_statistics.time", + elapsed, + tags=tags, + hostname=self._check.reported_hostname, + raw=True, + ) + self._check.gauge( + "dd.postgres.column_statistics.tables_count", + self._total_tables_count, + tags=tags, + hostname=self._check.reported_hostname, + raw=True, + ) + self._check.gauge( + "dd.postgres.column_statistics.columns_count", + self._total_columns_count, + tags=tags, + hostname=self._check.reported_hostname, + raw=True, + ) + self._check.gauge( + "dd.postgres.column_statistics.payloads_count", + self._payloads_count, + tags=tags, + hostname=self._check.reported_hostname, + raw=True, + ) + self._reset() + + @tracked_method(agent_check_getter=agent_check_getter) + def _collect_for_database(self, db_name: str, tags_no_db: list[str]): + try: + with self._check.db_pool.get_connection(db_name) as conn: + with conn.cursor(row_factory=dict_row) as cursor: + filters_sql, filter_params = self._build_filters() + query = COLUMN_STATISTICS_QUERY.format(filters=filters_sql) + filter_params.append(self._config.max_tables) + cursor.execute("SET statement_timeout = %s", (MAX_QUERY_DURATION_SECONDS * 1000,)) + try: + cursor.execute(query, filter_params) + + self._handle_recovery(db_name) + + row = cursor.fetchone() + while row: + table_data = self._map_row(db_name, row) + table_data['version'] = 1 + num_columns = len(table_data['columns']) + self._queued_tables.append(table_data) + self._queued_columns_count += num_columns + self._total_tables_count += 1 + self._total_columns_count += num_columns + row = cursor.fetchone() + if self._queued_columns_count >= PAYLOAD_MAX_COLUMNS: + self._flush(tags_no_db) + finally: + try: + cursor.execute("RESET statement_timeout") + except Exception: + pass + + # Flush at database boundary + if self._queued_tables: + self._flush(tags_no_db) + + except psycopg.errors.DatabaseError as e: + if isinstance(e, psycopg.errors.UndefinedFunction): + if db_name not in self._function_not_found_dbs: + self._function_not_found_dbs.add(db_name) + self._log.warning( + "datadog.column_statistics() function not found in database '%s'. " + "Please create the function as described in the documentation: " + "https://docs.datadoghq.com/database_monitoring/setup_postgres/", + db_name, + ) + self._check.health.submit_health_event( + name=PostgresHealthEvent.COLUMN_STATISTICS_FUNCTION_NOT_FOUND, + status=HealthStatus.WARNING, + ) + elif isinstance(e, psycopg.errors.InsufficientPrivilege): + if db_name not in self._insufficient_privilege_dbs: + self._insufficient_privilege_dbs.add(db_name) + self._log.warning( + "Insufficient privileges to execute datadog.column_statistics() in database '%s'. " + "Please check the function permissions.", + db_name, + ) + self._check.health.submit_health_event( + name=PostgresHealthEvent.COLUMN_STATISTICS_INSUFFICIENT_PRIVILEGE, + status=HealthStatus.WARNING, + ) + else: + self._log.exception("Error collecting column stats for database '%s'", db_name) + except Exception: + self._log.exception("Error collecting column stats for database '%s'", db_name) + + def _handle_recovery(self, db_name: str): + if db_name in self._function_not_found_dbs: + self._function_not_found_dbs.discard(db_name) + self._log.info("datadog.column_statistics() function is now available in database '%s'", db_name) + if not self._function_not_found_dbs: + self._check.health.submit_health_event( + name=PostgresHealthEvent.COLUMN_STATISTICS_FUNCTION_NOT_FOUND, + status=HealthStatus.OK, + ) + if db_name in self._insufficient_privilege_dbs: + self._insufficient_privilege_dbs.discard(db_name) + self._log.info("datadog.column_statistics() privileges restored in database '%s'", db_name) + if not self._insufficient_privilege_dbs: + self._check.health.submit_health_event( + name=PostgresHealthEvent.COLUMN_STATISTICS_INSUFFICIENT_PRIVILEGE, + status=HealthStatus.OK, + ) + + def _map_row(self, db_name: str, row): + return { + 'db': db_name, + 'schema': row['schemaname'], + 'table': row['tablename'], + 'last_analyze_age': row.get('last_analyze_age'), + 'last_autoanalyze_age': row.get('last_autoanalyze_age'), + 'last_vacuum_age': row.get('last_vacuum_age'), + 'last_autovacuum_age': row.get('last_autovacuum_age'), + 'stats_age': row.get('stats_age'), + 'columns': row.get('columns') or [], + } + + def _flush(self, tags_no_db: list[str]): + if self._queued_tables: + event = self._base_event + event["tags"] = tags_no_db + event["timestamp"] = time.time() * 1000 + event["column_statistics"] = self._queued_tables + self._payloads_count += 1 + self._check.database_monitoring_column_statistics(json.dumps(event, default=default_json_event_encoding)) + self._queued_tables = [] + self._queued_columns_count = 0 diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index c31b8e0f9be8d..74ff0dc2054c3 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -160,6 +160,10 @@ def build_config(check: PostgreSql) -> Tuple[InstanceConfig, ValidationResult]: **dict_defaults.instance_collect_schemas().model_dump(), **(instance.get('collect_schemas', {})), }, + "collect_column_statistics": { + **dict_defaults.instance_collect_column_statistics().model_dump(), + **(instance.get('collect_column_statistics', {})), + }, # Cloud "aws": { **Aws(managed_authentication=ManagedAuthentication()).model_dump(), @@ -424,7 +428,14 @@ def validate_config(config: InstanceConfig, instance: dict, validation_result: V validation_result.add_warning(f'Invalid regex pattern in autodiscovery exclude: {exclude_pattern}') # If the user provided config explicitly enables these features, we add a warning if dbm is not enabled - dbm_required = ['query_activity', 'query_samples', 'query_metrics', 'collect_settings', 'collect_schemas'] + dbm_required = [ + 'query_activity', + 'query_samples', + 'query_metrics', + 'collect_settings', + 'collect_schemas', + 'collect_column_statistics', + ] for feature in dbm_required: if instance.get(feature, {}).get('enabled') and not config.dbm: validation_result.add_warning(f'The `{feature}` feature requires the `dbm` option to be enabled.') @@ -443,6 +454,9 @@ def apply_features(config: InstanceConfig, validation_result: ValidationResult): validation_result.add_feature(FeatureKey.COLLECT_SETTINGS, config.collect_settings.enabled and config.dbm) validation_result.add_feature(FeatureKey.COLLECT_SCHEMAS, config.collect_schemas.enabled and config.dbm) validation_result.add_feature(FeatureKey.DATA_OBSERVABILITY, config.data_observability.enabled) + validation_result.add_feature( + FeatureKey.COLLECT_COLUMN_STATISTICS, config.collect_column_statistics.enabled and config.dbm + ) METRIC_TYPES = { diff --git a/postgres/datadog_checks/postgres/config_models/dict_defaults.py b/postgres/datadog_checks/postgres/config_models/dict_defaults.py index 75820983e7eaf..bcefe54a8fff7 100644 --- a/postgres/datadog_checks/postgres/config_models/dict_defaults.py +++ b/postgres/datadog_checks/postgres/config_models/dict_defaults.py @@ -84,6 +84,20 @@ def instance_collect_settings(): ) +def instance_collect_column_statistics(): + return instance.CollectColumnStatistics( + enabled=False, + max_tables=500, + collection_interval=3600, + include_databases=[], + exclude_databases=[], + include_schemas=[], + exclude_schemas=[], + include_tables=[], + exclude_tables=[], + ) + + def instance_collect_schemas(): return instance.CollectSchemas( enabled=True, diff --git a/postgres/datadog_checks/postgres/config_models/instance.py b/postgres/datadog_checks/postgres/config_models/instance.py index 4cb722cc11751..08e42a047b31e 100644 --- a/postgres/datadog_checks/postgres/config_models/instance.py +++ b/postgres/datadog_checks/postgres/config_models/instance.py @@ -78,6 +78,22 @@ class Azure(BaseModel): managed_authentication: Optional[ManagedAuthentication1] = None +class CollectColumnStatistics(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + exclude_databases: Optional[tuple[str, ...]] = None + exclude_schemas: Optional[tuple[str, ...]] = None + exclude_tables: Optional[tuple[str, ...]] = None + include_databases: Optional[tuple[str, ...]] = None + include_schemas: Optional[tuple[str, ...]] = None + include_tables: Optional[tuple[str, ...]] = None + max_tables: Optional[float] = None + + class CollectRawQueryStatement(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -315,6 +331,7 @@ class InstanceConfig(BaseModel): collect_bloat_metrics: Optional[bool] = None collect_buffercache_metrics: Optional[bool] = None collect_checksum_metrics: Optional[bool] = None + collect_column_statistics: Optional[CollectColumnStatistics] = None collect_count_metrics: Optional[bool] = None collect_database_size_metrics: Optional[bool] = None collect_default_database: Optional[bool] = None diff --git a/postgres/datadog_checks/postgres/data/conf.yaml.example b/postgres/datadog_checks/postgres/data/conf.yaml.example index 081d386b8cf40..0c63f38d5feca 100644 --- a/postgres/datadog_checks/postgres/data/conf.yaml.example +++ b/postgres/datadog_checks/postgres/data/conf.yaml.example @@ -666,6 +666,74 @@ instances: # exclude_tables: # - customer.* + ## Enable collection of column statistics from pg_stats. + ## Requires the `datadog.column_statistics()` function to be created in the database. + # + # collect_column_statistics: + + ## @param enabled - boolean - optional - default: false + ## Enable collection of column statistics. Requires `dbm: true`. + # + # enabled: false + + ## @param collection_interval - number - optional - default: 3600 + ## Set the column statistics collection interval (in seconds). + # + # collection_interval: 3600 + + ## @param max_tables - number - optional - default: 500 + ## Maximum number of tables to collect column statistics for. + # + # max_tables: 500 + + ## @param include_databases - list of strings - optional + ## A list of regex patterns to include databases. + ## Any database whose name matches any one of these patterns will be included. + ## If empty, all databases matching other filters are included. + # + # include_databases: + # - mydb + + ## @param exclude_databases - list of strings - optional + ## A list of regex patterns to exclude databases. + ## Any database whose name matches any one of these patterns will be excluded. + ## If empty, all databases matching other filters are included. + # + # exclude_databases: + # - privatedb.* + + ## @param include_schemas - list of strings - optional + ## A list of regex patterns to include schemas. + ## Any schema whose name matches any one of these patterns will be included. + ## If empty, all schemas matching other filters are included. + # + # include_schemas: + # - myschema + + ## @param exclude_schemas - list of strings - optional + ## A list of regex patterns to exclude schemas. + ## Any schema whose name matches any one of these patterns will be excluded. + ## If empty, all schemas matching other filters are included. + # + # exclude_schemas: + # - privateschema.* + + ## @param include_tables - list of strings - optional + ## A list of regex patterns to include tables. + ## Any table whose name matches any one of these patterns will be included. + ## If empty, all tables matching other filters are included. + # + # include_tables: + # - users.* + + ## @param exclude_tables - list of strings - optional + ## A list of regex patterns to exclude tables. + ## Any table whose name matches any one of these patterns will be excluded. + ## If empty, all tables matching other filters are included. + # + # exclude_tables: + # - temp_.* + ## This block defines the configuration for AWS RDS and Aurora instances. ## ## Complete this section if you have installed the Datadog AWS Integration diff --git a/postgres/datadog_checks/postgres/diagnose.py b/postgres/datadog_checks/postgres/diagnose.py index 7520e0f34c185..4b7cdf86afe4c 100644 --- a/postgres/datadog_checks/postgres/diagnose.py +++ b/postgres/datadog_checks/postgres/diagnose.py @@ -110,7 +110,11 @@ def _run_main_dbm_probes(self, conn): def _needs_per_database_probing(self): """True when the running check fans out connections across multiple DBs.""" - return self._needs_autodiscovery_connectivity_probing() or self._needs_query_sample_setup_probing() + return ( + self._needs_autodiscovery_connectivity_probing() + or self._needs_query_sample_setup_probing() + or self._needs_column_statistics_probing() + ) def _needs_autodiscovery_connectivity_probing(self): """True when runtime autodiscovery opens per-database connections.""" @@ -122,12 +126,19 @@ def _needs_query_sample_setup_probing(self): config = self._check._config return config.dbm and config.query_samples.enabled + def _needs_column_statistics_probing(self): + """True when column-statistics will read `datadog.column_statistics()` per database.""" + config = self._check._config + return config.dbm and config.collect_column_statistics.enabled + def _run_per_database_probes(self, main_conn): """Run per-database probes along the same fan-out paths as runtime collection.""" if self._needs_autodiscovery_connectivity_probing(): self._run_autodiscovery_connectivity_probes(main_conn) if self._needs_query_sample_setup_probing(): self._run_query_sample_setup_probes(main_conn) + if self._needs_column_statistics_probing(): + self._run_column_statistics_probes(main_conn) def _run_autodiscovery_connectivity_probes(self, main_conn): """Validate connectivity to databases selected by runtime autodiscovery.""" @@ -159,6 +170,22 @@ def _run_query_sample_setup_probes(self, main_conn): if probe_conn is not main_conn: _safe_close(probe_conn) + def _run_column_statistics_probes(self, main_conn): + """Validate `datadog.column_statistics()` in every database the collector scans.""" + dbnames = self._get_autodiscovered_probe_databases() + if dbnames is None: + dbnames = [self._check._config.dbname] + for dbname in dbnames: + probe_conn = self._probe_connection_for_db(main_conn, dbname) + if probe_conn is None: + continue + try: + failed = set() + self._diagnose_column_statistics_function(probe_conn, dbname, failed) + finally: + if probe_conn is not main_conn: + _safe_close(probe_conn) + def _probe_connection_for_db(self, main_conn, dbname): """Return the main probe connection or open one for another database.""" return main_conn if dbname == self._check._config.dbname else self._open_probe_connection(dbname) @@ -689,6 +716,72 @@ def _diagnose_explain_function(self, conn, dbname=None, failed=None): failed_codes=failed, ) + def _diagnose_column_statistics_function(self, conn, dbname=None, failed=None): + exists_code = DatabaseConfigurationError.column_statistics_function_undefined + priv_code = DatabaseConfigurationError.column_statistics_function_insufficient_privilege + dbname = dbname or self._check._config.dbname + failed = self._failed if failed is None else failed + if not _schema_exists(conn, "datadog"): + if DatabaseConfigurationError.missing_datadog_schema.value not in failed: + self._diagnose_datadog_schema(conn, dbname, failed) + return + if _schema_usage_failed_key("datadog") in failed: + return + + try: + with conn.cursor() as cursor: + cursor.execute('SELECT 1 FROM "datadog"."column_statistics"() LIMIT 1') + cursor.fetchone() + except psycopg.errors.UndefinedFunction as e: + self._fail( + exists_code, + diagnosis="datadog.column_statistics() does not exist in {}: {}".format(dbname, e), + category=self._category, + description=DIAGNOSTIC_METADATA[exists_code]["description"], + remediation=build_remediation(exists_code), + rawerror=str(e), + failed_codes=failed, + ) + return + except psycopg.errors.InsufficientPrivilege as e: + self._check.diagnosis.success( + name=exists_code.value, + diagnosis="datadog.column_statistics() exists in {}.".format(dbname), + category=self._category, + ) + self._fail( + priv_code, + diagnosis="datadog user lacks EXECUTE on datadog.column_statistics() in {}: {}".format(dbname, e), + category=self._category, + description=DIAGNOSTIC_METADATA[priv_code]["description"], + remediation=build_remediation(priv_code), + rawerror=str(e), + failed_codes=failed, + ) + return + except psycopg.Error as e: + self._fail( + exists_code, + diagnosis="datadog.column_statistics() in {} returned an error: {}".format(dbname, e), + category=self._category, + description=DIAGNOSTIC_METADATA[exists_code]["description"], + remediation=build_remediation(exists_code), + rawerror=str(e), + failed_codes=failed, + ) + return + + self._check.diagnosis.success( + name=exists_code.value, + diagnosis="datadog.column_statistics() exists in {}.".format(dbname), + category=self._category, + ) + self._check.diagnosis.success( + name=priv_code.value, + diagnosis="datadog user can execute datadog.column_statistics() in {}.".format(dbname), + category=self._category, + ) + def _diagnose_config_validation(self): """Report the check's config-validation state to `agent diagnose`. diff --git a/postgres/datadog_checks/postgres/features.py b/postgres/datadog_checks/postgres/features.py index 16bd1976b84da..1e19bf82a20c8 100644 --- a/postgres/datadog_checks/postgres/features.py +++ b/postgres/datadog_checks/postgres/features.py @@ -18,6 +18,7 @@ class FeatureKey(Enum): QUERY_ACTIVITY = "query_activity" QUERY_METRICS = "query_metrics" DATA_OBSERVABILITY = "data_observability" + COLLECT_COLUMN_STATISTICS = "collect_column_statistics" FeatureNames = { @@ -28,6 +29,7 @@ class FeatureKey(Enum): FeatureKey.QUERY_ACTIVITY: 'Query Activity', FeatureKey.QUERY_METRICS: 'Query Metrics', FeatureKey.DATA_OBSERVABILITY: 'Data Observability', + FeatureKey.COLLECT_COLUMN_STATISTICS: 'Collect Column Statistics', } diff --git a/postgres/datadog_checks/postgres/filters.py b/postgres/datadog_checks/postgres/filters.py new file mode 100644 index 0000000000000..005fd5ca0833d --- /dev/null +++ b/postgres/datadog_checks/postgres/filters.py @@ -0,0 +1,25 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + + +def regex_exclude_clauses(column: str, patterns: list[str] | None) -> str: + """Build SQL fragment of `AND col !~ %s` clauses, one per pattern. + + Caller is responsible for binding `patterns` as parameters when executing. + """ + if not patterns: + return "" + return "".join(" AND {} !~ %s".format(column) for _ in patterns) + + +def regex_include_clause(column: str, patterns: list[str] | None) -> str: + """Build SQL fragment `AND (col ~ %s OR col ~ %s ...)` matching any pattern. + + Caller is responsible for binding `patterns` as parameters when executing. + """ + if not patterns: + return "" + or_clause = " OR ".join("{} ~ %s".format(column) for _ in patterns) + return f" AND ({or_clause})" diff --git a/postgres/datadog_checks/postgres/health.py b/postgres/datadog_checks/postgres/health.py index e900ad9d55032..bcd9403115ac7 100644 --- a/postgres/datadog_checks/postgres/health.py +++ b/postgres/datadog_checks/postgres/health.py @@ -19,6 +19,8 @@ class PostgresHealthEvent(Enum): """ EXPLAIN_PLAN_ERROR = 'explain_plan_error' + COLUMN_STATISTICS_FUNCTION_NOT_FOUND = 'column_statistics_function_not_found' + COLUMN_STATISTICS_INSUFFICIENT_PRIVILEGE = 'column_statistics_insufficient_privilege' class PostgresHealth(Health): diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index d8a2959754291..4e5c8040cfd6e 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -10,7 +10,9 @@ import psycopg from psycopg.rows import dict_row +from .column_statistics import PostgresColumnStatisticsCollector from .schemas import PostgresSchemaCollector +from .util import collection_interval_gcd try: import datadog_agent # type: ignore @@ -80,6 +82,8 @@ class PostgresMetadata(DBMAsyncJob): Collects database metadata. Supports: 1. cloud metadata collection for resource creations 2. collection of pg_settings + 3. schema collection + 4. column statistics collection """ def __init__(self, check: PostgreSql, config: InstanceConfig): @@ -88,19 +92,22 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): # Extensions currently doesn't have a separate collection interval option self.pg_extensions_collection_interval = self.pg_settings_collection_interval self.schemas_collection_interval = config.collect_schemas.collection_interval + self.column_statistics_collection_interval = config.collect_column_statistics.collection_interval - # by default, send resources every 10 minutes - self.collection_interval = min( + self.collection_interval = collection_interval_gcd( self.pg_extensions_collection_interval, self.pg_settings_collection_interval, self.schemas_collection_interval, + self.column_statistics_collection_interval, ) super(PostgresMetadata, self).__init__( check, rate_limit=1 / float(self.collection_interval), run_sync=config.collect_settings.run_sync, - enabled=config.collect_settings.enabled or config.collect_schemas.enabled, + enabled=config.collect_settings.enabled + or config.collect_schemas.enabled + or config.collect_column_statistics.enabled, dbms="postgres", min_collection_interval=config.min_collection_interval, expected_db_exceptions=(psycopg.errors.DatabaseError,), @@ -112,10 +119,17 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): self._collect_extensions_enabled = self._collect_pg_settings_enabled self._collect_schemas_enabled = config.collect_schemas.enabled self._schema_collector = PostgresSchemaCollector(check) if config.collect_schemas.enabled else None + self._collect_column_statistics_enabled = config.collect_column_statistics.enabled and config.dbm + self._column_statistics_collector = ( + PostgresColumnStatisticsCollector(check, self._cancel_event) + if self._collect_column_statistics_enabled + else None + ) self._compiled_patterns_cache = {} self._time_since_last_extension_query = 0 self._time_since_last_settings_query = 0 self._last_schemas_query_time = 0 + self._last_column_statistics_query_time = 0 self.column_buffer_size = 100_000 self._conn_ttl_ms = self._config.idle_connection_timeout self._tags_no_db = None @@ -124,6 +138,7 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): def _shutdown(self): self._check = None self._schema_collector = None + self._column_statistics_collector = None self._compiled_patterns_cache = None def _dbtags(self, db, *extra_tags): @@ -216,6 +231,12 @@ def report_postgres_metadata(self): ): self._collect_postgres_schemas() + if ( + self._collect_column_statistics_enabled + and time.time() - self._last_column_statistics_query_time > self.column_statistics_collection_interval + ): + self._collect_column_statistics() + @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_schemas(self): started = self._schema_collector.collect_schemas() @@ -268,3 +289,10 @@ def _collect_postgres_settings(self): self._log.debug("Loaded %s rows from pg_settings", rows) self._log.debug("Loaded %s rows from pg_settings", len(rows)) return [dict(row) for row in rows] + + @tracked_method(agent_check_getter=agent_check_getter) + def _collect_column_statistics(self): + try: + self._column_statistics_collector.collect_column_statistics(self._tags_no_db) + finally: + self._last_column_statistics_query_time = time.time() diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index caab366180792..dac424e3f9c5f 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -194,6 +194,9 @@ def __init__(self, name, init_config, instances): self.diagnosis.register(functools.partial(run_diagnostics, self)) + def database_monitoring_column_statistics(self, raw_event: str): + self.event_platform_event(raw_event, "dbm-column-statistics") + def _submit_initialization_health_event(self): try: # Handle the config validation result after we've set tags so those tags are included in the health event diff --git a/postgres/datadog_checks/postgres/schemas.py b/postgres/datadog_checks/postgres/schemas.py index 31f5b388dcd9e..b263060dbd59a 100644 --- a/postgres/datadog_checks/postgres/schemas.py +++ b/postgres/datadog_checks/postgres/schemas.py @@ -13,6 +13,7 @@ from datadog_checks.postgres import PostgreSql from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig +from datadog_checks.postgres.filters import regex_exclude_clauses, regex_include_clause from datadog_checks.postgres.version_utils import V10, V11, VersionUtils @@ -68,8 +69,8 @@ class DatabaseObject(TypedDict): FROM pg_namespace nsp LEFT JOIN pg_roles r on nsp.nspowner = r.oid WHERE nspname NOT IN ( 'information_schema', 'pg_catalog' ) - AND nspname NOT LIKE 'pg_toast%' - AND nspname NOT LIKE 'pg_temp_%' + AND nspname NOT LIKE 'pg_toast%%' + AND nspname NOT LIKE 'pg_temp_%%' """ COLUMNS_QUERY = """ @@ -204,14 +205,11 @@ def _get_databases(self): query = DATABASE_INFORMATION_QUERY params: list[str] = [] - for exclude_regex in self._config.exclude_databases: - query += " AND datname !~ %s" - params.append(exclude_regex) + query += regex_exclude_clauses("datname", self._config.exclude_databases) + params.extend(self._config.exclude_databases) - if self._config.include_databases: - or_clause = " OR ".join(["datname ~ %s"] * len(self._config.include_databases)) - query += f" AND ({or_clause})" - params.extend(self._config.include_databases) + query += regex_include_clause("datname", self._config.include_databases) + params.extend(self._config.include_databases) # Autodiscovery trumps exclude and include autodiscovery_databases = self._check.autodiscovery.get_items() if self._check.autodiscovery else [] @@ -229,41 +227,45 @@ def _get_databases(self): def _get_cursor(self, database_name): with self._check.db_pool.get_connection(database_name) as conn: with conn.cursor(row_factory=dict_row) as cursor: - query = self.get_rows_query() + query, params = self.get_rows_query() cursor.execute(f"SET statement_timeout = '{self._config.max_query_duration}s';") - cursor.execute(query) + cursor.execute(query, params) yield cursor def _get_schemas_query(self): query = SCHEMA_QUERY - for exclude_regex in self._config.exclude_schemas: - query += " AND nspname !~ '{}'".format(exclude_regex) - if self._config.include_schemas: - query += f" AND ({ - ' OR '.join(f"nspname ~ '{include_regex}'" for include_regex in self._config.include_schemas) - })" + params: list[str] = [] + + query += regex_exclude_clauses("nspname", self._config.exclude_schemas) + params.extend(self._config.exclude_schemas) + + query += regex_include_clause("nspname", self._config.include_schemas) + params.extend(self._config.include_schemas) + if self._check._config.ignore_schemas_owned_by: query += " AND nspowner :: regrole :: text not IN ({})".format( ", ".join(f"'{owner}'" for owner in self._check._config.ignore_schemas_owned_by) ) - return query + return query, params def _get_tables_query(self): if VersionUtils.parse_version(str(self._check.version)) < V10: query = PG_TABLES_QUERY_V9 else: query = PG_TABLES_QUERY_V10_PLUS - for exclude_regex in self._config.exclude_tables: - query += " AND c.relname !~ '{}'".format(exclude_regex) - if self._config.include_tables: - query += f" AND ({ - ' OR '.join(f"c.relname ~ '{include_regex}'" for include_regex in self._config.include_tables) - })" - return query + params: list[str] = [] + + query += regex_exclude_clauses("c.relname", self._config.exclude_tables) + params.extend(self._config.exclude_tables) + + query += regex_include_clause("c.relname", self._config.include_tables) + params.extend(self._config.include_tables) + + return query, params def get_rows_query(self): - schemas_query = self._get_schemas_query() - tables_query = self._get_tables_query() + schemas_query, schemas_params = self._get_schemas_query() + tables_query, tables_params = self._get_tables_query() columns_query = COLUMNS_QUERY indexes_query = PG_INDEXES_QUERY constraints_query = PG_CONSTRAINTS_QUERY @@ -347,7 +349,7 @@ def get_rows_query(self): ; """ - return query + return query, schemas_params + tables_params def _get_next(self, cursor): return cursor.fetchone() diff --git a/postgres/datadog_checks/postgres/util.py b/postgres/datadog_checks/postgres/util.py index e556fa3fa45a0..395bc73ff9123 100644 --- a/postgres/datadog_checks/postgres/util.py +++ b/postgres/datadog_checks/postgres/util.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import math import re import string from enum import Enum @@ -49,6 +50,8 @@ class DatabaseConfigurationError(Enum): pg_stat_statements_not_readable = 'pg-stat-statements-not-readable' pg_stat_database_not_readable = 'pg-stat-database-not-readable' config_validation = 'config-validation' + column_statistics_function_undefined = 'column-statistics-function-undefined' + column_statistics_function_insufficient_privilege = 'column-statistics-function-insufficient-privilege' # Docs anchor is appended to the troubleshooting URL to land the user on the right section. @@ -196,6 +199,22 @@ class DatabaseConfigurationError(Enum): ), "docs_anchor": DatabaseConfigurationError.config_validation.value, }, + DatabaseConfigurationError.column_statistics_function_undefined: { + "description": "The `datadog.column_statistics` function is required to collect column statistics.", + "remediation": ( + "Create the `datadog.column_statistics` function in every monitored database as documented in the " + "Postgres DBM setup guide." + ), + "docs_anchor": DatabaseConfigurationError.column_statistics_function_undefined.value, + }, + DatabaseConfigurationError.column_statistics_function_insufficient_privilege: { + "description": "The datadog user lacks EXECUTE on `datadog.column_statistics`.", + "remediation": ( + "Run `GRANT EXECUTE ON FUNCTION datadog.column_statistics() TO ;` in every monitored " + "database." + ), + "docs_anchor": DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value, + }, } @@ -344,6 +363,11 @@ def get_list_chunks(lst, n): yield lst[i : i + n] +def collection_interval_gcd(*intervals: float | int) -> int: + """GCD of collection intervals (seconds); outer-loop tick for jobs with multiple sub-schedules.""" + return math.gcd(*(int(i) for i in intervals)) + + SET_TRIM_PATTERN = re.compile( r""" ^ diff --git a/postgres/tests/compose/resources/03_setup.sh b/postgres/tests/compose/resources/03_setup.sh index 88440f73808c2..b48d12a0ef291 100755 --- a/postgres/tests/compose/resources/03_setup.sh +++ b/postgres/tests/compose/resources/03_setup.sh @@ -62,8 +62,20 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" "$DBNAME" <<-'EOSQL' LANGUAGE sql SECURITY DEFINER; + CREATE OR REPLACE FUNCTION datadog.column_statistics() + RETURNS TABLE ( + schemaname name, tablename name, attname name, + n_distinct real, avg_width integer, null_frac real, + inherited boolean, correlation real, most_common_freqs real[] + ) AS + $$ SELECT schemaname, tablename, attname, n_distinct, avg_width, null_frac, + inherited, correlation, most_common_freqs FROM pg_stats; $$ + LANGUAGE sql + SECURITY DEFINER; + ALTER FUNCTION datadog.explain_statement(l_query text, out explain json) OWNER TO postgres; ALTER FUNCTION datadog.pg_stat_activity() owner to postgres; + ALTER FUNCTION datadog.column_statistics() OWNER TO postgres; -- datadog.explain_statement_noaccess is not part of the standard setup -- it's added only for the purpose of testing an explain function owned by a user with inadequate permissions @@ -104,7 +116,8 @@ EOSQL done psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" dogs_nofunc <<-'EOSQL' - DROP FUNCTION datadog.explain_statement(l_query text, out explain JSON) + DROP FUNCTION datadog.explain_statement(l_query text, out explain JSON); + DROP FUNCTION datadog.column_statistics(); EOSQL # Somehow, on old postgres version (11 and 12), wal_level is incorrectly set despite diff --git a/postgres/tests/test_column_statistics.py b/postgres/tests/test_column_statistics.py new file mode 100644 index 0000000000000..95cdb39d53e57 --- /dev/null +++ b/postgres/tests/test_column_statistics.py @@ -0,0 +1,850 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from concurrent.futures.thread import ThreadPoolExecutor +from contextlib import contextmanager +from unittest.mock import MagicMock, patch + +import psycopg +import pytest + +from datadog_checks.base.utils.db.utils import DBMAsyncJob + +from .utils import _get_superconn, run_one_check + +pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] + + +@pytest.fixture +def dbm_instance(pg_instance): + pg_instance['dbm'] = True + pg_instance['min_collection_interval'] = 0.1 + pg_instance['query_samples'] = {'enabled': False} + pg_instance['query_activity'] = {'enabled': False} + pg_instance['query_metrics'] = {'enabled': False} + pg_instance['collect_settings'] = {'enabled': False, 'run_sync': True} + pg_instance['collect_schemas'] = {'enabled': False} + pg_instance['collect_column_statistics'] = { + 'enabled': True, + 'collection_interval': 36000, + 'max_tables': 100, + } + return pg_instance + + +@pytest.fixture +def nofunc_instance(pg_instance): + """Instance pointing at dogs_nofunc, which has the datadog schema but no column_statistics() function.""" + pg_instance['dbm'] = True + pg_instance['min_collection_interval'] = 0.1 + pg_instance['dbname'] = 'dogs_nofunc' + pg_instance['query_samples'] = {'enabled': False} + pg_instance['query_activity'] = {'enabled': False} + pg_instance['query_metrics'] = {'enabled': False} + pg_instance['collect_settings'] = {'enabled': False, 'run_sync': True} + pg_instance['collect_schemas'] = {'enabled': False} + pg_instance['collect_column_statistics'] = { + 'enabled': True, + 'collection_interval': 36000, + 'max_tables': 100, + } + return pg_instance + + +@pytest.fixture(autouse=True) +def stop_orphaned_threads(): + DBMAsyncJob.executor.shutdown(wait=True) + DBMAsyncJob.executor = ThreadPoolExecutor() + + +@pytest.fixture +def check_runner(integration_check, aggregator): + """Build + initialize a check via check.run(), reset the aggregator, and tear down the metadata job on exit. + + Tests that exercise the collector directly (rather than via run_one_check) use this to avoid + repeating the check.run() / aggregator.reset() / cancel+future.result() boilerplate. + """ + checks = [] + + def _runner(instance, reset_aggregator=True): + check = integration_check(instance) + check.run() + if reset_aggregator: + aggregator.reset() + checks.append(check) + return check + + yield _runner + + for c in checks: + c.cancel() + if c.metadata_samples._job_loop_future is not None: + c.metadata_samples._job_loop_future.result() + + +def _analyze_tables(pg_instance): + """Run ANALYZE on test tables so they appear in the collector's time window.""" + conn = _get_superconn(pg_instance) + with conn.cursor() as cur: + cur.execute("ANALYZE persons") + cur.execute("ANALYZE cities") + cur.execute("ANALYZE pgtable") + conn.close() + + +def _fake_pool_raising(exc): + """Return a context-managed fake pool connection whose cursor.execute raises exc on the main query. + + SET/RESET statement_timeout calls pass through so RESET cleanup in _collect_for_database still runs. + """ + + @contextmanager + def get_connection(dbname, **kwargs): + cursor = MagicMock() + cursor.__enter__.return_value = cursor + cursor.__exit__.return_value = None + + def execute(sql, *args, **kwargs): + if 'statement_timeout' in sql: + return + raise exc + + cursor.execute.side_effect = execute + conn = MagicMock() + conn.cursor.return_value = cursor + yield conn + + return get_connection + + +def test_collect_column_statistics_happy_path(integration_check, dbm_instance, pg_instance, aggregator): + """Happy-path smoke test: analyzed tables are collected and emitted with well-formed event + row + column shape.""" + _analyze_tables(pg_instance) + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected at least one column stats event" + event = events[0] + assert event['dbm_type'] == 'column_statistics' + assert event['dbms'] == 'postgres' + assert event['host'] == 'stubbed.hostname' + assert 'timestamp' in event + assert 'collection_interval' in event + assert isinstance(event['column_statistics'], list) + assert len(event['column_statistics']) > 0 + + # Verify table structure including metadata fields + table_entry = event['column_statistics'][0] + assert 'schema' in table_entry + assert 'table' in table_entry + assert 'columns' in table_entry + assert 'version' in table_entry + assert table_entry['version'] == 1 + assert len(table_entry['columns']) > 0 + assert 'last_analyze_age' in table_entry + assert 'last_autoanalyze_age' in table_entry + assert 'last_vacuum_age' in table_entry + assert 'stats_age' in table_entry + # We just ran ANALYZE, so stats_age should be recent (within 60s) + assert table_entry['stats_age'] is not None + assert table_entry['stats_age'] < 60 + + # Verify column structure + col = table_entry['columns'][0] + assert 'name' in col + assert 'avg_width' in col + assert 'n_distinct' in col + assert 'null_frac' in col + + # Verify we got all 3 analyzed tables + all_tables = set() + for evt in events: + for entry in evt['column_statistics']: + all_tables.add(entry['table']) + assert 'persons' in all_tables, f"Expected 'persons' in collected tables, got {all_tables}" + assert 'cities' in all_tables, f"Expected 'cities' in collected tables, got {all_tables}" + assert 'pgtable' in all_tables, f"Expected 'pgtable' in collected tables, got {all_tables}" + + +def test_collect_column_statistics_include_filter(integration_check, dbm_instance, pg_instance, aggregator): + """include_tables restricts the result set to tables whose name matches the regex.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['include_tables'] = ['persons'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0 + for event in events: + for table_entry in event['column_statistics']: + assert 'persons' in table_entry['table'], f"Expected only 'persons' tables but got '{table_entry['table']}'" + + +def test_collect_column_statistics_exclude_filter(integration_check, dbm_instance, pg_instance, aggregator): + """exclude_tables removes matching tables from the result set while allowing others through.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['exclude_tables'] = ['persons', 'pgtable', 'cities'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + # personsdup* tables exist but are not manually analyzed; depending on autovacuum they may or + # may not appear in pg_stats. The contract we check is: no excluded table appears anywhere. + for event in events: + for table_entry in event['column_statistics']: + assert table_entry['table'] not in ('persons', 'pgtable', 'cities'), ( + f"Excluded table '{table_entry['table']}' should not have been collected" + ) + + +def test_collect_column_statistics_max_tables(integration_check, dbm_instance, pg_instance, aggregator): + """max_tables caps the number of collected tables at the configured limit.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['max_tables'] = 1 + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + total_tables = sum(len(e['column_statistics']) for e in events) + assert total_tables == 1, f"Expected exactly 1 table with max_tables=1, got {total_tables}" + + +def test_collect_column_statistics_disabled(integration_check, dbm_instance, aggregator): + """collect_column_statistics.enabled=False suppresses collection entirely.""" + dbm_instance['collect_column_statistics']['enabled'] = False + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events when column stats collection is disabled" + + +def test_collect_column_statistics_function_not_found(integration_check, nofunc_instance, aggregator): + """Test that missing datadog.column_statistics() emits a health WARNING and no column stats.""" + check = integration_check(nofunc_instance) + run_one_check(check) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no column stats events when function doesn't exist" + + health_events = aggregator.get_event_platform_events("dbm-health") + function_not_found_events = [e for e in health_events if e['name'] == 'column_statistics_function_not_found'] + assert len(function_not_found_events) == 1 + assert function_not_found_events[0]['status'] == 'warning' + + +def test_collect_column_statistics_function_not_found_logs_once(integration_check, nofunc_instance, aggregator): + """Test that repeated failures only emit a single health WARNING.""" + nofunc_instance['collect_column_statistics']['collection_interval'] = 0.1 + check = integration_check(nofunc_instance) + run_one_check(check) + run_one_check(check) + run_one_check(check) + + health_events = aggregator.get_event_platform_events("dbm-health") + function_not_found_events = [e for e in health_events if e['name'] == 'column_statistics_function_not_found'] + assert len(function_not_found_events) == 1, ( + f"Expected exactly 1 health event but got {len(function_not_found_events)}" + ) + + +def test_collect_column_statistics_recovery(check_runner, dbm_instance, pg_instance, aggregator): + """Recovery from a prior function_not_found state emits an OK health event and clears the error set.""" + _analyze_tables(pg_instance) + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + collector._function_not_found_dbs.add('datadog_test') + + # Call collector directly — the function exists, so it succeeds and triggers recovery + collector.collect_column_statistics([]) + + health_events = aggregator.get_event_platform_events("dbm-health") + ok_events = [ + e for e in health_events if e['name'] == 'column_statistics_function_not_found' and e['status'] == 'ok' + ] + assert len(ok_events) == 1, f"Expected OK health event after recovery, got {health_events}" + assert 'datadog_test' not in collector._function_not_found_dbs, "Expected datadog_test removed from error set" + + +def test_collect_column_statistics_collection_interval_in_event( + integration_check, dbm_instance, pg_instance, aggregator +): + """Test that the configured collection_interval appears in the emitted event.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['collection_interval'] = 7200 + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected at least one column stats event" + assert events[0]['collection_interval'] == 7200 + + +def test_collect_column_statistics_include_and_exclude(integration_check, dbm_instance, pg_instance, aggregator): + """When both include and exclude match, exclude takes precedence.""" + _analyze_tables(pg_instance) + # Include all tables starting with 'p', but exclude 'pgtable' + dbm_instance['collect_column_statistics']['include_tables'] = ['^p.*'] + dbm_instance['collect_column_statistics']['exclude_tables'] = ['^pgtable$'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected events for analyzed tables starting with 'p' (e.g. 'persons')" + for event in events: + for table_entry in event['column_statistics']: + assert table_entry['table'] != 'pgtable', "pgtable should be excluded" + assert table_entry['table'].startswith('p'), ( + f"Expected only tables starting with 'p' but got '{table_entry['table']}'" + ) + + +def test_collect_column_statistics_default_config(integration_check, pg_instance, aggregator): + """Test that minimal config (just enabled) uses correct defaults.""" + _analyze_tables(pg_instance) + pg_instance['dbm'] = True + pg_instance['min_collection_interval'] = 0.1 + pg_instance['query_samples'] = {'enabled': False} + pg_instance['query_activity'] = {'enabled': False} + pg_instance['query_metrics'] = {'enabled': False} + pg_instance['collect_settings'] = {'enabled': False, 'run_sync': True} + pg_instance['collect_schemas'] = {'enabled': False} + pg_instance['collect_column_statistics'] = { + 'enabled': True, + } + check = integration_check(pg_instance) + # Capture the collector before run_one_check — cancel() nulls it on shutdown. + collector = check.metadata_samples._column_statistics_collector + run_one_check(check) + # Verify defaults + assert collector._config.collection_interval == 3600 + assert collector._config.max_tables == 500 + assert collector._config.include_databases == [] + assert collector._config.exclude_databases == [] + assert collector._config.include_schemas == [] + assert collector._config.exclude_schemas == [] + assert collector._config.include_tables == [] + assert collector._config.exclude_tables == [] + + +def test_collect_column_statistics_no_health_event_on_success(integration_check, dbm_instance, pg_instance, aggregator): + """Test that a successful collection does not emit any column stats health events.""" + _analyze_tables(pg_instance) + check = integration_check(dbm_instance) + run_one_check(check) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected column stats events" + + health_events = aggregator.get_event_platform_events("dbm-health") + column_stats_health = [ + e + for e in health_events + if e['name'] in ('column_statistics_function_not_found', 'column_statistics_insufficient_privilege') + ] + assert len(column_stats_health) == 0, f"Expected no column stats health events, got {column_stats_health}" + + +def test_collect_column_statistics_respects_interval(integration_check, dbm_instance, pg_instance, aggregator): + """Test that the second run within the collection interval does not emit a new event.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['collection_interval'] = 36000 + check = integration_check(dbm_instance) + + run_one_check(check) + first_count = len(aggregator.get_event_platform_events("dbm-column-statistics")) + assert first_count > 0, "Expected at least one event on first run" + + run_one_check(check) + second_count = len(aggregator.get_event_platform_events("dbm-column-statistics")) + assert second_count == first_count, ( + f"Expected no new events on second run (interval not elapsed), got {second_count - first_count} new" + ) + + +@patch('datadog_checks.postgres.column_statistics.PAYLOAD_MAX_COLUMNS', 5) +def test_collect_column_statistics_payload_chunking(integration_check, dbm_instance, pg_instance, aggregator): + """Test that large collections are split into multiple payloads.""" + _analyze_tables(pg_instance) + check = integration_check(dbm_instance) + run_one_check(check) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 1, f"Expected multiple payloads with low chunk threshold, got {len(events)}" + + # Every event should have valid structure + for event in events: + assert event['dbm_type'] == 'column_statistics' + assert isinstance(event['column_statistics'], list) + assert len(event['column_statistics']) > 0 + + # All 3 tables should appear across the payloads + all_tables = set() + for event in events: + for entry in event['column_statistics']: + all_tables.add(entry['table']) + assert 'persons' in all_tables + assert 'cities' in all_tables + assert 'pgtable' in all_tables + + +def test_collect_column_statistics_multi_database(check_runner, dbm_instance, pg_instance, aggregator): + """When autodiscovery returns multiple databases, the collector visits each and emits rows tagged by db.""" + from .common import PASSWORD_ADMIN, USER_ADMIN + from .utils import _get_conn + + _analyze_tables(pg_instance) + # Analyze tables in dogs database + dogs_conn = _get_conn(pg_instance, dbname='dogs', user=USER_ADMIN, password=PASSWORD_ADMIN) + with dogs_conn.cursor() as cur: + cur.execute("ANALYZE breed") + cur.execute("ANALYZE kennel") + dogs_conn.close() + + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + collector._get_databases = lambda: ['datadog_test', 'dogs'] + collector.collect_column_statistics([]) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected column stats events" + + dbs_seen = set() + for event in events: + for entry in event['column_statistics']: + dbs_seen.add(entry['db']) + assert 'datadog_test' in dbs_seen, f"Expected datadog_test in collected dbs, got {dbs_seen}" + assert 'dogs' in dbs_seen, f"Expected dogs in collected dbs, got {dbs_seen}" + + +def test_collect_column_statistics_multi_database_error_isolation(check_runner, dbm_instance, pg_instance, aggregator): + """A missing function in one db does not block other dbs; the failed db emits a health warning.""" + _analyze_tables(pg_instance) + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + collector._get_databases = lambda: ['datadog_test', 'dogs_nofunc'] + collector.collect_column_statistics([]) + + # Should still get events from datadog_test + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected column stats events from working database" + + dbs_seen = set() + for event in events: + for entry in event['column_statistics']: + dbs_seen.add(entry['db']) + assert 'datadog_test' in dbs_seen, "Expected datadog_test results despite dogs_nofunc failure" + assert 'dogs_nofunc' not in dbs_seen, "Should not have results from dogs_nofunc" + + # Should have emitted a health warning for the missing function + health_events = aggregator.get_event_platform_events("dbm-health") + function_not_found_events = [e for e in health_events if e['name'] == 'column_statistics_function_not_found'] + assert len(function_not_found_events) == 1 + + +def test_collect_column_statistics_exclude_all(integration_check, dbm_instance, pg_instance, aggregator): + """Test that excluding all tables produces no events.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['exclude_tables'] = ['.*'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events when all tables are excluded" + + +def test_collect_column_statistics_include_nonexistent(integration_check, dbm_instance, pg_instance, aggregator): + """Test that including only nonexistent tables produces no events.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['include_tables'] = ['this_table_does_not_exist'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events when no tables match include filter" + + +def test_collect_column_statistics_max_tables_zero(integration_check, dbm_instance, pg_instance, aggregator): + """Test that max_tables=0 produces no events and no error.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['max_tables'] = 0 + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events with max_tables=0" + + +def test_collect_column_statistics_insufficient_privilege(check_runner, dbm_instance, aggregator): + """InsufficientPrivilege on datadog.column_statistics() emits a dedicated health WARNING.""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + exc = psycopg.errors.InsufficientPrivilege("permission denied for function datadog.column_statistics") + with patch.object(check.db_pool, 'get_connection', _fake_pool_raising(exc)): + collector.collect_column_statistics([]) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events when privileges are insufficient" + + health_events = aggregator.get_event_platform_events("dbm-health") + priv_events = [e for e in health_events if e['name'] == 'column_statistics_insufficient_privilege'] + assert len(priv_events) == 1, f"Expected 1 insufficient-privilege health event, got {health_events}" + assert priv_events[0]['status'] == 'warning' + assert 'datadog_test' in collector._insufficient_privilege_dbs + + +def test_collect_column_statistics_insufficient_privilege_logs_once(check_runner, dbm_instance, aggregator): + """Repeated privilege failures on the same database emit a single health WARNING (dedup).""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + exc = psycopg.errors.InsufficientPrivilege("permission denied") + with patch.object(check.db_pool, 'get_connection', _fake_pool_raising(exc)): + collector.collect_column_statistics([]) + collector.collect_column_statistics([]) + collector.collect_column_statistics([]) + + health_events = aggregator.get_event_platform_events("dbm-health") + priv_events = [e for e in health_events if e['name'] == 'column_statistics_insufficient_privilege'] + assert len(priv_events) == 1, f"Expected exactly 1 privilege health event, got {len(priv_events)}" + + +def test_collect_column_statistics_insufficient_privilege_recovery(check_runner, dbm_instance, pg_instance, aggregator): + """When privileges are restored, the collector emits an OK health event and clears the error set.""" + _analyze_tables(pg_instance) + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + collector._insufficient_privilege_dbs.add('datadog_test') + + collector.collect_column_statistics([]) + + health_events = aggregator.get_event_platform_events("dbm-health") + ok_events = [ + e for e in health_events if e['name'] == 'column_statistics_insufficient_privilege' and e['status'] == 'ok' + ] + assert len(ok_events) == 1, f"Expected 1 OK health event after recovery, got {health_events}" + assert 'datadog_test' not in collector._insufficient_privilege_dbs + + +def test_collect_column_statistics_disabled_when_dbm_false(integration_check, pg_instance, aggregator): + """Column stats should not be collected when dbm=False, even if collect_column_statistics.enabled=True.""" + _analyze_tables(pg_instance) + pg_instance['dbm'] = False + pg_instance['collect_settings'] = {'enabled': False, 'run_sync': True} + pg_instance['collect_schemas'] = {'enabled': False} + pg_instance['collect_column_statistics'] = {'enabled': True, 'collection_interval': 36000, 'max_tables': 100} + check = integration_check(pg_instance) + run_one_check(check) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events when dbm=False" + + +def test_collect_column_statistics_statement_timeout_handled(check_runner, dbm_instance, aggregator): + """A statement-timeout cancellation is logged and does not raise out of the collector.""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + exc = psycopg.errors.QueryCanceled("canceling statement due to statement timeout") + with patch.object(check.db_pool, 'get_connection', _fake_pool_raising(exc)): + collector.collect_column_statistics([]) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) == 0, "Expected no events when the query times out" + # No unhandled exception means the outer handler in _collect_for_database absorbed it. + + +def test_collect_column_statistics_cancel_stops_multi_db_iteration(check_runner, dbm_instance, pg_instance): + """Setting _cancel_event between databases stops further iteration.""" + _analyze_tables(pg_instance) + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + collector._get_databases = lambda: ['datadog_test', 'dogs'] + + original = collector._collect_for_database + calls = [] + + def tracking_collect(db_name, tags_no_db): + calls.append(db_name) + if db_name == 'datadog_test': + collector._cancel_event.set() + return original(db_name, tags_no_db) + + collector._collect_for_database = tracking_collect + collector.collect_column_statistics([]) + + assert calls == ['datadog_test'], f"Expected iteration to stop after cancel, got {calls}" + + +def test_collect_column_statistics_event_payload_fields(integration_check, dbm_instance, pg_instance, aggregator): + """Event payload includes all routing and context fields required by downstream consumers.""" + _analyze_tables(pg_instance) + dbm_instance['tags'] = ['foo:bar', 'env:test'] + check = integration_check(dbm_instance) + run_one_check(check) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0 + event = events[0] + + for field in ( + 'host', + 'database_instance', + 'ddagentversion', + 'dbms', + 'dbms_version', + 'cloud_metadata', + 'dbm_type', + 'collection_interval', + 'tags', + 'timestamp', + 'column_statistics', + ): + assert field in event, f"Expected '{field}' in event payload, got keys={sorted(event.keys())}" + + assert event['dbms'] == 'postgres' + assert event['dbm_type'] == 'column_statistics' + assert event['host'] == 'stubbed.hostname' + assert event['database_instance'] + # tags_no_db must not include a `db:` tag (column_statistics aggregates across databases) + assert not any(t.startswith('db:') for t in event['tags']), ( + f"Expected no db: tag in event tags (tags_no_db), got {event['tags']}" + ) + assert 'foo:bar' in event['tags'] + + +def _assert_metric_with_tag(aggregator, metric_name, required_tag): + """Assert at least one emitted sample of metric_name carries required_tag (tag-subset match).""" + samples = aggregator.metrics(metric_name) + matches = [s for s in samples if required_tag in s.tags] + assert matches, ( + f"Expected '{metric_name}' with tag '{required_tag}' — found {len(samples)} sample(s), none matching" + ) + + +def test_collect_column_statistics_metrics_emitted_on_success(integration_check, dbm_instance, pg_instance, aggregator): + """A successful collection emits the full set of operational metrics tagged status:success.""" + _analyze_tables(pg_instance) + check = integration_check(dbm_instance) + run_one_check(check) + + for metric in ( + 'dd.postgres.column_statistics.time', + 'dd.postgres.column_statistics.tables_count', + 'dd.postgres.column_statistics.columns_count', + 'dd.postgres.column_statistics.payloads_count', + ): + _assert_metric_with_tag(aggregator, metric, 'status:success') + + +def test_collect_column_statistics_metrics_emitted_on_error(check_runner, dbm_instance, aggregator): + """When an unexpected exception escapes the per-database handler, operational metrics are tagged status:error.""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + collector._get_databases = MagicMock(side_effect=RuntimeError("simulated autodiscovery failure")) + + with pytest.raises(RuntimeError): + collector.collect_column_statistics([]) + + for metric in ( + 'dd.postgres.column_statistics.time', + 'dd.postgres.column_statistics.tables_count', + 'dd.postgres.column_statistics.columns_count', + 'dd.postgres.column_statistics.payloads_count', + ): + _assert_metric_with_tag(aggregator, metric, 'status:error') + + +def test_collect_column_statistics_special_chars_in_pattern_handled_safely( + integration_check, dbm_instance, pg_instance, aggregator +): + """Patterns with SQL-special chars (e.g. quotes) bind safely as parameters and don't break the query.""" + _analyze_tables(pg_instance) + # `bad'pattern` is a valid regex (literal `bad`, then any char, then `pattern`); matches no real table. + dbm_instance['collect_column_statistics']['exclude_tables'] = ["bad'pattern"] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + tables_seen = {entry['table'] for evt in events for entry in evt['column_statistics']} + assert {'persons', 'cities', 'pgtable'}.issubset(tables_seen) + + +def test_collect_column_statistics_invalid_regex_does_not_crash(check_runner, dbm_instance, pg_instance, aggregator): + """Invalid regex is caught at the per-database boundary; no events emitted, no exception escapes.""" + _analyze_tables(pg_instance) + dbm_instance['collect_column_statistics']['include_tables'] = ["[invalid"] + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + + # Must not raise — the outer per-database except catches the regex error and logs it. + collector.collect_column_statistics([]) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert events == [], f"Expected no events when pattern is invalid regex, got {len(events)}" + + +def test_collect_column_statistics_handles_missing_version(check_runner, dbm_instance, pg_instance, aggregator): + """Collection succeeds when check.version is None (payload_pg_version returns an empty string).""" + _analyze_tables(pg_instance) + check = check_runner(dbm_instance) + check.version = None + + collector = check.metadata_samples._column_statistics_collector + collector.collect_column_statistics([]) + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0, "Expected events even when version is unavailable" + assert events[0]['dbms_version'] == '', ( + f"Expected empty dbms_version when check.version is None, got {events[0]['dbms_version']!r}" + ) + + +def test_collect_column_statistics_uses_autodiscovered_databases(check_runner, dbm_instance): + """_get_databases returns the autodiscovered set when autodiscovery is active.""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + + autodiscovery = MagicMock() + autodiscovery.get_items.return_value = ['db_one', 'db_two'] + check.autodiscovery = autodiscovery + + assert collector._get_databases() == ['db_one', 'db_two'] + + +def test_collect_column_statistics_pre_cancelled_skips_collection(check_runner, dbm_instance, aggregator): + """If _cancel_event is already set before collection starts, no databases are visited and no events emitted.""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + visited = [] + collector._collect_for_database = lambda db, tags: visited.append(db) + + collector._cancel_event.set() + try: + collector.collect_column_statistics([]) + finally: + collector._cancel_event.clear() + + assert visited == [], f"Expected no databases visited when pre-cancelled, got {visited}" + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert events == [], "Expected no events when collection is pre-cancelled" + + +def test_collect_column_statistics_partial_recovery_waits_for_all_dbs( + check_runner, dbm_instance, pg_instance, aggregator +): + """Recovery OK event fires only after every failing db has recovered; partial recovery stays silent.""" + _analyze_tables(pg_instance) + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + + # Seed two dbs into each error set; only recover one of them. + collector._function_not_found_dbs.update({'datadog_test', 'still_broken_db'}) + collector._insufficient_privilege_dbs.update({'datadog_test', 'still_broken_db'}) + collector._get_databases = lambda: ['datadog_test'] + + collector.collect_column_statistics([]) + + health_events = aggregator.get_event_platform_events("dbm-health") + ok_events = [ + e + for e in health_events + if e['name'] in ('column_statistics_function_not_found', 'column_statistics_insufficient_privilege') + and e['status'] == 'ok' + ] + assert ok_events == [], f"Expected no OK events while other dbs remain in error, got {ok_events}" + assert 'datadog_test' not in collector._function_not_found_dbs + assert 'still_broken_db' in collector._function_not_found_dbs + assert 'datadog_test' not in collector._insufficient_privilege_dbs + assert 'still_broken_db' in collector._insufficient_privilege_dbs + + +def test_collect_column_statistics_non_database_error_is_absorbed(check_runner, dbm_instance, aggregator): + """Non-DatabaseError exceptions from the cursor are logged per-db and do not propagate out of the collector.""" + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + + # ValueError is not a psycopg DatabaseError, so it exercises the catch-all except Exception branch. + exc = ValueError("unexpected non-DB failure") + with patch.object(check.db_pool, 'get_connection', _fake_pool_raising(exc)): + collector.collect_column_statistics([]) # must not raise + + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert events == [], "Expected no events when the cursor raises a non-DatabaseError" + + +def _analyze_public2_cities(pg_instance): + """Insert a row into public2.cities and ANALYZE so it appears in pg_stats for cross-schema tests.""" + conn = _get_superconn(pg_instance) + with conn.cursor() as cur: + cur.execute("INSERT INTO public2.cities VALUES ('Tokyo', 'Japan') ON CONFLICT DO NOTHING") + cur.execute("ANALYZE public2.cities") + conn.close() + + +def test_collect_column_statistics_include_schemas(integration_check, dbm_instance, pg_instance, aggregator): + """include_schemas restricts the result set to tables in matching schemas.""" + _analyze_tables(pg_instance) + _analyze_public2_cities(pg_instance) + dbm_instance['collect_column_statistics']['include_schemas'] = ['^public2$'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0 + schemas_seen = {entry['schema'] for evt in events for entry in evt['column_statistics']} + assert schemas_seen == {'public2'} + + +def test_collect_column_statistics_exclude_schemas(integration_check, dbm_instance, pg_instance, aggregator): + """exclude_schemas removes matching schemas from the result set.""" + _analyze_tables(pg_instance) + _analyze_public2_cities(pg_instance) + dbm_instance['collect_column_statistics']['exclude_schemas'] = ['^public$'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0 + schemas_seen = {entry['schema'] for evt in events for entry in evt['column_statistics']} + assert 'public' not in schemas_seen + assert 'public2' in schemas_seen + + +def test_collect_column_statistics_include_and_exclude_schemas( + integration_check, dbm_instance, pg_instance, aggregator +): + """When both include_schemas and exclude_schemas match a schema, exclude wins.""" + _analyze_tables(pg_instance) + _analyze_public2_cities(pg_instance) + dbm_instance['collect_column_statistics']['include_schemas'] = ['^public'] + dbm_instance['collect_column_statistics']['exclude_schemas'] = ['^public2$'] + check = integration_check(dbm_instance) + run_one_check(check) + events = aggregator.get_event_platform_events("dbm-column-statistics") + assert len(events) > 0 + schemas_seen = {entry['schema'] for evt in events for entry in evt['column_statistics']} + assert schemas_seen == {'public'} + + +def test_collect_column_statistics_include_databases(check_runner, dbm_instance, aggregator): + """include_databases restricts the database list to names matching the regex.""" + dbm_instance['collect_column_statistics']['include_databases'] = ['^dogs$'] + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + autodiscovery = MagicMock() + autodiscovery.get_items.return_value = ['datadog_test', 'dogs', 'dogs_3'] + collector._check.autodiscovery = autodiscovery + + assert collector._get_databases() == ['dogs'] + + +def test_collect_column_statistics_exclude_databases(check_runner, dbm_instance, aggregator): + """exclude_databases removes matching databases from the list.""" + dbm_instance['collect_column_statistics']['exclude_databases'] = ['^dogs'] + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + autodiscovery = MagicMock() + autodiscovery.get_items.return_value = ['datadog_test', 'dogs', 'dogs_3'] + collector._check.autodiscovery = autodiscovery + + assert collector._get_databases() == ['datadog_test'] + + +def test_collect_column_statistics_include_and_exclude_databases(check_runner, dbm_instance, aggregator): + """When both filters apply, exclude takes precedence.""" + dbm_instance['collect_column_statistics']['include_databases'] = ['^dogs'] + dbm_instance['collect_column_statistics']['exclude_databases'] = ['^dogs_[345]$'] + check = check_runner(dbm_instance) + collector = check.metadata_samples._column_statistics_collector + autodiscovery = MagicMock() + autodiscovery.get_items.return_value = ['datadog_test', 'dogs', 'dogs_2', 'dogs_3', 'dogs_5', 'dogs_9'] + collector._check.autodiscovery = autodiscovery + + assert collector._get_databases() == ['dogs', 'dogs_2', 'dogs_9'] diff --git a/postgres/tests/test_config.py b/postgres/tests/test_config.py index 6ee33b245e583..f0b81d4231cf3 100644 --- a/postgres/tests/test_config.py +++ b/postgres/tests/test_config.py @@ -87,6 +87,7 @@ def test_initialize_features_enabled_and_disabled(mock_check, minimal_instance): 'query_samples': {'enabled': True}, 'collect_settings': {'enabled': True}, 'collect_schemas': {'enabled': True}, + 'collect_column_statistics': {'enabled': True}, 'query_activity': {'enabled': True}, 'query_metrics': {'enabled': True}, 'data_observability': {'enabled': True}, @@ -101,6 +102,7 @@ def test_initialize_features_enabled_and_disabled(mock_check, minimal_instance): FeatureKey.QUERY_SAMPLES, FeatureKey.COLLECT_SETTINGS, FeatureKey.COLLECT_SCHEMAS, + FeatureKey.COLLECT_COLUMN_STATISTICS, FeatureKey.QUERY_ACTIVITY, FeatureKey.QUERY_METRICS, FeatureKey.DATA_OBSERVABILITY, @@ -118,6 +120,7 @@ def test_initialize_features_disabled_by_default(mock_check, minimal_instance): assert features[FeatureKey.QUERY_SAMPLES]['enabled'] is False assert features[FeatureKey.COLLECT_SETTINGS]['enabled'] is False assert features[FeatureKey.COLLECT_SCHEMAS]['enabled'] is False + assert features[FeatureKey.COLLECT_COLUMN_STATISTICS]['enabled'] is False assert features[FeatureKey.QUERY_ACTIVITY]['enabled'] is False assert features[FeatureKey.QUERY_METRICS]['enabled'] is False diff --git a/postgres/tests/test_config_defaults.py b/postgres/tests/test_config_defaults.py index 01331add617d8..ef36c929ad40f 100644 --- a/postgres/tests/test_config_defaults.py +++ b/postgres/tests/test_config_defaults.py @@ -113,6 +113,18 @@ 'run_sync': False, 'ignored_settings_patterns': ['plpgsql%'], }, + # === DBM: Column stats collection === + 'collect_column_statistics': { + 'enabled': False, + 'max_tables': 500, + 'collection_interval': 3600, + 'include_databases': [], + 'exclude_databases': [], + 'include_schemas': [], + 'exclude_schemas': [], + 'include_tables': [], + 'exclude_tables': [], + }, # === DBM: Schema collection === 'collect_schemas': { 'enabled': True, diff --git a/postgres/tests/test_diagnose.py b/postgres/tests/test_diagnose.py index 416f937666894..57f293c0f1da2 100644 --- a/postgres/tests/test_diagnose.py +++ b/postgres/tests/test_diagnose.py @@ -518,6 +518,8 @@ def test_dbm_disabled_skips_dbm_diagnostics(integration_check, pg_instance): DatabaseConfigurationError.pg_stat_statements_not_created.value, DatabaseConfigurationError.pg_stat_statements_not_readable.value, DatabaseConfigurationError.undefined_explain_function.value, + DatabaseConfigurationError.column_statistics_function_undefined.value, + DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value, } assert not any(d['name'] in dbm_names for d in diagnoses) @@ -1341,3 +1343,162 @@ def test_config_validation_strings_are_neutral(integration_check, pg_instance): text = (entry.get(field) or "").lower() for token in forbidden: assert token not in text, "{} leaked forbidden token {!r}: {!r}".format(field, token, entry.get(field)) + + +# -- Column-statistics setup probes ------------------------------------------ + + +COLUMN_STATISTICS_CALL = 'SELECT 1 FROM "datadog"."column_statistics"() LIMIT 1' + + +def _happy_column_statistics_responses(): + return [(COLUMN_STATISTICS_CALL, [(1,)])] + + +def _column_stats_check(integration_check, pg_instance, **overrides): + return integration_check(dict(pg_instance, dbm=True, collect_column_statistics={'enabled': True}, **overrides)) + + +def test_column_statistics_not_run_when_feature_disabled(integration_check, pg_instance): + check = _dbm_check(integration_check, pg_instance) + diagnoses = _run(check, _happy_server_responses() + _happy_dbm_responses()) + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value) + + +def test_column_statistics_not_run_without_dbm(integration_check, pg_instance): + check = integration_check(dict(pg_instance, collect_column_statistics={'enabled': True})) + diagnoses = _run(check, _happy_server_responses()) + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + + +def test_column_statistics_happy_path(integration_check, pg_instance): + check = _column_stats_check(integration_check, pg_instance) + diagnoses = _run( + check, + _happy_server_responses() + _happy_dbm_responses() + _happy_column_statistics_responses(), + ) + _assert_all_succeed( + diagnoses, + [ + DatabaseConfigurationError.column_statistics_function_undefined, + DatabaseConfigurationError.column_statistics_function_insufficient_privilege, + ], + ) + + +def test_column_statistics_undefined_function_fails(integration_check, pg_instance): + check = _column_stats_check(integration_check, pg_instance) + cs_responses = _override( + _happy_column_statistics_responses(), + COLUMN_STATISTICS_CALL, + psycopg.errors.UndefinedFunction('function datadog.column_statistics() does not exist'), + ) + diagnoses = _run(check, _happy_server_responses() + _happy_dbm_responses() + cs_responses) + fails = _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + assert len(fails) == 1 and fails[0]['result'] == Diagnosis.DIAGNOSIS_FAIL + assert 'column_statistics' in fails[0]['diagnosis'] + # Privilege row is suppressed when the function doesn't exist. + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value) + + +def test_column_statistics_insufficient_privilege_fails(integration_check, pg_instance): + check = _column_stats_check(integration_check, pg_instance) + cs_responses = _override( + _happy_column_statistics_responses(), + COLUMN_STATISTICS_CALL, + psycopg.errors.InsufficientPrivilege('permission denied for function column_statistics'), + ) + diagnoses = _run(check, _happy_server_responses() + _happy_dbm_responses() + cs_responses) + exists = _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + assert len(exists) == 1 and exists[0]['result'] == Diagnosis.DIAGNOSIS_SUCCESS + priv = _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value) + assert len(priv) == 1 and priv[0]['result'] == Diagnosis.DIAGNOSIS_FAIL + assert 'EXECUTE' in priv[0]['diagnosis'] or 'permission denied' in priv[0]['diagnosis'] + + +def test_column_statistics_other_error_fails_on_exists_code(integration_check, pg_instance): + """Generic psycopg errors (broken function body, wrong signature) surface on the exists code.""" + check = _column_stats_check(integration_check, pg_instance) + cs_responses = _override( + _happy_column_statistics_responses(), + COLUMN_STATISTICS_CALL, + psycopg.errors.DatabaseError('function returned wrong number of columns'), + ) + diagnoses = _run(check, _happy_server_responses() + _happy_dbm_responses() + cs_responses) + fails = _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + assert len(fails) == 1 and fails[0]['result'] == Diagnosis.DIAGNOSIS_FAIL + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value) + + +def test_column_statistics_skipped_when_datadog_schema_missing(integration_check, pg_instance): + """Schema-missing cascade replaces the column-statistics row with the root-cause row.""" + check = _column_stats_check( + integration_check, + pg_instance, + query_samples={'enabled': False}, + query_metrics={'enabled': False}, + query_activity={'enabled': False}, + ) + cs_responses = [(_schema('datadog'), [])] + diagnoses = _run(check, _happy_server_responses() + cs_responses) + schema_fails = [ + d + for d in _by_name(diagnoses, DatabaseConfigurationError.missing_datadog_schema.value) + if d['result'] == Diagnosis.DIAGNOSIS_FAIL + ] + assert schema_fails + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + assert not _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_insufficient_privilege.value) + + +def test_column_statistics_iterates_autodiscovered_databases(integration_check, pg_instance): + check = _column_stats_check( + integration_check, + pg_instance, + query_samples={'enabled': False}, + query_metrics={'enabled': False}, + query_activity={'enabled': False}, + database_autodiscovery={'enabled': True, 'include': ['app_.*']}, + ) + happy_cs = [ + (_schema('datadog'), [(1,)]), + (COLUMN_STATISTICS_CALL, [(1,)]), + ] + broken_cs = [ + (_schema('datadog'), [(1,)]), + (COLUMN_STATISTICS_CALL, psycopg.errors.UndefinedFunction('function does not exist')), + ] + connections = { + pg_instance['dbname']: FakeConn(_happy_server_responses() + happy_cs), + 'app_a': FakeConn(happy_cs), + 'app_b': FakeConn(broken_cs), + } + with mock.patch.object(check.autodiscovery, 'get_items', return_value=['app_a', 'app_b']): + with _patch_per_db(connections): + diagnoses = _get_diagnoses(check) + + fails = [ + d + for d in _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + if d['result'] == Diagnosis.DIAGNOSIS_FAIL + ] + assert len(fails) == 1 and 'app_b' in fails[0]['diagnosis'] + + +def test_column_statistics_uses_configured_dbname_when_no_autodiscovery(integration_check, pg_instance): + check = _column_stats_check( + integration_check, + pg_instance, + query_samples={'enabled': False}, + query_metrics={'enabled': False}, + query_activity={'enabled': False}, + ) + responses = _happy_server_responses() + [(_schema('datadog'), [(1,)])] + _happy_column_statistics_responses() + with _patch_connection(check, FakeConn(responses)) as connect: + diagnoses = _get_diagnoses(check) + opened_dbnames = {call.kwargs['dbname'] for call in connect.call_args_list} + assert opened_dbnames == {pg_instance['dbname']} + rows = _by_name(diagnoses, DatabaseConfigurationError.column_statistics_function_undefined.value) + assert len(rows) == 1 and rows[0]['result'] == Diagnosis.DIAGNOSIS_SUCCESS + assert pg_instance['dbname'] in rows[0]['diagnosis'] diff --git a/postgres/tests/test_filters.py b/postgres/tests/test_filters.py new file mode 100644 index 0000000000000..cc39c51771e35 --- /dev/null +++ b/postgres/tests/test_filters.py @@ -0,0 +1,52 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import pytest + +from datadog_checks.postgres.filters import regex_exclude_clauses, regex_include_clause + + +@pytest.mark.unit +class TestRegexExcludeClauses: + def test_empty_returns_empty_string(self): + assert regex_exclude_clauses("c.relname", []) == "" + + def test_none_returns_empty_string(self): + assert regex_exclude_clauses("c.relname", None) == "" + + def test_single_pattern(self): + assert regex_exclude_clauses("c.relname", ["temp_.*"]) == " AND c.relname !~ %s" + + def test_multiple_patterns_each_get_own_clause(self): + result = regex_exclude_clauses("nspname", ["pg_temp_.*", "_partitions$"]) + assert result == " AND nspname !~ %s AND nspname !~ %s" + + def test_pattern_value_does_not_appear_in_sql(self): + """Helper produces only placeholders; pattern values flow as separate cursor.execute params.""" + assert regex_exclude_clauses("c.relname", ["^p[0-9]+$"]) == " AND c.relname !~ %s" + + def test_different_columns(self): + assert regex_exclude_clauses("datname", ["dogs_[345]"]) == " AND datname !~ %s" + + +@pytest.mark.unit +class TestRegexIncludeClause: + def test_empty_returns_empty_string(self): + assert regex_include_clause("c.relname", []) == "" + + def test_none_returns_empty_string(self): + assert regex_include_clause("c.relname", None) == "" + + def test_single_pattern_still_wrapped_in_parens(self): + assert regex_include_clause("c.relname", ["users.*"]) == " AND (c.relname ~ %s)" + + def test_multiple_patterns_or_joined(self): + result = regex_include_clause("nspname", ["^app_.*", "^reports$"]) + assert result == " AND (nspname ~ %s OR nspname ~ %s)" + + def test_pattern_value_does_not_appear_in_sql(self): + """Helper produces only placeholders; pattern values flow as separate cursor.execute params.""" + assert regex_include_clause("c.relname", ["^p[0-9]+$"]) == " AND (c.relname ~ %s)" + + def test_different_columns(self): + assert regex_include_clause("datname", ["^prod_"]) == " AND (datname ~ %s)" diff --git a/postgres/tests/test_schemas.py b/postgres/tests/test_schemas.py index fd885fcae2661..16b5c47abec9f 100644 --- a/postgres/tests/test_schemas.py +++ b/postgres/tests/test_schemas.py @@ -51,6 +51,37 @@ def test_databases_filters(dbm_instance, integration_check): assert 'nope' not in datbase_names +def test_databases_include_filter(dbm_instance, integration_check): + dbm_instance['collect_schemas']['include_databases'] = ['^dogs_[0-9]$'] + check = integration_check(dbm_instance) + collector = PostgresSchemaCollector(check) + + databases = collector._get_databases() + database_names = [database['name'] for database in databases] + for n in range(10): + assert f'dogs_{n}' in database_names + assert 'dogs' not in database_names + assert 'postgres' not in database_names + assert 'datadog_test' not in database_names + + +def test_databases_include_and_exclude_filter(dbm_instance, integration_check): + dbm_instance['collect_schemas']['exclude_databases'] = ['dogs_[345]'] + dbm_instance['collect_schemas']['include_databases'] = ['^dogs_[0-9]$', '^postgres$'] + check = integration_check(dbm_instance) + collector = PostgresSchemaCollector(check) + + databases = collector._get_databases() + database_names = [database['name'] for database in databases] + assert 'postgres' in database_names + for n in (0, 1, 2, 6, 7, 8, 9): + assert f'dogs_{n}' in database_names + for n in (3, 4, 5): + assert f'dogs_{n}' not in database_names + assert 'dogs' not in database_names + assert 'datadog_test' not in database_names + + def test_get_cursor(dbm_instance, integration_check): check = integration_check(dbm_instance) check.version = POSTGRES_VERSION @@ -80,6 +111,37 @@ def test_schemas_filters(dbm_instance, integration_check): assert set(schemas) == {'datadog', 'hstore'} +def test_schemas_include_filter(dbm_instance, integration_check): + dbm_instance['collect_schemas']['include_schemas'] = ['^datadog$'] + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + schemas = [] + for row in cursor: + schemas.append(row['schema_name']) + + assert set(schemas) == {'datadog'} + + +def test_schemas_include_and_exclude_filter(dbm_instance, integration_check): + dbm_instance['collect_schemas']['exclude_schemas'] = ['^hstore$'] + dbm_instance['collect_schemas']['include_schemas'] = ['^datadog$', '^public2$'] + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + schemas = [] + for row in cursor: + schemas.append(row['schema_name']) + + assert set(schemas) == {'datadog', 'public2'} + + def test_tables(dbm_instance, integration_check): check = integration_check(dbm_instance) check.version = POSTGRES_VERSION @@ -119,6 +181,41 @@ def test_tables(dbm_instance, integration_check): assert set(tables) == expected_tables +def test_tables_include_filter(dbm_instance, integration_check): + dbm_instance['collect_schemas']['include_tables'] = ['^persons$'] + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + tables = [row['table_name'] for row in cursor if row['table_name']] + + assert set(tables) == {'persons'} + + +def test_tables_include_and_exclude_filter(dbm_instance, integration_check): + dbm_instance['collect_schemas']['include_tables'] = ['^persons'] + dbm_instance['collect_schemas']['exclude_tables'] = ['^personsdup[1-9]$'] + check = integration_check(dbm_instance) + check.version = POSTGRES_VERSION + collector = PostgresSchemaCollector(check) + + with collector._get_cursor('datadog_test') as cursor: + assert cursor is not None + tables = [row['table_name'] for row in cursor if row['table_name']] + + expected = { + 'persons', + 'persons_indexed', + 'personsdup10', + 'personsdup11', + 'personsdup12', + 'personsdup13', + } + assert set(tables) == expected + + # def test_columns(dbm_instance, integration_check): # check = integration_check(dbm_instance) # check.version = POSTGRES_VERSION diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index f2983eae8befe..1c5d216e00cf9 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -430,6 +430,7 @@ def dbm_instance(pg_instance): pg_instance['query_samples'] = {'enabled': True, 'run_sync': True, 'collection_interval': 0.2} pg_instance['query_activity'] = {'enabled': True, 'collection_interval': 0.2} pg_instance['collect_settings'] = {'enabled': False} + pg_instance['collect_column_statistics'] = {'enabled': False} # Set collection_interval close to 0. This is needed if the test runs the check multiple times. # This prevents DBMAsync from skipping job executions, as it is designed # to not execute jobs more frequently than their collection period. diff --git a/postgres/tests/test_unit.py b/postgres/tests/test_unit.py index decd10285d07d..7efdb71479139 100644 --- a/postgres/tests/test_unit.py +++ b/postgres/tests/test_unit.py @@ -242,6 +242,22 @@ def test_trim_set_stmts(query, expected_trimmed_query): assert trimmed_query == expected_trimmed_query +@pytest.mark.unit +@pytest.mark.parametrize( + 'intervals, expected', + [ + pytest.param((600, 600, 600, 3600), 600, id='all-multiples-of-min'), + pytest.param((600, 600, 600, 4500), 300, id='min-not-equal-to-gcd'), + pytest.param((600,), 600, id='single-interval'), + pytest.param((600, 0, 3600), 600, id='zero-does-not-constrain-gcd'), + pytest.param((600.0, 3600.0), 600, id='float-inputs-cast-to-int'), + pytest.param((900, 1500), 300, id='gcd-smaller-than-any-input'), + ], +) +def test_collection_interval_gcd(intervals, expected): + assert util.collection_interval_gcd(*intervals) == expected + + @pytest.mark.unit @pytest.mark.parametrize( 'exclude_hostname, expected_hostname', @@ -402,6 +418,7 @@ def test_check_gc_after_cancel(pg_instance): pg_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 10} pg_instance['query_activity'] = {'enabled': True, 'collection_interval': 1} pg_instance['data_observability'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1} + pg_instance['collect_column_statistics'] = {'enabled': True, 'collection_interval': 60} check = PostgreSql('postgres', {}, [pg_instance]) ref = weakref.ref(check) @@ -424,3 +441,24 @@ def test_check_gc_after_cancel(pg_instance): fail(f"Check still alive after cancel() + del -- pinned by: {referrers}") finally: gc.enable() + + +def test_collect_column_statistics_updates_timestamp_on_failure(pg_instance): + pg_instance['dbm'] = True + pg_instance['collect_column_statistics'] = {'enabled': True, 'collection_interval': 60} + + check = PostgreSql('postgres', {}, [pg_instance]) + metadata = check.metadata_samples + metadata._tags_no_db = [] + + with mock.patch.object( + metadata._column_statistics_collector, + 'collect_column_statistics', + side_effect=RuntimeError('boom'), + ): + before = metadata._last_column_statistics_query_time + with pytest.raises(RuntimeError): + metadata._collect_column_statistics() + after = metadata._last_column_statistics_query_time + + assert after > before