@@ -109,6 +109,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
109109 activity ( UpdateMetricsInput {
110110 client_id,
111111 flavor,
112+ draining : state. drain_timeout_ts . is_some ( ) ,
112113 clear : false ,
113114 } ) ,
114115 ) )
@@ -125,6 +126,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
125126 activity ( UpdateMetricsInput {
126127 client_id,
127128 flavor,
129+ draining : state. drain_timeout_ts . is_some ( ) ,
128130 clear : false ,
129131 } ) ,
130132 ) )
@@ -254,6 +256,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
254256 ctx. activity ( UpdateMetricsInput {
255257 client_id : input. client_id ,
256258 flavor : input. flavor ,
259+ draining : false ,
257260 clear : true ,
258261 } )
259262 . await ?;
@@ -691,6 +694,7 @@ pub async fn handle_commands(
691694 activity ( UpdateMetricsInput {
692695 client_id,
693696 flavor,
697+ draining : drain_timeout_ts. is_some ( ) ,
694698 clear : false ,
695699 } ) ,
696700 ) )
@@ -933,24 +937,47 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe
933937struct UpdateMetricsInput {
934938 client_id : Uuid ,
935939 flavor : ClientFlavor ,
940+ #[ serde( default ) ]
941+ draining : bool ,
936942 clear : bool ,
937943}
938944
939945#[ activity( UpdateMetrics ) ]
940946async fn update_metrics ( ctx : & ActivityCtx , input : & UpdateMetricsInput ) -> GlobalResult < ( ) > {
941947 if input. clear {
942948 metrics:: CLIENT_MEMORY_ALLOCATED
943- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
949+ . with_label_values ( & [
950+ & input. client_id . to_string ( ) ,
951+ & input. flavor . to_string ( ) ,
952+ "active" ,
953+ ] )
954+ . set ( 0 ) ;
955+ metrics:: CLIENT_CPU_ALLOCATED
956+ . with_label_values ( & [
957+ & input. client_id . to_string ( ) ,
958+ & input. flavor . to_string ( ) ,
959+ "active" ,
960+ ] )
961+ . set ( 0 ) ;
962+ metrics:: CLIENT_MEMORY_ALLOCATED
963+ . with_label_values ( & [
964+ & input. client_id . to_string ( ) ,
965+ & input. flavor . to_string ( ) ,
966+ "draining" ,
967+ ] )
944968 . set ( 0 ) ;
945-
946969 metrics:: CLIENT_CPU_ALLOCATED
947- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
970+ . with_label_values ( & [
971+ & input. client_id . to_string ( ) ,
972+ & input. flavor . to_string ( ) ,
973+ "draining" ,
974+ ] )
948975 . set ( 0 ) ;
949976
950977 return Ok ( ( ) ) ;
951978 }
952979
953- let ( total_mem, total_cpu , remaining_mem , remaining_cpu) =
980+ let ( total_mem, remaining_mem , total_cpu , remaining_cpu) =
954981 ctx. fdb ( )
955982 . await ?
956983 . run ( |tx, _mc| async move {
@@ -992,35 +1019,80 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
9921019 )
9931020 . map_err ( |x| fdb:: FdbBindingError :: CustomError ( x. into ( ) ) ) ?;
9941021
995- Ok ( (
996- total_mem,
997- remaining_mem,
998- total_cpu,
999- remaining_cpu,
1000- ) )
1022+ Ok ( ( total_mem, remaining_mem, total_cpu, remaining_cpu) )
10011023 } )
10021024 . custom_instrument ( tracing:: info_span!( "client_update_metrics_tx" ) )
10031025 . await ?;
10041026
1027+ let ( state, other_state) = if input. draining {
1028+ ( "draining" , "active" )
1029+ } else {
1030+ ( "active" , "draining" )
1031+ } ;
1032+ let allocated_mem = total_mem. saturating_sub ( remaining_mem) ;
1033+ let allocated_cpu = total_cpu. saturating_sub ( remaining_cpu) ;
1034+
10051035 metrics:: CLIENT_MEMORY_TOTAL
1006- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1036+ . with_label_values ( & [
1037+ & input. client_id . to_string ( ) ,
1038+ & input. flavor . to_string ( ) ,
1039+ state,
1040+ ] )
10071041 . set ( total_mem. try_into ( ) ?) ;
1008-
10091042 metrics:: CLIENT_CPU_TOTAL
1010- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1043+ . with_label_values ( & [
1044+ & input. client_id . to_string ( ) ,
1045+ & input. flavor . to_string ( ) ,
1046+ state,
1047+ ] )
10111048 . set ( total_cpu. try_into ( ) ?) ;
10121049
1013- let allocated_mem = total_mem. saturating_sub ( remaining_mem) ;
1014- let allocated_cpu = total_cpu. saturating_sub ( remaining_cpu) ;
1015-
10161050 metrics:: CLIENT_MEMORY_ALLOCATED
1017- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1051+ . with_label_values ( & [
1052+ & input. client_id . to_string ( ) ,
1053+ & input. flavor . to_string ( ) ,
1054+ state,
1055+ ] )
10181056 . set ( allocated_mem. try_into ( ) ?) ;
1019-
10201057 metrics:: CLIENT_CPU_ALLOCATED
1021- . with_label_values ( & [ & input. client_id . to_string ( ) , & input. flavor . to_string ( ) ] )
1058+ . with_label_values ( & [
1059+ & input. client_id . to_string ( ) ,
1060+ & input. flavor . to_string ( ) ,
1061+ state,
1062+ ] )
10221063 . set ( allocated_cpu. try_into ( ) ?) ;
10231064
1065+ // Clear other state
1066+ metrics:: CLIENT_MEMORY_TOTAL
1067+ . with_label_values ( & [
1068+ & input. client_id . to_string ( ) ,
1069+ & input. flavor . to_string ( ) ,
1070+ other_state,
1071+ ] )
1072+ . set ( 0 ) ;
1073+ metrics:: CLIENT_CPU_TOTAL
1074+ . with_label_values ( & [
1075+ & input. client_id . to_string ( ) ,
1076+ & input. flavor . to_string ( ) ,
1077+ other_state,
1078+ ] )
1079+ . set ( 0 ) ;
1080+
1081+ metrics:: CLIENT_MEMORY_ALLOCATED
1082+ . with_label_values ( & [
1083+ & input. client_id . to_string ( ) ,
1084+ & input. flavor . to_string ( ) ,
1085+ other_state,
1086+ ] )
1087+ . set ( 0 ) ;
1088+ metrics:: CLIENT_CPU_ALLOCATED
1089+ . with_label_values ( & [
1090+ & input. client_id . to_string ( ) ,
1091+ & input. flavor . to_string ( ) ,
1092+ other_state,
1093+ ] )
1094+ . set ( 0 ) ;
1095+
10241096 Ok ( ( ) )
10251097}
10261098
0 commit comments