Skip to content

Commit 898551d

Browse files
committed
refactor metric collector again
1 parent 743e081 commit 898551d

4 files changed

Lines changed: 211 additions & 321 deletions

File tree

plugin/action/event_to_metrics/event_to_metrics.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package event_to_metrics
22

33
import (
4+
"maps"
5+
"sync"
46
"time"
57

68
"github.com/ozontech/file.d/cfg"
@@ -21,6 +23,9 @@ type Plugin struct {
2123
logger *zap.Logger
2224
pluginController pipeline.ActionPluginController
2325
format string
26+
27+
Metrics []Metric
28+
mu *sync.Mutex
2429
}
2530

2631
// ! config-params
@@ -69,10 +74,11 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
6974
}
7075

7176
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
77+
p.mu = &sync.Mutex{}
7278
p.config = config.(*Config)
7379
p.logger = params.Logger.Desugar()
7480
p.pluginController = params.Controller
75-
p.config.Metrics = prepareCheckersForMetrics(p.config.Metrics, p.logger)
81+
p.Metrics = prepareCheckersForMetrics(p.config.Metrics, p.logger)
7682

7783
format, err := xtime.ParseFormatName(p.config.TimeFieldFormat)
7884
if err != nil {
@@ -102,11 +108,23 @@ func (p *Plugin) Stop() {
102108
}
103109

104110
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
105-
for i := range p.config.Metrics {
106-
if p.config.Metrics[i].DoIfChecker != nil {
107-
p.config.Metrics[i].use = p.config.Metrics[i].DoIfChecker.Check(event.Root)
111+
p.mu.Lock()
112+
copyMetrics := make([]Metric, 0, len(p.Metrics))
113+
for i := range p.Metrics {
114+
if p.Metrics[i].DoIfChecker == nil {
115+
copyMetrics = append(copyMetrics, p.Metrics[i])
116+
} else {
117+
if !p.config.Metrics[i].DoIfChecker.Check(event.Root) {
118+
continue
119+
}
120+
copyMetrics = append(copyMetrics, p.Metrics[i])
121+
}
122+
if p.Metrics[i].Labels != nil {
123+
copyMetrics[len(copyMetrics)-1].Labels = make(map[string]string, len(p.Metrics[i].Labels))
124+
maps.Copy(copyMetrics[len(copyMetrics)-1].Labels, p.Metrics[i].Labels)
108125
}
109126
}
127+
p.mu.Unlock()
110128

111129
var ts time.Time
112130

@@ -128,12 +146,8 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
128146
ts = time.Now()
129147
}
130148

131-
children := make([]*insaneJSON.Node, 0, len(p.config.Metrics))
132-
for _, metric := range p.config.Metrics {
133-
if !metric.use {
134-
continue
135-
}
136-
149+
children := make([]*insaneJSON.Node, 0, len(copyMetrics))
150+
for _, metric := range copyMetrics {
137151
elem := new(insaneJSON.Node)
138152
object := elem.MutateToObject()
139153

plugin/output/prometheus/metric_collector.go

Lines changed: 56 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -12,36 +12,37 @@ import (
1212
)
1313

1414
type metricCollector struct {
15-
collector *sync.Map
16-
17-
timeout time.Duration
15+
sender storageSender
16+
metrics map[string]*metricValue
17+
mutex sync.RWMutex
1818
flushTicker *time.Ticker
19-
repeatTicker *time.Ticker
2019
shutdownChan chan struct{}
21-
sender storageSender
22-
logger *zap.Logger
20+
flushTimeout time.Duration
21+
22+
logger *zap.Logger
2323
}
2424

2525
type metricValue struct {
26-
value float64
27-
timestamp int64
28-
sendedTimestamp time.Time
29-
lastUpdateTime time.Time
30-
expiredAt time.Time
26+
value float64
27+
timestamp int64
28+
lastValueIsSended bool
29+
lastUpdateTime time.Time
30+
sendedTimestamp time.Time
31+
expiredAt time.Time
3132
}
3233

3334
type storageSender interface {
3435
sendToStorage(values []promwrite.TimeSeries) error
3536
}
3637

37-
func newCollector(sender storageSender, logger *zap.Logger) *metricCollector {
38+
func newCollector(sender storageSender, flushTimeout time.Duration, logger *zap.Logger) *metricCollector {
3839
c := &metricCollector{
39-
collector: new(sync.Map),
40-
flushTicker: time.NewTicker(10 * time.Second),
41-
repeatTicker: time.NewTicker(1 * time.Minute),
42-
shutdownChan: make(chan struct{}),
4340
sender: sender,
4441
logger: logger,
42+
metrics: make(map[string]*metricValue),
43+
flushTicker: time.NewTicker(flushTimeout),
44+
flushTimeout: flushTimeout,
45+
shutdownChan: make(chan struct{}),
4546
}
4647
go c.flushAndRepeatOldMetrics()
4748
return c
@@ -50,38 +51,29 @@ func newCollector(sender storageSender, logger *zap.Logger) *metricCollector {
5051
var i int
5152

5253
func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string, ttl int64) {
53-
labelsKey := labelsToKey(labels)
54+
key := labelsToKey(labels)
5455
now := time.Now()
56+
p.mutex.Lock()
57+
defer p.mutex.Unlock()
5558

56-
newMetric := func() metricValue {
57-
if prev, ok := p.collector.Load(labelsKey); ok {
58-
prevMetric := prev.(metricValue)
59-
currentValue := value
60-
61-
if metricType == "counter" {
62-
// For counters, incoming values should be increments
63-
currentValue += prevMetric.value
64-
}
65-
66-
return metricValue{
67-
value: currentValue,
68-
timestamp: max(timestamp, prevMetric.timestamp),
69-
lastUpdateTime: now,
70-
expiredAt: now.Add(time.Duration(ttl) * time.Millisecond),
71-
sendedTimestamp: prevMetric.sendedTimestamp,
72-
}
73-
}
74-
75-
return metricValue{
76-
value: value,
77-
timestamp: timestamp,
78-
lastUpdateTime: now,
79-
expiredAt: now.Add(time.Duration(ttl) * time.Millisecond),
80-
sendedTimestamp: time.Time{}, // Zero time for new metrics
59+
if existing, exists := p.metrics[key]; exists {
60+
if metricType == "counter" {
61+
value += existing.value
8162
}
63+
timestamp = max(timestamp, existing.sendedTimestamp.UnixMilli())
8264
}
8365

84-
p.collector.Store(labelsKey, newMetric())
66+
nowUnixTime := now.UnixMilli()
67+
timestamp = min(timestamp, nowUnixTime)
68+
69+
metric := &metricValue{
70+
value: value,
71+
timestamp: timestamp,
72+
lastUpdateTime: now,
73+
lastValueIsSended: false,
74+
expiredAt: now.Add(time.Duration(ttl) * time.Millisecond),
75+
}
76+
p.metrics[key] = metric
8577
}
8678

8779
func (p *metricCollector) flushAndRepeatOldMetrics() {
@@ -97,48 +89,37 @@ func (p *metricCollector) flushAndRepeatOldMetrics() {
9789
}
9890

9991
func (p *metricCollector) flushMetrics() {
92+
p.mutex.Lock()
93+
defer p.mutex.Unlock()
94+
10095
var toSend []promwrite.TimeSeries
10196
now := time.Now()
102-
nowUnixtime := now.UnixMilli()
103-
104-
sendEvery := 1 * time.Minute
105-
freshDuration := 3 * time.Minute
10697

10798
toDelete := []string{}
10899

109-
p.collector.Range(func(key, value interface{}) bool {
110-
metric := value.(metricValue)
111-
labels := keyToLabels(key.(string))
100+
for key, metric := range p.metrics {
101+
labels := keyToLabels(key)
112102

113-
lastUpdateSeconds := now.Sub(metric.lastUpdateTime)
114-
lastSendedSeconds := now.Sub(metric.sendedTimestamp)
115-
isFresh := lastUpdateSeconds < freshDuration
103+
if metric.lastValueIsSended && now.Sub(metric.lastUpdateTime) >= p.flushTimeout && now.Before(metric.expiredAt) {
104+
// repeat value
105+
metric.timestamp = now.UnixMilli()
106+
}
116107

117-
if isFresh {
118-
timeSeries := createTimeSeries(labels, metric)
119-
if timeSeries.Sample.Time.After(metric.sendedTimestamp) {
120-
toSend = append(toSend, timeSeries)
121-
metric.sendedTimestamp = timeSeries.Sample.Time
122-
p.collector.Store(key, metric)
123-
}
124-
} else if lastSendedSeconds > sendEvery {
108+
timeSeries := createTimeSeries(labels, metric, p.flushTimeout)
109+
if metric.sendedTimestamp != timeSeries.Sample.Time {
110+
toSend = append(toSend, timeSeries)
111+
metric.sendedTimestamp = timeSeries.Sample.Time
112+
metric.lastValueIsSended = true
113+
p.metrics[key] = metric
125114
if now.After(metric.expiredAt) {
126-
toDelete = append(toDelete, key.(string))
127-
}
128-
metric.timestamp = nowUnixtime
129-
timeSeries := createTimeSeries(labels, metric)
130-
if timeSeries.Sample.Time.After(metric.sendedTimestamp) {
131-
toSend = append(toSend, timeSeries)
132-
metric.sendedTimestamp = timeSeries.Sample.Time
133-
p.collector.Store(key, metric)
115+
toDelete = append(toDelete, key)
134116
}
135117
}
136-
return true
137-
})
118+
}
138119

139120
if len(toDelete) > 0 {
140121
for _, key := range toDelete {
141-
p.collector.Delete(key)
122+
delete(p.metrics, key)
142123
}
143124
}
144125

@@ -153,25 +134,15 @@ func (p *metricCollector) flushMetrics() {
153134

154135
func (p *metricCollector) shutdown() {
155136
close(p.shutdownChan)
156-
157-
// Flush all remaining metrics
158-
var toSend []promwrite.TimeSeries
159-
160-
p.collector.Range(func(key, value interface{}) bool {
161-
metric := value.(metricValue)
162-
labels := keyToLabels(key.(string))
163-
toSend = append(toSend, createTimeSeries(labels, metric))
164-
return true
165-
})
166-
p.sender.sendToStorage(toSend)
137+
p.flushMetrics()
167138
}
168139

169140
// Helper function
170-
func createTimeSeries(labels []promwrite.Label, metric metricValue) promwrite.TimeSeries {
141+
func createTimeSeries(labels []promwrite.Label, metric *metricValue, roundPeriod time.Duration) promwrite.TimeSeries {
171142
return promwrite.TimeSeries{
172143
Labels: labels,
173144
Sample: promwrite.Sample{
174-
Time: time.Unix(0, metric.timestamp*int64(time.Millisecond)).Truncate(30 * time.Second),
145+
Time: time.Unix(0, metric.timestamp*int64(time.Millisecond)).Truncate(roundPeriod),
175146
Value: metric.value,
176147
},
177148
}

0 commit comments

Comments
 (0)