diff --git a/README.md b/README.md
index 0c64661e5..46b90b886 100755
--- a/README.md
+++ b/README.md
@@ -44,9 +44,9 @@ TBD: throughput on production servers.
**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md), [socket](plugin/input/socket/README.md)
-**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [cardinality](plugin/action/cardinality/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [decode](plugin/action/decode/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [hash](plugin/action/hash/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [json_extract](plugin/action/json_extract/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
+**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [cardinality](plugin/action/cardinality/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [decode](plugin/action/decode/README.md), [discard](plugin/action/discard/README.md), [event_to_metrics](plugin/action/event_to_metrics/README.md), [flatten](plugin/action/flatten/README.md), [hash](plugin/action/hash/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [json_extract](plugin/action/json_extract/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
-**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [http](plugin/output/http/README.md), [kafka](plugin/output/kafka/README.md), [loki](plugin/output/loki/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)
+**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [http](plugin/output/http/README.md), [kafka](plugin/output/kafka/README.md), [loki](plugin/output/loki/README.md), [postgres](plugin/output/postgres/README.md), [prometheus](plugin/output/prometheus/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)
## Logging system
diff --git a/_sidebar.md b/_sidebar.md
index 9705315eb..2b53c2c70 100644
--- a/_sidebar.md
+++ b/_sidebar.md
@@ -32,6 +32,7 @@
- [debug](plugin/action/debug/README.md)
- [decode](plugin/action/decode/README.md)
- [discard](plugin/action/discard/README.md)
+ - [event_to_metrics](plugin/action/event_to_metrics/README.md)
- [flatten](plugin/action/flatten/README.md)
- [hash](plugin/action/hash/README.md)
- [join](plugin/action/join/README.md)
@@ -61,6 +62,7 @@
- [kafka](plugin/output/kafka/README.md)
- [loki](plugin/output/loki/README.md)
- [postgres](plugin/output/postgres/README.md)
+ - [prometheus](plugin/output/prometheus/README.md)
- [s3](plugin/output/s3/README.md)
- [splunk](plugin/output/splunk/README.md)
- [stdout](plugin/output/stdout/README.md)
diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go
index 6ad73c2f7..c1ce56153 100644
--- a/cmd/file.d/file.d.go
+++ b/cmd/file.d/file.d.go
@@ -26,6 +26,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/debug"
_ "github.com/ozontech/file.d/plugin/action/decode"
_ "github.com/ozontech/file.d/plugin/action/discard"
+ _ "github.com/ozontech/file.d/plugin/action/event_to_metrics"
_ "github.com/ozontech/file.d/plugin/action/flatten"
_ "github.com/ozontech/file.d/plugin/action/hash"
_ "github.com/ozontech/file.d/plugin/action/join"
@@ -61,6 +62,7 @@ import (
_ "github.com/ozontech/file.d/plugin/output/kafka"
_ "github.com/ozontech/file.d/plugin/output/loki"
_ "github.com/ozontech/file.d/plugin/output/postgres"
+ _ "github.com/ozontech/file.d/plugin/output/prometheus"
_ "github.com/ozontech/file.d/plugin/output/s3"
_ "github.com/ozontech/file.d/plugin/output/splunk"
_ "github.com/ozontech/file.d/plugin/output/stdout"
diff --git a/go.mod b/go.mod
index b182dbc18..8b4aa40b9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,15 +6,16 @@ toolchain go1.25.5
require (
github.com/ClickHouse/ch-go v0.65.1
- github.com/KimMachineGun/automemlimit v0.2.6
+ github.com/KimMachineGun/automemlimit v0.7.1
github.com/Masterminds/squirrel v1.5.4
github.com/alecthomas/kingpin v2.2.6+incompatible
- github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
+ github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b
github.com/alicebob/miniredis/v2 v2.35.0
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310
github.com/bitly/go-simplejson v0.5.1
github.com/bmatcuk/doublestar/v4 v4.8.1
github.com/bufbuild/protocompile v0.13.0
+ github.com/castai/promwrite v0.6.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/dominikbraun/graph v0.23.0
@@ -32,10 +33,10 @@ require (
github.com/minio/minio-go v6.0.14+incompatible
github.com/ozontech/insane-json v0.1.9
github.com/pierrec/lz4/v4 v4.1.25
- github.com/prometheus/client_golang v1.16.0
- github.com/prometheus/client_model v0.3.0
- github.com/prometheus/common v0.42.0
- github.com/prometheus/procfs v0.10.1
+ github.com/prometheus/client_golang v1.22.0
+ github.com/prometheus/client_model v0.6.2
+ github.com/prometheus/common v0.63.0
+ github.com/prometheus/procfs v0.15.1
github.com/redis/go-redis/v9 v9.8.0
github.com/rjeczalik/notify v0.9.3
github.com/satori/go.uuid v1.2.0
@@ -49,10 +50,10 @@ require (
github.com/valyala/fasthttp v1.48.0
github.com/xdg-go/scram v1.1.2
go.uber.org/atomic v1.11.0
- go.uber.org/automaxprocs v1.5.3
+ go.uber.org/automaxprocs v1.6.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.49.0
- google.golang.org/protobuf v1.36.5
+ google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.34.2
@@ -65,13 +66,9 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
- github.com/cilium/ebpf v0.9.1 // indirect
- github.com/containerd/cgroups/v3 v3.0.1 // indirect
- github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dmarkham/enumer v1.5.10 // indirect
- github.com/docker/go-units v0.4.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
@@ -81,13 +78,13 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
- github.com/go-openapi/jsonreference v0.20.2 // indirect
+ github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
- github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
- github.com/golang/protobuf v1.5.4 // indirect
+ github.com/golang/snappy v1.0.0 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
+ github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -107,23 +104,23 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
+ github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
- github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
- github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pascaldekloe/name v1.0.1 // indirect
+ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
+ github.com/prometheus/prometheus v0.304.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
- github.com/sirupsen/logrus v1.8.1 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/tidwall/match v1.1.1 // indirect
@@ -136,15 +133,15 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
- go.opentelemetry.io/otel v1.34.0 // indirect
- go.opentelemetry.io/otel/metric v1.34.0 // indirect
- go.opentelemetry.io/otel/trace v1.34.0 // indirect
+ go.opentelemetry.io/otel v1.35.0 // indirect
+ go.opentelemetry.io/otel/metric v1.35.0 // indirect
+ go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/mod v0.32.0 // indirect
- golang.org/x/oauth2 v0.27.0 // indirect
+ golang.org/x/oauth2 v0.29.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.40.0 // indirect
@@ -153,7 +150,6 @@ require (
golang.org/x/tools v0.41.0 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
- gopkg.in/ini.v1 v1.62.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
diff --git a/go.sum b/go.sum
index 8c125d74a..f8dd80cb2 100644
--- a/go.sum
+++ b/go.sum
@@ -1,8 +1,8 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ClickHouse/ch-go v0.65.1 h1:SLuxmLl5Mjj44/XbINsK2HFvzqup0s6rwKLFH347ZhU=
github.com/ClickHouse/ch-go v0.65.1/go.mod h1:bsodgURwmrkvkBe5jw1qnGDgyITsYErfONKAHn05nv4=
-github.com/KimMachineGun/automemlimit v0.2.6 h1:tQFriVTcIteUkV5EgU9iz03eDY36T8JU5RAjP2r6Kt0=
-github.com/KimMachineGun/automemlimit v0.2.6/go.mod h1:pJhTW/nWJMj6SnWSU2TEKSlCaM+1N5Mej+IfS/5/Ol0=
+github.com/KimMachineGun/automemlimit v0.7.1 h1:QcG/0iCOLChjfUweIMC3YL5Xy9C3VBeNmCZHrZfJMBw=
+github.com/KimMachineGun/automemlimit v0.7.1/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
@@ -10,8 +10,8 @@ github.com/alecthomas/kingpin v2.2.6+incompatible h1:5svnBTFgJjZvGKyYBtMB0+m5wvr
github.com/alecthomas/kingpin v2.2.6+incompatible/go.mod h1:59OFYbFVLKQKq+mqrL6Rw5bR0c3ACQaawgXx0QYndlE=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
-github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
-github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
+github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=
+github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs=
github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI=
github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
@@ -30,22 +30,17 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M=
github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk=
+github.com/castai/promwrite v0.6.0 h1:QTalDPDAE07fjcPe6HpOU8oQIKI8lfBRibtNr7PpcrU=
+github.com/castai/promwrite v0.6.0/go.mod h1:33Jn4Bx9Q39/c6wKd3zxysxeDr7jcUDeOmlQSnYUohQ=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cilium/ebpf v0.9.1 h1:64sn2K3UKw8NbP/blsixRpF3nXuyhz/VjRlRzvlBRu4=
-github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
-github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5ToenJRaE=
-github.com/containerd/cgroups/v3 v3.0.1/go.mod h1:/vtwk1VXrtoa5AaZLkypuOJgA/6DyPMZHJPGQNtlHnw=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
-github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
-github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
-github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@@ -54,8 +49,6 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dmarkham/enumer v1.5.10 h1:ygL0L6quiTiH1jpp68DyvsWaea6MaZLZrTTkIS++R0M=
github.com/dmarkham/enumer v1.5.10/go.mod h1:e4VILe2b1nYK3JKJpRmNdl5xbDQvELc6tQ8b+GsGk6E=
-github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
-github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw=
@@ -66,8 +59,6 @@ github.com/euank/go-kmsg-parser/v3 v3.0.0 h1:1iSX0qRGn8DGNKHCg6T2140f1yKNlezielQ
github.com/euank/go-kmsg-parser/v3 v3.0.0/go.mod h1:xpm9FWjxX5iwyl+f0E23jDbqXJx7jkBvw+21U2iI7hc=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
-github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
-github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM=
github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ=
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
@@ -87,12 +78,10 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
-github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
-github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
-github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
-github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
+github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ=
+github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4=
github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE=
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -100,30 +89,28 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U=
github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
-github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
-github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
-github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
-github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
-github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
+github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
-github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo=
-github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
+github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
+github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
+github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -218,7 +205,6 @@ github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxh
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
-github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -226,6 +212,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
+github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
@@ -247,8 +235,6 @@ github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
-github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
-github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
@@ -267,12 +253,12 @@ github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM
github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4=
github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
-github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0=
-github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/ozontech/insane-json v0.1.9 h1:JG5cEsmuSDwmU7KTJTHfTJ40XMgvtPdsUQbXdbPv+bY=
github.com/ozontech/insane-json v0.1.9/go.mod h1:xZLf3tVLOqaT13rn1sv4fYaZfupAXNL9naLz4QRoMfY=
github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0=
github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM=
+github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
+github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0=
github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -283,14 +269,16 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
-github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
-github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
-github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
-github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
-github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
-github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
-github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
-github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
+github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
+github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
+github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
+github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
+github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
+github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
+github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
+github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
+github.com/prometheus/prometheus v0.304.1 h1:e4kpJMb2Vh/PcR6LInake+ofcvFYHT+bCfmBvOkaZbY=
+github.com/prometheus/prometheus v0.304.1/go.mod h1:ioGx2SGKTY+fLnJSQCdTHqARVldGNS8OlIe3kvp98so=
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY=
@@ -312,8 +300,6 @@ github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXY
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
-github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
-github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
@@ -335,6 +321,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
@@ -380,22 +368,22 @@ github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
-go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
-go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
-go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
-go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
-go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
-go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
-go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
-go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
+go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
+go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
+go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
+go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
+go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
+go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
+go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
+go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
-go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
-go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
+go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
+go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -427,8 +415,8 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
-golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0=
-golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
+golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA=
+golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
@@ -453,9 +441,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
-golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
-golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
-golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/oauth2 v0.29.0 h1:WdYw2tdTK1S8olAzWHdgeqfy+Mtm9XNhv/xJsY65d98=
+golang.org/x/oauth2 v0.29.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -532,8 +519,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
-google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
+google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
+google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
@@ -544,8 +531,8 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWM
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
-gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
-gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
+gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
diff --git a/playground/playground.go b/playground/playground.go
index ee6cbddf7..7e3fd054d 100644
--- a/playground/playground.go
+++ b/playground/playground.go
@@ -270,7 +270,7 @@ func preparePipelineLogger(buf *bytes.Buffer, onFatal zapcore.CheckWriteHook) *z
func formatMetricFamily(families []*dto.MetricFamily) string {
b := new(bytes.Buffer)
for _, f := range families {
- _ = expfmt.NewEncoder(b, expfmt.FmtOpenMetrics).Encode(f)
+ _ = expfmt.NewEncoder(b, expfmt.NewFormat(expfmt.TypeOpenMetrics)).Encode(f)
b.WriteString("\n")
}
return b.String()
diff --git a/plugin/README.md b/plugin/README.md
index af7da2409..76d7a8c9f 100755
--- a/plugin/README.md
+++ b/plugin/README.md
@@ -331,6 +331,12 @@ pipelines:
```
[More details...](plugin/action/discard/README.md)
+## event_to_metrics
+Get metric from event
+
+This plugin transforms incoming events into metric data. Each event can generate one or more metrics with configurable labels and values. Using the Prometheus output plugin, you can send the generated metrics to Prometheus.
+
+[More details...](plugin/action/event_to_metrics/README.md)
## flatten
It extracts the object keys and adds them into the root with some prefix. If the provided field isn't an object, an event will be skipped.
@@ -832,6 +838,10 @@ It sends the event batches to postgres db using pgx.
Supports [dead queue](/plugin/output/README.md#dead-queue).
[More details...](plugin/output/postgres/README.md)
+## prometheus
+It sends metrics to Prometheus using the remote write API. The plugin receives metric events from the pipeline (e.g., from the event_to_metrics action plugin) and forwards them to a Prometheus-compatible endpoint.
+
+[More details...](plugin/output/prometheus/README.md)
## s3
Sends events to s3 output of one or multiple buckets.
`bucket` is default bucket for events. Addition buckets can be described in `multi_buckets` section, example down here.
diff --git a/plugin/action/README.md b/plugin/action/README.md
index 632fa8fe3..1a33a4b91 100755
--- a/plugin/action/README.md
+++ b/plugin/action/README.md
@@ -152,6 +152,12 @@ pipelines:
```
[More details...](plugin/action/discard/README.md)
+## event_to_metrics
+Get metric from event
+
+This plugin transforms incoming events into metric data. Each event can generate one or more metrics with configurable labels and values. Using the Prometheus output plugin, you can send the generated metrics to Prometheus.
+
+[More details...](plugin/action/event_to_metrics/README.md)
## flatten
It extracts the object keys and adds them into the root with some prefix. If the provided field isn't an object, an event will be skipped.
diff --git a/plugin/action/event_to_metrics/README.idoc.md b/plugin/action/event_to_metrics/README.idoc.md
new file mode 100644
index 000000000..e6ee7f249
--- /dev/null
+++ b/plugin/action/event_to_metrics/README.idoc.md
@@ -0,0 +1,8 @@
+# Event to metrics plugin
+@introduction
+
+## Examples
+@examples
+
+## Config params
+@config-params|description
diff --git a/plugin/action/event_to_metrics/README.md b/plugin/action/event_to_metrics/README.md
new file mode 100755
index 000000000..9b1df2e48
--- /dev/null
+++ b/plugin/action/event_to_metrics/README.md
@@ -0,0 +1,246 @@
+# Event to metrics plugin
+Get metric from event
+
+This plugin transforms incoming events into metric data. Each event can generate one or more metrics with configurable labels and values. Using the Prometheus output plugin, you can send the generated metrics to Prometheus.
+
+## Examples
+**Example 1: Simple counter**
+
+```yaml
+pipelines:
+ example:
+ actions:
+ - type: event_to_metrics
+ config:
+ metrics:
+ - name: events_total
+ type: counter
+ value: []
+ ttl: 60s
+ labels:
+ service: service
+ environment: environment
+ output:
+ plugin: prometheus
+```
+
+Input event:
+```json
+{"time": "2024-01-15T10:30:00Z", "message": "request processed", "service": "app", "environment": "production"}
+```
+
+Generated metric:
+```json
+{
+ "name": "events_total",
+ "type": "counter",
+ "value": 1,
+ "timestamp": 1705315800000,
+ "labels": {
+ "service": "app",
+ "environment": "production"
+ },
+ "ttl": 60000
+}
+```
+
+**Example 2: Gauge with dynamic labels and value extraction**
+
+```yaml
+pipelines:
+ example:
+ actions:
+ - type: event_to_metrics
+ config:
+ time_field: timestamp
+ time_field_format: rfc3339
+ metrics:
+ - name: response_time_ms
+ type: gauge
+ value:
+ - response.duration_ms
+ ttl: 5m
+ labels:
+ method: request.method
+ endpoint: request.path
+ status: response.status_code
+ - name: request_size_bytes
+ type: gauge
+ value:
+ - request.size
+ ttl: 5m
+ labels:
+ method: request.method
+ output:
+ plugin: prometheus
+```
+
+Input event:
+```json
+{
+ "timestamp": "2024-01-15T10:30:00Z",
+ "request": {
+ "method": "GET",
+ "path": "/api/users",
+ "size": 256
+ },
+ "response": {
+ "status_code": "200",
+ "duration_ms": 42.5
+ }
+}
+```
+
+Generated metrics:
+```json
+{
+ "name": "response_time_ms",
+ "type": "gauge",
+ "value": 42.5,
+ "timestamp": 1705315800000,
+ "labels": {
+ "method": "GET",
+ "endpoint": "/api/users",
+ "status": "200"
+ },
+ "ttl": 300000
+}
+{
+ "name": "request_size_bytes",
+ "type": "gauge",
+ "value": 256,
+ "timestamp": 1705315800000,
+ "labels": {
+ "method": "GET"
+ },
+ "ttl": 300000
+}
+```
+
+**Example 3: Using with Prometheus output plugin**
+
+This sample demonstrates how to use `event_to_metrics` with the Prometheus output plugin to send metrics to a Prometheus server via remote write.
+
+```yaml
+pipelines:
+ metrics:
+ actions:
+ - type: event_to_metrics
+ config:
+ time_field: timestamp
+ time_field_format: rfc3339
+ metrics:
+ # Counter for total requests
+ - name: http_requests_total
+ type: counter
+ value: []
+ ttl: 60s
+ labels:
+ service: api
+ method: request.method
+ path: request.path
+ status: response.status_code
+
+ # Gauge for response time
+ - name: http_response_time_ms
+ type: gauge
+ value:
+ - response.duration_ms
+ ttl: 5m
+ labels:
+ service: api
+ method: request.method
+ path: request.path
+
+ output:
+ plugin: prometheus
+ config:
+ # Prometheus remote write endpoint
+ endpoint: http://localhost:9090/api/v1/write
+```
+
+Input event:
+```json
+{
+ "timestamp": "2024-01-15T10:30:00Z",
+ "request": {
+ "method": "GET",
+ "path": "/api/users/123"
+ },
+ "response": {
+ "status_code": "200",
+ "duration_ms": 45.3
+ }
+}
+```
+
+The plugin will generate the following metrics that get sent to Prometheus:
+
+1. **http_requests_total** - Counter incremented by 1
+ - Labels: `service="api"`, `method="GET"`, `path="/api/users/123"`, `status="200"`
+
+2. **http_response_time_ms** - Gauge set to 45.3
+ - Labels: `service="api"`, `method="GET"`, `path="/api/users/123"`
+
+These metrics are then sent to Prometheus via the remote write API, where they can be queried and visualized in Grafana or other tools.
+
+## Config params
+**`time_field`** *`cfg.FieldSelector`* *`default=time`*
+
+The event field which defines the time when event was fired.
+It is used to detect the event throughput in a particular time range.
+If not set, the current time will be taken.
+
+
+
+**`time_field_format`** *`string`* *`default=rfc3339nano`*
+
+It defines how to parse the time field format. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias.
+List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
+
+
+
+**`metrics`** *`[]Metric`* *`required`*
+
+List of metrics.
+
+
+
+**`name`** *`string`* *`required`*
+
+Name of metric
+
+
+
+**`type`** *`string`* *`required`* *`options=counter|gauge`*
+
+The type of metric. Supported types: `counter`, `gauge`.
+
+
+
+**`value`** *`[]cfg.FieldSelector`*
+
+Field selector(s) to extract the metric value from the event. If not specified or empty, the value defaults to 1 (useful for counters).
+
+
+
+**`labels`** *`map[string]string`*
+
+Labels are key-value pairs that provide context for the metric.
+
+
+
+**`ttl`** *`cfg.Duration`*
+
+Time-to-live for the metric. Defines how long the metric value should be kept in the Prometheus collector before being expired. This determines the retention period for the metric in Prometheus.
+
+
+
+**`do_if`** *`map[string]any`*
+
+Condition to check before emitting the metric.
+
+
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go
new file mode 100644
index 000000000..13a986116
--- /dev/null
+++ b/plugin/action/event_to_metrics/event_to_metrics.go
@@ -0,0 +1,410 @@
+package event_to_metrics
+
+import (
+ "maps"
+ "sync"
+ "time"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/fd"
+ "github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/pipeline/doif"
+ "github.com/ozontech/file.d/xtime"
+ insaneJSON "github.com/ozontech/insane-json"
+ "go.uber.org/zap"
+)
+
+/*{ introduction
+Get metric from event
+
+This plugin transforms incoming events into metric data. Each event can generate one or more metrics with configurable labels and values. Using the Prometheus output plugin, you can send the generated metrics to Prometheus.
+}*/
+
+/*{ examples
+**Example 1: Simple counter**
+
+```yaml
+pipelines:
+ example:
+ actions:
+ - type: event_to_metrics
+ config:
+ metrics:
+ - name: events_total
+ type: counter
+ value: []
+ ttl: 60s
+ labels:
+ service: service
+ environment: environment
+ output:
+ plugin: prometheus
+```
+
+Input event:
+```json
+{"time": "2024-01-15T10:30:00Z", "message": "request processed", "service": "app", "environment": "production"}
+```
+
+Generated metric:
+```json
+{
+ "name": "events_total",
+ "type": "counter",
+ "value": 1,
+ "timestamp": 1705315800000,
+ "labels": {
+ "service": "app",
+ "environment": "production"
+ },
+ "ttl": 60000
+}
+```
+
+**Example 2: Gauge with dynamic labels and value extraction**
+
+```yaml
+pipelines:
+ example:
+ actions:
+ - type: event_to_metrics
+ config:
+ time_field: timestamp
+ time_field_format: rfc3339
+ metrics:
+ - name: response_time_ms
+ type: gauge
+ value:
+ - response.duration_ms
+ ttl: 5m
+ labels:
+ method: request.method
+ endpoint: request.path
+ status: response.status_code
+ - name: request_size_bytes
+ type: gauge
+ value:
+ - request.size
+ ttl: 5m
+ labels:
+ method: request.method
+ output:
+ plugin: prometheus
+```
+
+Input event:
+```json
+{
+ "timestamp": "2024-01-15T10:30:00Z",
+ "request": {
+ "method": "GET",
+ "path": "/api/users",
+ "size": 256
+ },
+ "response": {
+ "status_code": "200",
+ "duration_ms": 42.5
+ }
+}
+```
+
+Generated metrics:
+```json
+{
+ "name": "response_time_ms",
+ "type": "gauge",
+ "value": 42.5,
+ "timestamp": 1705315800000,
+ "labels": {
+ "method": "GET",
+ "endpoint": "/api/users",
+ "status": "200"
+ },
+ "ttl": 300000
+}
+{
+ "name": "request_size_bytes",
+ "type": "gauge",
+ "value": 256,
+ "timestamp": 1705315800000,
+ "labels": {
+ "method": "GET"
+ },
+ "ttl": 300000
+}
+```
+
+**Example 3: Using with Prometheus output plugin**
+
+This sample demonstrates how to use `event_to_metrics` with the Prometheus output plugin to send metrics to a Prometheus server via remote write.
+
+```yaml
+pipelines:
+ metrics:
+ actions:
+ - type: event_to_metrics
+ config:
+ time_field: timestamp
+ time_field_format: rfc3339
+ metrics:
+ # Counter for total requests
+ - name: http_requests_total
+ type: counter
+ value: []
+ ttl: 60s
+ labels:
+ service: api
+ method: request.method
+ path: request.path
+ status: response.status_code
+
+ # Gauge for response time
+ - name: http_response_time_ms
+ type: gauge
+ value:
+ - response.duration_ms
+ ttl: 5m
+ labels:
+ service: api
+ method: request.method
+ path: request.path
+
+ output:
+ plugin: prometheus
+ config:
+ # Prometheus remote write endpoint
+ endpoint: http://localhost:9090/api/v1/write
+```
+
+Input event:
+```json
+{
+ "timestamp": "2024-01-15T10:30:00Z",
+ "request": {
+ "method": "GET",
+ "path": "/api/users/123"
+ },
+ "response": {
+ "status_code": "200",
+ "duration_ms": 45.3
+ }
+}
+```
+
+The plugin will generate the following metrics that get sent to Prometheus:
+
+1. **http_requests_total** - Counter incremented by 1
+ - Labels: `service="api"`, `method="GET"`, `path="/api/users/123"`, `status="200"`
+
+2. **http_response_time_ms** - Gauge set to 45.3
+ - Labels: `service="api"`, `method="GET"`, `path="/api/users/123"`
+
+These metrics are then sent to Prometheus via the remote write API, where they can be queried and visualized in Grafana or other tools.
+}*/
+
+type Plugin struct {
+ config *Config
+ logger *zap.Logger
+ pluginController pipeline.ActionPluginController
+ format string
+
+ Metrics []Metric
+ mu *sync.Mutex
+}
+
+// ! config-params
+// ^ config-params
+type Config struct {
+ // > @3@4@5@6
+ // >
+ // > The event field which defines the time when event was fired.
+ // > It is used to detect the event throughput in a particular time range.
+ // > If not set, the current time will be taken.
+ TimeField cfg.FieldSelector `json:"time_field" default:"time" parse:"selector"` // *
+ TimeField_ []string
+
+ // > @3@4@5@6
+ // >
+ // > It defines how to parse the time field format. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias.
+ // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
+ TimeFieldFormat string `json:"time_field_format" default:"rfc3339nano"` // *
+
+ // > @3@4@5@6
+ // >
+ // > List of metrics.
+ Metrics []Metric `json:"metrics" slice:"true" required:"true"` // *
+}
+
+type Metric struct {
+ // > @3@4@5@6
+ // >
+ // > Name of metric
+ Name string `json:"name" required:"true"` // *
+
+ // > @3@4@5@6
+ // >
+ // > The type of metric. Supported types: `counter`, `gauge`.
+ Type string `json:"type" options:"counter|gauge" required:"true"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Field selector(s) to extract the metric value from the event. If not specified or empty, the value defaults to 1 (useful for counters).
+ Value []cfg.FieldSelector `json:"value"` // *
+ valueFields []string
+
+ // > @3@4@5@6
+ // >
+ // > Labels are key-value pairs that provide context for the metric.
+ Labels map[string]string `json:"labels"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Time-to-live for the metric. Defines how long the metric value should be kept in the Prometheus collector before being expired. This determines the retention period for the metric in Prometheus.
+ TTL cfg.Duration `json:"ttl" parse:"duration"` // *
+ TTL_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Condition to check before emitting the metric.
+ DoIfCheckerMap map[string]any `json:"do_if"` // *
+
+ DoIfChecker *doif.Checker
+
+ use bool
+}
+
+func init() {
+ fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
+ Type: "event_to_metrics",
+ Factory: factory,
+ })
+}
+
+func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
+ return &Plugin{}, &Config{}
+}
+
+func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
+ p.mu = &sync.Mutex{}
+ p.config = config.(*Config)
+ p.logger = params.Logger.Desugar()
+ p.pluginController = params.Controller
+ p.Metrics = prepareCheckersForMetrics(p.config.Metrics, p.logger)
+
+ format, err := xtime.ParseFormatName(p.config.TimeFieldFormat)
+ if err != nil {
+ format = p.config.TimeFieldFormat
+ }
+ p.format = format
+}
+
+func prepareCheckersForMetrics(metrics []Metric, logger *zap.Logger) []Metric {
+ for i := range metrics {
+ m := &metrics[i]
+ if m.DoIfCheckerMap != nil {
+ var err error
+ m.DoIfChecker, err = doif.NewFromMap(m.DoIfCheckerMap)
+ if err != nil {
+ logger.Fatal("can't init do_if for mask", zap.Error(err))
+ }
+ } else {
+ m.use = true
+ }
+
+ fields := make([]string, 0, len(m.Value))
+ for _, fs := range m.Value {
+ if fs == "" {
+ continue
+ }
+ parsed := cfg.ParseFieldSelector(string(fs))
+ fields = append(fields, parsed...)
+ }
+ m.valueFields = fields
+ }
+
+ return metrics
+}
+
+func (p *Plugin) Stop() {
+}
+
+func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
+ p.mu.Lock()
+ metricIndices := make([]int, 0, len(p.Metrics))
+ for i := range p.Metrics {
+ if p.Metrics[i].DoIfChecker == nil || p.config.Metrics[i].DoIfChecker.Check(event.Root) {
+ metricIndices = append(metricIndices, i)
+ }
+ }
+ p.mu.Unlock()
+
+ copyMetrics := make([]Metric, 0, len(metricIndices))
+ for _, idx := range metricIndices {
+ metric := p.Metrics[idx]
+ if metric.Labels != nil {
+ labels := make(map[string]string, len(metric.Labels))
+ maps.Copy(labels, metric.Labels)
+ metric.Labels = labels
+ }
+ copyMetrics = append(copyMetrics, metric)
+ }
+
+ var ts time.Time
+
+ if len(p.config.TimeField_) != 0 {
+ tsValue := event.Root.Dig(p.config.TimeField_...).AsString()
+ t, err := xtime.ParseTime(p.format, tsValue)
+ if err != nil || t.IsZero() {
+ p.logger.Warn(
+ "can't parse field with timestamp using format",
+ zap.Any("time_field", p.config.TimeField),
+ zap.String("TimeFieldFormat", p.config.TimeFieldFormat),
+ zap.String("value", tsValue),
+ )
+ ts = time.Now()
+ } else {
+ ts = t
+ }
+ } else {
+ ts = time.Now()
+ }
+
+ children := make([]*insaneJSON.Node, 0, len(copyMetrics))
+ for i := range copyMetrics {
+ metric := ©Metrics[i]
+ elem := new(insaneJSON.Node)
+ object := elem.MutateToObject()
+
+ object.AddField("name").MutateToBytes([]byte(metric.Name))
+ object.AddField("type").MutateToBytes([]byte(metric.Type))
+ object.AddField("ttl").MutateToInt64(metric.TTL_.Milliseconds())
+ object.AddField("timestamp").MutateToInt64(ts.UnixMilli())
+
+ if len(metric.Value) == 0 {
+ object.AddField("value").MutateToInt(1)
+ } else {
+ valueNode := event.Root.Dig(metric.valueFields...).AsFloat()
+ object.AddField("value").MutateToFloat(valueNode)
+ }
+
+ if len(metric.Labels) > 0 {
+ labelsObject := object.AddField("labels").MutateToObject()
+
+ for labelName, labelValue := range metric.Labels {
+ node := event.Root.Dig(labelValue)
+ value := node.AsString()
+ labelsObject.AddField(labelName).MutateToBytes([]byte(value))
+ }
+ }
+
+ children = append(children, elem)
+ }
+
+ if len(children) == 0 {
+ // zero array or an array that does not contain objects
+ return pipeline.ActionDiscard
+ }
+
+ p.pluginController.Spawn(event, children)
+ return pipeline.ActionBreak
+}
diff --git a/plugin/action/event_to_metrics/event_to_metrics_test.go b/plugin/action/event_to_metrics/event_to_metrics_test.go
new file mode 100644
index 000000000..a9f255e2f
--- /dev/null
+++ b/plugin/action/event_to_metrics/event_to_metrics_test.go
@@ -0,0 +1,88 @@
+package event_to_metrics
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/test"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestEventToMetrics(t *testing.T) {
+ config := &Config{
+ TimeField_: []string{"time"},
+ TimeFieldFormat: time.RFC3339Nano,
+ Metrics: []Metric{
+ Metric{
+ Name: "status",
+ Type: "counter",
+ TTL_: 1 * time.Hour,
+ Labels: map[string]string{
+ "status": "status",
+ "host": "host",
+ },
+ },
+ Metric{
+ Name: "checkout_response_time",
+ Value: []cfg.FieldSelector{"info", "response_time"},
+ Type: "gauge",
+ Labels: map[string]string{
+ "zone": "info.zone",
+ },
+ DoIfCheckerMap: map[string]any{
+ "op": "equal",
+ "field": "info.zone",
+ "values": []any{"checkout"},
+ },
+ },
+ },
+ }
+
+ p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
+ now := time.Now().Add(-10 * time.Minute)
+
+ wrongEventsCnt := 0
+ outWg := sync.WaitGroup{}
+ outWg.Add(len(config.Metrics))
+
+ output.SetOutFn(func(e *pipeline.Event) {
+ message := e.Root
+
+ metricName := message.Dig("name").AsString()
+ if metricName == "" {
+ return
+ }
+
+ metricType := message.Dig("type").AsString()
+ timestamp := e.Root.Dig("timestamp").AsInt64()
+ ttl := e.Root.Dig("ttl").AsInt64()
+ value := e.Root.Dig("value").AsFloat()
+
+ switch metricName {
+ case "status":
+ assert.Equal(t, "counter", metricType)
+ assert.Equal(t, (1 * time.Hour).Milliseconds(), ttl)
+ assert.Equal(t, float64(1), value)
+ case "checkout_response_time":
+ assert.Equal(t, "gauge", metricType)
+ assert.Equal(t, int64(0), ttl)
+ assert.Equal(t, float64(0.1), value)
+ default:
+ assert.Fail(t, "unknown metric name", metricName)
+ }
+
+ assert.Equal(t, now.UnixMilli(), timestamp)
+ defer outWg.Done()
+ })
+
+ json := fmt.Sprintf(`{"host":"localhost","status":"200","info":{"zone":"checkout","response_time": 0.1},"time":"%s"}`, now.Format(time.RFC3339Nano))
+ input.In(10, "test", test.NewOffset(0), []byte(json))
+ outWg.Wait()
+
+ p.Stop()
+ assert.Equal(t, 0, wrongEventsCnt)
+}
diff --git a/plugin/output/README.md b/plugin/output/README.md
index 4295844a7..a24b2270f 100755
--- a/plugin/output/README.md
+++ b/plugin/output/README.md
@@ -71,6 +71,10 @@ It sends the event batches to postgres db using pgx.
Supports [dead queue](/plugin/output/README.md#dead-queue).
[More details...](plugin/output/postgres/README.md)
+## prometheus
+It sends metrics to Prometheus using the remote write API. The plugin receives metric events from the pipeline (e.g., from the event_to_metrics action plugin) and forwards them to a Prometheus-compatible endpoint.
+
+[More details...](plugin/output/prometheus/README.md)
## s3
Sends events to s3 output of one or multiple buckets.
`bucket` is default bucket for events. Addition buckets can be described in `multi_buckets` section, example down here.
diff --git a/plugin/output/prometheus/README.idoc.md b/plugin/output/prometheus/README.idoc.md
new file mode 100644
index 000000000..0870216c8
--- /dev/null
+++ b/plugin/output/prometheus/README.idoc.md
@@ -0,0 +1,5 @@
+# Prometheus output
+@introduction
+
+### Config params
+@config-params|description
diff --git a/plugin/output/prometheus/README.md b/plugin/output/prometheus/README.md
new file mode 100755
index 000000000..ae91be1df
--- /dev/null
+++ b/plugin/output/prometheus/README.md
@@ -0,0 +1,97 @@
+# Prometheus output
+It sends metrics to Prometheus using the remote write API. The plugin receives metric events from the pipeline (e.g., from the event_to_metrics action plugin) and forwards them to a Prometheus-compatible endpoint.
+
+### Config params
+**`endpoint`** *`string`* *`default=http://localhost:9090/api/v1/write`* *`required`*
+
+Prometheus remote write endpoint URL.
+
+
+
+**`auth`** *`AuthConfig`*
+
+Auth config.
+
+`AuthConfig` params:
+* `strategy` describes strategy to use; options:"disabled|tenant|basic|bearer"
+By default strategy is `disabled`.
+* `tenant_id` should be provided if strategy is `tenant`.
+* `username` should be provided if strategy is `basic`.
+Username is used for HTTP Basic Authentication.
+* `password` should be provided if strategy is `basic`.
+Password is used for HTTP Basic Authentication.
+* `bearer_token` should be provided if strategy is `bearer`.
+Token is used for HTTP Bearer Authentication.
+
+
+
+**`tls_enabled`** *`bool`* *`default=false`*
+
+If set true, the plugin will use SSL/TLS connections method.
+
+
+
+**`tls_skip_verify`** *`bool`* *`default=false`*
+
+If set, the plugin will skip SSL/TLS verification.
+
+
+
+**`request_timeout`** *`cfg.Duration`* *`default=1s`*
+
+Client timeout when sends requests to Prometheus HTTP API.
+
+
+
+**`connection_timeout`** *`cfg.Duration`* *`default=5s`*
+
+It defines how much time to wait for the connection.
+
+
+
+**`keep_alive`** *`KeepAliveConfig`*
+
+Keep-alive config.
+
+`KeepAliveConfig` params:
+* `max_idle_conn_duration` - idle keep-alive connections are closed after this duration.
+By default idle connections are closed after `10s`.
+* `max_conn_duration` - keep-alive connections are closed after this duration.
+If set to `0` - connection duration is unlimited.
+By default connection duration is `5m`.
+
+
+
+**`retry`** *`int`* *`default=10`*
+
+Retries of upload. If File.d cannot upload for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+
+
+
+**`retention`** *`cfg.Duration`* *`default=1s`*
+
+Retention milliseconds for retry to upload.
+
+
+
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
+**`attempt_num`** *`int`* *`default=3`*
+
+Number of retry attempts.
+
+
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/prometheus/metric_collector.go b/plugin/output/prometheus/metric_collector.go
new file mode 100644
index 000000000..14059217d
--- /dev/null
+++ b/plugin/output/prometheus/metric_collector.go
@@ -0,0 +1,177 @@
+package prometheus
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/castai/promwrite"
+ "go.uber.org/zap"
+)
+
+type metricCollector struct {
+ sender storageSender
+ metrics map[string]*metricValue
+ mutex sync.RWMutex
+ flushTicker *time.Ticker
+ shutdownChan chan struct{}
+ flushTimeout time.Duration
+
+ logger *zap.Logger
+}
+
+type metricValue struct {
+ value float64
+ timestamp int64
+ lastValueIsSended bool
+ lastUpdateTime time.Time
+ sendedTimestamp time.Time
+ expiredAt time.Time
+}
+
+type storageSender interface {
+ sendToStorage(values []promwrite.TimeSeries) error
+}
+
+func newCollector(sender storageSender, flushTimeout time.Duration, logger *zap.Logger) *metricCollector {
+ c := &metricCollector{
+ sender: sender,
+ logger: logger,
+ metrics: make(map[string]*metricValue),
+ flushTicker: time.NewTicker(flushTimeout),
+ flushTimeout: flushTimeout,
+ shutdownChan: make(chan struct{}),
+ }
+ go c.flushAndRepeatOldMetrics()
+ return c
+}
+
+func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string, ttl int64) {
+ key := labelsToKey(labels)
+ now := time.Now()
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if existing, exists := p.metrics[key]; exists {
+ if metricType == metricTypeCounter {
+ value += existing.value
+ }
+ timestamp = max(timestamp, existing.sendedTimestamp.UnixMilli())
+ }
+
+ nowUnixTime := now.UnixMilli()
+ timestamp = min(timestamp, nowUnixTime)
+
+ metric := &metricValue{
+ value: value,
+ timestamp: timestamp,
+ lastUpdateTime: now,
+ lastValueIsSended: false,
+ expiredAt: now.Add(time.Duration(ttl) * time.Millisecond),
+ }
+ p.metrics[key] = metric
+}
+
+func (p *metricCollector) flushAndRepeatOldMetrics() {
+ for {
+ select {
+ case <-p.flushTicker.C:
+ p.flushMetrics()
+ case <-p.shutdownChan:
+ p.flushTicker.Stop()
+ return
+ }
+ }
+}
+
+func (p *metricCollector) flushMetrics() {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ var toSend []promwrite.TimeSeries
+ now := time.Now()
+
+ toDelete := []string{}
+
+ for key, metric := range p.metrics {
+ labels := keyToLabels(key)
+
+ if metric.lastValueIsSended && now.Sub(metric.lastUpdateTime) >= p.flushTimeout && now.Before(metric.expiredAt) {
+ // repeat value
+ metric.timestamp = now.UnixMilli()
+ }
+
+ timeSeries := createTimeSeries(labels, metric, p.flushTimeout)
+ if metric.sendedTimestamp != timeSeries.Sample.Time {
+ toSend = append(toSend, timeSeries)
+ metric.sendedTimestamp = timeSeries.Sample.Time
+ metric.lastValueIsSended = true
+ p.metrics[key] = metric
+ if now.After(metric.expiredAt) {
+ toDelete = append(toDelete, key)
+ }
+ }
+ }
+
+ if len(toDelete) > 0 {
+ for _, key := range toDelete {
+ delete(p.metrics, key)
+ }
+ }
+
+ if len(toSend) > 0 {
+ // Send these metrics to your storage
+ err := p.sender.sendToStorage(toSend)
+ if err != nil {
+ p.logger.Error("can't send data", zap.Error(err))
+ }
+ }
+}
+
+func (p *metricCollector) shutdown() {
+ close(p.shutdownChan)
+ p.flushMetrics()
+}
+
+// Helper function
+func createTimeSeries(labels []promwrite.Label, metric *metricValue, roundPeriod time.Duration) promwrite.TimeSeries {
+ return promwrite.TimeSeries{
+ Labels: labels,
+ Sample: promwrite.Sample{
+ Time: time.Unix(0, metric.timestamp*int64(time.Millisecond)).Truncate(roundPeriod),
+ Value: metric.value,
+ },
+ }
+}
+
+func keyToLabels(key string) []promwrite.Label {
+ if key == "" {
+ return nil
+ }
+ key = key[:len(key)-1] // Remove trailing comma
+ labels := make([]promwrite.Label, 0, strings.Count(key, ",")+1)
+
+ for key != "" {
+ pair, rest, _ := strings.Cut(key, ",")
+ name, value, _ := strings.Cut(pair, "=")
+ labels = append(labels, promwrite.Label{Name: name, Value: value})
+ key = rest
+ }
+ return labels
+}
+
+func labelsToKey(labels []promwrite.Label) string {
+ sorted := make([]promwrite.Label, len(labels))
+ copy(sorted, labels)
+ sort.Slice(sorted, func(i, j int) bool {
+ return sorted[i].Name < sorted[j].Name
+ })
+
+ var b strings.Builder
+ for _, l := range sorted {
+ fmt.Fprintf(&b, "%s=%s,", l.Name, l.Value)
+ }
+ return b.String()
+}
diff --git a/plugin/output/prometheus/metric_collector_test.go b/plugin/output/prometheus/metric_collector_test.go
new file mode 100644
index 000000000..6e51cf7ed
--- /dev/null
+++ b/plugin/output/prometheus/metric_collector_test.go
@@ -0,0 +1,196 @@
+package prometheus
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/castai/promwrite"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/zap/zaptest"
+)
+
+// TestStorageSender implements storageSender for testing
+type TestStorageSender struct {
+ sentMetrics []promwrite.TimeSeries
+ mu sync.Mutex
+ returnError error
+}
+
+func (t *TestStorageSender) sendToStorage(values []promwrite.TimeSeries) error {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ t.sentMetrics = append(t.sentMetrics, values...)
+ return t.returnError
+}
+
+func (t *TestStorageSender) getSentMetrics() []promwrite.TimeSeries {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ return t.sentMetrics
+}
+
+func (t *TestStorageSender) reset() {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ t.sentMetrics = nil
+ t.returnError = nil
+}
+
+func TestMetricCollector(t *testing.T) {
+ t.Run("labelsToKey and keyToLabels roundtrip", func(t *testing.T) {
+ labels := []promwrite.Label{
+ {Name: "job", Value: "test"},
+ {Name: "instance", Value: "localhost"},
+ {Name: "__name__", Value: "test_metric"},
+ }
+
+ key := labelsToKey(labels)
+ convertedLabels := keyToLabels(key)
+
+ assert.Len(t, convertedLabels, 3)
+ // Since labels are sorted in key generation, we need to check values
+ labelMap := make(map[string]string)
+ for _, l := range convertedLabels {
+ labelMap[l.Name] = l.Value
+ }
+ assert.Equal(t, "test", labelMap["job"])
+ assert.Equal(t, "localhost", labelMap["instance"])
+ assert.Equal(t, "test_metric", labelMap["__name__"])
+ })
+
+ t.Run("handleMetric counter accumulation", func(t *testing.T) {
+ logger := zaptest.NewLogger(t)
+ testSender := &TestStorageSender{}
+ testSender.reset()
+ collector := newCollector(testSender, 1*time.Second, logger)
+
+ labels := []promwrite.Label{
+ {Name: "__name__", Value: "test_counter"},
+ {Name: "job", Value: "test"},
+ }
+
+ // First value - should not be sent
+ now := time.Now()
+ collector.handleMetric(labels, 10.0, now.UnixMilli(), metricTypeCounter, 0)
+ assert.Empty(t, testSender.getSentMetrics())
+
+ // Second value in same time window - should accumulate but not send
+ collector.handleMetric(labels, 5.0, now.UnixMilli(), metricTypeCounter, 0)
+ assert.Empty(t, testSender.getSentMetrics())
+
+ // Third value in next time window - should send accumulated value
+ nextTime := now.Add(10 * time.Second)
+ collector.handleMetric(labels, 3.0, nextTime.UnixMilli(), metricTypeCounter, 0)
+ time.Sleep(2 * time.Second)
+ sendedMetrics := testSender.getSentMetrics()
+
+ assert.Equal(t, 1, len(sendedMetrics))
+ assert.Equal(t, 18.0, sendedMetrics[0].Sample.Value) // 10 + 5 + 3
+ assert.Equal(t, now.Truncate(time.Second), sendedMetrics[0].Sample.Time.Truncate(time.Second))
+ })
+
+ t.Run("handleMetric counter accumulation with ttl", func(t *testing.T) {
+ logger := zaptest.NewLogger(t)
+ testSender := &TestStorageSender{}
+ testSender.reset()
+ collector := newCollector(testSender, 1*time.Second, logger)
+
+ labels := []promwrite.Label{
+ {Name: "__name__", Value: "test_counter"},
+ {Name: "job", Value: "test"},
+ }
+
+ // First value - should not be sent
+ now := time.Now()
+ collector.handleMetric(labels, 10.0, now.UnixMilli(), metricTypeCounter, 5000)
+
+ time.Sleep(3 * time.Second)
+ sendedMetrics := testSender.getSentMetrics()
+
+ assert.GreaterOrEqual(t, 3, len(sendedMetrics))
+ assert.Equal(t, 10.0, sendedMetrics[0].Sample.Value) // 10 + 5 + 3
+ assert.Equal(t, now.Truncate(time.Second), sendedMetrics[0].Sample.Time)
+ })
+
+ t.Run("concurrent access", func(t *testing.T) {
+ logger := zaptest.NewLogger(t)
+ testSender := &TestStorageSender{}
+ collector := newCollector(testSender, 1*time.Second, logger)
+
+ var wg sync.WaitGroup
+ numGoroutines := 10
+ numMetrics := 100
+
+ for i := 0; i < numGoroutines; i++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+ for j := 0; j < numMetrics; j++ {
+ labels := []promwrite.Label{
+ {Name: "__name__", Value: "concurrent_metric"},
+ {Name: "worker", Value: string(rune(workerID))},
+ {Name: "index", Value: string(rune(j))},
+ }
+ collector.handleMetric(labels, float64(j), time.Now().UnixMilli(), metricTypeCounter, 0)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // Verify all metrics are stored
+ count := 0
+ for range collector.metrics {
+ count++
+ }
+
+ assert.Equal(t, numGoroutines*numMetrics, count)
+ })
+}
+
+func TestCreateTimeSeries(t *testing.T) {
+ t.Run("createTimeSeries with valid metricValue", func(t *testing.T) {
+ now := time.Now()
+ mv := &metricValue{
+ value: 123.45,
+ timestamp: now.UnixMilli(),
+ }
+
+ labels := []promwrite.Label{
+ {Name: "__name__", Value: "test_metric"},
+ {Name: "instance", Value: "localhost"},
+ }
+
+ roundPeriod := 30 * time.Second
+ ts := createTimeSeries(labels, mv, roundPeriod)
+
+ assert.Equal(t, labels, ts.Labels)
+ assert.Equal(t, 123.45, ts.Sample.Value)
+ assert.Equal(t, now.Truncate(roundPeriod), ts.Sample.Time.Truncate(time.Millisecond))
+ })
+}
+
+// Benchmark tests
+func BenchmarkLabelsToKey(b *testing.B) {
+ labels := []promwrite.Label{
+ {Name: "__name__", Value: "benchmark_metric"},
+ {Name: "job", Value: "benchmark"},
+ {Name: "instance", Value: "localhost:9090"},
+ {Name: "environment", Value: "production"},
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ labelsToKey(labels)
+ }
+}
+
+func BenchmarkKeyToLabels(b *testing.B) {
+ key := "__name__=test_metric,environment=production,instance=localhost:9090,job=test,"
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ keyToLabels(key)
+ }
+}
diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go
new file mode 100644
index 000000000..ef537c3c1
--- /dev/null
+++ b/plugin/output/prometheus/prometheus.go
@@ -0,0 +1,349 @@
+package prometheus
+
+import (
+ "context"
+ "net"
+ "net/http"
+ "time"
+
+ "github.com/castai/promwrite"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/fd"
+ "github.com/ozontech/file.d/metric"
+ "github.com/ozontech/file.d/pipeline"
+
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+)
+
+/*{ introduction
+It sends metrics to Prometheus using the remote write API. The plugin receives metric events from the pipeline (e.g., from the event_to_metrics action plugin) and forwards them to a Prometheus-compatible endpoint.
+}*/
+
+const (
+ outPluginType = "prometheus"
+
+ metricTypeGauge = "gauge"
+ metricTypeCounter = "counter"
+)
+
+type Label struct {
+ Label string `json:"label" required:"true"`
+ Value string `json:"value" required:"true"`
+}
+
+// ! config-params
+// ^ config-params
+type Config struct {
+ // > @3@4@5@6
+ // >
+ // > Prometheus remote write endpoint URL.
+ Endpoint string `json:"endpoint" required:"true" default:"http://localhost:9090/api/v1/write"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Auth config.
+ // >
+ // > `AuthConfig` params:
+ // > * `strategy` describes strategy to use; options:"disabled|tenant|basic|bearer"
+ // > By default strategy is `disabled`.
+ // > * `tenant_id` should be provided if strategy is `tenant`.
+ // > * `username` should be provided if strategy is `basic`.
+ // > Username is used for HTTP Basic Authentication.
+ // > * `password` should be provided if strategy is `basic`.
+ // > Password is used for HTTP Basic Authentication.
+ // > * `bearer_token` should be provided if strategy is `bearer`.
+ // > Token is used for HTTP Bearer Authentication.
+ Auth AuthConfig `json:"auth" child:"true"` // *
+
+ // > @3@4@5@6
+ // >
+ // > If set true, the plugin will use SSL/TLS connections method.
+ TLSEnabled bool `json:"tls_enabled" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > If set, the plugin will skip SSL/TLS verification.
+ TLSSkipVerify bool `json:"tls_skip_verify" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Client timeout when sends requests to Prometheus HTTP API.
+ RequestTimeout cfg.Duration `json:"request_timeout" default:"1s" parse:"duration"` // *
+ RequestTimeout_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > It defines how much time to wait for the connection.
+ ConnectionTimeout cfg.Duration `json:"connection_timeout" default:"5s" parse:"duration"` // *
+ ConnectionTimeout_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Keep-alive config.
+ // >
+ // > `KeepAliveConfig` params:
+ // > * `max_idle_conn_duration` - idle keep-alive connections are closed after this duration.
+ // > By default idle connections are closed after `10s`.
+ // > * `max_conn_duration` - keep-alive connections are closed after this duration.
+ // > If set to `0` - connection duration is unlimited.
+ // > By default connection duration is `5m`.
+ KeepAlive KeepAliveConfig `json:"keep_alive" child:"true"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retries of upload. If File.d cannot upload for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"10"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retention milliseconds for retry to upload.
+ Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // *
+ Retention_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Number of retry attempts.
+ AttemptNum int `json:"attempt_num" default:"3"` // *
+}
+
+type AuthStrategy byte
+
+const (
+ StrategyDisabled AuthStrategy = iota
+ StrategyTenant
+ StrategyBasic
+ StrategyBearer
+)
+
+// ! config-params
+// ^ config-params
+type AuthConfig struct {
+ // > AuthStrategy.Strategy describes strategy to use.
+ Strategy string `json:"strategy" default:"disabled" options:"disabled|tenant|basic|bearer"`
+ Strategy_ AuthStrategy
+
+ // > TenantID for Tenant Authentication.
+ TenantID string `json:"tenant_id"`
+
+ // > Username for HTTP Basic Authentication.
+ Username string `json:"username"`
+
+ // > Password for HTTP Basic Authentication.
+ Password string `json:"password"`
+
+ // > Token for HTTP Bearer Authentication.
+ BearerToken string `json:"bearer_token"`
+}
+
+type KeepAliveConfig struct {
+ // Idle keep-alive connections are closed after this duration.
+ MaxIdleConnDuration cfg.Duration `json:"max_idle_conn_duration" parse:"duration" default:"10s"`
+ MaxIdleConnDuration_ time.Duration
+
+ // Keep-alive connections are closed after this duration.
+ MaxConnDuration cfg.Duration `json:"max_conn_duration" parse:"duration" default:"5m"`
+ MaxConnDuration_ time.Duration
+}
+
+type Plugin struct {
+ controller pipeline.OutputPluginController
+ logger *zap.Logger
+
+ config *Config
+ client PrometheusClient
+
+ // plugin metrics
+ sendErrorMetric *metric.Counter
+
+ collector *metricCollector
+
+ // isAvailable tracks if Prometheus is currently available
+ isAvailable bool
+
+ // retryChan is used to signal waiting events when Prometheus becomes available
+ retryChan chan struct{}
+}
+
+type PrometheusClient interface {
+ Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error)
+}
+
+func init() {
+ fd.DefaultPluginRegistry.RegisterOutput(&pipeline.PluginStaticInfo{
+ Type: outPluginType,
+ Factory: Factory,
+ })
+}
+
+func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
+ return &Plugin{}, &Config{}
+}
+
+func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
+ p.controller = params.Controller
+ p.config = config.(*Config)
+ p.logger = params.Logger.Desugar()
+ p.registerMetrics(params.MetricCtl)
+ p.collector = newCollector(p, 15*time.Second, p.logger)
+
+ p.prepareClient()
+ p.isAvailable = true
+ p.retryChan = make(chan struct{})
+}
+
+func (p *Plugin) Stop() {
+ p.collector.shutdown()
+}
+
+func (p *Plugin) Out(event *pipeline.Event) {
+ msg := event.Root
+ typeNode := msg.Dig("type")
+ if typeNode == nil {
+ if !p.isAvailable {
+ <-p.retryChan // Block until Prometheus is available
+ }
+ p.controller.Commit(event)
+ return
+ }
+ metricType := typeNode.AsString()
+ typeNode.Suicide()
+
+ if metricType != metricTypeCounter && metricType != metricTypeGauge {
+ p.logger.Warn(
+ "unsupported metric type, skipping",
+ zap.String("type", metricType),
+ )
+ p.controller.Commit(event)
+ return
+ }
+
+ nameNode := msg.Dig("name")
+ name := nameNode.AsString()
+ nameNode.Suicide()
+
+ timestampNode := msg.Dig("timestamp")
+ timestamp := timestampNode.AsInt64()
+ timestampNode.Suicide()
+
+ ttlNode := msg.Dig("ttl")
+ ttl := ttlNode.AsInt64()
+ ttlNode.Suicide()
+
+ valueNode := msg.Dig("value")
+ value := valueNode.AsFloat()
+ valueNode.Suicide()
+
+ labelsNode := msg.Dig("labels")
+ labelValues := labelsNode.AsFields()
+ labelsNode.Suicide()
+
+ labels := make([]promwrite.Label, 0, len(labelValues)+1)
+ labels = append(labels, promwrite.Label{
+ Name: "__name__",
+ Value: name,
+ })
+ for _, l := range labelValues {
+ labels = append(labels, promwrite.Label{
+ Name: l.AsString(),
+ Value: labelsNode.Dig(l.AsString()).AsString(),
+ })
+ }
+
+ p.collector.handleMetric(
+ labels,
+ value,
+ timestamp,
+ metricType,
+ ttl,
+ )
+}
+
+func (p *Plugin) sendToStorage(values []promwrite.TimeSeries) error {
+ for _, value := range values {
+ p.logger.Debug(
+ "send metric",
+ zap.Any("labels", value.Labels),
+ zap.Time("time", value.Sample.Time),
+ zap.Float64("value", value.Sample.Value),
+ )
+ }
+
+ expBackoff := pipeline.GetBackoff(
+ p.config.Retention_,
+ float64(p.config.RetentionExponentMultiplier),
+ uint64(p.config.AttemptNum),
+ )
+ expBackoff.Reset()
+
+ err := backoff.Retry(func() error {
+ ctx, cancel := context.WithTimeout(context.Background(), p.config.RequestTimeout_)
+ defer cancel()
+ resp, err := p.client.Write(ctx, &promwrite.WriteRequest{TimeSeries: values})
+ if err != nil {
+ p.sendErrorMetric.Inc()
+ p.logger.Error("can't send data to Prometheus", zap.String("address", p.config.Endpoint), zap.Error(err))
+ return err
+ }
+ p.logger.Debug("successfully sent", zap.Any("response", resp))
+ return nil
+ }, expBackoff)
+
+ if err != nil {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Log(level, "max retries reached")
+ p.isAvailable = false
+ newRetryChan := make(chan struct{})
+ if p.retryChan != nil {
+ close(p.retryChan)
+ }
+ p.retryChan = newRetryChan
+
+ p.logger.Info("prometheus unavailable, events will wait")
+ } else {
+ p.isAvailable = true
+ if p.retryChan != nil {
+ close(p.retryChan)
+ p.logger.Info("prometheus available")
+ }
+ p.retryChan = make(chan struct{})
+ }
+
+ return err
+}
+
+func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
+ p.sendErrorMetric = ctl.RegisterCounter("output_prometheus_send_error", "Total Prometheus send errors")
+}
+
+func (p *Plugin) prepareClient() {
+ customClient := &http.Client{
+ Transport: &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: p.config.ConnectionTimeout_,
+ KeepAlive: p.config.KeepAlive.MaxConnDuration_,
+ }).DialContext,
+ IdleConnTimeout: p.config.KeepAlive.MaxIdleConnDuration_,
+ },
+ }
+
+ p.client = promwrite.NewClient(p.config.Endpoint, promwrite.HttpClient(customClient))
+}
diff --git a/plugin/output/prometheus/prometheus_test.go b/plugin/output/prometheus/prometheus_test.go
new file mode 100644
index 000000000..a27c8d695
--- /dev/null
+++ b/plugin/output/prometheus/prometheus_test.go
@@ -0,0 +1,252 @@
+package prometheus
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/castai/promwrite"
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/test"
+ insaneJSON "github.com/ozontech/insane-json"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap/zaptest"
+)
+
+// mockPrometheus implements PrometheusClient for testing
+type mockPrometheus struct {
+ t *testing.T
+ mu sync.Mutex
+ writeCalls []*promwrite.WriteRequest
+ writeError error
+ writeOption promwrite.WriteOption
+}
+
+func (m *mockPrometheus) Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if len(options) > 0 {
+ m.writeOption = options[0]
+ }
+
+ m.writeCalls = append(m.writeCalls, req)
+
+ if m.writeError != nil {
+ return nil, m.writeError
+ }
+
+ return &promwrite.WriteResponse{}, nil
+}
+
+func (m *mockPrometheus) getWriteCalls() []*promwrite.WriteRequest {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ calls := make([]*promwrite.WriteRequest, len(m.writeCalls))
+ copy(calls, m.writeCalls)
+ return calls
+}
+
+func (m *mockPrometheus) setError(err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.writeError = err
+}
+
+func (m *mockPrometheus) reset() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.writeCalls = nil
+ m.writeError = nil
+ m.writeOption = nil
+}
+
+// mockController implements pipeline.OutputPluginController for testing
+type mockController struct {
+}
+
+func (m *mockController) Commit(_ *pipeline.Event) {}
+func (m *mockController) Error(_ string) {}
+
+func TestPluginOut(t *testing.T) {
+ t.Run("add event", func(t *testing.T) {
+ logger := zaptest.NewLogger(t)
+ mockClient := &mockPrometheus{t: t}
+ mockClient.reset()
+
+ config := &Config{
+ Endpoint: "http://localhost:9090/api/v1/write",
+ KeepAlive: KeepAliveConfig{
+ MaxIdleConnDuration_: 10 * time.Second,
+ MaxConnDuration_: 5 * time.Minute,
+ },
+ ConnectionTimeout_: 5 * time.Second,
+ RequestTimeout_: time.Second,
+ }
+ err := cfg.SetDefaultValues(config)
+ require.NoError(t, err)
+
+ plugin := &Plugin{
+ client: mockClient,
+ }
+
+ mockController := &mockController{}
+
+ params := &pipeline.OutputPluginParams{
+ PluginDefaultParams: test.NewEmptyOutputPluginParams().PluginDefaultParams,
+ Logger: logger.Sugar(),
+ Controller: mockController,
+ Router: pipeline.NewRouter(),
+ }
+
+ plugin.Start(config, params)
+ time.Sleep(50 * time.Millisecond)
+
+ // Create a test event using insaneJSON
+ root, err := insaneJSON.DecodeBytes([]byte(`{
+ "name": "test_metric",
+ "type": "gauge",
+ "value": 1.0,
+ "timestamp": 1234567890,
+ "labels": {"job": "test"}
+ }`))
+ require.NoError(t, err)
+
+ event := &pipeline.Event{
+ Root: root,
+ }
+
+ // Add event to the plugin
+ plugin.Out(event)
+
+ // Wait for batch to be processed
+ time.Sleep(200 * time.Millisecond)
+
+ plugin.Stop()
+ time.Sleep(100 * time.Millisecond)
+ })
+}
+
+func TestPluginSend(t *testing.T) {
+ t.Run("send metrics through plugin", func(t *testing.T) {
+ logger := zaptest.NewLogger(t)
+ mockClient := &mockPrometheus{t: t}
+ mockClient.reset()
+
+ config := &Config{
+ Endpoint: "http://localhost:9090/api/v1/write",
+ KeepAlive: KeepAliveConfig{
+ MaxIdleConnDuration_: 10 * time.Second,
+ MaxConnDuration_: 5 * time.Minute,
+ },
+ ConnectionTimeout_: 5 * time.Second,
+ RequestTimeout_: time.Second,
+ }
+ err := cfg.SetDefaultValues(config)
+ require.NoError(t, err)
+
+ plugin := &Plugin{
+ client: mockClient,
+ }
+
+ mockController := &mockController{}
+
+ params := &pipeline.OutputPluginParams{
+ PluginDefaultParams: test.NewEmptyOutputPluginParams().PluginDefaultParams,
+ Logger: logger.Sugar(),
+ Controller: mockController,
+ Router: pipeline.NewRouter(),
+ }
+
+ plugin.Start(config, params)
+ time.Sleep(50 * time.Millisecond)
+
+ // Test sending multiple metrics
+ for i := 0; i < 5; i++ {
+ root, err := insaneJSON.DecodeBytes([]byte(`{
+ "name": "metric_` + string(rune('0'+i)) + `",
+ "type": "gauge",
+ "value": 1.5,
+ "timestamp": 1234567890,
+ "labels": {"job": "test", "instance": "localhost"}
+ }`))
+ require.NoError(t, err)
+
+ event := &pipeline.Event{
+ Root: root,
+ }
+ plugin.Out(event)
+ }
+
+ // Wait for batch to be processed
+ time.Sleep(200 * time.Millisecond)
+
+ plugin.Stop()
+ time.Sleep(100 * time.Millisecond)
+
+ // Verify write was called
+ calls := mockClient.getWriteCalls()
+ assert.GreaterOrEqual(t, len(calls), 0) // May or may not have calls depending on timing
+ })
+}
+
+func TestPluginWithErrorClient(t *testing.T) {
+ t.Run("handles client write error", func(t *testing.T) {
+ logger := zaptest.NewLogger(t)
+ mockClient := &mockPrometheus{t: t}
+ mockClient.setError(assert.AnError)
+
+ config := &Config{
+ Endpoint: "http://localhost:9090/api/v1/write",
+ KeepAlive: KeepAliveConfig{
+ MaxIdleConnDuration_: 10 * time.Second,
+ MaxConnDuration_: 5 * time.Minute,
+ },
+ ConnectionTimeout_: 5 * time.Second,
+ RequestTimeout_: time.Second,
+ }
+ err := cfg.SetDefaultValues(config)
+ require.NoError(t, err)
+
+ plugin := &Plugin{
+ client: mockClient,
+ }
+
+ params := &pipeline.OutputPluginParams{
+ PluginDefaultParams: test.NewEmptyOutputPluginParams().PluginDefaultParams,
+ Logger: logger.Sugar(),
+ Controller: &mockController{},
+ Router: pipeline.NewRouter(),
+ }
+
+ plugin.Start(config, params)
+ time.Sleep(50 * time.Millisecond)
+
+ // Create and send an event that will fail
+ root, err := insaneJSON.DecodeBytes([]byte(`{
+ "name": "error_metric",
+ "type": "gauge",
+ "value": 1.0,
+ "timestamp": 1234567890,
+ "labels": {"job": "test"}
+ }`))
+ require.NoError(t, err)
+
+ event := &pipeline.Event{
+ Root: root,
+ }
+ plugin.Out(event)
+
+ // Wait for error handling
+ time.Sleep(200 * time.Millisecond)
+
+ plugin.Stop()
+ time.Sleep(100 * time.Millisecond)
+
+ // Test should complete without panic - error is handled gracefully
+ assert.True(t, true)
+ })
+}