Skip to content

Commit 0edd34f

Browse files
committed
refactor metric collector
1 parent 8a6c751 commit 0edd34f

3 files changed

Lines changed: 97 additions & 119 deletions

File tree

plugin/output/prometheus/metric_collector.go

Lines changed: 64 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
)
1313

1414
type metricCollector struct {
15-
collector *sync.Map
15+
mu sync.RWMutex
16+
metrics map[string]metricValue
17+
1618
timeout time.Duration
1719
flushTicker *time.Ticker
1820
repeatTicker *time.Ticker
@@ -22,23 +24,23 @@ type metricCollector struct {
2224
}
2325

2426
type metricValue struct {
25-
value float64
26-
timestamp int64
27-
lastUpdateTime time.Time
28-
expiredAt time.Time
29-
lastValueIsSended bool
27+
value float64
28+
timestamp int64
29+
sendedTimestamp time.Time
30+
lastUpdateTime time.Time
31+
expiredAt time.Time
3032
}
3133

3234
type storageSender interface {
3335
sendToStorage(values []promwrite.TimeSeries) error
3436
}
3537

36-
func newCollector(timeout time.Duration, sender storageSender, logger *zap.Logger) *metricCollector {
38+
func newCollector(sender storageSender, logger *zap.Logger) *metricCollector {
3739
c := &metricCollector{
38-
collector: new(sync.Map),
39-
timeout: timeout,
40-
flushTicker: time.NewTicker(1 * time.Second),
41-
repeatTicker: time.NewTicker(30 * time.Second),
40+
mu: sync.RWMutex{},
41+
metrics: make(map[string]metricValue),
42+
flushTicker: time.NewTicker(10 * time.Second),
43+
repeatTicker: time.NewTicker(1 * time.Minute),
4244
shutdownChan: make(chan struct{}),
4345
sender: sender,
4446
logger: logger,
@@ -47,108 +49,83 @@ func newCollector(timeout time.Duration, sender storageSender, logger *zap.Logge
4749
return c
4850
}
4951

50-
func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string, ttl int64) []promwrite.TimeSeries {
52+
func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string, ttl int64) {
53+
p.mu.Lock()
54+
defer p.mu.Unlock()
55+
5156
labelsKey := labelsToKey(labels)
5257
now := time.Now()
53-
currentTimestampSec := timestamp / 1_000
5458

55-
var values []promwrite.TimeSeries
56-
var shouldSend bool
5759
var currentValue float64
58-
var prevMetric metricValue
5960

60-
if prev, ok := p.collector.Load(labelsKey); ok {
61-
prevMetric = prev.(metricValue)
62-
prevTimestampSec := prevMetric.timestamp / 1_000
63-
64-
// For counters, accumulate values
61+
if prevMetric, ok := p.metrics[labelsKey]; ok {
6562
currentValue = value
6663
if metricType == "counter" {
6764
currentValue += prevMetric.value
6865
}
69-
70-
// Check if time window advanced
71-
shouldSend = prevTimestampSec < currentTimestampSec
72-
73-
if shouldSend {
74-
prevMetric.timestamp = max(prevMetric.timestamp, timestamp-1_000)
75-
values = append(values, createTimeSeries(labels, prevMetric))
76-
}
7766
} else {
7867
currentValue = value
79-
shouldSend = false // First value, don't send yet
8068
}
8169

82-
// Always store the current value
83-
p.collector.Store(labelsKey, metricValue{
84-
value: currentValue,
85-
timestamp: timestamp,
86-
lastUpdateTime: now,
87-
expiredAt: now.Add(time.Duration(ttl) * time.Millisecond),
88-
lastValueIsSended: shouldSend,
89-
})
90-
91-
return values
70+
p.metrics[labelsKey] = metricValue{
71+
value: currentValue,
72+
timestamp: timestamp,
73+
lastUpdateTime: now,
74+
expiredAt: now.Add(time.Duration(ttl) * time.Millisecond),
75+
}
9276
}
9377

9478
func (p *metricCollector) flushAndRepeatOldMetrics() {
9579
for {
9680
select {
9781
case <-p.flushTicker.C:
98-
p.flushOldMetrics()
99-
case <-p.repeatTicker.C:
100-
p.repeatOldMetrics()
82+
p.flushMetrics()
10183
case <-p.shutdownChan:
10284
p.flushTicker.Stop()
10385
return
10486
}
10587
}
10688
}
10789

108-
func (p *metricCollector) flushOldMetrics() {
109-
var toSend []promwrite.TimeSeries
110-
now := time.Now()
111-
112-
p.collector.Range(func(key, value interface{}) bool {
113-
metric := value.(metricValue)
114-
if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended {
115-
labels := keyToLabels(key.(string))
116-
toSend = append(toSend, createTimeSeries(labels, metric))
117-
metric.lastValueIsSended = true
118-
p.collector.Store(key, metric)
119-
}
120-
return true
121-
})
90+
func (p *metricCollector) flushMetrics() {
91+
p.mu.Lock()
92+
defer p.mu.Unlock()
12293

123-
if len(toSend) > 0 {
124-
// Send these metrics to your storage
125-
err := p.sender.sendToStorage(toSend)
126-
if err != nil {
127-
p.logger.Error("can't send data", zap.Error(err))
128-
}
129-
}
130-
}
131-
132-
func (p *metricCollector) repeatOldMetrics() {
13394
var toSend []promwrite.TimeSeries
13495
now := time.Now()
13596
nowUnixtime := now.UnixMilli()
13697

137-
p.collector.Range(func(key, value interface{}) bool {
138-
metric := value.(metricValue)
139-
labels := keyToLabels(key.(string))
140-
metric.timestamp = nowUnixtime
141-
toSend = append(toSend, createTimeSeries(labels, metric))
142-
if now.Before(metric.expiredAt) {
143-
metric.lastValueIsSended = true
144-
p.collector.Store(key, metric)
145-
} else {
146-
p.collector.Delete(key)
98+
sendEvery := 1 * time.Minute
99+
freshDuration := 3 * time.Minute
100+
101+
for key, metric := range p.metrics {
102+
labels := keyToLabels(key)
103+
104+
lastUpdateSeconds := now.Sub(metric.lastUpdateTime)
105+
isFresh := lastUpdateSeconds < freshDuration
106+
107+
if isFresh {
108+
timeSeries := createTimeSeries(labels, metric)
109+
if metric.sendedTimestamp != timeSeries.Sample.Time {
110+
toSend = append(toSend, timeSeries)
111+
metric.sendedTimestamp = timeSeries.Sample.Time
112+
p.metrics[key] = metric
113+
}
114+
} else if lastUpdateSeconds > sendEvery {
115+
metric.timestamp = nowUnixtime
116+
if now.Before(metric.expiredAt) {
117+
p.metrics[key] = metric
118+
timeSeries := createTimeSeries(labels, metric)
119+
toSend = append(toSend, timeSeries)
120+
metric.sendedTimestamp = timeSeries.Sample.Time
121+
} else {
122+
delete(p.metrics, key)
123+
}
147124
}
148-
return true
149-
})
125+
}
150126

151127
if len(toSend) > 0 {
128+
// Send these metrics to your storage
152129
err := p.sender.sendToStorage(toSend)
153130
if err != nil {
154131
p.logger.Error("can't send data", zap.Error(err))
@@ -158,14 +135,16 @@ func (p *metricCollector) repeatOldMetrics() {
158135

159136
func (p *metricCollector) shutdown() {
160137
close(p.shutdownChan)
138+
p.mu.Lock()
139+
defer p.mu.Unlock()
140+
161141
// Flush all remaining metrics
162142
var toSend []promwrite.TimeSeries
163-
p.collector.Range(func(key, value interface{}) bool {
164-
metric := value.(metricValue)
165-
labels := keyToLabels(key.(string))
143+
144+
for key, metric := range p.metrics {
145+
labels := keyToLabels(key)
166146
toSend = append(toSend, createTimeSeries(labels, metric))
167-
return true
168-
})
147+
}
169148
p.sender.sendToStorage(toSend)
170149
}
171150

@@ -174,7 +153,7 @@ func createTimeSeries(labels []promwrite.Label, metric metricValue) promwrite.Ti
174153
return promwrite.TimeSeries{
175154
Labels: labels,
176155
Sample: promwrite.Sample{
177-
Time: time.Unix(0, metric.timestamp*int64(time.Millisecond)),
156+
Time: time.Unix(0, metric.timestamp*int64(time.Millisecond)).Round(30 * time.Second),
178157
Value: metric.value,
179158
},
180159
}

plugin/output/prometheus/metric_collector_test.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,18 @@ func TestMetricCollector(t *testing.T) {
8080

8181
// First value - should not be sent
8282
now := time.Now()
83-
values := collector.handleMetric(labels, 10.0, now.UnixNano(), "counter")
83+
values := collector.handleMetric(labels, 10.0, now.UnixMilli(), "counter", 0)
8484
assert.Empty(t, values)
8585
assert.Empty(t, testSender.getSentMetrics())
8686

8787
// Second value in same time window - should accumulate but not send
88-
values = collector.handleMetric(labels, 5.0, now.UnixNano(), "counter")
88+
values = collector.handleMetric(labels, 5.0, now.UnixMilli(), "counter", 0)
8989
assert.Empty(t, values)
9090
assert.Empty(t, testSender.getSentMetrics())
9191

9292
// Third value in next time window - should send accumulated value
9393
nextTime := now.Add(time.Second)
94-
values = collector.handleMetric(labels, 3.0, nextTime.UnixNano(), "counter")
94+
values = collector.handleMetric(labels, 3.0, nextTime.UnixMilli(), "counter", 0)
9595
assert.Len(t, values, 1)
9696
assert.Equal(t, 15.0, values[0].Sample.Value) // 10 + 5
9797
})
@@ -109,16 +109,16 @@ func TestMetricCollector(t *testing.T) {
109109
now := time.Now()
110110

111111
// First value - should not be sent
112-
values := collector.handleMetric(labels, 100.0, now.UnixNano(), "gauge")
112+
values := collector.handleMetric(labels, 100.0, now.UnixMilli(), "gauge", 0)
113113
assert.Empty(t, values)
114114

115115
// Second value in same time window - should replace but not send
116-
values = collector.handleMetric(labels, 200.0, now.UnixNano(), "gauge")
116+
values = collector.handleMetric(labels, 200.0, now.UnixMilli(), "gauge", 0)
117117
assert.Empty(t, values)
118118

119119
// Third value in next time window - should send latest value
120120
nextTime := now.Add(time.Second)
121-
values = collector.handleMetric(labels, 300.0, nextTime.UnixNano(), "gauge")
121+
values = collector.handleMetric(labels, 300.0, nextTime.UnixMilli(), "gauge", 0)
122122
assert.Len(t, values, 1)
123123
assert.Equal(t, 200.0, values[0].Sample.Value) // Latest value before time window change
124124
})
@@ -135,7 +135,7 @@ func TestMetricCollector(t *testing.T) {
135135
}
136136

137137
// Add a metric that will timeout
138-
collector.handleMetric(labels, 42.0, time.Now().UnixNano(), "gauge")
138+
collector.handleMetric(labels, 42.0, time.Now().UnixMilli(), "gauge", 0)
139139

140140
// Wait for timeout + a bit more
141141
time.Sleep(shortTimeout + 50*time.Millisecond)
@@ -168,7 +168,7 @@ func TestMetricCollector(t *testing.T) {
168168
}
169169

170170
// Add a metric that will timeout
171-
sendedValues := collector.handleMetric(labels, 42.0, time.Now().UnixNano(), "gauge")
171+
sendedValues := collector.handleMetric(labels, 42.0, time.Now().UnixMilli(), "gauge", 0)
172172
assert.Len(t, sendedValues, 0)
173173

174174
// Wait for timeout + a bit more
@@ -203,8 +203,8 @@ func TestMetricCollector(t *testing.T) {
203203
}
204204

205205
// Add multiple metrics
206-
collector.handleMetric(labels1, 10.0, time.Now().UnixNano(), "gauge")
207-
collector.handleMetric(labels2, 20.0, time.Now().UnixNano(), "gauge")
206+
collector.handleMetric(labels1, 10.0, time.Now().UnixMilli(), "gauge", 0)
207+
collector.handleMetric(labels2, 20.0, time.Now().UnixMilli(), "gauge", 0)
208208

209209
collector.shutdown()
210210

@@ -232,7 +232,7 @@ func TestMetricCollector(t *testing.T) {
232232
{Name: "worker", Value: string(rune(workerID))},
233233
{Name: "index", Value: string(rune(j))},
234234
}
235-
collector.handleMetric(labels, float64(j), time.Now().UnixNano(), "counter")
235+
collector.handleMetric(labels, float64(j), time.Now().UnixMilli(), "counter", 0)
236236
}
237237
}(i)
238238
}
@@ -260,7 +260,7 @@ func TestMetricCollector(t *testing.T) {
260260
{Name: "job", Value: "test"},
261261
}
262262

263-
collector.handleMetric(labels, 99.0, time.Now().UnixNano(), "gauge")
263+
collector.handleMetric(labels, 99.0, time.Now().UnixMilli(), "gauge", 0)
264264

265265
// Wait for flush
266266
time.Sleep(150 * time.Millisecond)
@@ -274,7 +274,7 @@ func TestMetricCollector(t *testing.T) {
274274
now := time.Now()
275275
mv := metricValue{
276276
value: 42.0,
277-
timestamp: now.UnixNano(),
277+
timestamp: now.UnixMilli(),
278278
lastUpdateTime: now,
279279
lastValueIsSended: false,
280280
}
@@ -285,7 +285,7 @@ func TestMetricCollector(t *testing.T) {
285285

286286
ts := createTimeSeries(labels, mv)
287287
assert.Equal(t, 42.0, ts.Sample.Value)
288-
assert.Equal(t, now.Truncate(time.Nanosecond), ts.Sample.Time.Truncate(time.Nanosecond))
288+
assert.Equal(t, now.Truncate(time.Millisecond), ts.Sample.Time.Truncate(time.Millisecond))
289289
assert.Equal(t, labels, ts.Labels)
290290
})
291291

@@ -317,11 +317,11 @@ func TestEdgeCases(t *testing.T) {
317317

318318
// Very old timestamp
319319
oldTime := time.Now().Add(-1 * time.Hour)
320-
collector.handleMetric(labels, 1.0, oldTime.UnixNano(), "gauge")
320+
collector.handleMetric(labels, 1.0, oldTime.UnixMilli(), "gauge", 0)
321321

322322
now := time.Now()
323323
// New timestamp to trigger send
324-
values := collector.handleMetric(labels, 2.0, now.UnixNano(), "gauge")
324+
values := collector.handleMetric(labels, 2.0, now.UnixMilli(), "gauge", 0)
325325

326326
// Should have sent the old value despite being very old (timestamp: 1 second before now)
327327
assert.Len(t, values, 1)
@@ -341,8 +341,8 @@ func TestEdgeCases(t *testing.T) {
341341
now := time.Now()
342342

343343
// Test unknown metric type (should behave like gauge)
344-
collector.handleMetric(labels, 10.0, now.UnixNano(), "unknown")
345-
values := collector.handleMetric(labels, 20.0, now.Add(time.Second).UnixNano(), "unknown")
344+
collector.handleMetric(labels, 10.0, now.UnixMilli(), "unknown", 0)
345+
values := collector.handleMetric(labels, 20.0, now.Add(time.Second).UnixMilli(), "unknown", 0)
346346

347347
// Should send the last value (10.0) since time window advanced
348348
if len(values) > 0 {
@@ -356,7 +356,7 @@ func TestCreateTimeSeries(t *testing.T) {
356356
now := time.Now()
357357
mv := metricValue{
358358
value: 123.45,
359-
timestamp: now.UnixNano(),
359+
timestamp: now.UnixMilli(),
360360
}
361361

362362
labels := []promwrite.Label{
@@ -368,7 +368,7 @@ func TestCreateTimeSeries(t *testing.T) {
368368

369369
assert.Equal(t, labels, ts.Labels)
370370
assert.Equal(t, 123.45, ts.Sample.Value)
371-
assert.Equal(t, now.Truncate(time.Nanosecond), ts.Sample.Time.Truncate(time.Nanosecond))
371+
assert.Equal(t, now.Truncate(time.Millisecond), ts.Sample.Time.Truncate(time.Millisecond))
372372
})
373373
}
374374

0 commit comments

Comments
 (0)