@@ -450,12 +450,7 @@ impl Lambda {
450450 }
451451 }
452452
453- #[ allow( unreachable_code) ]
454- #[ allow( unused_variables) ]
455- #[ allow( unused_mut) ]
456453 pub fn set_tmp_enhanced_metrics ( & self , mut send_metrics : Receiver < ( ) > ) {
457- // Temporarily disabled
458- return ;
459454 if !self . config . enhanced_metrics {
460455 return ;
461456 }
@@ -475,7 +470,7 @@ impl Lambda {
475470 let tmp_max = bsize * blocks;
476471 let mut tmp_used = bsize * ( blocks - bavail) ;
477472
478- let mut interval = interval ( Duration :: from_millis ( 10 ) ) ;
473+ let mut interval = interval ( Duration :: from_millis ( constants :: MONITORING_INTERVAL ) ) ;
479474 loop {
480475 tokio:: select! {
481476 biased;
@@ -502,9 +497,11 @@ impl Lambda {
502497 } ) ;
503498 }
504499
505- pub fn generate_fd_enhanced_metrics (
500+ pub fn generate_process_metrics (
506501 fd_max : f64 ,
507502 fd_use : f64 ,
503+ threads_max : f64 ,
504+ threads_use : f64 ,
508505 aggr : & mut std:: sync:: MutexGuard < Aggregator > ,
509506 tags : Option < SortedTags > ,
510507 ) {
@@ -522,20 +519,15 @@ impl Lambda {
522519 let metric = Metric :: new (
523520 constants:: FD_USE_METRIC . into ( ) ,
524521 MetricValue :: distribution ( fd_use) ,
525- tags,
522+ tags. clone ( ) ,
526523 ) ;
527524 if let Err ( e) = aggr. insert ( metric) {
528525 error ! ( "Failed to insert fd_use metric: {}" , e) ;
529526 }
527+ } else {
528+ debug ! ( "Could not get file descriptor usage data." ) ;
530529 }
531- }
532530
533- pub fn generate_threads_enhanced_metrics (
534- threads_max : f64 ,
535- threads_use : f64 ,
536- aggr : & mut std:: sync:: MutexGuard < Aggregator > ,
537- tags : Option < SortedTags > ,
538- ) {
539531 let metric = Metric :: new (
540532 constants:: THREADS_MAX_METRIC . into ( ) ,
541533 MetricValue :: distribution ( threads_max) ,
@@ -555,15 +547,12 @@ impl Lambda {
555547 if let Err ( e) = aggr. insert ( metric) {
556548 error ! ( "Failed to insert threads_use metric: {}" , e) ;
557549 }
550+ } else {
551+ debug ! ( "Could not get thread usage data." ) ;
558552 }
559553 }
560554
561- #[ allow( unreachable_code) ]
562- #[ allow( unused_variables) ]
563- #[ allow( unused_mut) ]
564555 pub fn set_process_enhanced_metrics ( & self , mut send_metrics : Receiver < ( ) > ) {
565- // Temporarily disabled
566- return ;
567556 if !self . config . enhanced_metrics {
568557 return ;
569558 }
@@ -573,46 +562,36 @@ impl Lambda {
573562
574563 tokio:: spawn ( async move {
575564 // get list of all process ids
576- let pids = proc:: get_pid_list ( ) ;
565+ let mut pids = proc:: get_pid_list ( ) ;
577566
578- // Set fd_max and initial value for fd_use to -1
567+ // Set fd_max and initial value for fd_use
579568 let fd_max = proc:: get_fd_max_data ( & pids) ;
580- let mut fd_use = -1_f64 ;
569+ let mut fd_use = proc :: get_fd_use_data ( & pids ) . unwrap_or_else ( |_| -1_f64 ) ;
581570
582- // Set threads_max and initial value for threads_use to -1
571+ // Set threads_max and initial value for threads_use
583572 let threads_max = proc:: get_threads_max_data ( & pids) ;
584- let mut threads_use = -1_f64 ;
573+ let mut threads_use = proc :: get_threads_use_data ( & pids ) . unwrap_or_else ( |_| -1_f64 ) ;
585574
586- let mut interval = interval ( Duration :: from_millis ( 1 ) ) ;
575+ let mut interval = interval ( Duration :: from_millis ( constants :: MONITORING_INTERVAL ) ) ;
587576 loop {
588577 tokio:: select! {
589578 biased;
590579 // When the stop signal is received, generate final metrics
591580 _ = send_metrics. changed( ) => {
592581 let mut aggr: std:: sync:: MutexGuard <Aggregator > =
593582 aggr. lock( ) . expect( "lock poisoned" ) ;
594- Self :: generate_fd_enhanced_metrics( fd_max, fd_use, & mut aggr, tags. clone( ) ) ;
595- Self :: generate_threads_enhanced_metrics( threads_max, threads_use, & mut aggr, tags) ;
583+ Self :: generate_process_metrics( fd_max, fd_use, threads_max, threads_use, & mut aggr, tags. clone( ) ) ;
596584 return ;
597585 }
598586 // Otherwise keep monitoring file descriptor and thread usage periodically
599587 _ = interval. tick( ) => {
600- match proc:: get_fd_use_data( & pids) {
601- Ok ( fd_use_curr) => {
602- fd_use = fd_use. max( fd_use_curr) ;
603- } ,
604- Err ( _) => {
605- debug!( "Could not update file descriptor use enhanced metric." ) ;
606- }
607- } ;
608- match proc:: get_threads_use_data( & pids) {
609- Ok ( threads_use_curr) => {
610- threads_use = threads_use. max( threads_use_curr) ;
611- } ,
612- Err ( _) => {
613- debug!( "Could not update threads use enhanced metric." ) ;
614- }
615- } ;
588+ pids = proc:: get_pid_list( ) ;
589+ if let Ok ( fd_use_curr) = proc:: get_fd_use_data( & pids) {
590+ fd_use = fd_use. max( fd_use_curr) ;
591+ }
592+ if let Ok ( threads_use_curr) = proc:: get_threads_use_data( & pids) {
593+ threads_use = threads_use. max( threads_use_curr) ;
594+ }
616595 }
617596 }
618597 }
@@ -1028,84 +1007,56 @@ mod tests {
10281007 }
10291008
10301009 #[ test]
1031- fn test_set_fd_enhanced_metrics_valid_fd_use ( ) {
1010+ fn test_set_process_enhanced_metrics_valid_use ( ) {
10321011 let ( metrics_aggr, my_config) = setup ( ) ;
10331012 let lambda = Lambda :: new ( metrics_aggr. clone ( ) , my_config) ;
10341013
10351014 let fd_max = 1024.0 ;
10361015 let fd_use = 175.0 ;
1016+ let threads_max = 1024.0 ;
1017+ let threads_use = 40.0 ;
10371018
1038- Lambda :: generate_fd_enhanced_metrics (
1019+ Lambda :: generate_process_metrics (
10391020 fd_max,
10401021 fd_use,
1022+ threads_max,
1023+ threads_use,
10411024 & mut lambda. aggregator . lock ( ) . expect ( "lock poisoned" ) ,
10421025 None ,
10431026 ) ;
10441027
10451028 assert_sketch ( & metrics_aggr, constants:: FD_MAX_METRIC , 1024.0 ) ;
10461029 assert_sketch ( & metrics_aggr, constants:: FD_USE_METRIC , 175.0 ) ;
1030+ assert_sketch ( & metrics_aggr, constants:: THREADS_MAX_METRIC , 1024.0 ) ;
1031+ assert_sketch ( & metrics_aggr, constants:: THREADS_USE_METRIC , 40.0 ) ;
10471032 }
10481033
10491034 #[ test]
1050- fn test_set_fd_enhanced_metrics_invalid_fd_use ( ) {
1035+ fn test_set_process_enhanced_metrics_invalid_use ( ) {
10511036 let ( metrics_aggr, my_config) = setup ( ) ;
10521037 let lambda = Lambda :: new ( metrics_aggr. clone ( ) , my_config) ;
10531038
10541039 let fd_max = 1024.0 ;
10551040 let fd_use = -1.0 ;
1041+ let threads_max = 1024.0 ;
1042+ let threads_use = -1.0 ;
10561043
1057- Lambda :: generate_fd_enhanced_metrics (
1044+ Lambda :: generate_process_metrics (
10581045 fd_max,
10591046 fd_use,
1047+ threads_max,
1048+ threads_use,
10601049 & mut lambda. aggregator . lock ( ) . expect ( "lock poisoned" ) ,
10611050 None ,
10621051 ) ;
10631052
10641053 assert_sketch ( & metrics_aggr, constants:: FD_MAX_METRIC , 1024.0 ) ;
1054+ assert_sketch ( & metrics_aggr, constants:: THREADS_MAX_METRIC , 1024.0 ) ;
10651055
10661056 let aggr = lambda. aggregator . lock ( ) . expect ( "lock poisoned" ) ;
10671057 assert ! ( aggr
10681058 . get_entry_by_id( constants:: FD_USE_METRIC . into( ) , & None )
10691059 . is_none( ) ) ;
1070- }
1071-
1072- #[ test]
1073- fn test_set_threads_enhanced_metrics_valid_threads_use ( ) {
1074- let ( metrics_aggr, my_config) = setup ( ) ;
1075- let lambda = Lambda :: new ( metrics_aggr. clone ( ) , my_config) ;
1076-
1077- let threads_max = 1024.0 ;
1078- let threads_use = 40.0 ;
1079-
1080- Lambda :: generate_threads_enhanced_metrics (
1081- threads_max,
1082- threads_use,
1083- & mut lambda. aggregator . lock ( ) . expect ( "lock poisoned" ) ,
1084- None ,
1085- ) ;
1086-
1087- assert_sketch ( & metrics_aggr, constants:: THREADS_MAX_METRIC , 1024.0 ) ;
1088- assert_sketch ( & metrics_aggr, constants:: THREADS_USE_METRIC , 40.0 ) ;
1089- }
1090-
1091- #[ test]
1092- fn test_set_threads_enhanced_metrics_invalid_threads_use ( ) {
1093- let ( metrics_aggr, my_config) = setup ( ) ;
1094- let lambda = Lambda :: new ( metrics_aggr. clone ( ) , my_config) ;
1095-
1096- let threads_max = 1024.0 ;
1097- let threads_use = -1.0 ;
1098-
1099- Lambda :: generate_threads_enhanced_metrics (
1100- threads_max,
1101- threads_use,
1102- & mut lambda. aggregator . lock ( ) . expect ( "lock poisoned" ) ,
1103- None ,
1104- ) ;
1105-
1106- assert_sketch ( & metrics_aggr, constants:: THREADS_MAX_METRIC , 1024.0 ) ;
1107-
1108- let aggr = lambda. aggregator . lock ( ) . expect ( "lock poisoned" ) ;
11091060 assert ! ( aggr
11101061 . get_entry_by_id( constants:: THREADS_USE_METRIC . into( ) , & None )
11111062 . is_none( ) ) ;
0 commit comments