Skip to content

Commit cdeb9c2

Browse files
committed
fix: add metrics for new logs
Previously, metrics only reflected logs that were present at the initial start of the server. In order to track all the logs watched by certstream, each new log watcher must register with the prometheus metrics. The whole prometheus metrics logic was rewritten to be centered around a PrometheusExporter struct. The logmetrics file is moved to the metrics package in order to remove import dependencies. Also the prometheus file now relies stronger on dependency injection/callback functions. That prevents issues regarding circular imports. Also that prevents expensive nested loops to obtain certain data, since each metric now calls a single callback function to obtain the relevant data. fixes #96
1 parent 7e24888 commit cdeb9c2

7 files changed

Lines changed: 115 additions & 111 deletions

File tree

internal/certificatetransparency/ct-watcher.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/google/trillian/client/backoff"
1717

1818
"github.com/d-Rickyy-b/certstream-server-go/internal/config"
19+
"github.com/d-Rickyy-b/certstream-server-go/internal/metrics"
1920
"github.com/d-Rickyy-b/certstream-server-go/internal/models"
2021
"github.com/d-Rickyy-b/certstream-server-go/internal/web"
2122

@@ -65,9 +66,9 @@ func (w *Watcher) Start() {
6566
return
6667
}
6768
// Load Saved CT Indexes
68-
metrics.LoadCTIndex(ctIndexFilePath)
69+
metrics.Metrics.LoadCTIndex(ctIndexFilePath)
6970
// Save CTIndexes at regular intervals
70-
go metrics.SaveCertIndexesAtInterval(time.Second*30, ctIndexFilePath) // save indexes every X seconds
71+
go metrics.Metrics.SaveCertIndexesAtInterval(time.Second*30, ctIndexFilePath) // save indexes every X seconds
7172
}
7273

7374
// initialize the watcher with currently available logs
@@ -187,7 +188,7 @@ func (w *Watcher) addLogIfNew(operatorName, description, url string, isTiled boo
187188
// Log is not being watched, so add it
188189
w.wg.Add(1)
189190

190-
lastCTIndex := metrics.GetCTIndex(normURL)
191+
lastCTIndex := metrics.Metrics.GetCTIndex(normURL)
191192
ctWorker := worker{
192193
name: description,
193194
operatorName: operatorName,
@@ -197,7 +198,7 @@ func (w *Watcher) addLogIfNew(operatorName, description, url string, isTiled boo
197198
isTiled: isTiled,
198199
}
199200
w.workers = append(w.workers, &ctWorker)
200-
metrics.Init(operatorName, normURL)
201+
metrics.Metrics.Init(operatorName, normURL)
201202

202203
// Start a goroutine for each worker
203204
go func() {
@@ -232,7 +233,7 @@ func (w *Watcher) Stop() {
232233
if config.AppConfig.General.Recovery.Enabled {
233234
// Store current CT Indexes before shutting down
234235
filePath := config.AppConfig.General.Recovery.CTIndexFile
235-
metrics.SaveCertIndexes(filePath)
236+
metrics.Metrics.SaveCertIndexes(filePath)
236237
}
237238

238239
w.cancelFunc()
@@ -251,7 +252,7 @@ func (w *Watcher) CreateIndexFile(filePath string) error {
251252
// Iterate over each log of the operator
252253
for _, transparencyLog := range operator.Logs {
253254
// Check if the log is already being watched
254-
metrics.Init(operator.Name, normalizeCtlogURL(transparencyLog.URL))
255+
metrics.Metrics.Init(operator.Name, normalizeCtlogURL(transparencyLog.URL))
255256
log.Println("Fetching STH for", normalizeCtlogURL(transparencyLog.URL))
256257

257258
hc := http.Client{Timeout: 5 * time.Second}
@@ -268,12 +269,12 @@ func (w *Watcher) CreateIndexFile(filePath string) error {
268269
continue
269270
}
270271

271-
metrics.SetCTIndex(normalizeCtlogURL(transparencyLog.URL), sth.TreeSize)
272+
metrics.Metrics.SetCTIndex(normalizeCtlogURL(transparencyLog.URL), sth.TreeSize)
272273
}
273274
}
274275
w.cancelFunc()
275276

276-
metrics.SaveCertIndexes(filePath)
277+
metrics.Metrics.SaveCertIndexes(filePath)
277278
log.Println("Index file saved to", filePath)
278279

279280
return nil
@@ -544,7 +545,7 @@ func (w *worker) foundCertCallback(rawEntry *ct.RawLogEntry) {
544545
entry.Data.UpdateType = "X509LogEntry"
545546
w.entryChan <- entry
546547

547-
atomic.AddInt64(&processedCerts, 1)
548+
atomic.AddInt64(&metrics.ProcessedCerts, 1)
548549
}
549550

550551
// foundPrecertCallback is the callback that handles cases where new precerts are found.
@@ -558,7 +559,7 @@ func (w *worker) foundPrecertCallback(rawEntry *ct.RawLogEntry) {
558559
entry.Data.UpdateType = "PrecertLogEntry"
559560
w.entryChan <- entry
560561

561-
atomic.AddInt64(&processedPrecerts, 1)
562+
atomic.AddInt64(&metrics.ProcessedPrecerts, 1)
562563
}
563564

564565
// certHandler takes the entries out of the entryChan channel and broadcasts them to all clients.
@@ -584,7 +585,7 @@ func certHandler(entryChan chan models.Entry) {
584585
operator := entry.Data.Source.Operator
585586
index := entry.Data.CertIndex
586587

587-
metrics.Inc(operator, url, index)
588+
metrics.Metrics.Inc(operator, url, index)
588589
}
589590
}
590591

internal/certstream/certstream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ func (cs *Certstream) setupMetrics(webserver *web.WebServer) {
6262
if (cs.config.Prometheus.ListenAddr == "" || cs.config.Prometheus.ListenAddr == cs.config.Webserver.ListenAddr) &&
6363
(cs.config.Prometheus.ListenPort == 0 || cs.config.Prometheus.ListenPort == cs.config.Webserver.ListenPort) {
6464
log.Println("Starting prometheus server on same interface as webserver")
65-
webserver.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.WritePrometheus)
65+
webserver.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.Prometheus.Write)
6666
} else {
6767
log.Println("Starting prometheus server on new interface")
6868
cs.metricsServer = web.NewMetricsServer(cs.config.Prometheus.ListenAddr, cs.config.Prometheus.ListenPort, cs.config.Prometheus.CertPath, cs.config.Prometheus.CertKeyPath)
69-
cs.metricsServer.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.WritePrometheus)
69+
cs.metricsServer.RegisterPrometheus(cs.config.Prometheus.MetricsURL, metrics.Prometheus.Write)
7070
}
7171
}
7272
}

internal/certificatetransparency/logmetrics.go renamed to internal/metrics/logmetrics.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package certificatetransparency
1+
package metrics
22

33
import (
44
"encoding/json"
@@ -22,9 +22,9 @@ type (
2222
)
2323

2424
var (
25-
processedCerts int64
26-
processedPrecerts int64
27-
metrics = LogMetrics{metrics: make(CTMetrics), index: make(CTCertIndex)}
25+
ProcessedCerts int64
26+
ProcessedPrecerts int64
27+
Metrics = LogMetrics{metrics: make(CTMetrics), index: make(CTCertIndex)}
2828
)
2929

3030
// LogMetrics is a struct that holds a map of metrics for each CT log grouped by operator.
@@ -91,6 +91,9 @@ func (m *LogMetrics) Init(operator, url string) {
9191
if _, ok := m.index[url]; !ok {
9292
m.index[url] = 0
9393
}
94+
95+
// Register the metric for this operator and url with Prometheus
96+
Prometheus.RegisterLog(operator, url)
9497
}
9598

9699
// Get the metric for a given operator and ct url.
@@ -289,18 +292,18 @@ func (m *LogMetrics) SaveCertIndexes(ctIndexFilePath string) {
289292

290293
// GetProcessedCerts returns the total number of processed certificates.
291294
func GetProcessedCerts() int64 {
292-
return processedCerts
295+
return ProcessedCerts
293296
}
294297

295298
// GetProcessedPrecerts returns the total number of processed precertificates.
296299
func GetProcessedPrecerts() int64 {
297-
return processedPrecerts
300+
return ProcessedPrecerts
298301
}
299302

300303
func GetCertMetrics() CTMetrics {
301-
return metrics.GetCTMetrics()
304+
return Metrics.GetCTMetrics()
302305
}
303306

304307
func GetLogOperators() map[string][]string {
305-
return metrics.OperatorLogMapping()
308+
return Metrics.OperatorLogMapping()
306309
}

internal/certificatetransparency/logmetrics_test.go renamed to internal/metrics/logmetrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package certificatetransparency
1+
package metrics
22

33
import (
44
"path/filepath"

internal/metrics/prometheus.go

Lines changed: 76 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -3,120 +3,108 @@ package metrics
33
import (
44
"fmt"
55
"io"
6-
"strings"
6+
"log"
77
"sync"
88
"time"
99

10-
"github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency"
11-
"github.com/d-Rickyy-b/certstream-server-go/internal/web"
12-
1310
"github.com/VictoriaMetrics/metrics"
1411
)
1512

16-
var (
17-
ctLogMetricsInitialized = false
18-
ctLogMetricsInitMutex = &sync.Mutex{}
19-
20-
tempCertMetricsLastRefreshed = time.Time{}
21-
tempCertMetrics = certificatetransparency.CTMetrics{}
22-
tempCertMetricsMutex = &sync.RWMutex{}
13+
var Prometheus = NewPrometheusExporter()
2314

15+
type PrometheusExporter struct {
2416
// Number of currently connected clients.
25-
fullClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"full\"}", func() float64 {
26-
return float64(web.ClientHandler.ClientFullCount())
27-
})
28-
liteClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"lite\"}", func() float64 {
29-
return float64(web.ClientHandler.ClientLiteCount())
30-
})
31-
domainClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"domain\"}", func() float64 {
32-
return float64(web.ClientHandler.ClientDomainsCount())
33-
})
17+
fullClientCount metrics.Gauge
18+
liteClientCount metrics.Gauge
19+
domainClientCount metrics.Gauge
20+
21+
tempCertMetricsLastRefreshed time.Time
22+
tempCertMetrics CTMetrics
23+
tempCertMetricsMutex sync.RWMutex
24+
25+
skippedCertsCallback func() map[string]int64
26+
}
3427

35-
// Number of certificates processed by the CT watcher.
36-
processedCertificates = metrics.NewGauge("certstreamservergo_certificates_total{type=\"regular\"}", func() float64 {
37-
return float64(certificatetransparency.GetProcessedCerts())
28+
// NewPrometheusExporter creates a new PrometheusExporter and registers the default metrics for the number of processed certificates.
29+
func NewPrometheusExporter() *PrometheusExporter {
30+
e := &PrometheusExporter{}
31+
// Register metrics for the total number of certificates processed by the CT watcher.
32+
metrics.GetOrCreateGauge("certstreamservergo_certificates_total{type=\"regular\"}", func() float64 {
33+
return float64(GetProcessedCerts())
3834
})
39-
processedPreCertificates = metrics.NewGauge("certstreamservergo_certificates_total{type=\"precert\"}", func() float64 {
40-
return float64(certificatetransparency.GetProcessedPrecerts())
35+
metrics.GetOrCreateGauge("certstreamservergo_certificates_total{type=\"precert\"}", func() float64 {
36+
return float64(GetProcessedPrecerts())
4137
})
42-
)
43-
44-
// WritePrometheus provides an easy way to write metrics to a writer.
45-
func WritePrometheus(w io.Writer, exposeProcessMetrics bool) {
46-
ctLogMetricsInitMutex.Lock()
47-
if !ctLogMetricsInitialized {
48-
initCtLogMetrics()
49-
}
50-
ctLogMetricsInitMutex.Unlock()
38+
return e
39+
}
5140

52-
getSkippedCertMetrics()
41+
// Write is a callback function that is called by a webserver in order to write metrics data to the http response.
42+
func (pm *PrometheusExporter) Write(w io.Writer, exposeProcessMetrics bool) {
43+
// getSkippedCertMetrics()
5344

5445
metrics.WritePrometheus(w, exposeProcessMetrics)
5546
}
5647

57-
// For having metrics regarding each individual CT log, we need to register them manually.
58-
// initCtLogMetrics fetches all the CT Logs and registers one metric per log.
59-
func initCtLogMetrics() {
60-
logs := certificatetransparency.GetLogOperators()
61-
62-
for operator, urls := range logs {
63-
operator := operator // Copy variable to new scope
64-
65-
for i := range urls {
66-
url := urls[i]
67-
name := fmt.Sprintf("certstreamservergo_certs_by_log_total{url=\"%s\",operator=\"%s\"}", url, operator)
68-
metrics.NewGauge(name, func() float64 {
69-
return float64(getCertCountForLog(operator, url))
70-
})
71-
}
72-
}
48+
// RegisterGaugeMetric is a helper function that registers a new gauge metric with a float64 callback function.
49+
func (pm *PrometheusExporter) RegisterGaugeMetric(label string, callback func() float64) {
50+
metrics.GetOrCreateGauge(label, callback)
51+
}
7352

74-
if len(logs) > 0 {
75-
ctLogMetricsInitialized = true
76-
}
53+
// RegisterGaugeMetricInt is a helper function that registers a new gauge metric with an int64 callback function.
54+
func (pm *PrometheusExporter) RegisterGaugeMetricInt(label string, callback func() int64) {
55+
metrics.GetOrCreateGauge(label, func() float64 { return float64(callback()) })
56+
}
57+
58+
// RegisterClient registers a new gauge metric for the client with the given name.
59+
func (pm *PrometheusExporter) RegisterClient(name string, callback func() float64) {
60+
label := fmt.Sprintf("certstreamservergo_skipped_certs{client=\"%s\"}", name)
61+
metrics.GetOrCreateGauge(label, callback)
62+
}
63+
64+
// UnregisterClient unregisters the metric for the client with the given name.
65+
func (pm *PrometheusExporter) UnregisterClient(name string) {
66+
label := fmt.Sprintf("certstreamservergo_skipped_certs{client=\"%s\"}", name)
67+
metrics.UnregisterMetric(label)
68+
}
69+
70+
// RegisterLog registers a new gauge metric for the given CT log.
71+
// The metric will be named "certstreamservergo_certs_by_log_total{url=\"<url>\",operator=\"<operatorName>\"}" and
72+
// will call the given callback function to get the current value of the metric.
73+
func (pm *PrometheusExporter) RegisterLog(operatorName, url string) {
74+
label := fmt.Sprintf("certstreamservergo_certs_by_log_total{url=\"%s\",operator=\"%s\"}", url, operatorName)
75+
metrics.GetOrCreateGauge(label, func() float64 {
76+
return float64(pm.getCertCountForLog(operatorName, url))
77+
})
78+
}
79+
80+
// UnregisterMetric unregisters a metric with a given label.
81+
func (pm *PrometheusExporter) UnregisterMetric(label string) {
82+
metrics.UnregisterMetric(label)
7783
}
7884

7985
// getCertCountForLog returns the number of certificates processed from a specific CT log.
8086
// It caches the result for 5 seconds. Subsequent calls to this method will return the cached result.
81-
func getCertCountForLog(operatorName, logname string) int64 {
82-
tempCertMetricsMutex.Lock()
83-
defer tempCertMetricsMutex.Unlock()
87+
func (pm *PrometheusExporter) getCertCountForLog(operatorName, logname string) int64 {
88+
pm.tempCertMetricsMutex.Lock()
89+
defer pm.tempCertMetricsMutex.Unlock()
8490

8591
// Add some caching to avoid having to lock the mutex every time
86-
if time.Since(tempCertMetricsLastRefreshed) > time.Second*5 {
87-
tempCertMetricsLastRefreshed = time.Now()
88-
tempCertMetrics = certificatetransparency.GetCertMetrics()
92+
if time.Since(pm.tempCertMetricsLastRefreshed) > time.Second*5 {
93+
pm.tempCertMetricsLastRefreshed = time.Now()
94+
pm.tempCertMetrics = GetCertMetrics()
8995
}
9096

91-
return tempCertMetrics[operatorName][logname]
92-
}
93-
94-
// getSkippedCertMetrics gets the number of skipped certificates for each client and creates metrics for it.
95-
// It also removes metrics for clients that are not connected anymore.
96-
func getSkippedCertMetrics() {
97-
skippedCerts := web.ClientHandler.GetSkippedCerts()
98-
for clientName := range skippedCerts {
99-
// Get or register a new counter for each client
100-
metricName := fmt.Sprintf("certstreamservergo_skipped_certs{client=\"%s\"}", clientName)
101-
c := metrics.GetOrCreateCounter(metricName)
102-
c.Set(skippedCerts[clientName])
97+
operatorMetrics, ok := pm.tempCertMetrics[operatorName]
98+
if !ok {
99+
log.Printf("No metrics for operator \"%s\"", operatorName)
100+
return 0
103101
}
104102

105-
// Remove all metrics that are not in the list of current client skipped cert metrics
106-
// Get a list of current client skipped cert metrics
107-
for _, metricName := range metrics.ListMetricNames() {
108-
if !strings.HasPrefix(metricName, "certstreamservergo_skipped_certs") {
109-
continue
110-
}
111-
112-
clientName := strings.TrimPrefix(metricName, "certstreamservergo_skipped_certs{client=\"")
113-
clientName = strings.TrimSuffix(clientName, "\"}")
114-
115-
// Check if the registered metric is in the list of current client skipped cert metrics
116-
// If not, unregister the metric
117-
_, exists := skippedCerts[clientName]
118-
if !exists {
119-
metrics.UnregisterMetric(metricName)
120-
}
103+
count, ok := operatorMetrics[logname]
104+
if !ok {
105+
log.Printf("No metrics for log \"%s\" of operator \"%s\"", logname, operatorName)
106+
return 0
121107
}
108+
109+
return count
122110
}

0 commit comments

Comments
 (0)