Skip to content

Commit f6bae06

Browse files
authored
Rgw ops log latency metrics fix (#29)
* latency metrics logic fixed On-behalf-of: SAP <filipp.akinfiev@clyso.com> Signed-off-by: Filipp Akinfiev <filipp.akinfiev@clyso.com> * readme updated On-behalf-of: SAP <filipp.akinfiev@clyso.com> Signed-off-by: Filipp Akinfiev <filipp.akinfiev@clyso.com> --------- Signed-off-by: Filipp Akinfiev <filipp.akinfiev@clyso.com>
1 parent d8c6dde commit f6bae06

5 files changed

Lines changed: 98 additions & 129 deletions

File tree

pkg/producers/opslog/metrics.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@ type Metrics struct {
1717
BytesReceived atomic.Uint64
1818
Errors atomic.Uint64
1919

20-
LatencyByMethod sync.Map // map["user|bucket|method"]
20+
// LatencyByMethod sync.Map // map["user|bucket|method"]
21+
22+
// LatencyObs records a single request‐latency observation into the
23+
// `requestsDurationHistogram`, which is registered once at startup.
24+
// Because the histogram lives for the entire process life and is never
25+
// re‐initialized or cleared, its buckets continuously accumulate across
26+
// scrape intervals—ensuring a true cumulative histogram that Prometheus
27+
// can derive accurate rates and quantiles from.
28+
LatencyObs func(user string, tenant string, bucket string, method string, seconds float64)
2129

2230
RequestsByMethod sync.Map // map[string]*atomic.Uint64
2331
RequestsByOperation sync.Map // map[string]*atomic.Uint64
@@ -39,8 +47,17 @@ type Metrics struct {
3947

4048
}
4149

42-
func NewMetrics() *Metrics {
43-
return &Metrics{}
50+
func NewMetrics(obs ...func(user string, tenant string, bucket string, method string, seconds float64)) *Metrics {
51+
var cb func(user, tenant, bucket, method string, seconds float64)
52+
if len(obs) > 0 {
53+
cb = obs[0]
54+
} else {
55+
// default no‐op so nobody ever has a nil-pointer
56+
cb = func(_, _, _, _ string, _ float64) {}
57+
}
58+
return &Metrics{
59+
LatencyObs: cb,
60+
}
4461
}
4562

4663
// Convert metrics to a JSON-friendly struct
@@ -115,11 +132,6 @@ func (m *Metrics) ToJSON(metricsConfig *MetricsConfig) ([]byte, error) {
115132
data["errors_by_ip_and_bucket"] = loadSyncMap(&m.ErrorsByIPAndBucket)
116133
}
117134

118-
// Latency Tracking
119-
if metricsConfig.TrackLatencyByMethod {
120-
data["latency_by_method"] = loadSyncMap(&m.LatencyByMethod)
121-
}
122-
123135
return json.Marshal(data)
124136
}
125137

@@ -223,16 +235,15 @@ func (m *Metrics) Update(logEntry S3OperationLog, metricsConfig *MetricsConfig)
223235

224236
// Latency Tracking
225237
if logEntry.TotalTime > 0 {
226-
latencyMs := uint64(logEntry.TotalTime)
227-
228238
if metricsConfig.TrackLatencyByMethod ||
229239
metricsConfig.TrackLatencyByBucket ||
230240
metricsConfig.TrackLatencyByTenant ||
231241
metricsConfig.TrackLatencyByUser ||
232242
metricsConfig.TrackLatencyByBucketAndMethod {
233-
// Key format: "user|bucket|method"
234-
latencyKey := logEntry.User + "|" + logEntry.Bucket + "|" + method
235-
incrementSyncMapValue(&m.LatencyByMethod, latencyKey, latencyMs)
243+
244+
latencySec := float64(logEntry.TotalTime) / 1000.0
245+
userStr, tenantStr := extractUserAndTenant(logEntry.User)
246+
m.LatencyObs(userStr, tenantStr, logEntry.Bucket, method, latencySec)
236247
}
237248
}
238249
}
@@ -244,7 +255,6 @@ func (m *Metrics) Reset() {
244255
m.BytesReceived.Store(0)
245256
m.Errors.Store(0)
246257

247-
resetSyncMap(&m.LatencyByMethod)
248258
resetSyncMap(&m.RequestsByMethod)
249259
resetSyncMap(&m.RequestsByOperation)
250260
resetSyncMap(&m.RequestsByStatusCode)
@@ -263,10 +273,6 @@ func (m *Metrics) Reset() {
263273
resetSyncMap(&m.ErrorsByIPAndBucket)
264274
}
265275

266-
func (m *Metrics) ResetPerWindowMetrics() {
267-
resetSyncMap(&m.LatencyByMethod)
268-
}
269-
270276
// Helper function: Update max atomic value
271277
func updateMaxAtomic(target *atomic.Uint64, value uint64) {
272278
for {
@@ -398,14 +404,13 @@ func ExtractHTTPMethod(uri string) string {
398404

399405
// Clone creates a deep copy of the Metrics
400406
func (m *Metrics) Clone() *Metrics {
401-
clone := NewMetrics()
407+
clone := NewMetrics(m.LatencyObs)
402408

403409
clone.TotalRequests.Store(m.TotalRequests.Load())
404410
clone.BytesSent.Store(m.BytesSent.Load())
405411
clone.BytesReceived.Store(m.BytesReceived.Load())
406412
clone.Errors.Store(m.Errors.Load())
407413

408-
copySyncMap(&m.LatencyByMethod, &clone.LatencyByMethod)
409414
copySyncMap(&m.RequestsByMethod, &clone.RequestsByMethod)
410415
copySyncMap(&m.RequestsByOperation, &clone.RequestsByOperation)
411416
copySyncMap(&m.RequestsByStatusCode, &clone.RequestsByStatusCode)
@@ -440,7 +445,7 @@ func copySyncMap(src, dst *sync.Map) {
440445

441446
// SubtractMetrics calculates the delta between two metrics objects: total - previous
442447
func SubtractMetrics(total, previous *Metrics) *Metrics {
443-
delta := NewMetrics()
448+
delta := NewMetrics(total.LatencyObs)
444449

445450
// Handle top-level counters
446451
delta.TotalRequests.Store(diff(total.TotalRequests.Load(), previous.TotalRequests.Load()))
@@ -465,7 +470,6 @@ func SubtractMetrics(total, previous *Metrics) *Metrics {
465470
subtractSyncMap(&total.BytesReceivedByIP, &previous.BytesReceivedByIP, &delta.BytesReceivedByIP)
466471
subtractSyncMap(&total.ErrorsByUserAndBucket, &previous.ErrorsByUserAndBucket, &delta.ErrorsByUserAndBucket)
467472
subtractSyncMap(&total.ErrorsByIPAndBucket, &previous.ErrorsByIPAndBucket, &delta.ErrorsByIPAndBucket)
468-
subtractSyncMap(&total.LatencyByMethod, &previous.LatencyByMethod, &delta.LatencyByMethod)
469473

470474
return delta
471475
}

pkg/producers/opslog/metrics_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,37 @@ func TestSubtractMetrics_MissingInPrev(t *testing.T) {
9292
assert.Equal(t, uint64(7), v.(*atomic.Uint64).Load())
9393
}
9494

95+
func TestLatencyObsPropagation(t *testing.T) {
96+
called := false
97+
cb := func(u, tnt, bucket, method string, sec float64) {
98+
assert.Equal(t, "u1", u)
99+
assert.Equal(t, "t1", tnt)
100+
assert.Equal(t, "b1", bucket)
101+
assert.Equal(t, "M", method)
102+
assert.InDelta(t, 0.123, sec, 1e-6)
103+
called = true
104+
}
105+
106+
m := NewMetrics(cb)
107+
// direct call
108+
m.LatencyObs("u1", "t1", "b1", "M", 0.123)
109+
assert.True(t, called)
110+
111+
// clone carries it forward
112+
clone := m.Clone()
113+
called = false
114+
clone.LatencyObs("u1", "t1", "b1", "M", 0.123)
115+
assert.True(t, called)
116+
117+
// subtract carries it forward
118+
total := NewMetrics(cb)
119+
prev := NewMetrics(cb)
120+
delta := SubtractMetrics(total, prev)
121+
called = false
122+
delta.LatencyObs("u1", "t1", "b1", "M", 0.123)
123+
assert.True(t, called)
124+
}
125+
95126
func newUint64(val uint64) *atomic.Uint64 {
96127
var u atomic.Uint64
97128
u.Store(val)

pkg/producers/opslog/opslog.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,42 @@ func StartFileOpsLogger(cfg OpsLogConfig) {
7474

7575
if cfg.Prometheus {
7676
StartPrometheusServer(cfg.PrometheusPort, &cfg.MetricsConfig)
77+
78+
if cfg.MetricsConfig.TrackLatencyByMethod || cfg.MetricsConfig.TrackLatencyByUser ||
79+
cfg.MetricsConfig.TrackLatencyByBucket || cfg.MetricsConfig.TrackLatencyByTenant ||
80+
cfg.MetricsConfig.TrackLatencyByBucketAndMethod {
81+
latencyObs = func(user, tenant, bucket, method string, seconds float64) {
82+
// raw per-method
83+
if cfg.MetricsConfig.TrackLatencyByBucketAndMethod {
84+
requestsDurationHistogram.WithLabelValues(user, tenant, bucket, method).
85+
Observe(seconds)
86+
}
87+
// per-method aggregated across users
88+
if cfg.MetricsConfig.TrackLatencyByMethod {
89+
requestsDurationHistogram.WithLabelValues("all", "all", bucket, method).
90+
Observe(seconds)
91+
}
92+
// per-bucket
93+
if cfg.MetricsConfig.TrackLatencyByBucket {
94+
requestsDurationHistogram.WithLabelValues("all", "all", bucket, "all").
95+
Observe(seconds)
96+
}
97+
// per-tenant
98+
if cfg.MetricsConfig.TrackLatencyByTenant {
99+
requestsDurationHistogram.WithLabelValues("all", tenant, "all", "all").
100+
Observe(seconds)
101+
}
102+
// per-user
103+
if cfg.MetricsConfig.TrackLatencyByUser {
104+
requestsDurationHistogram.WithLabelValues(user, tenant, "all", "all").
105+
Observe(seconds)
106+
}
107+
}
108+
}
77109
}
78110

79111
// Initialize metrics
80-
metrics := NewMetrics()
112+
metrics := NewMetrics(latencyObs)
81113
interval := time.Duration(cfg.PrometheusIntervalSeconds) * time.Second
82114
ticker := time.NewTicker(interval)
83115
defer ticker.Stop()
@@ -106,8 +138,6 @@ func StartFileOpsLogger(cfg OpsLogConfig) {
106138
if cfg.UseNats {
107139
publishMetricsToNATS(cfg, nc, metrics)
108140
}
109-
110-
metrics.ResetPerWindowMetrics()
111141
}
112142

113143
// Keep the program running
@@ -297,7 +327,7 @@ func StartSocketOpsLogger(cfg OpsLogConfig) {
297327
log.Info().Str("nats_url", cfg.NatsURL).Msg("Connected to NATS server")
298328
}
299329

300-
metrics := NewMetrics()
330+
metrics := NewMetrics(latencyObs)
301331
ticker := time.NewTicker(1 * time.Minute) // Set up a ticker to trigger every 1 minute
302332
defer ticker.Stop()
303333

@@ -348,7 +378,7 @@ func StartSocketOpsLogger(cfg OpsLogConfig) {
348378
}
349379

350380
// Reset metrics for the next interval
351-
metrics = NewMetrics()
381+
metrics = NewMetrics(latencyObs)
352382
}
353383
}
354384

pkg/producers/opslog/prometheus.go

Lines changed: 2 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ var (
140140
[]string{"user", "tenant", "bucket", "method"},
141141
// []string{"pod", "user", "tenant", "bucket", "method"},
142142
)
143+
144+
latencyObs func(user, tenant, bucket, method string, seconds float64)
143145
)
144146

145147
func initPrometheusSettings(metricsConfig *MetricsConfig) {
@@ -696,100 +698,6 @@ func PublishToPrometheus(totalMetrics *Metrics, cfg OpsLogConfig) {
696698
})
697699
}
698700

699-
// Update request duration histogram (latency metrics)
700-
if metricsConfig.TrackLatencyByMethod || metricsConfig.TrackLatencyByUser ||
701-
metricsConfig.TrackLatencyByBucket || metricsConfig.TrackLatencyByTenant ||
702-
metricsConfig.TrackLatencyByBucketAndMethod {
703-
704-
diffMetrics.LatencyByMethod.Range(func(key, totalLatency any) bool {
705-
parts := strings.Split(key.(string), "|")
706-
if len(parts) != 3 {
707-
log.Warn().Msgf("Invalid key format in LatencyByMethod: %v", key)
708-
return true
709-
}
710-
user, bucket, method := parts[0], parts[1], parts[2]
711-
userStr, tenantStr := extractUserAndTenant(user)
712-
713-
// Fetch request count for this method
714-
countVal, exists := diffMetrics.RequestsByMethod.Load(key)
715-
if !exists {
716-
countVal, exists = currentMetrics.RequestsByMethod.Load(key)
717-
if !exists {
718-
log.Warn().Msgf("Missing request count for latency key: %v", key)
719-
return true
720-
}
721-
}
722-
count := float64(countVal.(*atomic.Uint64).Load())
723-
if count == 0 {
724-
log.Warn().Msgf("Zero request count for latency key: %v", key)
725-
return true
726-
}
727-
728-
// Compute avg latency (convert ms → sec)
729-
avgLatencySec := float64(totalLatency.(*atomic.Uint64).Load()) / count / 1000.0
730-
731-
if metricsConfig.TrackLatencyByBucketAndMethod {
732-
// Fine-grained latency tracking per bucket & method
733-
requestsDurationHistogram.With(prometheus.Labels{
734-
// "pod": cfg.PodName,
735-
"user": userStr,
736-
"tenant": tenantStr,
737-
"bucket": bucket,
738-
"method": method,
739-
}).Observe(avgLatencySec)
740-
}
741-
742-
if metricsConfig.TrackLatencyByMethod {
743-
// Aggregated latency for all users (reduces cardinality)
744-
requestsDurationHistogram.With(prometheus.Labels{
745-
// "pod": cfg.PodName,
746-
"user": "all",
747-
"tenant": "all",
748-
"bucket": bucket,
749-
"method": method,
750-
}).Observe(avgLatencySec)
751-
}
752-
753-
if metricsConfig.TrackLatencyByBucket {
754-
requestsDurationHistogram.With(prometheus.Labels{
755-
"user": "all",
756-
"tenant": "all",
757-
"bucket": bucket,
758-
"method": "all",
759-
}).Observe(avgLatencySec)
760-
}
761-
762-
if metricsConfig.TrackLatencyByTenant {
763-
requestsDurationHistogram.With(prometheus.Labels{
764-
"user": "all",
765-
"tenant": tenantStr,
766-
"bucket": "all",
767-
"method": "all",
768-
}).Observe(avgLatencySec)
769-
}
770-
771-
if metricsConfig.TrackLatencyByUser {
772-
requestsDurationHistogram.With(prometheus.Labels{
773-
"user": userStr,
774-
"tenant": tenantStr,
775-
"bucket": "all",
776-
"method": "all",
777-
}).Observe(avgLatencySec)
778-
}
779-
780-
// // Aggregate latency for all methods
781-
// requestsDurationHistogram.With(prometheus.Labels{
782-
// "pod": cfg.PodName,
783-
// "user": userStr,
784-
// "tenant": tenantStr,
785-
// "bucket": bucket,
786-
// "method": "all",
787-
// }).Observe(avgLatencySec)
788-
789-
return true
790-
})
791-
}
792-
793701
log.Info().Msg("Updated Prometheus metrics for users and buckets")
794702
}
795703

pkg/producers/radosgwusage/README.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,16 @@ Configuration can also be set through environment variables:
6060
The RadosGW Usage Exporter collects and exposes the following metrics:
6161

6262

63-
### Bucket Usage Metrics
63+
### Bucket / User Usage Metrics
6464

65-
- `radosgw_usage_bucket_bytes`: Bucket used bytes.
66-
- `radosgw_usage_bucket_utilized_bytes`: Bucket utilized bytes.
67-
- `radosgw_usage_bucket_objects`: Number of objects in the bucket.
65+
- `radosgw_user_buckets_total`: Total number of buckets for each user.
66+
- `radosgw_user_objects_total`: Total number of objects for each user.
67+
- `radosgw_user_data_size_bytes`: Total size of data for each user in bytes
6868

6969
### Quota Metrics
7070

7171
- `radosgw_usage_bucket_quota_enabled`: Indicates if quota is enabled for the bucket.
7272
- `radosgw_usage_bucket_quota_size`: Maximum allowed bucket size.
73-
- `radosgw_usage_bucket_quota_size_bytes`: Maximum allowed bucket size in bytes.
7473
- `radosgw_usage_bucket_quota_size_objects`: Maximum allowed number of objects in the bucket.
7574
- `radosgw_usage_user_quota_enabled`: Indicates if user quota is enabled.
7675
- `radosgw_usage_user_quota_size`: Maximum allowed size for the user.
@@ -81,9 +80,6 @@ The RadosGW Usage Exporter collects and exposes the following metrics:
8180
- `radosgw_usage_bucket_shards`: Number of shards in the bucket.
8281
- `radosgw_user_metadata`: User metadata (e.g., display name, email, storage class).
8382

84-
### Miscellaneous Metrics
85-
86-
- `radosgw_usage_scrape_duration_seconds`: Amount of time each scrape takes.
8783

8884
## Example Workflow
8985

0 commit comments

Comments
 (0)