@@ -3,157 +3,148 @@ package metrics
33import (
44 "context"
55 "fmt"
6+ "log"
7+ "time"
8+
69 "github.com/prometheus/common/model"
710 "github.com/serverledge-faas/serverledge/internal/config"
811 "github.com/serverledge-faas/serverledge/internal/node"
9- "log"
10- "time"
1112
1213 promapi "github.com/prometheus/client_golang/api"
1314 v1 "github.com/prometheus/client_golang/api/prometheus/v1"
1415)
1516
1617var retrievedMetrics RetrievedMetrics
1718
18- func retrieveSingleValue (query string , api v1.API , ctx context.Context ) (float64 , error ) {
19+ type metricSample struct {
20+ Value float64
21+ Labels map [string ]string
22+ }
23+ type metricProcessor [T any ] func (samples []metricSample ) (T , error )
1924
25+ func executeQuery (query string , api v1.API , ctx context.Context ) (model.Vector , error ) {
2026 result , warnings , err := api .Query (ctx , query , time .Now ())
2127 if err != nil {
22- return 0.0 , fmt .Errorf ("Failed query : %v\n " , err )
28+ return nil , fmt .Errorf ("failed query: %v" , err )
2329 }
2430
2531 if len (warnings ) > 0 {
26- log .Printf ("Received warnings in the execution of the : %v\n " , warnings )
32+ log .Printf ("received warnings in the execution: %v" , warnings )
2733 }
2834
2935 vector , ok := result .(model.Vector )
3036 if ! ok {
31- return 0.0 , fmt .Errorf ("Could not convert the result of the query : %v\n " , result )
32- }
33- if vector .Len () != 1 {
34- return 0.0 , fmt .Errorf ("Expected 1 result; found %d\n " , vector .Len ())
37+ return nil , fmt .Errorf ("could not convert the result of the query: %v" , result )
3538 }
3639
37- sample := vector [0 ]
38- return float64 (sample .Value ), nil
40+ return vector , nil
3941}
4042
41- func retrieveByFunction (query string , api v1.API , ctx context.Context ) (map [string ]float64 , error ) {
43+ func extractSampleWithLabels (sample * model.Sample , requiredLabels []string ) (* metricSample , error ) {
44+ labels := make (map [string ]string )
4245
43- result , warnings , err := api .Query (ctx , query , time .Now ())
44- if err != nil {
45- return nil , fmt .Errorf ("Failed query : %v\n " , err )
46- }
47-
48- if len (warnings ) > 0 {
49- log .Printf ("Received warnings in the execution of the : %v\n " , warnings )
50- }
51-
52- functionValues := make (map [string ]float64 )
53- if vector , ok := result .(model.Vector ); ok {
54- for _ , sample := range vector {
55- value := float64 (sample .Value )
56- functionName , found := sample .Metric [model .LabelName ("function" )]
57- if ! found {
58- log .Printf ("Could not find the function name in the result : %v\n " , sample )
59- continue
60- } else {
61- functionValues [string (functionName )] = value
62- }
46+ for _ , labelName := range requiredLabels {
47+ labelValue , found := sample .Metric [model .LabelName (labelName )]
48+ if ! found {
49+ return nil , fmt .Errorf ("could not find the %s label in the result: %v" , labelName , sample )
6350 }
64- } else {
65- return nil , fmt .Errorf ("Unexpected Result %v\n " , result )
51+ labels [labelName ] = string (labelValue )
6652 }
6753
68- return functionValues , nil
54+ return & metricSample {
55+ Value : float64 (sample .Value ),
56+ Labels : labels ,
57+ }, nil
6958}
7059
71- func retrieveByFunctionAndNode (query string , api v1.API , ctx context.Context ) (map [string ]map [string ]float64 , error ) {
60+ func retrieveMetrics [T any ](query string , api v1.API , ctx context.Context , requiredLabels []string , processor metricProcessor [T ]) (T , error ) {
61+ var zero T
7262
73- result , warnings , err := api . Query ( ctx , query , time . Now () )
63+ vector , err := executeQuery ( query , api , ctx )
7464 if err != nil {
75- return nil , fmt . Errorf ( "Failed query : %v \n " , err )
65+ return zero , err
7666 }
7767
78- if len (warnings ) > 0 {
79- log .Printf ("Received warnings in the execution of the : %v\n " , warnings )
68+ var samples []metricSample
69+ for _ , sample := range vector {
70+ extracted , err := extractSampleWithLabels (sample , requiredLabels )
71+ if err != nil {
72+ log .Printf ("skipping sample: %v" , err )
73+ continue
74+ }
75+ samples = append (samples , * extracted )
8076 }
8177
82- values := make (map [string ]map [string ]float64 )
83- if vector , ok := result .(model.Vector ); ok {
84- for _ , sample := range vector {
85- value := float64 (sample .Value )
86- functionName , found := sample .Metric [model .LabelName ("function" )]
87- if ! found {
88- log .Printf ("Could not find the function name in the result : %v\n " , sample )
89- continue
90- }
91- nodeName , found := sample .Metric [model .LabelName ("node" )]
92- if ! found {
93- log .Printf ("Could not find the node name in the result : %v\n " , sample )
94- continue
95- }
96- _ , foundInner := values [string (nodeName )]
97- if ! foundInner {
98- values [string (nodeName )] = make (map [string ]float64 )
99- }
100- values [string (nodeName )][string (functionName )] = value
78+ return processor (samples )
79+ }
80+
81+ func retrieveSingleValue (query string , api v1.API , ctx context.Context ) (float64 , error ) {
82+ return retrieveMetrics (query , api , ctx , []string {}, func (samples []metricSample ) (float64 , error ) {
83+ if len (samples ) != 1 {
84+ // This will cause the function to return zero value, but we should handle this better
85+ return 0.0 , fmt .Errorf ("Expected 1 result; found %d\n " , len (samples ))
10186 }
102- } else {
103- return nil , fmt . Errorf ( "Unexpected Result %v \n " , result )
104- }
87+ return samples [ 0 ]. Value , nil
88+ } )
89+ }
10590
106- return values , nil
91+ func retrieveByFunction (query string , api v1.API , ctx context.Context ) (map [string ]float64 , error ) {
92+ return retrieveMetrics (query , api , ctx , []string {"function" }, func (samples []metricSample ) (map [string ]float64 , error ) {
93+ result := make (map [string ]float64 )
94+ for _ , sample := range samples {
95+ result [sample .Labels ["function" ]] = sample .Value
96+ }
97+ return result , nil
98+ })
10799}
108100
109- func retrieveByTaskAndNextTask (query string , api v1.API , ctx context.Context ) (map [string ]map [string ]float64 , error ) {
101+ func retrieveByFunctionAndNode (query string , api v1.API , ctx context.Context ) (map [string ]map [string ]float64 , error ) {
102+ return retrieveMetrics (query , api , ctx , []string {"function" , "node" }, func (samples []metricSample ) (map [string ]map [string ]float64 , error ) {
103+ result := make (map [string ]map [string ]float64 )
104+ for _ , sample := range samples {
105+ nodeName := sample .Labels ["node" ]
106+ functionName := sample .Labels ["function" ]
107+
108+ if _ , exists := result [nodeName ]; ! exists {
109+ result [nodeName ] = make (map [string ]float64 )
110+ }
111+ result [nodeName ][functionName ] = sample .Value
112+ }
113+ return result , nil
114+ })
115+ }
110116
111- result , warnings , err := api .Query (ctx , query , time .Now ())
112- if err != nil {
113- return nil , fmt .Errorf ("Failed query : %v\n " , err )
114- }
117+ func retrieveByTaskAndNextTask (query string , api v1.API , ctx context.Context ) (map [string ]map [string ]float64 , error ) {
118+ return retrieveMetrics (query , api , ctx , []string {"task" , "next_task" }, func (samples []metricSample ) (map [string ]map [string ]float64 , error ) {
119+ values := make (map [string ]map [string ]float64 )
115120
116- if len (warnings ) > 0 {
117- log .Printf ("Received warnings in the execution of the : %v\n " , warnings )
118- }
121+ // Build the raw values map
122+ for _ , sample := range samples {
123+ taskId := sample .Labels ["task" ]
124+ nextTaskId := sample .Labels ["next_task" ]
119125
120- values := make (map [string ]map [string ]float64 )
121- if vector , ok := result .(model.Vector ); ok {
122- for _ , sample := range vector {
123- value := float64 (sample .Value )
124- taskId , found := sample .Metric [model .LabelName ("task" )]
125- if ! found {
126- log .Printf ("Could not find the task in the result : %v\n " , sample )
127- continue
128- }
129- nextTaskId , found := sample .Metric [model .LabelName ("next_task" )]
130- if ! found {
131- log .Printf ("Could not find the output branch name in the result : %v\n " , sample )
132- continue
133- }
134- _ , foundInner := values [string (taskId )]
135- if ! foundInner {
136- values [string (taskId )] = make (map [string ]float64 )
126+ if _ , exists := values [taskId ]; ! exists {
127+ values [taskId ] = make (map [string ]float64 )
137128 }
138- values [string ( taskId )][ string ( nextTaskId ) ] = value
129+ values [taskId ][ nextTaskId ] = sample . Value
139130 }
140- } else {
141- return nil , fmt .Errorf ("Unexpected Result %v\n " , result )
142- }
143131
144- // estimate branch probability
145- for taskId , innerMap := range values {
146- sum := 0.0
147- for _ , value := range innerMap {
148- sum += value
149- }
132+ // Normalize to probabilities
133+ for taskId , innerMap := range values {
134+ sum := 0.0
135+ for _ , value := range innerMap {
136+ sum += value
137+ }
150138
151- for nextTaskId , value := range innerMap {
152- values [taskId ][nextTaskId ] = value / sum
139+ if sum > 0 {
140+ for nextTaskId , value := range innerMap {
141+ values [taskId ][nextTaskId ] = value / sum
142+ }
143+ }
153144 }
154- }
155145
156- return values , nil
146+ return values , nil
147+ })
157148}
158149
159150func MetricsRetriever () {
0 commit comments