Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clickhouse/changelog.d/23882.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add CPU time metrics (cpu_us, cpu_wait_us) to ClickHouse query metrics, query completions, and query errors.
8 changes: 8 additions & 0 deletions clickhouse/datadog_checks/clickhouse/query_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
result_rows,
result_bytes,
memory_usage,
ProfileEvents['OSCPUVirtualTimeMicroseconds'] as cpu_us,
ProfileEvents['OSCPUWaitMicroseconds'] as cpu_wait_us,
query_start_time_microseconds,
event_time_microseconds,
query_id,
Expand Down Expand Up @@ -200,6 +202,8 @@ def _collect_completed_queries(self):
result_rows_count,
result_bytes,
memory_usage,
cpu_us,
cpu_wait_us,
query_start_time_microseconds,
event_time_microseconds,
query_id,
Expand Down Expand Up @@ -231,6 +235,8 @@ def _collect_completed_queries(self):
'result_rows': int(result_rows_count) if result_rows_count else 0,
'result_bytes': int(result_bytes) if result_bytes else 0,
'memory_usage': int(memory_usage) if memory_usage else 0,
'cpu_us': int(cpu_us) if cpu_us else 0,
'cpu_wait_us': int(cpu_wait_us) if cpu_wait_us else 0,
'query_start_time_microseconds': self.to_microseconds(query_start_time_microseconds),
'event_time_microseconds': event_time_int,
'query_id': str(query_id) if query_id else '',
Expand Down Expand Up @@ -300,6 +306,8 @@ def _create_batched_payload(self, rows):
'result_rows': row.get('result_rows', 0),
'result_bytes': row.get('result_bytes', 0),
'memory_usage': row.get('memory_usage', 0),
'cpu_us': row.get('cpu_us', 0),
'cpu_wait_us': row.get('cpu_wait_us', 0),
'query_start_time_microseconds': row.get('query_start_time_microseconds', 0),
'event_time_microseconds': row.get('event_time_microseconds', 0),
'initial_query_id': row.get('initial_query_id', ''),
Expand Down
8 changes: 8 additions & 0 deletions clickhouse/datadog_checks/clickhouse/query_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
result_rows,
result_bytes,
memory_usage,
ProfileEvents['OSCPUVirtualTimeMicroseconds'] as cpu_us,
ProfileEvents['OSCPUWaitMicroseconds'] as cpu_wait_us,
query_start_time_microseconds,
event_time_microseconds,
query_id,
Expand Down Expand Up @@ -178,6 +180,8 @@ def _collect_query_errors(self):
result_rows_count,
result_bytes,
memory_usage,
cpu_us,
cpu_wait_us,
query_start_time_microseconds,
event_time_microseconds,
query_id,
Expand Down Expand Up @@ -219,6 +223,8 @@ def _collect_query_errors(self):
'result_rows': result_rows_count or 0,
'result_bytes': result_bytes or 0,
'memory_usage': memory_usage or 0,
'cpu_us': int(cpu_us) if cpu_us else 0,
'cpu_wait_us': int(cpu_wait_us) if cpu_wait_us else 0,
'query_start_time_microseconds': self.to_microseconds(query_start_time_microseconds),
'event_time_microseconds': event_time_int,
'query_id': query_id or '',
Expand Down Expand Up @@ -284,6 +290,8 @@ def _create_batched_payload(self, rows: list) -> dict | None:
'result_rows': row.get('result_rows', 0),
'result_bytes': row.get('result_bytes', 0),
'memory_usage': row.get('memory_usage', 0),
'cpu_us': row.get('cpu_us', 0),
'cpu_wait_us': row.get('cpu_wait_us', 0),
'query_start_time_microseconds': row.get('query_start_time_microseconds', 0),
'event_time_microseconds': row.get('event_time_microseconds', 0),
'initial_query_id': row.get('initial_query_id', ''),
Expand Down
8 changes: 8 additions & 0 deletions clickhouse/datadog_checks/clickhouse/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
sum(result_rows) as total_result_rows,
sum(result_bytes) as total_result_bytes,
sum(memory_usage) as total_memory_usage,
sum(ProfileEvents['OSCPUVirtualTimeMicroseconds']) as total_cpu_us,
sum(ProfileEvents['OSCPUWaitMicroseconds']) as total_cpu_wait_us,
max(memory_usage) as peak_memory_usage,
max(event_time_microseconds) as max_event_time_microseconds
FROM {query_log_table}
Expand Down Expand Up @@ -256,6 +258,8 @@ def _load_query_log_statements(self):
total_result_rows,
total_result_bytes,
total_memory_usage,
total_cpu_us,
total_cpu_wait_us,
peak_memory_usage,
max_event_time_microseconds,
) = row
Expand Down Expand Up @@ -287,6 +291,8 @@ def _load_query_log_statements(self):
'written_bytes': int(total_written_bytes) if total_written_bytes else 0,
'result_bytes': int(total_result_bytes) if total_result_bytes else 0,
'memory_usage': int(total_memory_usage) if total_memory_usage else 0,
'cpu_us': int(total_cpu_us) if total_cpu_us else 0,
'cpu_wait_us': int(total_cpu_wait_us) if total_cpu_wait_us else 0,
'peak_memory_usage': int(peak_memory_usage) if peak_memory_usage else 0,
}
result_rows.append(result_row)
Expand Down Expand Up @@ -343,6 +349,8 @@ def _merge_rows_across_nodes(node_rows):
'result_rows',
'result_bytes',
'memory_usage',
'cpu_us',
'cpu_wait_us',
]
for field in sum_fields:
merged[field] = sum(r.get(field, 0) for r in rows)
Expand Down
6 changes: 6 additions & 0 deletions clickhouse/tests/test_dbm_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
'result_rows',
'result_bytes',
'memory_usage',
'cpu_us',
'cpu_wait_us',
'peak_memory_usage',
}

Expand Down Expand Up @@ -359,6 +361,8 @@ def test_query_completions_data(aggregator, instance, dd_run_check, datadog_agen
assert details['username'] is not None
assert details['read_rows'] >= 0
assert details['event_time_microseconds'] > 0
assert details['cpu_us'] >= 0
assert details['cpu_wait_us'] >= 0


def test_explain_plan_collected(aggregator, instance, dd_run_check, datadog_agent):
Expand Down Expand Up @@ -621,3 +625,5 @@ def test_query_errors_data(aggregator, instance, dd_run_check):
assert 'UNKNOWN_TABLE' in details['exception']
assert details['exception_code'] == 60
assert 'stack_trace' in details
assert details['cpu_us'] >= 0
assert details['cpu_wait_us'] >= 0
8 changes: 8 additions & 0 deletions clickhouse/tests/test_query_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ def test_create_batched_payload_query_details(check_with_dbm):
'result_rows': 100,
'result_bytes': 10240,
'memory_usage': 5242880,
'cpu_us': 1872000,
'cpu_wait_us': 35000,
'event_time_microseconds': 1746205423150500,
'query_start_time_microseconds': 1746205423000000,
'initial_query_id': 'test-query-id-123',
Expand Down Expand Up @@ -165,6 +167,8 @@ def test_create_batched_payload_query_details(check_with_dbm):
assert query_details['query_id'] == 'test-query-id-123'
assert query_details['read_rows'] == 1000
assert query_details['memory_usage'] == 5242880
assert query_details['cpu_us'] == 1872000
assert query_details['cpu_wait_us'] == 35000

# Verify metadata is included
assert query_details['metadata']['tables'] == ['users']
Expand Down Expand Up @@ -253,6 +257,10 @@ def test_completed_queries_query_format():
assert 'event_time_microseconds' in COMPLETED_QUERIES_QUERY
assert 'query_start_time_microseconds' in COMPLETED_QUERIES_QUERY

# CPU fields read from the ProfileEvents map
assert "ProfileEvents['OSCPUVirtualTimeMicroseconds']" in COMPLETED_QUERIES_QUERY
assert "ProfileEvents['OSCPUWaitMicroseconds']" in COMPLETED_QUERIES_QUERY


def test_normalize_query_with_obfuscation(check_with_dbm):
"""Test that query normalization properly obfuscates and extracts metadata"""
Expand Down
10 changes: 9 additions & 1 deletion clickhouse/tests/test_query_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_normalize_query(check_with_dbm):


def test_create_batched_payload_error_fields(check_with_dbm):
"""Test that batched payload includes exception, exception_code, and stack_trace"""
"""Test that batched payload includes exception, exception_code, stack_trace, and CPU time fields."""
query_errors = check_with_dbm.query_errors
query_errors._tags_no_db = ['test:clickhouse']

Expand All @@ -134,6 +134,8 @@ def test_create_batched_payload_error_fields(check_with_dbm):
'result_rows': 0,
'result_bytes': 0,
'memory_usage': 0,
'cpu_us': 1872000,
'cpu_wait_us': 35000,
'event_time_microseconds': 1746205423150500,
'query_start_time_microseconds': 1746205423000000,
'initial_query_id': 'err-query-id-456',
Expand All @@ -159,6 +161,8 @@ def test_create_batched_payload_error_fields(check_with_dbm):
assert query_details['exception'] == 'Table default.nonexistent_table does not exist. (UNKNOWN_TABLE)'
assert query_details['exception_code'] == 60
assert query_details['stack_trace'] == 'DB::Exception::Exception at 0x1234...'
assert query_details['cpu_us'] == 1872000
assert query_details['cpu_wait_us'] == 35000


def test_create_batched_payload_structure(check_with_dbm):
Expand Down Expand Up @@ -224,6 +228,10 @@ def test_query_errors_sql_query_format():
assert 'memory_usage' in QUERY_ERRORS_QUERY
assert 'event_time_microseconds' in QUERY_ERRORS_QUERY

# CPU time fields
assert "ProfileEvents['OSCPUVirtualTimeMicroseconds']" in QUERY_ERRORS_QUERY
assert "ProfileEvents['OSCPUWaitMicroseconds']" in QUERY_ERRORS_QUERY


def test_rate_limiting(check_with_dbm):
"""Test that query error rate limiting works correctly"""
Expand Down
25 changes: 25 additions & 0 deletions clickhouse/tests/test_statement_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def test_normalize_queries(check_with_dbm):
'result_bytes': 5000,
'memory_usage': 1000000,
'peak_memory_usage': 1500000,
'cpu_us': 80000,
'cpu_wait_us': 8000,
}
]

Expand Down Expand Up @@ -189,6 +191,8 @@ def test_query_signature_matches_samples_pipeline(check_with_dbm):
'result_bytes': 0,
'memory_usage': 0,
'peak_memory_usage': 0,
'cpu_us': 1000,
'cpu_wait_us': 100,
}
]
normalized = metrics._normalize_queries(rows)
Expand Down Expand Up @@ -452,6 +456,8 @@ def test_normalize_queries_handles_obfuscation_failure(check_with_dbm):
'result_bytes': 0,
'memory_usage': 0,
'peak_memory_usage': 0,
'cpu_us': 1000,
'cpu_wait_us': 100,
},
{
'normalized_query_hash': '67890',
Expand All @@ -472,6 +478,8 @@ def test_normalize_queries_handles_obfuscation_failure(check_with_dbm):
'result_bytes': 0,
'memory_usage': 0,
'peak_memory_usage': 0,
'cpu_us': 500,
'cpu_wait_us': 50,
},
]

Expand Down Expand Up @@ -500,6 +508,9 @@ def test_statements_query_format():
# Filters
assert 'event_time_microseconds <= now64(6)' in STATEMENTS_QUERY

assert "ProfileEvents['OSCPUVirtualTimeMicroseconds']" in STATEMENTS_QUERY
assert "ProfileEvents['OSCPUWaitMicroseconds']" in STATEMENTS_QUERY


def test_merge_rows_across_nodes_single_node():
"""When a query only appears on one node, it should pass through unchanged."""
Expand All @@ -520,13 +531,17 @@ def test_merge_rows_across_nodes_single_node():
'result_bytes': 500,
'memory_usage': 5000,
'peak_memory_usage': 8000,
'cpu_us': 80000,
'cpu_wait_us': 8000,
}
]

result = ClickhouseStatementMetrics._merge_rows_across_nodes(rows)
assert len(result) == 1
assert result[0]['count'] == 10
assert result[0]['total_time'] == 100.0
assert result[0]['cpu_us'] == 80000
assert result[0]['cpu_wait_us'] == 8000


def test_merge_rows_across_nodes_sums_counts():
Expand All @@ -548,6 +563,8 @@ def test_merge_rows_across_nodes_sums_counts():
'result_bytes': 5000,
'memory_usage': 50000,
'peak_memory_usage': 80000,
'cpu_us': 500000,
'cpu_wait_us': 50000,
},
{
'normalized_query_hash': 'hash1',
Expand All @@ -563,6 +580,8 @@ def test_merge_rows_across_nodes_sums_counts():
'result_bytes': 2500,
'memory_usage': 30000,
'peak_memory_usage': 90000,
'cpu_us': 300000,
'cpu_wait_us': 30000,
},
]

Expand All @@ -580,6 +599,8 @@ def test_merge_rows_across_nodes_sums_counts():
assert merged['result_bytes'] == 7500
assert merged['memory_usage'] == 80000
assert merged['peak_memory_usage'] == 90000
assert merged['cpu_us'] == 800000
assert merged['cpu_wait_us'] == 80000
assert merged['mean_time'] == 800.0 / 150


Expand All @@ -600,6 +621,8 @@ def test_merge_rows_across_nodes_different_queries():
'result_bytes': 0,
'memory_usage': 0,
'peak_memory_usage': 0,
'cpu_us': 80000,
'cpu_wait_us': 8000,
}

rows = [
Expand Down Expand Up @@ -636,6 +659,8 @@ def test_metrics_row_with_empty_values(check_with_dbm):
'result_bytes': 0,
'memory_usage': 0,
'peak_memory_usage': 0,
'cpu_us': 0, # Zero CPU
'cpu_wait_us': 0,
}
]

Expand Down
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/23849.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``resolve_db_host`` treating loopback IP literals (e.g. ``::1``) as DNS resolution failures, which caused database checks to submit metrics with the wrong host tag and miss agent host tags.
20 changes: 18 additions & 2 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
from concurrent.futures.thread import ThreadPoolExecutor
from enum import Enum, auto
from ipaddress import IPv4Address
from ipaddress import IPv4Address, IPv6Address, ip_address
from typing import Any, Callable, Dict, List, Optional, Tuple, Union # noqa: F401

from cachetools import TTLCache
Expand Down Expand Up @@ -162,12 +162,28 @@ def acquire(self, key):
return True


def _try_parse_db_host_ip(db_host: str) -> IPv4Address | IPv6Address | None:
"""Try to parse db_host as an IP address."""
try:
return ip_address(db_host.strip())
except ValueError:
return None


def _is_local_db_host(db_host: str | None) -> bool:
"""Return True when the DB is reached via localhost, a unix socket, or a loopback IP."""
if not db_host or db_host == 'localhost' or db_host.startswith('/'):
return True
addr = _try_parse_db_host_ip(db_host)
return addr is not None and addr.is_loopback


def resolve_db_host(db_host):
if db_host and db_host.endswith('.local'):
return db_host

agent_hostname = datadog_agent.get_hostname()
if not db_host or db_host in {'localhost', '127.0.0.1'} or db_host.startswith('/'):
if _is_local_db_host(db_host):
return agent_hostname

try:
Expand Down
18 changes: 16 additions & 2 deletions datadog_checks_base/tests/base/utils/db/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@
@pytest.mark.parametrize(
"db_host, agent_hostname, want",
[
# Unset host or local hostname
(None, "agent_hostname", "agent_hostname"),
("localhost", "agent_hostname", "agent_hostname"),
# Unix socket
("/var/run/mysqld.sock", "agent_hostname", "agent_hostname"),
# Loopback IP literals (incl. zone-scoped IPv6)
("127.0.0.1", "agent_hostname", "agent_hostname"),
("::1", "agent_hostname", "agent_hostname"),
("::1%lo0", "agent_hostname", "agent_hostname"),
# Non-loopback IP literals keep the configured host
("169.254.169.254", "agent_hostname", "169.254.169.254"),
("fe80::1", "agent_hostname", "fe80::1"),
("fe80::1%eth0", "agent_hostname", "fe80::1%eth0"),
("192.0.2.1", "agent_hostname", "192.0.2.1"),
("2001:db8::1", "agent_hostname", "2001:db8::1"),
# Resolved DB host shares the agent host IP
("192.0.2.1", "192.0.2.1", "192.0.2.1"),
("192.0.2.1", "192.0.2.254", "192.0.2.1"),
# Hostname resolution failures fall back to the configured db_host
("socket.gaierror", "agent_hostname", "socket.gaierror"),
(
"greater-than-or-equal-to-64-characters-causes-unicode-error-----",
Expand All @@ -44,8 +59,7 @@
),
("192.0.2.1", "socket.gaierror", "192.0.2.1"),
("192.0.2.1", "greater-than-or-equal-to-64-characters-causes-unicode-error-----", "192.0.2.1"),
("192.0.2.1", "192.0.2.1", "192.0.2.1"),
("192.0.2.1", "192.0.2.254", "192.0.2.1"),
# mDNS .local names are passed through unchanged
("postgres.svc.local", "some-pod", "postgres.svc.local"),
],
)
Expand Down
Loading
Loading