Skip to content

Commit e2976bf

Browse files
authored
Adds skupper event processing prometheus metrics (skupperproject#2338)
Exposes a new set of prometheus metrics from kubernetes control plane components to inform on the event processing load and capacity of each process (controller and adaptor.) - "skupper_workqueue_adds_total": count of events queued by kind. - "skupper_workqueue_depth": current depth of the work queue by kind. - "skupper_workqueue_delay_seconds": duration in seconds an event is queued before it is pulled for processing by kind. - "skupper_workqueue_retries_total": count of event processing retires by kind. Signed-off-by: Christian Kruse <christian@c-kruse.com>
1 parent 24720f8 commit e2976bf

10 files changed

Lines changed: 243 additions & 24 deletions

File tree

cmd/controller/main.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
internalclient "github.com/skupperproject/skupper/internal/kube/client"
1313
"github.com/skupperproject/skupper/internal/kube/controller"
1414
"github.com/skupperproject/skupper/internal/kube/metrics"
15+
"github.com/skupperproject/skupper/internal/kube/watchers"
1516
"github.com/skupperproject/skupper/internal/version"
1617
)
1718

@@ -70,20 +71,22 @@ func main() {
7071
}
7172
config.Namespace = cli.Namespace
7273

73-
controller, err := controller.NewController(cli, config)
74-
if err != nil {
75-
log.Fatal("Error getting new site controller ", err.Error())
76-
}
77-
74+
var eventProcessorMetrics watchers.MetricsProvider
7875
if !config.MetricsConfig.Disabled {
7976
reg := prometheus.NewRegistry()
8077
metrics.MustRegisterClientGoMetrics(reg)
78+
eventProcessorMetrics = metrics.MustRegisterEventProcessorMetrics(reg)
8179
srv := metrics.NewServer(config.MetricsConfig, reg)
8280
if err := srv.Start(stopCh); err != nil {
8381
log.Fatalf("Error starting metrics server: %s", err)
8482
}
8583
}
8684

85+
controller, err := controller.NewController(cli, config, watchers.WithMetricsProvider(eventProcessorMetrics))
86+
if err != nil {
87+
log.Fatal("Error getting new site controller ", err.Error())
88+
}
89+
8790
if err = controller.Run(stopCh); err != nil {
8891
log.Fatal("Error running site controller: ", err.Error())
8992
}

cmd/kube-adaptor/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/skupperproject/skupper/internal/kube/adaptor"
1717
internalclient "github.com/skupperproject/skupper/internal/kube/client"
1818
"github.com/skupperproject/skupper/internal/kube/metrics"
19+
"github.com/skupperproject/skupper/internal/kube/watchers"
1920
"github.com/skupperproject/skupper/internal/qdr"
2021
"github.com/skupperproject/skupper/internal/version"
2122
)
@@ -84,9 +85,11 @@ func main() {
8485
os.Exit(0)
8586
}
8687

88+
var eventProcessorMetrics watchers.MetricsProvider
8789
if !metricsConfig.Disabled {
8890
reg := prometheus.NewRegistry()
8991
metrics.MustRegisterClientGoMetrics(reg)
92+
eventProcessorMetrics = metrics.MustRegisterEventProcessorMetrics(reg)
9093
srv := metrics.NewServer(metricsConfig, reg)
9194
if err := srv.Start(stopCh); err != nil {
9295
log.Fatalf("Error starting metrics server: %s", err)
@@ -108,7 +111,7 @@ func main() {
108111
})
109112
go http.ListenAndServe(":9191", nil)
110113

111-
configSync := adaptor.NewConfigSync(cli, cli.GetNamespace(), configDir, configMapName)
114+
configSync := adaptor.NewConfigSync(cli, cli.GetNamespace(), configDir, configMapName, eventProcessorMetrics)
112115
log.Println("Starting controller loop...")
113116
configSync.Start(stopCh)
114117

internal/kube/adaptor/config_sync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ func sslSecretsWatcher(namespace string, eventProcessor *watchers.EventProcessor
3434
}
3535
}
3636

37-
func NewConfigSync(cli internalclient.Clients, namespace string, path string, routerConfigMap string) *ConfigSync {
38-
controller := watchers.NewEventProcessor("config-sync", cli)
37+
func NewConfigSync(cli internalclient.Clients, namespace string, path string, routerConfigMap string, metrics watchers.MetricsProvider) *ConfigSync {
38+
controller := watchers.NewEventProcessor("config-sync", cli, watchers.WithMetricsProvider(metrics))
3939
configSync := &ConfigSync{
4040
agentPool: qdr.NewAgentPool("amqp://localhost:5672", nil),
4141
controller: controller,

internal/kube/controller/controller.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,9 @@ func labelling() internalinterfaces.TweakListOptionsFunc {
8383
}
8484
}
8585

86-
var eventProcessorCustomizers []watchers.EventProcessorCustomizer
87-
88-
func NewController(cli internalclient.Clients, config *Config) (*Controller, error) {
86+
func NewController(cli internalclient.Clients, config *Config, options ...watchers.EventProcessorCustomizer) (*Controller, error) {
8987
controller := &Controller{
90-
eventProcessor: watchers.NewEventProcessor("Controller", cli, eventProcessorCustomizers...),
88+
eventProcessor: watchers.NewEventProcessor("Controller", cli, options...),
9189
sites: map[string]*site.Site{},
9290
siteSizing: sizing.NewRegistry(),
9391
labelling: labels.NewLabelsAndAnnotations(config.Namespace),

internal/kube/controller/controller_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -761,9 +761,11 @@ func TestUpdate(t *testing.T) {
761761
},
762762
},
763763
}
764-
defer func() {
765-
eventProcessorCustomizers = nil
766-
}()
764+
eventProcessorResyncShort := []watchers.EventProcessorCustomizer{
765+
func(e *watchers.EventProcessor) {
766+
e.SetResyncShort(time.Second)
767+
},
768+
}
767769
for _, tt := range testTable {
768770
t.Run(tt.name, func(t *testing.T) {
769771
flags := &flag.FlagSet{}
@@ -776,12 +778,7 @@ func TestUpdate(t *testing.T) {
776778
clients, err := fakeclient.NewFakeClient(config.Namespace, tt.k8sObjects, tt.skupperObjects, "")
777779
assert.Assert(t, err)
778780
enableSSA(clients.GetDynamicClient())
779-
eventProcessorCustomizers = []watchers.EventProcessorCustomizer{
780-
func(e *watchers.EventProcessor) {
781-
e.SetResyncShort(time.Second)
782-
},
783-
}
784-
controller, err := NewController(clients, config)
781+
controller, err := NewController(clients, config, eventProcessorResyncShort...)
785782
assert.Assert(t, err)
786783
stopCh := make(chan struct{})
787784
err = controller.init(stopCh)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/skupperproject/skupper/internal/kube/watchers"
6+
)
7+
8+
func MustRegisterEventProcessorMetrics(registry *prometheus.Registry) watchers.MetricsProvider {
9+
provider := eventProcessorMetrics{
10+
adds: prometheus.NewCounterVec(prometheus.CounterOpts{
11+
Namespace: "skupper",
12+
Subsystem: "workqueue",
13+
Name: "adds_total",
14+
Help: "Total number of events queued.",
15+
}, []string{"kind"}),
16+
depth: prometheus.NewGaugeVec(prometheus.GaugeOpts{
17+
Namespace: "skupper",
18+
Subsystem: "workqueue",
19+
Name: "depth",
20+
Help: "Current depth of event queue.",
21+
}, []string{"kind"}),
22+
delayDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
23+
Namespace: "skupper",
24+
Subsystem: "workqueue",
25+
Name: "delay_seconds",
26+
Help: "How long in seconds an event is queued before it is handled.",
27+
Buckets: prometheus.ExponentialBuckets(1e-9, 10, 12),
28+
}, []string{"kind"}),
29+
workDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
30+
Namespace: "skupper",
31+
Subsystem: "workqueue",
32+
Name: "duration_seconds",
33+
Help: "How long in seconds handling a watch event takes.",
34+
Buckets: prometheus.ExponentialBuckets(1e-9, 10, 12),
35+
}, []string{"kind"}),
36+
retries: prometheus.NewCounterVec(prometheus.CounterOpts{
37+
Namespace: "skupper",
38+
Subsystem: "workqueue",
39+
Name: "retries_total",
40+
Help: "Total number of retries queued.",
41+
}, []string{"kind"}),
42+
}
43+
registry.MustRegister(provider.adds, provider.depth, provider.delayDuration, provider.workDuration, provider.retries)
44+
return provider
45+
}
46+
47+
type eventProcessorMetrics struct {
48+
adds *prometheus.CounterVec
49+
depth *prometheus.GaugeVec
50+
delayDuration *prometheus.HistogramVec
51+
workDuration *prometheus.HistogramVec
52+
retries *prometheus.CounterVec
53+
}
54+
55+
func (p eventProcessorMetrics) NewAddedMetric(kind string) watchers.CounterMetric {
56+
return p.adds.WithLabelValues(kind)
57+
}
58+
func (p eventProcessorMetrics) NewDepthMetric(kind string) watchers.GaugeMetric {
59+
return p.depth.WithLabelValues(kind)
60+
}
61+
func (p eventProcessorMetrics) NewDelayedMetric(kind string) watchers.ObservableMetric {
62+
return p.delayDuration.WithLabelValues(kind)
63+
}
64+
func (p eventProcessorMetrics) NewWorkDurationMetric(kind string) watchers.ObservableMetric {
65+
return p.workDuration.WithLabelValues(kind)
66+
}
67+
func (p eventProcessorMetrics) NewRetriesMetric(kind string) watchers.CounterMetric {
68+
return p.retries.WithLabelValues(kind)
69+
}

internal/kube/watchers/metrics.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package watchers
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
type MetricsProvider interface {
9+
NewAddedMetric(kind string) CounterMetric
10+
NewDepthMetric(kind string) GaugeMetric
11+
NewDelayedMetric(kind string) ObservableMetric
12+
NewWorkDurationMetric(kind string) ObservableMetric
13+
NewRetriesMetric(kind string) CounterMetric
14+
}
15+
16+
type CounterMetric interface {
17+
Inc()
18+
}
19+
type GaugeMetric interface {
20+
CounterMetric
21+
Dec()
22+
}
23+
type ObservableMetric interface {
24+
Observe(float64)
25+
}
26+
27+
type noopMetric struct{}
28+
29+
func (noopMetric) Inc() {}
30+
func (noopMetric) Dec() {}
31+
func (noopMetric) Observe(float64) {}
32+
33+
type noopMetricsProvider struct{}
34+
35+
func (noopMetricsProvider) NewAddedMetric(kind string) CounterMetric { return noopMetric{} }
36+
func (noopMetricsProvider) NewDepthMetric(kind string) GaugeMetric { return noopMetric{} }
37+
func (noopMetricsProvider) NewDelayedMetric(kind string) ObservableMetric { return noopMetric{} }
38+
func (noopMetricsProvider) NewWorkDurationMetric(kind string) ObservableMetric { return noopMetric{} }
39+
func (noopMetricsProvider) NewRetriesMetric(kind string) CounterMetric { return noopMetric{} }
40+
41+
type metricsSet struct {
42+
Added CounterMetric
43+
Depth GaugeMetric
44+
Delayed ObservableMetric
45+
WorkDuration ObservableMetric
46+
Retries CounterMetric
47+
}
48+
49+
type metricsQueue struct {
50+
provider MetricsProvider
51+
52+
metricsMu sync.Mutex
53+
metrics map[string]metricsSet
54+
pendingMu sync.Mutex
55+
pending map[ResourceChange]time.Time
56+
}
57+
58+
func (q *metricsQueue) add(evt ResourceChange) {
59+
q.pendingMu.Lock()
60+
defer q.pendingMu.Unlock()
61+
if _, ok := q.pending[evt]; ok {
62+
return
63+
}
64+
q.pending[evt] = time.Now()
65+
ms := q.metricsFor(evt.Handler.Kind())
66+
ms.Added.Inc()
67+
ms.Depth.Inc()
68+
}
69+
70+
type metricsClose func(evt ResourceChange, retry bool)
71+
72+
func (q *metricsQueue) get(evt ResourceChange) metricsClose {
73+
q.pendingMu.Lock()
74+
defer q.pendingMu.Unlock()
75+
queuedAt, ok := q.pending[evt]
76+
if !ok {
77+
return q.done(time.Now())
78+
}
79+
q.metricsFor(evt.Handler.Kind()).Delayed.Observe(time.Since(queuedAt).Seconds())
80+
delete(q.pending, evt)
81+
return q.done(time.Now())
82+
}
83+
84+
func (q *metricsQueue) done(startTime time.Time) metricsClose {
85+
return func(evt ResourceChange, retry bool) {
86+
ms := q.metricsFor(evt.Handler.Kind())
87+
ms.WorkDuration.Observe(time.Since(startTime).Seconds())
88+
ms.Depth.Dec()
89+
if retry {
90+
ms.Retries.Inc()
91+
q.add(evt)
92+
}
93+
}
94+
}
95+
96+
func (q *metricsQueue) metricsFor(kind string) metricsSet {
97+
q.metricsMu.Lock()
98+
defer q.metricsMu.Unlock()
99+
if q.metrics == nil {
100+
q.metrics = map[string]metricsSet{}
101+
}
102+
if m, ok := q.metrics[kind]; ok {
103+
return m
104+
}
105+
m := metricsSet{
106+
Added: q.provider.NewAddedMetric(kind),
107+
Depth: q.provider.NewDepthMetric(kind),
108+
Delayed: q.provider.NewDelayedMetric(kind),
109+
WorkDuration: q.provider.NewWorkDurationMetric(kind),
110+
Retries: q.provider.NewRetriesMetric(kind),
111+
}
112+
q.metrics[kind] = m
113+
return m
114+
}

internal/kube/watchers/resources.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,7 @@ func (w ResourceWatcher[T]) Sync(stopCh <-chan struct{}) bool {
149149
func (w ResourceWatcher[T]) IsStopped() bool {
150150
return w.informer.IsStopped()
151151
}
152+
153+
func (w ResourceWatcher[T]) Kind() string {
154+
return w.gvk.GroupVersion().String() + " " + w.gvk.Kind
155+
}

0 commit comments

Comments
 (0)