11use axum:: extract:: State ;
22use axum:: response:: IntoResponse ;
33use std:: collections:: HashMap ;
4- use std:: fmt:: Write ;
4+ use std:: fmt:: { Display , Write } ;
55use std:: sync:: Arc ;
66
7- use super :: state:: { AppState , ReviewStatus } ;
7+ use super :: state:: { AppState , ReviewStatus , MAX_CONCURRENT_REVIEWS } ;
8+
9+ #[ derive( Clone , Copy ) ]
10+ struct LongRunningJobMetrics {
11+ job_type : & ' static str ,
12+ queue_depth : u64 ,
13+ worker_capacity : u64 ,
14+ workers_active : u64 ,
15+ workers_available : u64 ,
16+ }
17+
18+ impl LongRunningJobMetrics {
19+ const fn new (
20+ job_type : & ' static str ,
21+ queue_depth : u64 ,
22+ worker_capacity : u64 ,
23+ workers_active : u64 ,
24+ workers_available : u64 ,
25+ ) -> Self {
26+ Self {
27+ job_type,
28+ queue_depth,
29+ worker_capacity,
30+ workers_active,
31+ workers_available,
32+ }
33+ }
34+
35+ fn worker_saturation_ratio ( self ) -> f64 {
36+ if self . worker_capacity == 0 {
37+ 0.0
38+ } else {
39+ self . workers_active as f64 / self . worker_capacity as f64
40+ }
41+ }
42+ }
843
944pub async fn get_metrics ( State ( state) : State < Arc < AppState > > ) -> impl IntoResponse {
1045 let reviews = state. reviews . read ( ) . await ;
@@ -69,6 +104,23 @@ pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoRespons
69104
70105 drop ( reviews) ;
71106
107+ let review_worker_capacity = MAX_CONCURRENT_REVIEWS as u64 ;
108+ let review_workers_available = state
109+ . review_semaphore
110+ . available_permits ( )
111+ . min ( MAX_CONCURRENT_REVIEWS ) as u64 ;
112+ let review_workers_active = review_worker_capacity. saturating_sub ( review_workers_available) ;
113+ let long_running_job_metrics = [
114+ LongRunningJobMetrics :: new (
115+ "review" ,
116+ pending,
117+ review_worker_capacity,
118+ review_workers_active,
119+ review_workers_available,
120+ ) ,
121+ LongRunningJobMetrics :: new ( "eval" , 0 , 0 , 0 , 0 ) ,
122+ ] ;
123+
72124 let mut buf = String :: with_capacity ( 4096 ) ;
73125
74126 write_metric (
@@ -170,6 +222,81 @@ pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoRespons
170222 total_completion_tokens,
171223 ) ;
172224
225+ write_metric_header (
226+ & mut buf,
227+ "diffscope_job_queue_depth" ,
228+ "Current queue depth for long-running jobs by job type" ,
229+ "gauge" ,
230+ ) ;
231+ for metric in & long_running_job_metrics {
232+ write_labeled_metric (
233+ & mut buf,
234+ "diffscope_job_queue_depth" ,
235+ & [ ( "job_type" , metric. job_type ) ] ,
236+ metric. queue_depth ,
237+ ) ;
238+ }
239+
240+ write_metric_header (
241+ & mut buf,
242+ "diffscope_job_worker_capacity" ,
243+ "Worker capacity for long-running jobs by job type" ,
244+ "gauge" ,
245+ ) ;
246+ for metric in & long_running_job_metrics {
247+ write_labeled_metric (
248+ & mut buf,
249+ "diffscope_job_worker_capacity" ,
250+ & [ ( "job_type" , metric. job_type ) ] ,
251+ metric. worker_capacity ,
252+ ) ;
253+ }
254+
255+ write_metric_header (
256+ & mut buf,
257+ "diffscope_job_workers_active" ,
258+ "Active workers handling long-running jobs by job type" ,
259+ "gauge" ,
260+ ) ;
261+ for metric in & long_running_job_metrics {
262+ write_labeled_metric (
263+ & mut buf,
264+ "diffscope_job_workers_active" ,
265+ & [ ( "job_type" , metric. job_type ) ] ,
266+ metric. workers_active ,
267+ ) ;
268+ }
269+
270+ write_metric_header (
271+ & mut buf,
272+ "diffscope_job_workers_available" ,
273+ "Available workers for long-running jobs by job type" ,
274+ "gauge" ,
275+ ) ;
276+ for metric in & long_running_job_metrics {
277+ write_labeled_metric (
278+ & mut buf,
279+ "diffscope_job_workers_available" ,
280+ & [ ( "job_type" , metric. job_type ) ] ,
281+ metric. workers_available ,
282+ ) ;
283+ }
284+
285+ write_metric_header (
286+ & mut buf,
287+ "diffscope_job_worker_saturation_ratio" ,
288+ "Worker saturation ratio for long-running jobs by job type" ,
289+ "gauge" ,
290+ ) ;
291+ for metric in & long_running_job_metrics {
292+ write_labeled_metric (
293+ & mut buf,
294+ "diffscope_job_worker_saturation_ratio" ,
295+ & [ ( "job_type" , metric. job_type ) ] ,
296+ metric. worker_saturation_ratio ( ) ,
297+ ) ;
298+ }
299+
173300 // Per-severity comment counts
174301 let _ = writeln ! (
175302 buf,
@@ -217,15 +344,38 @@ pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoRespons
217344 )
218345}
219346
220- fn write_metric ( buf : & mut String , name : & str , help : & str , metric_type : & str , value : u64 ) {
347+ fn write_metric_header ( buf : & mut String , name : & str , help : & str , metric_type : & str ) {
221348 let _ = writeln ! ( buf, "# HELP {name} {help}" ) ;
222349 let _ = writeln ! ( buf, "# TYPE {name} {metric_type}" ) ;
350+ }
351+
352+ fn write_metric < T : Display > ( buf : & mut String , name : & str , help : & str , metric_type : & str , value : T ) {
353+ write_metric_header ( buf, name, help, metric_type) ;
223354 let _ = writeln ! ( buf, "{name} {value}" ) ;
224355}
225356
357+ fn write_labeled_metric < T : Display > (
358+ buf : & mut String ,
359+ name : & str ,
360+ labels : & [ ( & str , & str ) ] ,
361+ value : T ,
362+ ) {
363+ let mut label_buf = String :: new ( ) ;
364+ for ( index, ( key, raw_value) ) in labels. iter ( ) . enumerate ( ) {
365+ if index > 0 {
366+ label_buf. push ( ',' ) ;
367+ }
368+ let _ = write ! ( label_buf, "{key}=\" {raw_value}\" " ) ;
369+ }
370+
371+ let _ = writeln ! ( buf, "{name}{{{label_buf}}} {value}" ) ;
372+ }
373+
226374#[ cfg( test) ]
227375mod tests {
228376 use super :: * ;
377+ use crate :: server:: state:: ReviewSession ;
378+ use axum:: body:: to_bytes;
229379
230380 #[ test]
231381 fn test_write_metric_format ( ) {
@@ -235,4 +385,85 @@ mod tests {
235385 assert ! ( buf. contains( "# TYPE test_metric gauge\n " ) ) ;
236386 assert ! ( buf. contains( "test_metric 42\n " ) ) ;
237387 }
388+
389+ #[ test]
390+ fn test_write_labeled_metric_format ( ) {
391+ let mut buf = String :: new ( ) ;
392+ write_metric_header ( & mut buf, "test_metric" , "A labeled metric" , "gauge" ) ;
393+ write_labeled_metric ( & mut buf, "test_metric" , & [ ( "job_type" , "review" ) ] , 0.4 ) ;
394+
395+ assert ! ( buf. contains( "# HELP test_metric A labeled metric\n " ) ) ;
396+ assert ! ( buf. contains( "# TYPE test_metric gauge\n " ) ) ;
397+ assert ! ( buf. contains( "test_metric{job_type=\" review\" } 0.4\n " ) ) ;
398+ }
399+
400+ fn make_session ( id : & str , status : ReviewStatus ) -> ReviewSession {
401+ ReviewSession {
402+ id : id. to_string ( ) ,
403+ status,
404+ diff_source : "head" . to_string ( ) ,
405+ github_head_sha : None ,
406+ github_post_results_requested : None ,
407+ started_at : 1 ,
408+ completed_at : None ,
409+ comments : Vec :: new ( ) ,
410+ summary : None ,
411+ files_reviewed : 0 ,
412+ error : None ,
413+ pr_summary_text : None ,
414+ diff_content : None ,
415+ event : None ,
416+ progress : None ,
417+ }
418+ }
419+
420+ #[ tokio:: test]
421+ async fn metrics_include_queue_depth_and_worker_saturation_gauges ( ) {
422+ let storage_path = std:: path:: PathBuf :: from ( "metrics-test-reviews.json" ) ;
423+ let config_path = std:: path:: PathBuf :: from ( "metrics-test-config.json" ) ;
424+ let storage = crate :: server:: storage_json:: JsonStorageBackend :: new ( & storage_path) ;
425+
426+ let mut reviews = HashMap :: new ( ) ;
427+ reviews. insert (
428+ "running" . to_string ( ) ,
429+ make_session ( "running" , ReviewStatus :: Running ) ,
430+ ) ;
431+ reviews. insert (
432+ "pending-1" . to_string ( ) ,
433+ make_session ( "pending-1" , ReviewStatus :: Pending ) ,
434+ ) ;
435+ reviews. insert (
436+ "pending-2" . to_string ( ) ,
437+ make_session ( "pending-2" , ReviewStatus :: Pending ) ,
438+ ) ;
439+
440+ let state = Arc :: new ( AppState {
441+ config : Arc :: new ( tokio:: sync:: RwLock :: new ( crate :: config:: Config :: default ( ) ) ) ,
442+ repo_path : std:: path:: PathBuf :: from ( "." ) ,
443+ reviews : Arc :: new ( tokio:: sync:: RwLock :: new ( reviews) ) ,
444+ storage : Arc :: new ( storage) ,
445+ storage_path,
446+ config_path,
447+ http_client : reqwest:: Client :: new ( ) ,
448+ review_semaphore : Arc :: new ( tokio:: sync:: Semaphore :: new ( MAX_CONCURRENT_REVIEWS ) ) ,
449+ last_reviewed_shas : Arc :: new ( tokio:: sync:: RwLock :: new ( HashMap :: new ( ) ) ) ,
450+ } ) ;
451+ let _permit = state
452+ . review_semaphore
453+ . clone ( )
454+ . acquire_owned ( )
455+ . await
456+ . unwrap ( ) ;
457+
458+ let response = get_metrics ( State ( state) ) . await . into_response ( ) ;
459+ let body = to_bytes ( response. into_body ( ) , usize:: MAX ) . await . unwrap ( ) ;
460+ let text = String :: from_utf8 ( body. to_vec ( ) ) . unwrap ( ) ;
461+
462+ assert ! ( text. contains( "diffscope_job_queue_depth{job_type=\" review\" } 2\n " ) ) ;
463+ assert ! ( text. contains( "diffscope_job_queue_depth{job_type=\" eval\" } 0\n " ) ) ;
464+ assert ! ( text. contains( "diffscope_job_worker_capacity{job_type=\" review\" } 5\n " ) ) ;
465+ assert ! ( text. contains( "diffscope_job_workers_active{job_type=\" review\" } 1\n " ) ) ;
466+ assert ! ( text. contains( "diffscope_job_workers_available{job_type=\" review\" } 4\n " ) ) ;
467+ assert ! ( text. contains( "diffscope_job_worker_saturation_ratio{job_type=\" review\" } 0.2\n " ) ) ;
468+ }
238469}
0 commit comments