Skip to content

Commit e4efdb1

Browse files
authored
Merge pull request #9 from d-Rickyy-b/prometheus
Implement prometheus interface
2 parents e0dd5c6 + 49298a9 commit e4efdb1

9 files changed

Lines changed: 189 additions & 98 deletions

File tree

cmd/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"flag"
55
"go-certstream-server/internal/certificatetransparency"
66
"go-certstream-server/internal/config"
7+
"go-certstream-server/internal/prometheus"
78
"go-certstream-server/internal/web"
89
"log"
910
)
@@ -22,6 +23,20 @@ func main() {
2223
}
2324

2425
webserver := web.NewWebsocketServer(conf.Webserver.ListenAddr, conf.Webserver.ListenPort, conf.Webserver.CertPath, conf.Webserver.CertKeyPath)
26+
if conf.Prometheus.Enabled {
27+
// If prometheus is enabled, and interface is either unconfigured or same as webserver config, use existing webserver
28+
if (conf.Prometheus.ListenAddr == "" || conf.Prometheus.ListenAddr == conf.Webserver.ListenAddr) &&
29+
(conf.Prometheus.ListenPort == 0 || conf.Prometheus.ListenPort == conf.Webserver.ListenPort) {
30+
log.Println("Starting prometheus server on same interface as webserver")
31+
webserver.RegisterPrometheus(conf.Prometheus.MetricsURL, prometheus.WritePrometheus)
32+
} else {
33+
log.Println("Starting prometheus server on new interface")
34+
metricsServer := web.NewMetricsServer(conf.Prometheus.ListenAddr, conf.Prometheus.ListenPort, conf.Webserver.CertPath, conf.Webserver.CertKeyPath)
35+
metricsServer.RegisterPrometheus(conf.Prometheus.MetricsURL, prometheus.WritePrometheus)
36+
go metricsServer.Start()
37+
}
38+
}
39+
2540
go webserver.Start()
2641

2742
watcher := certificatetransparency.Watcher{}

config.sample.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,9 @@ webserver:
66
domains_only_url: "/domains-only"
77
cert_path: ""
88
cert_key_path: ""
9+
10+
prometheus:
11+
enabled: true
12+
listen_addr: "127.0.0.1"
13+
listen_port: 8080
14+
metrics_url: "/metrics"

go.mod

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module go-certstream-server
33
go 1.18
44

55
require (
6+
github.com/VictoriaMetrics/metrics v1.22.2
67
github.com/go-chi/chi v1.5.4
78
github.com/google/certificate-transparency-go v1.1.3
89
github.com/gorilla/websocket v1.5.0
@@ -78,18 +79,12 @@ require (
7879
go.etcd.io/etcd/server/v3 v3.5.5 // indirect
7980
go.etcd.io/etcd/tests/v3 v3.5.5 // indirect
8081
go.etcd.io/etcd/v3 v3.5.5 // indirect
81-
go.opentelemetry.io/contrib v1.11.0 // indirect
8282
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.3 // indirect
8383
go.opentelemetry.io/otel v1.11.0 // indirect
84-
go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect
8584
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.0 // indirect
8685
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.0 // indirect
8786
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.0 // indirect
88-
go.opentelemetry.io/otel/internal/metric v0.27.0 // indirect
89-
go.opentelemetry.io/otel/metric v0.32.3 // indirect
9087
go.opentelemetry.io/otel/sdk v1.11.0 // indirect
91-
go.opentelemetry.io/otel/sdk/export/metric v0.28.0 // indirect
92-
go.opentelemetry.io/otel/sdk/metric v0.32.3 // indirect
9388
go.opentelemetry.io/otel/trace v1.11.0 // indirect
9489
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
9590
go.uber.org/atomic v1.10.0 // indirect
@@ -103,7 +98,6 @@ require (
10398
golang.org/x/text v0.4.0 // indirect
10499
golang.org/x/time v0.1.0 // indirect
105100
golang.org/x/tools v0.1.12 // indirect
106-
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
107101
google.golang.org/appengine v1.6.7 // indirect
108102
google.golang.org/genproto v0.0.0-20221018160656-63c7b68cfc55 // indirect
109103
google.golang.org/grpc v1.50.1 // indirect

go.sum

Lines changed: 3 additions & 90 deletions
Large diffs are not rendered by default.

internal/certificatetransparency/ct-watcher.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,44 @@ import (
1717
"net/http"
1818
"strings"
1919
"sync"
20+
"sync/atomic"
2021
)
2122

23+
var (
24+
processedCerts int64
25+
processedPrecerts int64
26+
urlMapMutex sync.RWMutex
27+
urlMap = make(map[string]int64)
28+
)
29+
30+
func GetProcessedCerts() int64 {
31+
return processedCerts
32+
}
33+
34+
func GetProcessedPrecerts() int64 {
35+
return processedPrecerts
36+
}
37+
38+
func GetCertCountForLog(logname string) int64 {
39+
urlMapMutex.RLock()
40+
defer urlMapMutex.RUnlock()
41+
return urlMap[logname]
42+
}
43+
44+
func GetLogs() []string {
45+
urlMapMutex.RLock()
46+
defer urlMapMutex.RUnlock()
47+
48+
urls := make([]string, len(urlMap))
49+
50+
counter := 0
51+
for key := range urlMap {
52+
urls[counter] = key
53+
counter++
54+
}
55+
return urls
56+
}
57+
2258
// Watcher describes a component that watches for new certificates in a CT log.
2359
type Watcher struct {
2460
Name string
@@ -132,6 +168,7 @@ func (w *worker) foundCertCallback(rawEntry *ct.RawLogEntry) {
132168
}
133169
entry.Data.UpdateType = "X509LogEntry"
134170
w.entryChan <- entry
171+
atomic.AddInt64(&processedCerts, 1)
135172
}
136173

137174
// foundPrecertCallback is the callback that handles cases where new precerts are found.
@@ -143,9 +180,10 @@ func (w *worker) foundPrecertCallback(rawEntry *ct.RawLogEntry) {
143180
}
144181
entry.Data.UpdateType = "PrecertLogEntry"
145182
w.entryChan <- entry
183+
atomic.AddInt64(&processedPrecerts, 1)
146184
}
147185

148-
// certHandler takes the entries out of the channel and broadcasts them to all clients.
186+
// certHandler takes the entries out of the entryChan channel and broadcasts them to all clients.
149187
func certHandler(entryChan chan certstream.Entry) {
150188
var processed int64
151189
for {
@@ -160,6 +198,12 @@ func certHandler(entryChan chan certstream.Entry) {
160198

161199
// Run json encoding in the background and send the result to the clients.
162200
web.ClientHandler.Broadcast <- entry
201+
202+
url := normalizeCtlogURL(entry.Data.Source.URL)
203+
204+
urlMapMutex.Lock()
205+
urlMap[url] += 1
206+
urlMapMutex.Unlock()
163207
}
164208
}
165209

@@ -199,3 +243,10 @@ func getAllLogs() (loglist3.LogList, error) {
199243

200244
return *allLogs, nil
201245
}
246+
247+
func normalizeCtlogURL(input string) string {
248+
input = strings.TrimPrefix(input, "http://")
249+
input = strings.TrimPrefix(input, "https://")
250+
input = strings.TrimSuffix(input, "/")
251+
return input
252+
}

internal/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ type Config struct {
2020
CertPath string `yaml:"cert_path"`
2121
CertKeyPath string `yaml:"cert_key_path"`
2222
}
23+
Prometheus struct {
24+
Enabled bool `yaml:"enabled"`
25+
MetricsURL string `yaml:"metrics_url"`
26+
ListenAddr string `yaml:"listen_addr"`
27+
ListenPort int `yaml:"listen_port"`
28+
}
2329
}
2430

2531
// ReadConfig reads the config file and returns a filled Config struct.
@@ -100,5 +106,15 @@ func validateConfig(config Config) bool {
100106
config.Webserver.FullURL = "/domains-only"
101107
}
102108

109+
if config.Prometheus.Enabled {
110+
if config.Prometheus.ListenAddr == "" || !IPRegex.MatchString(config.Prometheus.ListenAddr) {
111+
log.Fatalln("Prometheus export IP does not match pattern 'x.x.x.x'")
112+
return false
113+
}
114+
if config.Prometheus.ListenPort == 0 {
115+
log.Fatalln("Prometheus export port is not set")
116+
return false
117+
}
118+
}
103119
return true
104120
}

internal/prometheus/prometheus.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package prometheus
2+
3+
import (
4+
"fmt"
5+
"github.com/VictoriaMetrics/metrics"
6+
"go-certstream-server/internal/certificatetransparency"
7+
"go-certstream-server/internal/web"
8+
"io"
9+
)
10+
11+
var (
12+
ctLogsInitialized = false
13+
fullClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"full\"}", func() float64 {
14+
return float64(web.ClientHandler.ClientFullCount())
15+
})
16+
liteClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"lite\"}", func() float64 {
17+
return float64(web.ClientHandler.ClientLiteCount())
18+
})
19+
domainClientCount = metrics.NewGauge("certstreamservergo_clients_total{type=\"domain\"}", func() float64 {
20+
return float64(web.ClientHandler.ClientDomainsCount())
21+
})
22+
processedCertificates = metrics.NewGauge("certstreamservergo_certificates_total{type=\"regular\"}", func() float64 {
23+
return float64(certificatetransparency.GetProcessedCerts())
24+
})
25+
processedPreCertificates = metrics.NewGauge("certstreamservergo_certificates_total{type=\"precert\"}", func() float64 {
26+
return float64(certificatetransparency.GetProcessedPrecerts())
27+
})
28+
)
29+
30+
// WritePrometheus provides an easy way to write metrics to a writer.
31+
func WritePrometheus(w io.Writer, exposeProcessMetrics bool) {
32+
if !ctLogsInitialized {
33+
logs := certificatetransparency.GetLogs()
34+
for i := 0; i < len(logs); i++ {
35+
url := logs[i]
36+
metrics.NewGauge(fmt.Sprintf("certstreamservergo_certs_by_log_total{url=\"%s\"}", url), func() float64 {
37+
return float64(certificatetransparency.GetCertCountForLog(url))
38+
})
39+
}
40+
if len(logs) > 0 {
41+
ctLogsInitialized = true
42+
}
43+
}
44+
metrics.WritePrometheus(w, exposeProcessMetrics)
45+
}

internal/web/broadcastmanager.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,33 @@ func (bm *BroadcastManager) unregisterClient(c *client) {
3838
bm.clientLock.Unlock()
3939
}
4040

41+
// ClientFullCount returns the current number of clients connected to the service on the `full` endpoint.
42+
func (bm *BroadcastManager) ClientFullCount() (count int64) {
43+
return bm.clientCountByType(SubTypeFull)
44+
}
45+
46+
// ClientLiteCount returns the current number of clients connected to the service on the `lite` endpoint.
47+
func (bm *BroadcastManager) ClientLiteCount() (count int64) {
48+
return bm.clientCountByType(SubTypeLite)
49+
}
50+
51+
// ClientDomainsCount returns the current number of clients connected to the service on the `domains-only` endpoint.
52+
func (bm *BroadcastManager) ClientDomainsCount() (count int64) {
53+
return bm.clientCountByType(SubTypeDomain)
54+
}
55+
56+
// clientCountByType returns the current number of clients connected to the service on the endpoint matching the specified SubscriptionType.
57+
func (bm *BroadcastManager) clientCountByType(subType SubscriptionType) (count int64) {
58+
bm.clientLock.RLock()
59+
defer bm.clientLock.RUnlock()
60+
for _, c := range bm.clients {
61+
if c.subType == subType {
62+
count++
63+
}
64+
}
65+
return count
66+
}
67+
4168
// broadcaster is run in a goroutine and handles the dispatching of entries to clients.
4269
func (bm *BroadcastManager) broadcaster() {
4370
for {

internal/web/server.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/gorilla/websocket"
99
"go-certstream-server/internal/certstream"
1010
"go-certstream-server/internal/config"
11+
"io"
1112
"log"
1213
"net/http"
1314
"time"
@@ -25,6 +26,15 @@ type WebServer struct {
2526
keyPath string
2627
}
2728

29+
// RegisterPrometheus registers a new handler that listens on the given url and calls the given function
30+
// in order to provide metrics for a prometheus server. This function signature was used, because VictoriaMetrics
31+
// offers exactly this function signature.
32+
func (ws *WebServer) RegisterPrometheus(url string, callback func(w io.Writer, exposeProcessMetrics bool)) {
33+
ws.routes.HandleFunc(url, func(w http.ResponseWriter, r *http.Request) {
34+
callback(w, false)
35+
})
36+
}
37+
2838
// initFullWebsocket is called when a client connects to the /full-stream endpoint.
2939
// It upgrades the connection to a websocket and starts a goroutine to listen for messages from the client.
3040
func initFullWebsocket(w http.ResponseWriter, r *http.Request) {
@@ -138,6 +148,20 @@ func (ws *WebServer) initServer() {
138148
}
139149
}
140150

151+
// NewMetricsServer creates a new webserver that listens on the given port and provides metrics for a prometheus server.
152+
func NewMetricsServer(networkIf string, port int, certPath, keyPath string) *WebServer {
153+
server := &WebServer{
154+
networkIf: networkIf,
155+
port: port,
156+
routes: chi.NewRouter(),
157+
certPath: certPath,
158+
keyPath: keyPath,
159+
}
160+
server.initServer()
161+
server.routes.Use(middleware.Recoverer)
162+
return server
163+
}
164+
141165
// NewWebsocketServer starts a new webserver and initialized it with the necessary routes.
142166
// It also starts the broadcaster in ClientHandler as a background job.
143167
func NewWebsocketServer(networkIf string, port int, certPath, keyPath string) *WebServer {

0 commit comments

Comments
 (0)