Skip to content

Commit fe620f8

Browse files
authored
Tag Kueue queues from workloadmeta (#23999)
* Tag Kueue queues from workloadmeta * Add Kueue tagger changelog * test fixes * Fix linter * Tag LocalQueue resource metrics from workloadmeta. * Cover LocalQueue resource tag enrichment. * Reset Kueue tagger stub between tests. * Cover scoped Kueue tagger enrichment.
1 parent 1a752a9 commit fe620f8

4 files changed

Lines changed: 93 additions & 0 deletions

File tree

kueue/changelog.d/23999.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add Kueue queue and resource flavor tag enrichment from the Agent tagger.

kueue/datadog_checks/kueue/check.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import re
55

66
from datadog_checks.base import OpenMetricsBaseCheckV2
7+
from datadog_checks.base.checks.openmetrics.v2.scraper import OpenMetricsScraper
78
from datadog_checks.base.checks.openmetrics.v2.transform import get_native_dynamic_transformer
9+
from datadog_checks.base.utils.tagging import tagger
810

911
from .config_models import ConfigMixin
1012
from .metrics import LOCAL_QUEUE_METRIC_MAP, METRIC_MAP, RESOURCE_METRIC_MAP
@@ -19,6 +21,8 @@
1921
}
2022

2123
OTHER_RESOURCE_NAME = 'other'
24+
KUEUE_QUEUE_ENTITY_PREFIX = 'kubernetes_kueue_queue://'
25+
KUEUE_RESOURCE_FLAVOR_ENTITY_PREFIX = 'kueue_resource_flavor://'
2226

2327
DEFAULT_RENAME_LABELS = {
2428
'cluster_queue': 'kueue_cluster_queue',
@@ -38,6 +42,9 @@ def __init__(self, name, init_config, instances):
3842
def get_default_config(self):
3943
return {'metrics': [METRIC_MAP]}
4044

45+
def create_scraper(self, config):
46+
return KueueOpenMetricsScraper(self, self.get_config_with_defaults(config))
47+
4148
def configure_scrapers(self):
4249
super().configure_scrapers()
4350

@@ -108,3 +115,33 @@ def rename_local_queue_tag(tags: list[str]) -> list[str]:
108115
@staticmethod
109116
def normalize_resource_name(resource_name: str) -> str:
110117
return resource_name.replace('/', '.').replace('-', '_')
118+
119+
120+
class KueueOpenMetricsScraper(OpenMetricsScraper):
121+
def generate_sample_data(self, metric):
122+
for sample, tags, hostname in super().generate_sample_data(metric):
123+
tags.extend(self.get_queue_tagger_tags(metric, sample.labels))
124+
yield sample, tags, hostname
125+
126+
@staticmethod
127+
def get_queue_tagger_tags(metric, labels) -> list[str]:
128+
tags = []
129+
130+
if cluster_queue := labels.get('cluster_queue'):
131+
tags.extend(
132+
tagger.tag(f'{KUEUE_QUEUE_ENTITY_PREFIX}clusterqueue//{cluster_queue}', tagger.ORCHESTRATOR) or []
133+
)
134+
135+
if metric.name in LOCAL_QUEUE_METRIC_MAP or RESOURCE_METRIC_MAP.get(metric.name, '').startswith('local_queue.'):
136+
namespace = labels.get('namespace')
137+
local_queue = labels.get('name')
138+
if namespace and local_queue:
139+
tags.extend(
140+
tagger.tag(f'{KUEUE_QUEUE_ENTITY_PREFIX}localqueue/{namespace}/{local_queue}', tagger.ORCHESTRATOR)
141+
or []
142+
)
143+
144+
if flavor := labels.get('flavor'):
145+
tags.extend(tagger.tag(f'{KUEUE_RESOURCE_FLAVOR_ENTITY_PREFIX}{flavor}', tagger.ORCHESTRATOR) or [])
146+
147+
return tags

kueue/tests/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pytest
99

10+
from datadog_checks.base.stubs import tagger
1011
from datadog_checks.dev import get_here
1112
from datadog_checks.dev.kind import kind_run
1213
from datadog_checks.dev.kube_port_forward import port_forward
@@ -19,6 +20,13 @@
1920
KUEUE_NAMESPACE = 'kueue-system' # hardcoded in the Kueue manifests
2021

2122

23+
@pytest.fixture(autouse=True)
24+
def reset_tagger():
25+
tagger.reset()
26+
yield
27+
tagger.reset()
28+
29+
2230
def wait_for_controller():
2331
run_command(
2432
[

kueue/tests/test_unit.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66

7+
from datadog_checks.base.stubs import tagger
78
from datadog_checks.dev.utils import get_metadata_metrics
89
from datadog_checks.kueue import KueueCheck
910
from datadog_checks.kueue.check import OTHER_RESOURCE_NAME, RESOURCE_NAME_MAP
@@ -64,6 +65,48 @@ def test_check(dd_run_check, aggregator, instance, mock_http_response):
6465
)
6566

6667

68+
def test_queue_tagger_tags(dd_run_check, aggregator, instance, mock_http_response):
69+
mock_http_response(file_path=get_fixture_path('metrics.txt'))
70+
tagger.set_tags(
71+
{
72+
'kubernetes_kueue_queue://clusterqueue//cluster-queue': ['cluster_queue_tag:value'],
73+
'kubernetes_kueue_queue://localqueue/default/user-queue': ['local_queue_tag:value'],
74+
'kueue_resource_flavor://default-flavor': ['resource_flavor_tag:value'],
75+
}
76+
)
77+
78+
check = KueueCheck('kueue', {}, [instance])
79+
dd_run_check(check)
80+
81+
aggregator.assert_metric_has_tag('kueue.pending_workloads', 'cluster_queue_tag:value')
82+
aggregator.assert_metric_has_tag('kueue.cluster_queue.resource_usage.gpu', 'cluster_queue_tag:value')
83+
aggregator.assert_metric_has_tag('kueue.cluster_queue.resource_usage.gpu', 'resource_flavor_tag:value')
84+
aggregator.assert_metric_has_tag('kueue.local_queue.pending_workloads', 'cluster_queue_tag:value')
85+
aggregator.assert_metric_has_tag('kueue.local_queue.pending_workloads', 'local_queue_tag:value')
86+
aggregator.assert_metric_has_tag('kueue.local_queue.resource_reservation.cpu', 'local_queue_tag:value')
87+
aggregator.assert_metric_has_tag('kueue.local_queue.resource_usage.cpu', 'local_queue_tag:value')
88+
tagger.assert_called('kubernetes_kueue_queue://clusterqueue//cluster-queue', tagger.ORCHESTRATOR)
89+
tagger.assert_called('kubernetes_kueue_queue://localqueue/default/user-queue', tagger.ORCHESTRATOR)
90+
tagger.assert_called('kueue_resource_flavor://default-flavor', tagger.ORCHESTRATOR)
91+
92+
93+
def test_queue_tagger_tags_are_scoped(dd_run_check, aggregator, instance, mock_http_response):
94+
mock_http_response(file_path=get_fixture_path('metrics.txt'))
95+
tagger.set_tags(
96+
{
97+
'kubernetes_kueue_queue://clusterqueue//cluster-queue': ['cluster_queue_tag:value'],
98+
}
99+
)
100+
101+
check = KueueCheck('kueue', {}, [instance])
102+
dd_run_check(check)
103+
104+
go_goroutines_tags = _get_metric_tags(aggregator, 'kueue.go.goroutines')
105+
local_queue_tags = _get_metric_tags(aggregator, 'kueue.local_queue.pending_workloads')
106+
assert 'cluster_queue_tag:value' not in go_goroutines_tags
107+
assert 'local_queue_tag:value' not in local_queue_tags
108+
109+
67110
def test_resource_name_map(dd_run_check, aggregator, instance, mock_http_response):
68111
mock_http_response(file_path=get_fixture_path('metrics.txt'))
69112
instance = {
@@ -95,3 +138,7 @@ def test_empty_instance(dd_run_check):
95138
):
96139
check = KueueCheck('kueue', {}, [{}])
97140
dd_run_check(check)
141+
142+
143+
def _get_metric_tags(aggregator, metric_name):
144+
return {tag for metric in aggregator.metrics(metric_name) for tag in metric.tags}

0 commit comments

Comments
 (0)