Skip to content

Commit 2e9d25f

Browse files
sangeetashivajieric-weaverclaude
authored
[DBMON-6301] Adding support for query errors in ClickHouse (DataDog#23041)
* Adding query errors for ClickHouse * Add integration tests * Updating integration test with more detailed assertion * Update clickhouse/datadog_checks/clickhouse/data/conf.yaml.example Co-authored-by: Eric Weaver <eweaver755@gmail.com> * Regenerating files * Fix instance.py formatting to match CI ddev model generator output Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Exception handling * Fixing linting issues * Updating spec with regenerated config --------- Co-authored-by: Eric Weaver <eweaver755@gmail.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b99bb44 commit 2e9d25f

15 files changed

Lines changed: 866 additions & 20 deletions

File tree

clickhouse/assets/configuration/spec.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,50 @@ files:
236236
value:
237237
type: boolean
238238
example: false
239+
- name: query_errors
240+
description: Configure collection of query errors from system.query_log
241+
options:
242+
- name: enabled
243+
description: |
244+
Enable collection of query errors. Requires `dbm: true`.
245+
Collects ExceptionBeforeStart and ExceptionWhileProcessing events, which include
246+
exception message, error code, and stack trace.
247+
value:
248+
type: boolean
249+
example: true
250+
- name: collection_interval
251+
description: |
252+
Set the query errors collection interval (in seconds).
253+
value:
254+
type: number
255+
example: 10
256+
- name: samples_per_hour_per_query
257+
description: |
258+
Set the maximum number of error samples to collect per hour per unique query signature.
259+
value:
260+
type: number
261+
example: 60
262+
- name: seen_samples_cache_maxsize
263+
hidden: true
264+
description: |
265+
Set the max size of the cache used for rate limiting error samples.
266+
value:
267+
type: number
268+
default: 10000
269+
- name: max_samples_per_collection
270+
hidden: true
271+
description: |
272+
Maximum number of error samples to collect in a single run (applies LIMIT in SQL).
273+
value:
274+
type: number
275+
default: 1000
276+
- name: run_sync
277+
hidden: true
278+
description: |
279+
Run the query errors collection synchronously. For testing only.
280+
value:
281+
type: boolean
282+
example: false
239283
- template: instances/db
240284
overrides:
241285
custom_queries.value.example:

clickhouse/changelog.d/23041.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add query error collection from system.query_log for DBM

clickhouse/datadog_checks/clickhouse/clickhouse.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from .config import build_config, sanitize
1919
from .health import ClickhouseHealth, HealthEvent, HealthStatus
2020
from .query_completions import ClickhouseQueryCompletions
21+
from .query_errors import ClickhouseQueryErrors
2122
from .statement_samples import ClickhouseStatementSamples
2223
from .statements import ClickhouseStatementMetrics
2324
from .utils import ErrorSanitizer
@@ -120,6 +121,12 @@ def _init_dbm_components(self):
120121
else:
121122
self.query_completions = None
122123

124+
# Initialize query errors (from system.query_log - failed queries)
125+
if self._config.dbm and self._config.query_errors.enabled:
126+
self.query_errors = ClickhouseQueryErrors(self, self._config.query_errors)
127+
else:
128+
self.query_errors = None
129+
123130
@property
124131
def tags(self) -> list[str]:
125132
"""Return the current list of tags from the TagManager."""
@@ -244,6 +251,10 @@ def check(self, _):
244251
if self.query_completions:
245252
self.query_completions.run_job_loop(self.tags)
246253

254+
# Run query errors if DBM is enabled (from system.query_log - failed queries)
255+
if self.query_errors:
256+
self.query_errors.run_job_loop(self.tags)
257+
247258
@AgentCheck.metadata_entrypoint
248259
def collect_version(self):
249260
version = list(self.execute_query_raw('SELECT version()'))[0][0]
@@ -461,6 +472,8 @@ def cancel(self):
461472
self.statement_samples.cancel()
462473
if self.query_completions:
463474
self.query_completions.cancel()
475+
if self.query_errors:
476+
self.query_errors.cancel()
464477

465478
# Wait for job loops to finish
466479
if self.statement_metrics and self.statement_metrics._job_loop_future:
@@ -469,6 +482,8 @@ def cancel(self):
469482
self.statement_samples._job_loop_future.result()
470483
if self.query_completions and self.query_completions._job_loop_future:
471484
self.query_completions._job_loop_future.result()
485+
if self.query_errors and self.query_errors._job_loop_future:
486+
self.query_errors._job_loop_future.result()
472487

473488
# Close main client
474489
if self._client:

clickhouse/datadog_checks/clickhouse/config.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ def build_config(check: ClickhouseCheck) -> Tuple[InstanceConfig, ValidationResu
120120
**dict_defaults.instance_query_completions().model_dump(),
121121
**(instance.get('query_completions', {})),
122122
},
123+
"query_errors": {
124+
**dict_defaults.instance_query_errors().model_dump(),
125+
**(instance.get('query_errors', {})),
126+
},
123127
# Tags - ensure we have a list, not None
124128
"tags": list(instance.get('tags', [])),
125129
# Other settings
@@ -188,6 +192,27 @@ def _apply_validated_defaults(args: dict, instance: dict, validation_result: Val
188192
f"query_completions.collection_interval must be greater than 0, defaulting to {default_value} seconds."
189193
)
190194

195+
if _safefloat(args.get('query_completions', {}).get('samples_per_hour_per_query')) <= 0:
196+
default_value = dict_defaults.instance_query_completions().samples_per_hour_per_query
197+
args['query_completions']['samples_per_hour_per_query'] = default_value
198+
validation_result.add_warning(
199+
f"query_completions.samples_per_hour_per_query must be greater than 0, defaulting to {default_value}."
200+
)
201+
202+
if _safefloat(args.get('query_errors', {}).get('collection_interval')) <= 0:
203+
default_value = dict_defaults.instance_query_errors().collection_interval
204+
args['query_errors']['collection_interval'] = default_value
205+
validation_result.add_warning(
206+
f"query_errors.collection_interval must be greater than 0, defaulting to {default_value} seconds."
207+
)
208+
209+
if _safefloat(args.get('query_errors', {}).get('samples_per_hour_per_query')) <= 0:
210+
default_value = dict_defaults.instance_query_errors().samples_per_hour_per_query
211+
args['query_errors']['samples_per_hour_per_query'] = default_value
212+
validation_result.add_warning(
213+
f"query_errors.samples_per_hour_per_query must be greater than 0, defaulting to {default_value}."
214+
)
215+
191216

192217
def _validate_config(config: InstanceConfig, instance: dict, validation_result: ValidationResult):
193218
"""Validate the configuration and add warnings/errors."""
@@ -203,6 +228,7 @@ def _validate_config(config: InstanceConfig, instance: dict, validation_result:
203228
'query_completions',
204229
config.query_completions.enabled if config.query_completions else False,
205230
),
231+
('query_errors', config.query_errors.enabled if config.query_errors else False),
206232
]
207233
for feature_name, _is_enabled in dbm_features:
208234
if instance.get(feature_name, {}).get('enabled') and not config.dbm:
@@ -234,6 +260,11 @@ def _apply_features(config: InstanceConfig, validation_result: ValidationResult)
234260
config.query_completions.enabled and config.dbm,
235261
None if config.dbm else "Requires `dbm: true`",
236262
)
263+
validation_result.add_feature(
264+
FeatureKey.QUERY_ERRORS,
265+
config.query_errors.enabled and config.dbm,
266+
None if config.dbm else "Requires `dbm: true`",
267+
)
237268
validation_result.add_feature(FeatureKey.SINGLE_ENDPOINT_MODE, config.single_endpoint_mode)
238269

239270

clickhouse/datadog_checks/clickhouse/config_models/dict_defaults.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,14 @@ def instance_query_completions():
4242
max_samples_per_collection=1000,
4343
run_sync=False,
4444
)
45+
46+
47+
def instance_query_errors():
48+
return instance.QueryErrors(
49+
enabled=True,
50+
collection_interval=10,
51+
samples_per_hour_per_query=60,
52+
seen_samples_cache_maxsize=10000,
53+
max_samples_per_collection=1000,
54+
run_sync=False,
55+
)

clickhouse/datadog_checks/clickhouse/config_models/instance.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ class QueryCompletions(BaseModel):
6565
seen_samples_cache_maxsize: Optional[float] = None
6666

6767

68+
class QueryErrors(BaseModel):
69+
model_config = ConfigDict(
70+
arbitrary_types_allowed=True,
71+
frozen=True,
72+
)
73+
collection_interval: Optional[float] = None
74+
enabled: Optional[bool] = None
75+
max_samples_per_collection: Optional[float] = None
76+
run_sync: Optional[bool] = None
77+
samples_per_hour_per_query: Optional[float] = None
78+
seen_samples_cache_maxsize: Optional[float] = None
79+
80+
6881
class QueryMetrics(BaseModel):
6982
model_config = ConfigDict(
7083
arbitrary_types_allowed=True,
@@ -109,6 +122,7 @@ class InstanceConfig(BaseModel):
109122
password: Optional[str] = None
110123
port: Optional[int] = None
111124
query_completions: Optional[QueryCompletions] = None
125+
query_errors: Optional[QueryErrors] = None
112126
query_metrics: Optional[QueryMetrics] = None
113127
query_samples: Optional[QuerySamples] = None
114128
read_timeout: Optional[int] = None

clickhouse/datadog_checks/clickhouse/data/conf.yaml.example

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,27 @@ instances:
176176
#
177177
# samples_per_hour_per_query: 15
178178

179+
## Configure collection of query errors from system.query_log
180+
#
181+
# query_errors:
182+
183+
## @param enabled - boolean - optional - default: true
184+
## Enable collection of query errors. Requires `dbm: true`.
185+
## Collects ExceptionBeforeStart and ExceptionWhileProcessing events, which include
186+
## exception message, error code, and stack trace.
187+
#
188+
# enabled: true
189+
190+
## @param collection_interval - number - optional - default: 10
191+
## Set the query errors collection interval (in seconds).
192+
#
193+
# collection_interval: 10
194+
195+
## @param samples_per_hour_per_query - number - optional - default: 60
196+
## Set the maximum number of error samples to collect per hour per unique query signature.
197+
#
198+
# samples_per_hour_per_query: 60
199+
179200
## @param only_custom_queries - boolean - optional - default: false
180201
## Set this parameter to `true` if you want to skip the integration's default metrics collection.
181202
## Only metrics specified in `custom_queries` will be collected.

clickhouse/datadog_checks/clickhouse/features.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class FeatureKey(Enum):
2121
QUERY_METRICS = "query_metrics"
2222
QUERY_SAMPLES = "query_samples"
2323
QUERY_COMPLETIONS = "query_completions"
24+
QUERY_ERRORS = "query_errors"
2425
SINGLE_ENDPOINT_MODE = "single_endpoint_mode"
2526

2627

@@ -29,6 +30,7 @@ class FeatureKey(Enum):
2930
FeatureKey.QUERY_METRICS: 'Query Metrics',
3031
FeatureKey.QUERY_SAMPLES: 'Query Samples',
3132
FeatureKey.QUERY_COMPLETIONS: 'Query Completions',
33+
FeatureKey.QUERY_ERRORS: 'Query Errors',
3234
FeatureKey.SINGLE_ENDPOINT_MODE: 'Single Endpoint Mode',
3335
}
3436

clickhouse/datadog_checks/clickhouse/query_completions.py

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,15 @@ def _collect_and_submit(self):
115115
# Step 3: Submit payload
116116
payload_data = json.dumps(payload, default=default_json_event_encoding)
117117
num_completions = len(payload.get('clickhouse_query_completions', []))
118-
self._log.info(
118+
self._log.debug(
119119
"Submitting query completions payload: %d bytes, %d completions",
120120
len(payload_data),
121121
num_completions,
122122
)
123123
self._check.database_monitoring_query_activity(payload_data)
124124

125125
if self._current_checkpoint_microseconds is not None:
126-
self._log.info(
126+
self._log.debug(
127127
"Successfully submitted. Checkpoint: %d microseconds", self._current_checkpoint_microseconds
128128
)
129129

@@ -162,7 +162,7 @@ def _collect_completed_queries(self):
162162

163163
rows = self._execute_query(query, parameters=params)
164164

165-
self._log.info(
165+
self._log.debug(
166166
"Loaded %d completed queries from %s [%s]",
167167
len(rows),
168168
query_log_table,
@@ -206,6 +206,7 @@ def _collect_completed_queries(self):
206206

207207
row_dict = {
208208
'normalized_query_hash': str(normalized_query_hash),
209+
'hostname': str(server_node) if server_node else '',
209210
'query': str(query_text) if query_text else '',
210211
'user': str(user) if user else '',
211212
'query_type': str(query_type) if query_type else '',
@@ -238,7 +239,7 @@ def _collect_completed_queries(self):
238239
return result_rows
239240

240241
except Exception as e:
241-
self._log.exception("Failed to load completed queries from system.query_log: %s", e)
242+
self._log.warning("Failed to load completed queries from system.query_log: %s", e)
242243

243244
self._check.count(
244245
"dd.clickhouse.query_completions.error",
@@ -251,22 +252,6 @@ def _collect_completed_queries(self):
251252
# Checkpoint will still advance to avoid duplicates on retry.
252253
raise
253254

254-
def _normalize_query(self, row):
255-
"""
256-
Normalize and obfuscate a single query row
257-
"""
258-
obfuscation_result = self._obfuscate_query(row['query'])
259-
if obfuscation_result is None:
260-
return None
261-
262-
row['statement'] = obfuscation_result['query']
263-
row['query_signature'] = obfuscation_result['query_signature']
264-
row['dd_tables'] = obfuscation_result['dd_tables']
265-
row['dd_commands'] = obfuscation_result['dd_commands']
266-
row['dd_comments'] = obfuscation_result['dd_comments']
267-
268-
return row
269-
270255
def _create_batched_payload(self, rows):
271256
"""
272257
Create a batched payload following SQL Server query_completion pattern.
@@ -308,6 +293,7 @@ def _create_batched_payload(self, rows):
308293
'event_time_microseconds': row.get('event_time_microseconds', 0),
309294
'initial_query_id': row.get('initial_query_id', ''),
310295
'is_initial_query': row.get('is_initial_query', True),
296+
'hostname': row.get('hostname', ''),
311297
'metadata': {
312298
'tables': row.get('dd_tables'),
313299
'commands': row.get('dd_commands'),

0 commit comments

Comments
 (0)