1+ import itertools
12from collections .abc import Generator , Iterable
3+ from datetime import timezone
24
35from prometheus_client import Metric
46from prometheus_client .parser import text_string_to_metric_families
79from sqlalchemy .ext .asyncio import AsyncSession
810from sqlalchemy .orm import joinedload
911
10- from dstack ._internal .core .models .runs import JobStatus
11- from dstack ._internal .server .models import JobModel , JobPrometheusMetrics , ProjectModel
12+ from dstack ._internal .core .models .instances import InstanceStatus
13+ from dstack ._internal .core .models .runs import JobStatus , RunSpec
14+ from dstack ._internal .server .models import (
15+ InstanceModel ,
16+ JobModel ,
17+ JobPrometheusMetrics ,
18+ ProjectModel ,
19+ RunModel ,
20+ )
21+ from dstack ._internal .server .services .instances import get_instance_offer
22+ from dstack ._internal .server .services .jobs import get_job_provisioning_data , get_job_runtime_data
23+ from dstack ._internal .utils .common import get_current_datetime
24+
25+ _INSTANCE_DURATION = "dstack_instance_duration_seconds_total"
26+ _INSTANCE_PRICE = "dstack_instance_price_dollars_per_hour"
27+ _INSTANCE_GPU_COUNT = "dstack_instance_gpu_count"
28+ _JOB_DURATION = "dstack_job_duration_seconds_total"
29+ _JOB_PRICE = "dstack_job_price_dollars_per_hour"
30+ _JOB_GPU_COUNT = "dstack_job_gpu_count"
1231
1332
1433async def get_metrics (session : AsyncSession ) -> str :
34+ metrics_iter = itertools .chain (
35+ await get_instance_metrics (session ),
36+ await get_job_metrics (session ),
37+ await get_job_gpu_metrics (session ),
38+ )
39+ return "\n " .join (_render_metrics (metrics_iter )) + "\n "
40+
41+
42+ async def get_instance_metrics (session : AsyncSession ) -> Iterable [Metric ]:
43+ res = await session .execute (
44+ select (InstanceModel )
45+ .join (ProjectModel )
46+ .where (
47+ InstanceModel .deleted == False ,
48+ InstanceModel .status .in_ (
49+ [
50+ InstanceStatus .PROVISIONING ,
51+ InstanceStatus .IDLE ,
52+ InstanceStatus .BUSY ,
53+ InstanceStatus .TERMINATING ,
54+ ]
55+ ),
56+ )
57+ .order_by (ProjectModel .name , InstanceModel .name )
58+ .options (
59+ joinedload (InstanceModel .project ),
60+ joinedload (InstanceModel .fleet ),
61+ )
62+ )
63+ instances = res .unique ().scalars ().all ()
64+ metrics : dict [str , Metric ] = {
65+ _INSTANCE_DURATION : Metric (
66+ name = _INSTANCE_DURATION ,
67+ documentation = "Total seconds the instance is running" ,
68+ typ = "counter" ,
69+ ),
70+ _INSTANCE_PRICE : Metric (
71+ name = _INSTANCE_PRICE , documentation = "Instance price, USD/hour" , typ = "gauge"
72+ ),
73+ _INSTANCE_GPU_COUNT : Metric (
74+ name = _INSTANCE_GPU_COUNT , documentation = "Instance GPU count" , typ = "gauge"
75+ ),
76+ }
77+ now = get_current_datetime ()
78+ for instance in instances :
79+ fleet = instance .fleet
80+ offer = get_instance_offer (instance )
81+ gpu = ""
82+ gpu_count = 0
83+ if offer is not None and len (offer .instance .resources .gpus ) > 0 :
84+ gpu = offer .instance .resources .gpus [0 ].name
85+ gpu_count = len (offer .instance .resources .gpus )
86+ labels : dict [str , str ] = {
87+ "dstack_project_name" : instance .project .name ,
88+ "dstack_fleet_name" : fleet .name if fleet is not None else "" ,
89+ "dstack_fleet_id" : str (fleet .id ) if fleet is not None else "" ,
90+ "dstack_instance_name" : str (instance .name ),
91+ "dstack_instance_id" : str (instance .id ),
92+ "dstack_instance_type" : offer .instance .name if offer is not None else "" ,
93+ "dstack_backend" : instance .backend .value if instance .backend is not None else "" ,
94+ "dstack_gpu" : gpu ,
95+ }
96+ duration = (now - instance .created_at .replace (tzinfo = timezone .utc )).total_seconds ()
97+ metrics [_INSTANCE_DURATION ].add_sample (
98+ name = _INSTANCE_DURATION , labels = labels , value = duration
99+ )
100+ metrics [_INSTANCE_PRICE ].add_sample (
101+ name = _INSTANCE_PRICE , labels = labels , value = instance .price or 0.0
102+ )
103+ metrics [_INSTANCE_GPU_COUNT ].add_sample (
104+ name = _INSTANCE_GPU_COUNT , labels = labels , value = gpu_count
105+ )
106+ return metrics .values ()
107+
108+
109+ async def get_job_metrics (session : AsyncSession ) -> Iterable [Metric ]:
110+ res = await session .execute (
111+ select (JobModel )
112+ .join (ProjectModel )
113+ .where (
114+ JobModel .status .in_ (
115+ [
116+ JobStatus .PROVISIONING ,
117+ JobStatus .PULLING ,
118+ JobStatus .RUNNING ,
119+ JobStatus .TERMINATING ,
120+ ]
121+ )
122+ )
123+ .order_by (ProjectModel .name , JobModel .job_name )
124+ .options (
125+ joinedload (JobModel .project ),
126+ joinedload (JobModel .run ).joinedload (RunModel .user ),
127+ )
128+ )
129+ jobs = res .scalars ().all ()
130+ metrics : dict [str , Metric ] = {
131+ _JOB_DURATION : Metric (
132+ name = _JOB_DURATION , documentation = "Total seconds the job is running" , typ = "counter"
133+ ),
134+ _JOB_PRICE : Metric (
135+ name = _JOB_PRICE , documentation = "Job instance price, USD/hour" , typ = "gauge"
136+ ),
137+ _JOB_GPU_COUNT : Metric (name = _JOB_GPU_COUNT , documentation = "Job GPU count" , typ = "gauge" ),
138+ }
139+ now = get_current_datetime ()
140+ for job in jobs :
141+ jpd = get_job_provisioning_data (job )
142+ if jpd is None :
143+ continue
144+ jrd = get_job_runtime_data (job )
145+ gpus = jpd .instance_type .resources .gpus
146+ price = jpd .price
147+ if jrd is not None and jrd .offer is not None :
148+ gpus = jrd .offer .instance .resources .gpus
149+ price = jrd .offer .price
150+ run_spec = RunSpec .__response__ .parse_raw (job .run .run_spec )
151+ labels = _get_job_labels (job )
152+ labels ["dstack_run_type" ] = run_spec .configuration .type
153+ labels ["dstack_backend" ] = jpd .get_base_backend ().value
154+ labels ["dstack_gpu" ] = gpus [0 ].name if gpus else ""
155+ duration = (now - job .submitted_at .replace (tzinfo = timezone .utc )).total_seconds ()
156+ metrics [_JOB_DURATION ].add_sample (name = _JOB_DURATION , labels = labels , value = duration )
157+ metrics [_JOB_PRICE ].add_sample (name = _JOB_PRICE , labels = labels , value = price )
158+ metrics [_JOB_GPU_COUNT ].add_sample (name = _JOB_GPU_COUNT , labels = labels , value = len (gpus ))
159+ return metrics .values ()
160+
161+
162+ async def get_job_gpu_metrics (session : AsyncSession ) -> Iterable [Metric ]:
15163 res = await session .execute (
16164 select (JobPrometheusMetrics )
17165 .join (JobModel )
18166 .join (ProjectModel )
19167 .where (JobModel .status .in_ ([JobStatus .RUNNING ]))
20168 .order_by (ProjectModel .name , JobModel .job_name )
21- .options (joinedload (JobPrometheusMetrics .job ).joinedload (JobModel .project ))
169+ .options (
170+ joinedload (JobPrometheusMetrics .job ).joinedload (JobModel .project ),
171+ joinedload (JobPrometheusMetrics .job )
172+ .joinedload (JobModel .run )
173+ .joinedload (RunModel .user ),
174+ )
22175 )
23176 metrics_models = res .scalars ().all ()
24- return _process_metrics (metrics_models )
177+ return _parse_and_enrich_job_gpu_metrics (metrics_models )
25178
26179
27180async def get_project_metrics (session : AsyncSession , project : ProjectModel ) -> str :
@@ -33,20 +186,20 @@ async def get_project_metrics(session: AsyncSession, project: ProjectModel) -> s
33186 JobModel .status .in_ ([JobStatus .RUNNING ]),
34187 )
35188 .order_by (JobModel .job_name )
36- .options (joinedload (JobPrometheusMetrics .job ).joinedload (JobModel .project ))
189+ .options (
190+ joinedload (JobPrometheusMetrics .job ).joinedload (JobModel .project ),
191+ joinedload (JobPrometheusMetrics .job )
192+ .joinedload (JobModel .run )
193+ .joinedload (RunModel .user ),
194+ )
37195 )
38196 metrics_models = res .scalars ().all ()
39- return _process_metrics (metrics_models )
40-
41-
42- def _process_metrics (metrics_models : Iterable [JobPrometheusMetrics ]) -> str :
43- metrics = _parse_and_enrich_metrics (metrics_models )
44- if not metrics :
45- return ""
46- return "\n " .join (_render_metrics (metrics )) + "\n "
197+ return "\n " .join (_render_metrics (_parse_and_enrich_job_gpu_metrics (metrics_models ))) + "\n "
47198
48199
49- def _parse_and_enrich_metrics (metrics_models : Iterable [JobPrometheusMetrics ]) -> list [Metric ]:
200+ def _parse_and_enrich_job_gpu_metrics (
201+ metrics_models : Iterable [JobPrometheusMetrics ],
202+ ) -> Iterable [Metric ]:
50203 metrics : dict [str , Metric ] = {}
51204 for metrics_model in metrics_models :
52205 for metric in text_string_to_metric_families (metrics_model .text ):
@@ -56,31 +209,36 @@ def _parse_and_enrich_metrics(metrics_models: Iterable[JobPrometheusMetrics]) ->
56209 metric = metrics .setdefault (name , metric )
57210 for sample in samples :
58211 labels = sample .labels
59- labels .update (_get_dstack_labels (metrics_model .job ))
212+ labels .update (_get_job_labels (metrics_model .job ))
60213 # text_string_to_metric_families "fixes" counter names appending _total,
61214 # we rebuild Sample to revert this
62215 metric .samples .append (Sample (name , labels , * sample [2 :]))
63- return list ( metrics .values () )
216+ return metrics .values ()
64217
65218
66- def _get_dstack_labels (job : JobModel ) -> dict [str , str ]:
219+ def _get_job_labels (job : JobModel ) -> dict [str , str ]:
67220 return {
68221 "dstack_project_name" : job .project .name ,
222+ "dstack_user_name" : job .run .user .name ,
69223 "dstack_run_name" : job .run_name ,
224+ "dstack_run_id" : str (job .run_id ),
70225 "dstack_job_name" : job .job_name ,
226+ "dstack_job_id" : str (job .id ),
71227 "dstack_job_num" : str (job .job_num ),
72228 "dstack_replica_num" : str (job .replica_num ),
73229 }
74230
75231
76232def _render_metrics (metrics : Iterable [Metric ]) -> Generator [str , None , None ]:
77233 for metric in metrics :
234+ if not metric .samples :
235+ continue
78236 yield f"# HELP { metric .name } { metric .documentation } "
79237 yield f"# TYPE { metric .name } { metric .type } "
80238 for sample in metric .samples :
81239 parts : list [str ] = [f"{ sample .name } {{" ]
82240 parts .extend ("," .join (f'{ name } ="{ value } "' for name , value in sample .labels .items ()))
83- parts .append (f"}} { sample .value } " )
241+ parts .append (f"}} { float ( sample .value ) } " )
84242 # text_string_to_metric_families converts milliseconds to float seconds
85243 if isinstance (sample .timestamp , float ):
86244 parts .append (f" { int (sample .timestamp * 1000 )} " )
0 commit comments