Skip to content

Commit 5d0e661

Browse files
Replace runtime regex with static cluster-aware query variants
Instead of rewriting system-table metric queries with a regex at runtime, define static cluster-aware query variants and select them in get_queries() based on single_endpoint_mode. - Add cluster_aware_query() helper in utils.py that reuses the base query's columns by reference (no duplication) and appends the clickhouse_node tag. - Define legacy variants in queries.py and advanced variants in advanced_queries (JSON-backed via load_match_query + SystemErrorsClusterAware). - Remove import re, SYSTEM_TABLE_FROM_CLAUSE, and make_cluster_aware(). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 611fb79 commit 5d0e661

5 files changed

Lines changed: 113 additions & 70 deletions

File tree

clickhouse/datadog_checks/clickhouse/advanced_queries/__init__.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,18 @@
5959
import os
6060
from typing import Any
6161

62-
__all__ = ['SystemAsynchronousMetrics', 'SystemErrors', 'SystemEvents', 'SystemMetrics']
62+
from datadog_checks.clickhouse.utils import cluster_aware_query
63+
64+
__all__ = [
65+
'SystemAsynchronousMetrics',
66+
'SystemAsynchronousMetricsClusterAware',
67+
'SystemErrors',
68+
'SystemErrorsClusterAware',
69+
'SystemEvents',
70+
'SystemEventsClusterAware',
71+
'SystemMetrics',
72+
'SystemMetricsClusterAware',
73+
]
6374

6475
DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data')
6576

@@ -69,6 +80,10 @@
6980
'SystemAsynchronousMetrics': 'system_async_metrics',
7081
}
7182

83+
# Suffix that selects the single-endpoint-mode variant of a bulk match query (e.g.
84+
# 'SystemEventsClusterAware') through __getattr__.
85+
CLUSTER_AWARE_SUFFIX = 'ClusterAware'
86+
7287
_match_query_cache: dict[str, dict[str, Any]] = {}
7388

7489
SystemErrors: dict[str, Any] = {
@@ -82,14 +97,18 @@
8297
],
8398
}
8499

100+
SystemErrorsClusterAware: dict[str, Any] = cluster_aware_query(
101+
SystemErrors, 'value, name, code, remote', 'errors', where='WHERE value > 0'
102+
)
103+
85104

86-
def load_match_query(name: str) -> dict[str, Any]:
105+
def load_match_query(name: str, cluster_aware: bool = False) -> dict[str, Any]:
87106
"""Read ``data/<name>.json`` and reconstitute the QueryManager-shaped dict."""
88107
try:
89108
with open(os.path.join(DATA_DIR, f'{name}.json'), encoding='utf-8') as f:
90109
spec = json.load(f)
91110
items = _expand_match_items(spec['items'], spec['prefix'])
92-
return {
111+
base = {
93112
'name': spec['name'],
94113
'query': spec['query'],
95114
'columns': [
@@ -102,13 +121,18 @@ def load_match_query(name: str) -> dict[str, Any]:
102121
},
103122
],
104123
}
124+
if cluster_aware:
125+
# value_column/match_column are logical labels, not SQL identifiers, so derive the
126+
# real SELECT list and table from the (fixed-shape) generated query string.
127+
select, _, tail = spec['query'].partition(' FROM system.')
128+
table, _, where = tail.partition(' ')
129+
return cluster_aware_query(base, select.removeprefix('SELECT '), table, where=where)
130+
return base
105131
except (OSError, json.JSONDecodeError, KeyError, TypeError, AttributeError) as exc:
106132
raise RuntimeError(f'failed to load advanced query {name!r}') from exc
107133

108134

109-
def _expand_match_items(
110-
compact: dict[str, list[str] | dict[str, str]], prefix: str
111-
) -> dict[str, dict[str, Any]]:
135+
def _expand_match_items(compact: dict[str, list[str] | dict[str, str]], prefix: str) -> dict[str, dict[str, Any]]:
112136
"""Expand the compact ``{type: keys | {key: scale}}`` map to the per-entry dict shape."""
113137
merged: dict[str, dict[str, Any]] = {}
114138
for type_name, group in compact.items():
@@ -129,8 +153,10 @@ def warm_cache() -> None:
129153

130154

131155
def __getattr__(name: str) -> dict[str, Any]:
132-
if name not in MATCH_QUERIES:
156+
cluster_aware = name.endswith(CLUSTER_AWARE_SUFFIX)
157+
base_name = name[: -len(CLUSTER_AWARE_SUFFIX)] if cluster_aware else name
158+
if base_name not in MATCH_QUERIES:
133159
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
134160
if name not in _match_query_cache:
135-
_match_query_cache[name] = load_match_query(MATCH_QUERIES[name])
161+
_match_query_cache[name] = load_match_query(MATCH_QUERIES[base_name], cluster_aware=cluster_aware)
136162
return _match_query_cache[name]

clickhouse/datadog_checks/clickhouse/clickhouse.py

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# (C) Datadog, Inc. 2019-present
22
# All rights reserved
33
# Licensed under a 3-clause BSD style license (see LICENSE)
4-
import re
54
from string import Template
65
from time import time
76

@@ -36,14 +35,6 @@
3635
# Database instance collection interval in seconds (not user-configurable)
3736
DATABASE_INSTANCE_COLLECTION_INTERVAL = 300
3837

39-
# Tag added to per-node metrics when collecting from all replicas in single endpoint mode.
40-
CLUSTER_NODE_TAG = 'clickhouse_node'
41-
42-
# Matches the FROM clause of the standard system-table metric queries so they can be retargeted at
43-
# clusterAllReplicas() and tagged per node in single endpoint mode. The leading whitespace is part of
44-
# the match so the per-node projection can be spliced in without leaving a gap before the comma.
45-
SYSTEM_TABLE_FROM_CLAUSE = re.compile(r'\s+FROM\s+system\.(?P<table>\w+)', re.IGNORECASE)
46-
4738

4839
class ClickhouseCheck(DatabaseCheck):
4940
__NAMESPACE__ = 'clickhouse'
@@ -299,13 +290,15 @@ def check(self, _):
299290
def get_queries(self) -> list[dict]:
300291
query_list = []
301292

293+
single = self._config.single_endpoint_mode
294+
302295
if self._config.use_legacy_queries:
303296
query_list.extend(
304297
[
305-
self.make_cluster_aware(queries.SystemMetrics),
306-
self.make_cluster_aware(queries.SystemEventsToDeprecate),
307-
self.make_cluster_aware(queries.SystemEvents),
308-
self.make_cluster_aware(queries.SystemAsynchronousMetrics),
298+
queries.SystemMetricsClusterAware if single else queries.SystemMetrics,
299+
queries.SystemEventsToDeprecateClusterAware if single else queries.SystemEventsToDeprecate,
300+
queries.SystemEventsClusterAware if single else queries.SystemEvents,
301+
queries.SystemAsynchronousMetricsClusterAware if single else queries.SystemAsynchronousMetrics,
309302
queries.SystemParts,
310303
queries.SystemReplicas,
311304
queries.SystemDictionaries,
@@ -315,13 +308,17 @@ def get_queries(self) -> list[dict]:
315308
if self._config.use_advanced_queries:
316309
query_list.extend(
317310
[
318-
self.make_cluster_aware(advanced_queries.SystemMetrics),
319-
self.make_cluster_aware(advanced_queries.SystemEvents),
320-
self.make_cluster_aware(advanced_queries.SystemAsynchronousMetrics),
311+
advanced_queries.SystemMetricsClusterAware if single else advanced_queries.SystemMetrics,
312+
advanced_queries.SystemEventsClusterAware if single else advanced_queries.SystemEvents,
313+
advanced_queries.SystemAsynchronousMetricsClusterAware
314+
if single
315+
else advanced_queries.SystemAsynchronousMetrics,
321316
]
322317
)
323318
if self.version_ge('21.3'):
324-
query_list.append(self.make_cluster_aware(advanced_queries.SystemErrors))
319+
query_list.append(
320+
advanced_queries.SystemErrorsClusterAware if single else advanced_queries.SystemErrors
321+
)
325322

326323
return query_list
327324

@@ -457,25 +454,6 @@ def get_system_table(self, table_name):
457454
# Direct connection: Query the local system table directly
458455
return f"system.{table_name}"
459456

460-
def make_cluster_aware(self, query: dict) -> dict:
461-
"""Retarget a system-table query at clusterAllReplicas and tag each row per node."""
462-
if not self._config.single_endpoint_mode:
463-
return query
464-
465-
def rewrite(match):
466-
cluster_table = self.get_system_table(match.group('table'))
467-
return f', hostName() AS {CLUSTER_NODE_TAG} FROM {cluster_table}'
468-
469-
new_query, replaced = SYSTEM_TABLE_FROM_CLAUSE.subn(rewrite, query['query'], count=1)
470-
if not replaced:
471-
return query
472-
473-
return {
474-
**query,
475-
'query': new_query,
476-
'columns': [*query['columns'], {'name': CLUSTER_NODE_TAG, 'type': 'tag'}],
477-
}
478-
479457
def ping_clickhouse(self):
480458
return self._client.ping()
481459

clickhouse/datadog_checks/clickhouse/queries.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# (C) Datadog, Inc. 2019-present
22
# All rights reserved
33
# Licensed under a 3-clause BSD style license (see LICENSE)
4-
from .utils import compact_query
4+
from .utils import cluster_aware_query, compact_query
55

66
# https://clickhouse.com/docs/operations/system-tables/metrics
77
SystemMetrics = {
@@ -2059,3 +2059,14 @@
20592059
{'name': 'dictionary.load', 'type': 'gauge'},
20602060
],
20612061
}
2062+
2063+
2064+
# Cluster-aware variants used in single endpoint mode: they read all replicas via
2065+
# clusterAllReplicas() and tag each row per node so per-node counters stay distinct.
2066+
# SystemParts/SystemReplicas/SystemDictionaries use GROUP BY and are intentionally excluded.
2067+
SystemMetricsClusterAware = cluster_aware_query(SystemMetrics, 'value, metric', 'metrics')
2068+
SystemEventsToDeprecateClusterAware = cluster_aware_query(SystemEventsToDeprecate, 'value, event', 'events')
2069+
SystemEventsClusterAware = cluster_aware_query(SystemEvents, 'value, event', 'events')
2070+
SystemAsynchronousMetricsClusterAware = cluster_aware_query(
2071+
SystemAsynchronousMetrics, 'value, metric', 'asynchronous_metrics'
2072+
)

clickhouse/datadog_checks/clickhouse/utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,22 @@ def compact_query(query):
2727
return re.sub(r'\n\s+', ' ', query.strip())
2828

2929

30+
# Tag added to per-node metrics when collecting from all replicas in single endpoint mode.
31+
CLUSTER_NODE_TAG = 'clickhouse_node'
32+
33+
34+
def cluster_aware_query(base: dict, select: str, table: str, where: str = '') -> dict:
35+
"""Build a cluster-aware variant that reads all replicas and tags each row per node."""
36+
tail = f' {where}' if where else ''
37+
return {
38+
'name': base['name'],
39+
'query': (
40+
f"SELECT {select}, hostName() AS {CLUSTER_NODE_TAG} "
41+
f"FROM clusterAllReplicas('default', system.{table}){tail}"
42+
),
43+
'columns': [*base['columns'], {'name': CLUSTER_NODE_TAG, 'type': 'tag'}],
44+
}
45+
46+
3047
def parse_version(version: str) -> list[int]:
3148
return [int(v) for v in version.split('.')]

clickhouse/tests/test_unit.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -383,42 +383,39 @@ def test_database_hostname_ignores_reported_hostname_override(reported_hostname,
383383
mock_resolve.assert_called_with(BASE_INSTANCE['server'])
384384

385385

386-
def test_make_cluster_aware_direct_connection_unchanged(instance):
387-
"""Direct connections keep the bare per-node query (SDBM-2746)."""
388-
instance = {**instance, 'single_endpoint_mode': False}
389-
check = ClickhouseCheck('clickhouse', {}, [instance])
390-
391-
assert check.make_cluster_aware(advanced_queries.SystemEvents) is advanced_queries.SystemEvents
392-
393-
394-
def test_make_cluster_aware_bulk_match_query(instance):
395-
"""Single endpoint mode tags system.events per node so counters don't flap across the LB (SDBM-2746)."""
396-
instance = {**instance, 'single_endpoint_mode': True}
397-
check = ClickhouseCheck('clickhouse', {}, [instance])
398-
399-
rewritten = check.make_cluster_aware(advanced_queries.SystemEvents)
386+
def test_cluster_aware_variant_bulk_match_query():
387+
"""The cluster-aware variant reads all replicas and tags system.events per node (SDBM-2746)."""
388+
variant = advanced_queries.SystemEventsClusterAware
400389

401-
assert rewritten['query'] == (
390+
assert variant['query'] == (
402391
"SELECT value, event, hostName() AS clickhouse_node FROM clusterAllReplicas('default', system.events)"
403392
)
404-
assert rewritten['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
405-
# The original cached query dict must not be mutated.
393+
assert variant['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
394+
# The base query dict must not be mutated by building the variant.
406395
assert advanced_queries.SystemEvents['query'] == 'SELECT value, event FROM system.events'
407396
assert all(column['name'] != 'clickhouse_node' for column in advanced_queries.SystemEvents['columns'])
408397

409398

410-
def test_make_cluster_aware_preserves_where_clause(instance):
411-
"""system.errors carries a WHERE clause that must survive the rewrite."""
412-
instance = {**instance, 'single_endpoint_mode': True}
413-
check = ClickhouseCheck('clickhouse', {}, [instance])
414-
415-
rewritten = check.make_cluster_aware(advanced_queries.SystemErrors)
399+
def test_cluster_aware_variant_preserves_where_clause():
400+
"""system.errors carries a WHERE clause that must survive in the cluster-aware variant."""
401+
variant = advanced_queries.SystemErrorsClusterAware
416402

417-
assert rewritten['query'] == (
403+
assert variant['query'] == (
418404
"SELECT value, name, code, remote, hostName() AS clickhouse_node "
419405
"FROM clusterAllReplicas('default', system.errors) WHERE value > 0"
420406
)
421-
assert rewritten['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
407+
assert variant['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
408+
409+
410+
def test_cluster_aware_variant_legacy_query():
411+
"""Legacy queries.py exposes matching static cluster-aware variants."""
412+
variant = queries.SystemMetricsClusterAware
413+
414+
assert variant['query'] == (
415+
"SELECT value, metric, hostName() AS clickhouse_node FROM clusterAllReplicas('default', system.metrics)"
416+
)
417+
assert variant['columns'][-1] == {'name': 'clickhouse_node', 'type': 'tag'}
418+
assert queries.SystemMetrics['query'] == 'SELECT value, metric FROM system.metrics'
422419

423420

424421
@pytest.mark.parametrize('use_advanced_queries', [True, False])
@@ -443,3 +440,17 @@ def test_get_queries_tags_system_tables_per_node_in_single_endpoint_mode(instanc
443440
# system.parts/replicas/dictionaries use GROUP BY and are intentionally left untouched here.
444441
if not use_advanced_queries:
445442
assert any(q is queries.SystemParts for q in check.get_queries())
443+
444+
445+
@pytest.mark.parametrize('use_advanced_queries', [True, False])
446+
def test_get_queries_uses_base_queries_for_direct_connection(instance, use_advanced_queries):
447+
instance = {
448+
**instance,
449+
'single_endpoint_mode': False,
450+
'use_advanced_queries': use_advanced_queries,
451+
'use_legacy_queries': not use_advanced_queries,
452+
}
453+
check = ClickhouseCheck('clickhouse', {}, [instance])
454+
check._server_version = '24.8'
455+
456+
assert all('clusterAllReplicas' not in q['query'] for q in check.get_queries())

0 commit comments

Comments
 (0)