Skip to content

Commit 3a7ee12

Browse files
committed
add ttl to metric
1 parent 4e1533a commit 3a7ee12

3 files changed

Lines changed: 71 additions & 27 deletions

File tree

plugin/action/event_to_metrics/event_to_metrics.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@ type Config struct {
4040
// > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
4141
TimeFieldFormat string `json:"time_field_format" default:"rfc3339nano"` // *
4242

43-
Metrics []Metric
43+
Metrics []Metric `json:"metrics" slice:"true" required:"true"` // *
4444
}
4545

4646
type Metric struct {
4747
Name string `json:"name"`
4848
Type string `json:"type"`
4949
Value string `json:"value"`
5050
Labels map[string]string `json:"labels"`
51+
TTL cfg.Duration `json:"ttl" parse:"duration"` // *
52+
TTL_ time.Duration
5153

5254
DoIfCheckerMap map[string]any `json:"do_if"`
5355
DoIfChecker *doif.Checker
@@ -137,6 +139,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
137139

138140
object.AddField("name").MutateToBytes([]byte(metric.Name))
139141
object.AddField("type").MutateToBytes([]byte(metric.Type))
142+
object.AddField("ttl").MutateToInt64(metric.TTL_.Nanoseconds())
140143
object.AddField("timestamp").MutateToInt64(ts.UnixNano())
141144

142145
if len(metric.Value) == 0 {

plugin/output/prometheus/metric_collector.go

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type metricCollector struct {
1515
collector *sync.Map
1616
timeout time.Duration
1717
flushTicker *time.Ticker
18+
repeatTicker *time.Ticker
1819
shutdownChan chan struct{}
1920
sender storageSender
2021
logger *zap.Logger
@@ -24,6 +25,7 @@ type metricValue struct {
2425
value float64
2526
timestamp int64
2627
lastUpdateTime time.Time
28+
expiredAt time.Time
2729
lastValueIsSended bool
2830
}
2931

@@ -35,16 +37,17 @@ func newCollector(timeout time.Duration, sender storageSender, logger *zap.Logge
3537
c := &metricCollector{
3638
collector: new(sync.Map),
3739
timeout: timeout,
38-
flushTicker: time.NewTicker(1 * time.Second), // Check every second
40+
flushTicker: time.NewTicker(1 * time.Second),
41+
repeatTicker: time.NewTicker(30 * time.Second),
3942
shutdownChan: make(chan struct{}),
4043
sender: sender,
4144
logger: logger,
4245
}
43-
go c.flushOldMetrics()
46+
go c.flushAndRepeatOldMetrics()
4447
return c
4548
}
4649

47-
func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string) []promwrite.TimeSeries {
50+
func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string, ttl int64) []promwrite.TimeSeries {
4851
labelsKey := labelsToKey(labels)
4952
now := time.Now()
5053
currentTimestampSec := timestamp / 1_000_000_000
@@ -81,45 +84,78 @@ func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64,
8184
value: currentValue,
8285
timestamp: timestamp,
8386
lastUpdateTime: now,
87+
expiredAt: now.Add(time.Duration(ttl)),
8488
lastValueIsSended: shouldSend,
8589
})
8690

8791
return values
8892
}
8993

90-
func (p *metricCollector) flushOldMetrics() {
94+
func (p *metricCollector) flushAndRepeatOldMetrics() {
9195
for {
9296
select {
9397
case <-p.flushTicker.C:
94-
var toSend []promwrite.TimeSeries
95-
now := time.Now()
96-
97-
p.collector.Range(func(key, value interface{}) bool {
98-
metric := value.(metricValue)
99-
if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended {
100-
labels := keyToLabels(key.(string))
101-
toSend = append(toSend, createTimeSeries(labels, metric))
102-
metric.lastValueIsSended = true
103-
p.collector.Store(key, metric)
104-
}
105-
return true
106-
})
107-
108-
if len(toSend) > 0 {
109-
// Send these metrics to your storage
110-
err := p.sender.sendToStorage(toSend)
111-
if err != nil {
112-
p.logger.Error("can't send data", zap.Error(err))
113-
}
114-
}
115-
98+
p.flushOldMetrics()
99+
case <-p.repeatTicker.C:
100+
p.repeatOldMetrics()
116101
case <-p.shutdownChan:
117102
p.flushTicker.Stop()
118103
return
119104
}
120105
}
121106
}
122107

108+
func (p *metricCollector) flushOldMetrics() {
109+
var toSend []promwrite.TimeSeries
110+
now := time.Now()
111+
112+
p.collector.Range(func(key, value interface{}) bool {
113+
metric := value.(metricValue)
114+
if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended {
115+
labels := keyToLabels(key.(string))
116+
toSend = append(toSend, createTimeSeries(labels, metric))
117+
metric.lastValueIsSended = true
118+
p.collector.Store(key, metric)
119+
}
120+
return true
121+
})
122+
123+
if len(toSend) > 0 {
124+
// Send these metrics to your storage
125+
err := p.sender.sendToStorage(toSend)
126+
if err != nil {
127+
p.logger.Error("can't send data", zap.Error(err))
128+
}
129+
}
130+
}
131+
132+
func (p *metricCollector) repeatOldMetrics() {
133+
var toSend []promwrite.TimeSeries
134+
now := time.Now()
135+
nowUnixtime := now.UnixNano()
136+
137+
p.collector.Range(func(key, value interface{}) bool {
138+
metric := value.(metricValue)
139+
labels := keyToLabels(key.(string))
140+
metric.timestamp = nowUnixtime
141+
toSend = append(toSend, createTimeSeries(labels, metric))
142+
if now.Before(metric.expiredAt) {
143+
metric.lastValueIsSended = true
144+
p.collector.Store(key, metric)
145+
} else {
146+
p.collector.Delete(key)
147+
}
148+
return true
149+
})
150+
151+
if len(toSend) > 0 {
152+
err := p.sender.sendToStorage(toSend)
153+
if err != nil {
154+
p.logger.Error("can't send data", zap.Error(err))
155+
}
156+
}
157+
}
158+
123159
func (p *metricCollector) shutdown() {
124160
close(p.shutdownChan)
125161
// Flush all remaining metrics

plugin/output/prometheus/prometheus.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ func (p *Plugin) send(root *insaneJSON.Root) error {
337337
timestamp := timestampNode.AsInt64()
338338
timestampNode.Suicide()
339339

340+
ttlNode := msg.Dig("ttl")
341+
ttl := ttlNode.AsInt64()
342+
ttlNode.Suicide()
343+
340344
valueNode := msg.Dig("value")
341345
value := valueNode.AsFloat()
342346
valueNode.Suicide()
@@ -362,6 +366,7 @@ func (p *Plugin) send(root *insaneJSON.Root) error {
362366
value,
363367
timestamp,
364368
metricType,
369+
ttl,
365370
)...)
366371
}
367372

0 commit comments

Comments
 (0)