3535 read_bytes,
3636 result_rows,
3737 written_rows,
38+ written_bytes,
3839 exception,
3940 exception_code,
4041 query,
4142 type,
43+ query_kind,
44+ current_database,
4245 arrayStringConcat(tables, ', ') AS tables,
4346 client_name
4447FROM system.query_log
45- WHERE type IN ('QueryFinish', 'ExceptionWhileProcessing')
48+ WHERE type IN ('QueryFinish', 'ExceptionWhileProcessing', 'ExceptionBeforeStart')
49+ AND is_initial_query = 1
4650 AND event_time_microseconds > fromUnixTimestamp64Micro({last_cursor})
51+ AND query NOT LIKE '%system.query_log%'
52+ AND query NOT LIKE '%system.text_log%'
53+ {internal_user_filter}
4754ORDER BY event_time_microseconds ASC
4855LIMIT {batch_size}
4956"""
5057
58+ # Filter clause injected into QUERY_LOG_SQL when exclude_internal_users is True.
59+ # ClickHouse Cloud runs health checks, backups, and observability queries under
60+ # service accounts whose usernames follow several patterns:
61+ # 1. "*-internal" suffix (monitoring-internal, operator-internal, backups-internal, etc.)
62+ # 2. "clickhouse-cloud-*" prefix (clickhouse-cloud-monitor — the primary metrics scraper)
63+ # 3. "prometheus-exporter" — Prometheus metrics collection
64+ # 4. Empty string "" — internal system-level metrics queries (SELECT from
65+ # system.dimensional_metrics, system.histogram_metrics, etc.) that run
66+ # under a blank user and generate heavy query_log volume.
67+ # Together these generate ~99% of query_log volume on an idle cluster with zero
68+ # operational value for application teams.
69+ INTERNAL_USER_FILTER = (
70+ "AND user NOT LIKE '%-internal'"
71+ " AND user NOT LIKE 'clickhouse-cloud-%'"
72+ " AND user != 'prometheus-exporter'"
73+ " AND user != ''"
74+ )
75+
5176TEXT_LOG_SQL = """
5277SELECT
5378 event_time,
5479 toUnixTimestamp64Micro(event_time_microseconds) AS cursor_us,
5580 level,
5681 logger_name,
5782 message,
58- thread_id
83+ thread_id,
84+ query_id
5985FROM system.text_log
60- WHERE level IN ('Error ', 'Warning ', 'Fatal ')
86+ WHERE level IN ('Fatal ', 'Critical ', 'Error', 'Warning ')
6187 AND event_time_microseconds > fromUnixTimestamp64Micro({last_cursor})
88+ AND logger_name NOT IN ('QueryProfiler', 'GlobalProfiler')
6289ORDER BY event_time_microseconds ASC
6390LIMIT {batch_size}
6491"""
76103
77104TYPE_QUERY_FINISH = "QueryFinish"
78105TYPE_QUERY_EXCEPTION = "ExceptionWhileProcessing"
106+ TYPE_QUERY_EXCEPTION_BEFORE_START = "ExceptionBeforeStart"
79107
80108# ---------------------------------------------------------------------------
81109# Datadog metric / service-check names
92120
93121TEXT_LOG_LEVEL_MAP : dict [str , str ] = {
94122 "Fatal" : "critical" ,
123+ "Critical" : "critical" ,
95124 "Error" : "error" ,
96125 "Warning" : "warning" ,
97126}
@@ -128,6 +157,7 @@ def __init__(
128157 # Feature toggles
129158 self .collect_query_logs : bool = inst .get ("collect_query_logs" , True )
130159 self .collect_text_logs : bool = inst .get ("collect_text_logs" , True )
160+ self .exclude_internal_users : bool = inst .get ("exclude_internal_users" , True )
131161
132162 # Tuning — validate all numeric config to prevent SQL injection and
133163 # catch misconfiguration early (e.g. log_batch_size: "all").
@@ -371,8 +401,10 @@ def _collect_logs(
371401
372402 def _collect_query_logs (self ) -> None :
373403 """Fetch new rows from system.query_log and send as Datadog logs."""
404+ user_filter = INTERNAL_USER_FILTER if self .exclude_internal_users else ""
405+ sql = QUERY_LOG_SQL .replace ("{internal_user_filter}" , user_filter )
374406 self ._collect_logs (
375- sql_template = QUERY_LOG_SQL ,
407+ sql_template = sql ,
376408 cursor_key = CURSOR_QUERY_LOG ,
377409 check_name = SC_QUERY_LOG_CONNECT ,
378410 gauge_name = GAUGE_QUERY_LOG_ROWS ,
@@ -384,7 +416,7 @@ def _build_query_log_payload(self, row: dict[str, Any]) -> dict[str, Any]:
384416 query_type = row .get ("type" , "" )
385417
386418 # Determine log level
387- if query_type == TYPE_QUERY_EXCEPTION :
419+ if query_type in ( TYPE_QUERY_EXCEPTION , TYPE_QUERY_EXCEPTION_BEFORE_START ) :
388420 level = "error"
389421 type_label = "exception"
390422 else :
@@ -407,9 +439,12 @@ def _build_query_log_payload(self, row: dict[str, Any]) -> dict[str, Any]:
407439 "clickhouse.read_bytes" : int (row .get ("read_bytes" , 0 )),
408440 "clickhouse.result_rows" : int (row .get ("result_rows" , 0 )),
409441 "clickhouse.written_rows" : int (row .get ("written_rows" , 0 )),
442+ "clickhouse.written_bytes" : int (row .get ("written_bytes" , 0 )),
410443 "clickhouse.exception" : row .get ("exception" , "" ),
411444 "clickhouse.exception_code" : int (row .get ("exception_code" , 0 )),
412445 "clickhouse.query_type" : type_label ,
446+ "clickhouse.query_kind" : row .get ("query_kind" , "" ),
447+ "clickhouse.database" : row .get ("current_database" , "" ),
413448 "clickhouse.tables" : row .get ("tables" , "" ),
414449 "clickhouse.client" : row .get ("client_name" , "" ),
415450 }
@@ -441,6 +476,7 @@ def _build_text_log_payload(self, row: dict[str, Any]) -> dict[str, Any]:
441476 "status" : level ,
442477 "clickhouse.logger" : row .get ("logger_name" , "" ),
443478 "clickhouse.thread_id" : str (row .get ("thread_id" , "" )),
479+ "clickhouse.query_id" : row .get ("query_id" , "" ),
444480 }
445481
446482 # ------------------------------------------------------------------
0 commit comments