Skip to content

Commit 6bc2f2f

Browse files
grussorussomatnar
andcommitted
Prometheus integration complete
Co-authored-by: Matteo Nardelli <matnar@gmail.com>
1 parent ca3caf7 commit 6bc2f2f

4 files changed

Lines changed: 145 additions & 186 deletions

File tree

internal/config/keys.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,15 @@ const LISTEN_UDP_PORT = "registry.udp.port"
5959
// enable metrics system
6060
const METRICS_ENABLED = "metrics.enabled"
6161

62-
// HTTP port used for Prometheus scraping
62+
// Port used by Prometheus server
6363
const METRICS_PROMETHEUS_PORT = "metrics.prometheus.port"
6464

65+
// Prometheus IP address / hostname
66+
const METRICS_PROMETHEUS_HOST = "metrics.prometheus.host"
67+
68+
// Interval (in seconds) for metrics retriever
69+
const METRICS_RETRIEVER_INTERVAL = "metrics.retriever.interval"
70+
6571
// Scheduling policy to use
6672
// Possible values: "qosaware", "default", "cloudonly"
6773
const SCHEDULING_POLICY = "scheduler.policy"

internal/metrics/metrics.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@ var registry = prometheus.NewRegistry()
1818
var ScrapingHandler http.Handler = nil
1919
var durationBuckets = []float64{0.002, 0.005, 0.010, 0.02, 0.03, 0.05, 0.1, 0.15, 0.3, 0.6, 1.0}
2020

21+
const (
22+
COMPLETIONS = "completed_total"
23+
EXECUTION_TIME = "execution_time"
24+
)
25+
2126
var (
22-
CompletedInvocations = promauto.NewCounterVec(prometheus.CounterOpts{
23-
Name: "completed_total",
27+
metricCompletions = promauto.NewCounterVec(prometheus.CounterOpts{
28+
Name: COMPLETIONS,
2429
Help: "Number of completed function invocations",
2530
}, []string{"node", "function"})
26-
ExecutionTimes = promauto.NewHistogramVec(prometheus.HistogramOpts{
27-
Name: "execution_time",
31+
metricExecutionTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
32+
Name: EXECUTION_TIME,
2833
Help: "Function duration",
2934
Buckets: durationBuckets,
3035
}, []string{"node", "function"})
3136
)
3237

38+
type RetrievedMetrics struct {
39+
Completions map[string]float64
40+
AvgExecutionTime map[string]float64
41+
}
42+
3343
func Init() {
3444
if config.GetBool(config.METRICS_ENABLED, false) {
3545
log.Println("Metrics enabled.")
@@ -39,16 +49,18 @@ func Init() {
3949
return
4050
}
4151

42-
registry.MustRegister(CompletedInvocations)
43-
registry.MustRegister(ExecutionTimes)
52+
registry.MustRegister(metricCompletions)
53+
registry.MustRegister(metricExecutionTime)
4454

4555
ScrapingHandler = promhttp.HandlerFor(registry, promhttp.HandlerOpts{
4656
EnableOpenMetrics: true})
57+
58+
go MetricsRetriever()
4759
}
4860

4961
func AddCompletedInvocation(funcName string) {
50-
CompletedInvocations.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Inc()
62+
metricCompletions.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Inc()
5163
}
5264
func AddFunctionDurationValue(funcName string, duration float64) {
53-
ExecutionTimes.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Observe(duration)
65+
metricExecutionTime.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Observe(duration)
5466
}

internal/metrics/query.go

Lines changed: 0 additions & 177 deletions
This file was deleted.

internal/metrics/retriever.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/prometheus/common/model"
7+
"github.com/serverledge-faas/serverledge/internal/config"
8+
"github.com/serverledge-faas/serverledge/internal/node"
9+
"log"
10+
"time"
11+
12+
promapi "github.com/prometheus/client_golang/api"
13+
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
14+
)
15+
16+
var retrievedMetrics RetrievedMetrics
17+
18+
func retrieveSingleValue(query string, api v1.API, ctx context.Context) (float64, error) {
19+
20+
result, warnings, err := api.Query(ctx, query, time.Now())
21+
if err != nil {
22+
return 0.0, fmt.Errorf("Failed query : %v\n", err)
23+
}
24+
25+
if len(warnings) > 0 {
26+
log.Printf("Received warnings in the execution of the : %v\n", warnings)
27+
}
28+
29+
vector, ok := result.(model.Vector)
30+
if !ok {
31+
return 0.0, fmt.Errorf("Could not convert the result of the query : %v\n", result)
32+
}
33+
if vector.Len() != 1 {
34+
return 0.0, fmt.Errorf("Expected 1 result; found %d\n", vector.Len())
35+
}
36+
37+
sample := vector[0]
38+
return float64(sample.Value), nil
39+
}
40+
41+
func retrieveByFunction(query string, api v1.API, ctx context.Context) (map[string]float64, error) {
42+
43+
result, warnings, err := api.Query(ctx, query, time.Now())
44+
if err != nil {
45+
return nil, fmt.Errorf("Failed query : %v\n", err)
46+
}
47+
48+
if len(warnings) > 0 {
49+
log.Printf("Received warnings in the execution of the : %v\n", warnings)
50+
}
51+
52+
functionValues := make(map[string]float64)
53+
if vector, ok := result.(model.Vector); ok {
54+
for _, sample := range vector {
55+
value := float64(sample.Value)
56+
functionName, found := sample.Metric[model.LabelName("function")]
57+
if !found {
58+
log.Printf("Could not find the function name in the result : %v\n", sample)
59+
continue
60+
} else {
61+
functionValues[string(functionName)] = value
62+
}
63+
}
64+
} else {
65+
return nil, fmt.Errorf("Unexpected Result %v\n", result)
66+
}
67+
68+
return functionValues, nil
69+
}
70+
71+
func MetricsRetriever() {
72+
prometheusHost := config.GetString(config.METRICS_PROMETHEUS_HOST, "127.0.0.1")
73+
prometheusPort := config.GetInt(config.METRICS_PROMETHEUS_PORT, 9090)
74+
client, err := promapi.NewClient(promapi.Config{
75+
Address: fmt.Sprintf("http://%s:%d", prometheusHost, prometheusPort),
76+
})
77+
if err != nil {
78+
log.Printf("Error in Prometheus client creation: %v\n", err)
79+
return
80+
}
81+
82+
// API of Prometheus
83+
api := v1.NewAPI(client)
84+
ctx := context.Background()
85+
86+
ticker := time.NewTicker(time.Duration(config.GetInt(config.METRICS_RETRIEVER_INTERVAL, 10)) * time.Second)
87+
defer ticker.Stop()
88+
89+
for {
90+
select {
91+
case <-ticker.C:
92+
93+
query := fmt.Sprintf("%s{node=\"%s\"}", COMPLETIONS, node.NodeIdentifier)
94+
completionsPerFunction, err := retrieveByFunction(query, api, ctx)
95+
if err != nil {
96+
log.Printf("Error in retrieveByFunction: %v\n", err)
97+
}
98+
retrievedMetrics.Completions = completionsPerFunction
99+
100+
query = fmt.Sprintf("%s_sum{node=\"%s\"}/%s_count{node=\"%s\"}",
101+
EXECUTION_TIME, node.NodeIdentifier, EXECUTION_TIME, node.NodeIdentifier)
102+
avgFunDuration, err := retrieveByFunction(query, api, ctx)
103+
if err != nil {
104+
log.Printf("Error in retrieveByFunction: %v\n", err)
105+
}
106+
retrievedMetrics.AvgExecutionTime = avgFunDuration
107+
108+
fmt.Println("All queries completed")
109+
110+
}
111+
}
112+
113+
}
114+
115+
func GetMetrics() RetrievedMetrics {
116+
// TODO: deep copy?
117+
return retrievedMetrics
118+
}

0 commit comments

Comments
 (0)