Skip to content

Commit 0ca1842

Browse files
committed
HYPERFLEET-469: Add Prometheus metrics with hyperfleet_broker_ prefix
Add MetricsRecorder that instruments publisher and subscriber with Prometheus counters and histograms following the HyperFleet Metrics Standard: - hyperfleet_broker_messages_consumed_total - hyperfleet_broker_messages_published_total - hyperfleet_broker_errors_total (conversion, handler, publish) - hyperfleet_broker_message_duration_seconds MetricsRecorder is a required parameter in NewPublisher and NewSubscriber constructors. Passing nil returns an error.
1 parent 024ec6b commit 0ca1842

17 files changed

Lines changed: 498 additions & 180 deletions

File tree

README.md

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The current implementation uses [Watermill](https://github.com/ThreeDotsLabs/wat
1414
- **Worker Pools**: Configurable parallel message processing for subscribers
1515
- **Subscription Management**: Flexible subscription IDs for load balancing (shared subscriptions) or fanout (separate subscriptions)
1616
- **Health Checks**: Built-in `Health()` method on `Publisher` for readiness probes (per [HyperFleet Health Endpoints standard](https://github.com/openshift-hyperfleet/architecture/blob/main/hyperfleet/standards/health-endpoints.md))
17+
- **Prometheus Metrics**: Built-in instrumentation with `MetricsRecorder` (per [HyperFleet Metrics Standard](https://github.com/openshift-hyperfleet/architecture/blob/main/hyperfleet/standards/metrics.md))
1718
- **Simple API**: Clean, easy-to-use interface that hides Watermill complexity
1819

1920
## Installation
@@ -48,17 +49,20 @@ This interface matches the HyperFleet adapter logger interface, ensuring consist
4849
### Usage Examples
4950

5051
```go
52+
// Create metrics recorder (required)
53+
metrics := broker.NewMetricsRecorder("my-component", "v1.0.0", prometheus.DefaultRegisterer)
54+
5155
// Use the default logger
5256
appLogger := logger.NewTestLogger()
53-
publisher, err := broker.NewPublisher(appLogger)
57+
publisher, err := broker.NewPublisher(appLogger, metrics)
5458

5559
// Use JSON format for the default logger
5660
appLogger := logger.NewTestLogger(logger.WithFormat(logger.FormatJSON))
57-
publisher, err := broker.NewPublisher(appLogger)
61+
publisher, err := broker.NewPublisher(appLogger, metrics)
5862

5963
// Use your own logger implementation with config
6064
myLogger := createMyApplicationLogger()
61-
publisher, err := broker.NewPublisher(myLogger, config)
65+
publisher, err := broker.NewPublisher(myLogger, metrics, config)
6266
```
6367

6468
## Quick Start
@@ -78,12 +82,14 @@ import (
7882
"github.com/cloudevents/sdk-go/v2/event"
7983
"github.com/openshift-hyperfleet/hyperfleet-broker/broker"
8084
"github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger"
85+
"github.com/prometheus/client_golang/prometheus"
8186
)
8287

8388
func main() {
84-
// Create logger and publisher
89+
// Create logger, metrics, and publisher
8590
appLogger := logger.NewTestLogger()
86-
publisher, err := broker.NewPublisher(appLogger)
91+
metrics := broker.NewMetricsRecorder("example-publisher", "v1.0.0", prometheus.DefaultRegisterer)
92+
publisher, err := broker.NewPublisher(appLogger, metrics)
8793
if err != nil {
8894
log.Fatalf("Failed to create publisher: %v", err)
8995
}
@@ -148,15 +154,17 @@ import (
148154
"github.com/cloudevents/sdk-go/v2/event"
149155
"github.com/openshift-hyperfleet/hyperfleet-broker/broker"
150156
"github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger"
157+
"github.com/prometheus/client_golang/prometheus"
151158
)
152159

153160
func main() {
154-
// Create logger and subscriber with subscription ID
161+
// Create logger, metrics, and subscriber with subscription ID
155162
// Subscribers with the same subscription ID share messages (load balancing)
156163
// Subscribers with different IDs receive all messages (fanout)
157164
appLogger := logger.NewTestLogger()
165+
metrics := broker.NewMetricsRecorder("example-subscriber", "v1.0.0", prometheus.DefaultRegisterer)
158166
subscriptionID := "shared-subscription"
159-
subscriber, err := broker.NewSubscriber(appLogger, subscriptionID)
167+
subscriber, err := broker.NewSubscriber(appLogger, subscriptionID, metrics)
160168
if err != nil {
161169
log.Fatalf("Failed to create subscriber: %v", err)
162170
}
@@ -437,18 +445,55 @@ You can also provide configuration programmatically using a map:
437445

438446
```go
439447
appLogger := logger.NewTestLogger()
448+
metrics := broker.NewMetricsRecorder("my-component", "v1.0.0", prometheus.DefaultRegisterer)
440449
configMap := map[string]string{
441450
"broker.type": "rabbitmq",
442451
"broker.rabbitmq.url": "amqp://user:pass@localhost:5672/",
443452
"subscriber.parallelism": "5",
444453
}
445454
446-
publisher, err := broker.NewPublisher(appLogger, configMap)
447-
subscriber, err := broker.NewSubscriber(appLogger, "my-subscription", configMap)
455+
publisher, err := broker.NewPublisher(appLogger, metrics, configMap)
456+
subscriber, err := broker.NewSubscriber(appLogger, "my-subscription", metrics, configMap)
448457
```
449458

450459
</details>
451460

461+
## Prometheus Metrics
462+
463+
The library provides Prometheus metrics via `MetricsRecorder`. All metrics follow the [HyperFleet Metrics Standard](https://github.com/openshift-hyperfleet/architecture/blob/main/hyperfleet/standards/metrics.md) and use the `hyperfleet_broker_` prefix.
464+
465+
A `MetricsRecorder` is required when creating publishers and subscribers:
466+
467+
```go
468+
metrics := broker.NewMetricsRecorder("my-component", "v1.0.0", prometheus.DefaultRegisterer)
469+
470+
publisher, err := broker.NewPublisher(appLogger, metrics)
471+
subscriber, err := broker.NewSubscriber(appLogger, "my-subscription", metrics)
472+
```
473+
474+
### Exported Metrics
475+
476+
| Metric | Type | Labels | Description |
477+
|--------|------|--------|-------------|
478+
| `hyperfleet_broker_messages_consumed_total` | Counter | `topic`, `component`, `version` | Total messages consumed from the broker |
479+
| `hyperfleet_broker_messages_published_total` | Counter | `topic`, `component`, `version` | Total messages published to the broker |
480+
| `hyperfleet_broker_errors_total` | Counter | `topic`, `error_type`, `component`, `version` | Total message processing errors |
481+
| `hyperfleet_broker_message_duration_seconds` | Histogram | `topic`, `component`, `version` | Duration of message handler execution |
482+
483+
### Error Types
484+
485+
The `error_type` label on `hyperfleet_broker_errors_total` can have the following values:
486+
487+
| Value | Description |
488+
|-------|-------------|
489+
| `conversion` | Failed to convert between CloudEvent and Watermill message format |
490+
| `handler` | The user-provided handler returned an error |
491+
| `publish` | The underlying broker rejected the publish operation |
492+
493+
### Duration Buckets
494+
495+
The `hyperfleet_broker_message_duration_seconds` histogram uses the following buckets (in seconds): `0.1, 0.5, 1, 2, 5, 10, 30, 60, 120`.
496+
452497
## Main Architectural Decisions
453498

454499
### 1. Watermill Abstraction

broker/broker.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ const (
2121
DefaultShutdownTimeout = 30 * time.Second
2222
)
2323

24-
// NewPublisher creates a new publisher with a required logger and optional configuration.
24+
// NewPublisher creates a new publisher with the provided logger, metrics recorder, and optional configuration.
2525
// Usage:
26-
// - NewPublisher(logger) - uses provided logger and loads config from file
27-
// - NewPublisher(logger, configMap) - uses provided logger with config map
28-
func NewPublisher(log logger.Logger, configMap ...map[string]string) (Publisher, error) {
26+
// - NewPublisher(logger, metrics) - uses provided logger and loads config from file
27+
// - NewPublisher(logger, metrics, configMap) - uses provided logger with config map
28+
func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[string]string) (Publisher, error) {
2929
if log == nil {
3030
return nil, fmt.Errorf("logger is required")
3131
}
32+
if metrics == nil {
33+
return nil, fmt.Errorf("metrics is required")
34+
}
3235

3336
var cfg *config
3437
var err error
@@ -96,20 +99,24 @@ func NewPublisher(log logger.Logger, configMap ...map[string]string) (Publisher,
9699
logger: log,
97100
healthCheck: hc,
98101
healthCloser: healthCloser,
102+
metrics: metrics,
99103
}, nil
100104
}
101105

102-
// NewSubscriber creates a new subscriber with a required logger and optional configuration.
106+
// NewSubscriber creates a new subscriber with the provided logger, subscription ID, metrics recorder, and optional configuration.
103107
// Usage:
104-
// - NewSubscriber(logger, "id") - uses provided logger and loads config from file
105-
// - NewSubscriber(logger, "id", configMap) - uses provided logger with config map
106-
func NewSubscriber(log logger.Logger, subscriptionID string, configMap ...map[string]string) (Subscriber, error) {
108+
// - NewSubscriber(logger, "id", metrics) - uses provided logger and loads config from file
109+
// - NewSubscriber(logger, "id", metrics, configMap) - uses provided logger with config map
110+
func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRecorder, configMap ...map[string]string) (Subscriber, error) {
107111
if subscriptionID == "" {
108112
return nil, fmt.Errorf("subscriptionID is required")
109113
}
110114
if log == nil {
111115
return nil, fmt.Errorf("logger is required")
112116
}
117+
if metrics == nil {
118+
return nil, fmt.Errorf("metrics is required")
119+
}
113120

114121
var cfg *config
115122
var err error
@@ -165,6 +172,7 @@ func NewSubscriber(log logger.Logger, subscriptionID string, configMap ...map[st
165172
subscriptionID: subscriptionID,
166173
logger: log,
167174
errorChan: make(chan *SubscriberError, ErrorChannelBufferSize),
175+
metrics: metrics,
168176
}, nil
169177
}
170178

broker/error_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@ import (
44
"testing"
55

66
"github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger"
7+
"github.com/prometheus/client_golang/prometheus"
78
"github.com/stretchr/testify/assert"
89
)
910

11+
func newErrorTestMetrics(t *testing.T) *MetricsRecorder {
12+
t.Helper()
13+
return NewMetricsRecorder("test", "v0.0.0", prometheus.NewRegistry())
14+
}
15+
1016
func TestNewPublisherErrorHandling(t *testing.T) {
1117
tests := []struct {
1218
name string
@@ -65,10 +71,11 @@ func TestNewPublisherErrorHandling(t *testing.T) {
6571
var err error
6672

6773
mockLogger := logger.NewMockLogger()
74+
metrics := newErrorTestMetrics(t)
6875
if tt.configMap == nil {
69-
pub, err = NewPublisher(mockLogger)
76+
pub, err = NewPublisher(mockLogger, metrics)
7077
} else {
71-
pub, err = NewPublisher(mockLogger, tt.configMap)
78+
pub, err = NewPublisher(mockLogger, metrics, tt.configMap)
7279
}
7380

7481
if tt.expectError {
@@ -154,10 +161,11 @@ func TestNewSubscriberErrorHandling(t *testing.T) {
154161
var err error
155162

156163
mockLogger := logger.NewMockLogger()
164+
metrics := newErrorTestMetrics(t)
157165
if tt.configMap == nil {
158-
sub, err = NewSubscriber(mockLogger, tt.subscriptionID)
166+
sub, err = NewSubscriber(mockLogger, tt.subscriptionID, metrics)
159167
} else {
160-
sub, err = NewSubscriber(mockLogger, tt.subscriptionID, tt.configMap)
168+
sub, err = NewSubscriber(mockLogger, tt.subscriptionID, metrics, tt.configMap)
161169
}
162170

163171
if tt.expectError {
@@ -192,7 +200,8 @@ func TestPublisherPublishErrorHandling(t *testing.T) {
192200
}
193201

194202
mockLogger := logger.NewMockLogger()
195-
_, err := NewPublisher(mockLogger, configMap)
203+
metrics := newErrorTestMetrics(t)
204+
_, err := NewPublisher(mockLogger, metrics, configMap)
196205
assert.Error(t, err)
197206
}
198207

broker/metrics.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package broker
2+
3+
import (
4+
"time"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
// MetricsRecorder holds Prometheus metrics for the broker library.
10+
type MetricsRecorder struct {
11+
component string
12+
version string
13+
14+
messagesConsumed *prometheus.CounterVec
15+
messagesPublished *prometheus.CounterVec
16+
errors *prometheus.CounterVec
17+
messageDuration *prometheus.HistogramVec
18+
}
19+
20+
// NewMetricsRecorder creates a new MetricsRecorder and registers all metrics
21+
// with the provided prometheus.Registerer. If registerer is nil, the default
22+
// prometheus.DefaultRegisterer is used.
23+
//
24+
// The component and version parameters are used as constant label values
25+
// on all metrics, per the HyperFleet Metrics Standard.
26+
func NewMetricsRecorder(component, version string, registerer prometheus.Registerer) *MetricsRecorder {
27+
if registerer == nil {
28+
registerer = prometheus.DefaultRegisterer
29+
}
30+
31+
m := &MetricsRecorder{
32+
component: component,
33+
version: version,
34+
35+
messagesConsumed: prometheus.NewCounterVec(prometheus.CounterOpts{
36+
Name: "hyperfleet_broker_messages_consumed_total",
37+
Help: "Total number of messages consumed from the broker.",
38+
}, []string{"topic", "component", "version"}),
39+
40+
messagesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
41+
Name: "hyperfleet_broker_messages_published_total",
42+
Help: "Total number of messages published to the broker.",
43+
}, []string{"topic", "component", "version"}),
44+
45+
errors: prometheus.NewCounterVec(prometheus.CounterOpts{
46+
Name: "hyperfleet_broker_errors_total",
47+
Help: "Total number of message processing errors.",
48+
}, []string{"topic", "error_type", "component", "version"}),
49+
50+
messageDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
51+
Name: "hyperfleet_broker_message_duration_seconds",
52+
Help: "Duration of message processing in seconds.",
53+
// Event processing buckets per HyperFleet Metrics Standard
54+
Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30, 60, 120},
55+
}, []string{"topic", "component", "version"}),
56+
}
57+
58+
registerer.MustRegister(
59+
m.messagesConsumed,
60+
m.messagesPublished,
61+
m.errors,
62+
m.messageDuration,
63+
)
64+
65+
return m
66+
}
67+
68+
// RecordConsumed increments the messages consumed counter for the given topic.
69+
func (m *MetricsRecorder) RecordConsumed(topic string) {
70+
m.messagesConsumed.WithLabelValues(topic, m.component, m.version).Inc()
71+
}
72+
73+
// RecordPublished increments the messages published counter for the given topic.
74+
func (m *MetricsRecorder) RecordPublished(topic string) {
75+
m.messagesPublished.WithLabelValues(topic, m.component, m.version).Inc()
76+
}
77+
78+
// RecordError increments the errors counter for the given topic and error type.
79+
func (m *MetricsRecorder) RecordError(topic, errorType string) {
80+
m.errors.WithLabelValues(topic, errorType, m.component, m.version).Inc()
81+
}
82+
83+
// RecordDuration observes the message processing duration for the given topic.
84+
func (m *MetricsRecorder) RecordDuration(topic string, duration time.Duration) {
85+
m.messageDuration.WithLabelValues(topic, m.component, m.version).Observe(duration.Seconds())
86+
}

0 commit comments

Comments
 (0)