Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions clickhouse/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,34 @@ files:
type: boolean
example: false
fleet_configurable: false
- name: schema_metrics
description: |
Configure per-table size and per-view refresh gauges derived from
`system.tables` and `system.view_refreshes`. Independent of
`collect_schemas` (which controls catalog structure collection)
so users can dashboard table sizes without enabling Schema Explorer.
Requires `dbm: true`.
options:
- name: enabled
description: |
Enable collection of per-table size and per-view refresh gauges.
value:
type: boolean
example: false
- name: collection_interval
description: |
Set the schema metrics collection interval (in seconds). These
gauges change continuously, so 60s is a reasonable default.
value:
type: number
example: 60
- name: run_sync
hidden: true
description: |
Run the schema metrics collection synchronously. For testing only.
value:
type: boolean
example: false
- name: collect_schemas
description: |
Configure collection of ClickHouse catalog metadata (databases,
Expand Down
1 change: 1 addition & 0 deletions clickhouse/changelog.d/23900.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ClickHouse schema metrics: per-table size gauges and per-view refresh status gauges under schema_metrics.
15 changes: 15 additions & 0 deletions clickhouse/datadog_checks/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .query_errors import ClickhouseQueryErrors
from .statement_samples import ClickhouseStatementSamples
from .statements import ClickhouseStatementMetrics
from .table_metrics import ClickhouseTableMetrics
from .utils import ErrorSanitizer

try:
Expand Down Expand Up @@ -123,6 +124,12 @@ def _init_dbm_components(self):
else:
self.query_errors = None

# Initialize schema metrics (per-table size and per-view refresh gauges)
if self._config.dbm and self._config.schema_metrics.enabled:
self.table_metrics = ClickhouseTableMetrics(self, self._config.schema_metrics)
else:
self.table_metrics = None

# Initialize schema collection (catalog metadata for Schema Explorer)
if self._config.dbm and self._config.collect_schemas.enabled:
self.metadata = ClickhouseMetadata(self)
Expand Down Expand Up @@ -267,6 +274,10 @@ def check(self, _):
if self.query_errors:
self.query_errors.run_job_loop(self.tags)

# Run schema metrics (per-table size and per-view refresh gauges) if enabled
if self.table_metrics:
self.table_metrics.run_job_loop(self.tags)

# Run schema collection if enabled
if self.metadata:
self.metadata.run_job_loop(self.tags)
Expand Down Expand Up @@ -540,6 +551,8 @@ def cancel(self):
self.query_completions.cancel()
if self.query_errors:
self.query_errors.cancel()
if self.table_metrics:
self.table_metrics.cancel()
if self.metadata:
self.metadata.cancel()
if self.parts_and_merges:
Expand All @@ -554,6 +567,8 @@ def cancel(self):
self.query_completions._job_loop_future.result()
if self.query_errors and self.query_errors._job_loop_future:
self.query_errors._job_loop_future.result()
if self.table_metrics and self.table_metrics._job_loop_future:
self.table_metrics._job_loop_future.result()
if self.metadata and self.metadata._job_loop_future:
self.metadata._job_loop_future.result()
if self.parts_and_merges and self.parts_and_merges._job_loop_future:
Expand Down
9 changes: 9 additions & 0 deletions clickhouse/datadog_checks/clickhouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ def build_config(check: ClickhouseCheck) -> Tuple[InstanceConfig, ValidationResu
**dict_defaults.instance_parts_and_merges().model_dump(),
**(instance.get('parts_and_merges', {})),
},
"schema_metrics": {
**dict_defaults.instance_schema_metrics().model_dump(),
**(instance.get('schema_metrics', {})),
},
"collect_schemas": {
**dict_defaults.instance_collect_schemas().model_dump(),
**(instance.get('collect_schemas', {})),
Expand Down Expand Up @@ -315,6 +319,11 @@ def _apply_features(config: InstanceConfig, validation_result: ValidationResult)
config.parts_and_merges.enabled and config.dbm,
None if config.dbm else "Requires `dbm: true`",
)
validation_result.add_feature(
FeatureKey.SCHEMA_METRICS,
config.schema_metrics.enabled and config.dbm,
None if config.dbm else "Requires `dbm: true`",
)
validation_result.add_feature(FeatureKey.SINGLE_ENDPOINT_MODE, config.single_endpoint_mode)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def instance_query_errors():
)


def instance_schema_metrics():
return instance.SchemaMetrics(
enabled=False,
collection_interval=60,
run_sync=False,
)


def instance_collect_schemas():
return instance.CollectSchemas(
enabled=False,
Expand Down
11 changes: 11 additions & 0 deletions clickhouse/datadog_checks/clickhouse/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ class QuerySamples(BaseModel):
run_sync: Optional[bool] = None


class SchemaMetrics(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=True,
)
collection_interval: Optional[float] = None
enabled: Optional[bool] = None
run_sync: Optional[bool] = None


class InstanceConfig(BaseModel):
model_config = ConfigDict(
validate_default=True,
Expand Down Expand Up @@ -166,6 +176,7 @@ class InstanceConfig(BaseModel):
query_samples: Optional[QuerySamples] = None
read_timeout: Optional[int] = None
reported_hostname: Optional[str] = None
schema_metrics: Optional[SchemaMetrics] = None
server: str
service: Optional[str] = None
single_endpoint_mode: Optional[bool] = None
Expand Down
19 changes: 19 additions & 0 deletions clickhouse/datadog_checks/clickhouse/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ instances:
#
# samples_per_hour_per_query: 60

## Configure per-table size and per-view refresh gauges derived from
## `system.tables` and `system.view_refreshes`. Independent of
## `collect_schemas` (which controls catalog structure collection)
## so users can dashboard table sizes without enabling Schema Explorer.
## Requires `dbm: true`.
#
# schema_metrics:

## @param enabled - boolean - optional - default: false
## Enable collection of per-table size and per-view refresh gauges.
#
# enabled: false

## @param collection_interval - number - optional - default: 60
## Set the schema metrics collection interval (in seconds). These
## gauges change continuously, so 60s is a reasonable default.
#
# collection_interval: 60

## Configure collection of ClickHouse catalog metadata (databases,
## tables, views, columns) for Database Monitoring's Schema Explorer.
## Requires `dbm: true`.
Expand Down
2 changes: 2 additions & 0 deletions clickhouse/datadog_checks/clickhouse/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class FeatureKey(Enum):
QUERY_COMPLETIONS = "query_completions"
EXPLAIN_PLANS = "explain_plans"
QUERY_ERRORS = "query_errors"
SCHEMA_METRICS = "schema_metrics"
COLLECT_SCHEMAS = "collect_schemas"
PARTS_AND_MERGES = "parts_and_merges"
SINGLE_ENDPOINT_MODE = "single_endpoint_mode"
Expand All @@ -35,6 +36,7 @@ class FeatureKey(Enum):
FeatureKey.QUERY_COMPLETIONS: 'Query Completions',
FeatureKey.QUERY_ERRORS: 'Query Errors',
FeatureKey.EXPLAIN_PLANS: 'Explain Plans',
FeatureKey.SCHEMA_METRICS: 'Schema Metrics',
FeatureKey.COLLECT_SCHEMAS: 'Collect Schemas',
FeatureKey.PARTS_AND_MERGES: 'Parts and Merges',
FeatureKey.SINGLE_ENDPOINT_MODE: 'Single Endpoint Mode',
Expand Down
170 changes: 170 additions & 0 deletions clickhouse/datadog_checks/clickhouse/table_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from typing import TYPE_CHECKING

from clickhouse_connect.driver.exceptions import OperationalError

if TYPE_CHECKING:
from datadog_checks.clickhouse import ClickhouseCheck
from datadog_checks.clickhouse.config_models.instance import SchemaMetrics

from datadog_checks.base import AgentCheck
from datadog_checks.base.utils.db.utils import DBMAsyncJob
from datadog_checks.base.utils.tracking import tracked_method

DEFAULT_COLLECTION_INTERVAL = 60

_TABLE_SIZES_QUERY = """\
SELECT
database,
name,
toInt64(total_rows) AS total_rows,
toInt64(total_bytes) AS total_bytes
FROM {tables_table}
WHERE database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')
LIMIT 1 BY database, name
"""

_VIEW_REFRESHES_QUERY = """\
SELECT
database,
view,
hostName() AS host,
status,
exception,
toInt64(toUnixTimestamp(last_success_time)) AS last_refresh_time,
toInt64(toUnixTimestamp(next_refresh_time)) AS next_refresh_time,
toInt64(written_rows) AS written_rows,
toInt64(written_bytes) AS written_bytes
FROM {view_refreshes_table}
LIMIT 1 BY database, view, host
"""

_VIEW_REFRESH_STATUS_MAP = {
'Scheduled': AgentCheck.OK,
'Running': AgentCheck.OK,
'WaitingForDependencies': AgentCheck.WARNING,
'Disabled': AgentCheck.UNKNOWN,
'Error': AgentCheck.CRITICAL,
}


def agent_check_getter(self):
return self._check


class ClickhouseTableMetrics(DBMAsyncJob):
"""Per-table size and per-view refresh gauges from system.tables and system.view_refreshes."""

def __init__(self, check: ClickhouseCheck, config: SchemaMetrics):
collection_interval = config.collection_interval
if collection_interval is None or collection_interval <= 0:
collection_interval = DEFAULT_COLLECTION_INTERVAL

super(ClickhouseTableMetrics, self).__init__(
check,
rate_limit=1 / collection_interval,
run_sync=config.run_sync,
enabled=config.enabled,
dbms='clickhouse',
min_collection_interval=check._config.min_collection_interval,
expected_db_exceptions=(Exception,),
job_name='clickhouse-table-metrics',
)
self._check = check
self._config = config
self._collection_interval = collection_interval
self._db_client = None
self._view_refreshes_unsupported_logged = False
self._view_refreshes_permission_logged = False
self._view_refreshes_skip = False

def cancel(self):
super(ClickhouseTableMetrics, self).cancel()
self._close_db_client()

def _close_db_client(self):
if self._db_client:
try:
self._db_client.close()
except Exception as e:
self._log.debug("Error closing table-metrics client: %s", e)
self._db_client = None

def _execute_query(self, query: str) -> list:
if self._db_client is None:
self._db_client = self._check.create_dbm_client()
self._db_client.set_client_setting('max_execution_time', self._collection_interval)
try:
return self._db_client.query(query).result_rows
except OperationalError as e:
self._log.warning("Connection error on table-metrics query, will reconnect: %s", e)
self._close_db_client()
raise

@tracked_method(agent_check_getter=agent_check_getter)
def run_job(self):
self._emit_table_size_gauges()
self._collect_view_refresh_metrics()

def _emit_table_size_gauges(self) -> None:
try:
rows = self._execute_query(_TABLE_SIZES_QUERY.format(tables_table=self._check.get_system_table('tables')))
except Exception:
self._log.exception("Failed to collect clickhouse table sizes")
return

# Drop the instance-level `db:` base tag (the connection database) so each
# per-table series carries exactly one `db:` tag — the table's own database.
base_tags = [t for t in self._check.tags if not t.startswith('db:')]
for database, name, total_rows, total_bytes in rows:
entity_tags = base_tags + [f'db:{database}', f'table:{name}']
self._check.gauge('table.rows', int(total_rows or 0), tags=entity_tags)
self._check.gauge('table.bytes', int(total_bytes or 0), tags=entity_tags)

def _collect_view_refresh_metrics(self) -> None:
if self._view_refreshes_skip:
return
try:
rows = self._check.execute_query_raw(
_VIEW_REFRESHES_QUERY.format(view_refreshes_table=self._check.get_system_table('view_refreshes'))
)
except Exception as e:
self._handle_view_refreshes_error(e)
return

# Drop the instance-level `db:` base tag (the connection database) so each
# per-view series carries exactly one `db:` tag — the view's own database.
base_tags = [t for t in self._check.tags if not t.startswith('db:')]
for database, view_name, host, status, _exception, last_time, next_time, written_rows, written_bytes in rows:
view_tags = base_tags + [f'db:{database}', f'view:{view_name}', f'host:{host}']
refresh_status = _VIEW_REFRESH_STATUS_MAP.get(status, AgentCheck.UNKNOWN)
self._check.gauge('view.refresh.status', refresh_status, tags=view_tags)
self._check.gauge('view.refresh.last_time', int(last_time or 0), tags=view_tags)
self._check.gauge('view.refresh.next_time', int(next_time or 0), tags=view_tags)
self._check.gauge('view.refresh.rows', int(written_rows or 0), tags=view_tags)
self._check.gauge('view.refresh.bytes', int(written_bytes or 0), tags=view_tags)

def _handle_view_refreshes_error(self, e: Exception) -> None:
lowered = str(e).lower()
if 'unknown table' in lowered or 'unknowntable' in lowered or 'unknown_table' in lowered:
if not self._view_refreshes_unsupported_logged:
self._log.info(
"system.view_refreshes not present (ClickHouse < 24.3); refresh status will not be populated."
)
self._view_refreshes_unsupported_logged = True
self._view_refreshes_skip = True
elif 'not enough privileges' in lowered or 'access_denied' in lowered:
if not self._view_refreshes_permission_logged:
self._log.warning(
"Agent user lacks SELECT on system.view_refreshes; refresh status will not be populated. "
"Grant with: GRANT SELECT ON system.view_refreshes TO <agent_user>. "
"Restart the agent after granting access."
)
self._view_refreshes_permission_logged = True
self._view_refreshes_skip = True
else:
self._log.exception("Unexpected error querying system.view_refreshes")
6 changes: 6 additions & 0 deletions clickhouse/tests/test_config_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
'max_samples_per_collection': 1000,
'run_sync': False,
},
# === DBM: Schema metrics ===
'schema_metrics': {
'enabled': False,
'collection_interval': 60,
'run_sync': False,
},
# === DBM: Schema collector ===
'collect_schemas': {
'enabled': False,
Expand Down
Loading
Loading