Skip to content

Commit 0d3d537

Browse files
authored
metricexport: reusable OTLP metrics egress (#3195)
metricexport: add reusable OTLP metrics egress Add a small, generic OTLP metrics egress path: a Producer interface and an Exporter that batches every registered producer's pmetric output into one OTLP MetricsService/Export request and ships it over an existing remote-store gRPC connection (the same one used for profiles), on a jittered interval. Nothing here is GPU-specific; it's intended as shared infrastructure for any in-process subsystem that wants to emit metrics to the remote store. The NVML GPU collector is its first consumer.
1 parent eeaf857 commit 0d3d537

2 files changed

Lines changed: 169 additions & 1 deletion

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/xyproto/ainur v1.3.3
3434
github.com/zcalusic/sysinfo v1.1.3
3535
github.com/zeebo/xxh3 v1.1.0
36+
go.opentelemetry.io/collector/pdata v1.60.0
3637
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0
3738
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.60.0
3839
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0
@@ -159,7 +160,6 @@ require (
159160
go.opentelemetry.io/collector/consumer v1.60.0 // indirect
160161
go.opentelemetry.io/collector/consumer/xconsumer v0.154.0 // indirect
161162
go.opentelemetry.io/collector/featuregate v1.60.0 // indirect
162-
go.opentelemetry.io/collector/pdata v1.60.0 // indirect
163163
go.opentelemetry.io/collector/pdata/pprofile v0.154.0 // indirect
164164
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
165165
go.uber.org/multierr v1.11.0 // indirect

metricexport/exporter.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2026 The Parca Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
// Package metricexport provides a small OTLP metrics egress path for
15+
// parca-agent. It is deliberately generic: a Producer collects gauge/sum
16+
// metrics in the background and renders them into a pmetric.MetricSlice on
17+
// demand, and the Exporter periodically batches every producer's output into a
18+
// single OTLP ExportMetricsServiceRequest and ships it over an existing
19+
// remote-store gRPC connection (the same one used for profiles).
20+
//
21+
// The GPU metrics collector (package gpumetrics) is the first Producer, but
22+
// nothing here is GPU-specific — any subsystem that wants to emit metrics to
23+
// the remote store can register a Producer.
24+
package metricexport
25+
26+
import (
27+
"context"
28+
"errors"
29+
"fmt"
30+
"math/rand/v2"
31+
"time"
32+
33+
log "github.com/sirupsen/logrus"
34+
"go.opentelemetry.io/collector/pdata/pmetric"
35+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
36+
"golang.org/x/sync/errgroup"
37+
"google.golang.org/grpc"
38+
)
39+
40+
// Producer collects metrics in the background (Collect, blocking until ctx is
41+
// cancelled) and renders the metrics accumulated so far into ms (Produce,
42+
// called periodically by the Exporter). Implementations must be safe for
43+
// Produce and Collect to run concurrently.
44+
type Producer interface {
45+
// Produce appends the metrics collected since the last call to ms.
46+
Produce(ms pmetric.MetricSlice) error
47+
// Collect runs the background collection loop until ctx is cancelled.
48+
Collect(ctx context.Context) error
49+
}
50+
51+
// ProducerConfig pairs a Producer with the OTLP instrumentation scope its
52+
// metrics are reported under.
53+
type ProducerConfig struct {
54+
Producer Producer
55+
ScopeName string
56+
}
57+
58+
// Exporter periodically renders all registered producers into a single OTLP
59+
// metrics request and sends it over conn.
60+
type Exporter struct {
61+
client pmetricotlp.GRPCClient
62+
interval time.Duration
63+
producers []ProducerConfig
64+
resourceAttrs map[string]any
65+
}
66+
67+
// NewExporter builds an Exporter that ships metrics over conn (typically the
68+
// agent's existing remote-store connection) every interval. resourceAttrs are
69+
// attached to the OTLP Resource (e.g. {"node": <node name>}).
70+
func NewExporter(conn *grpc.ClientConn, interval time.Duration, resourceAttrs map[string]any) *Exporter {
71+
return &Exporter{
72+
client: pmetricotlp.NewGRPCClient(conn),
73+
interval: interval,
74+
resourceAttrs: resourceAttrs,
75+
}
76+
}
77+
78+
// AddProducer registers a producer. Call before Run/Collect.
79+
func (e *Exporter) AddProducer(p ProducerConfig) {
80+
e.producers = append(e.producers, p)
81+
}
82+
83+
func (e *Exporter) report(ctx context.Context) error {
84+
m := pmetric.NewMetrics()
85+
r := m.ResourceMetrics().AppendEmpty()
86+
if err := r.Resource().Attributes().FromRaw(e.resourceAttrs); err != nil {
87+
return err
88+
}
89+
for _, p := range e.producers {
90+
s := r.ScopeMetrics().AppendEmpty()
91+
s.Scope().SetName(p.ScopeName)
92+
if err := p.Producer.Produce(s.Metrics()); err != nil {
93+
log.WithError(err).WithField("scope", p.ScopeName).Warn("metrics producer failed to produce")
94+
}
95+
}
96+
97+
dpc := m.DataPointCount()
98+
if dpc == 0 {
99+
return nil
100+
}
101+
102+
req := pmetricotlp.NewExportRequestFromMetrics(m)
103+
start := time.Now()
104+
resp, err := e.client.Export(ctx, req)
105+
if err != nil {
106+
return fmt.Errorf("otlp metrics export failed: %w", err)
107+
}
108+
if ps := resp.PartialSuccess(); ps.RejectedDataPoints() > 0 || ps.ErrorMessage() != "" {
109+
log.WithFields(log.Fields{
110+
"rejected": ps.RejectedDataPoints(),
111+
"message": ps.ErrorMessage(),
112+
}).Warn("otlp metrics partial success")
113+
}
114+
log.WithFields(log.Fields{
115+
"data_points": dpc,
116+
"duration": time.Since(start),
117+
}).Debug("gpu metrics export succeeded")
118+
return nil
119+
}
120+
121+
// Run starts every producer's background collection loop and the periodic
122+
// export loop, blocking until ctx is cancelled or a fatal error occurs.
123+
func (e *Exporter) Run(ctx context.Context) error {
124+
if len(e.producers) == 0 {
125+
return errors.New("metricexport: no producers configured")
126+
}
127+
log.WithField("producers", len(e.producers)).Info("starting otlp metrics exporter")
128+
129+
g, ctx := errgroup.WithContext(ctx)
130+
131+
// Background collection loops, one per producer.
132+
for _, p := range e.producers {
133+
g.Go(func() error {
134+
return p.Producer.Collect(ctx)
135+
})
136+
}
137+
138+
// Periodic export loop.
139+
g.Go(func() error {
140+
tick := time.NewTicker(e.interval)
141+
defer tick.Stop()
142+
for {
143+
select {
144+
case <-ctx.Done():
145+
return nil
146+
case <-tick.C:
147+
if err := e.report(ctx); err != nil {
148+
// Don't tear the agent down over a transient export
149+
// failure; log and keep collecting.
150+
log.WithError(err).Warn("failed to send otlp gpu metrics")
151+
}
152+
tick.Reset(addJitter(e.interval, 0.2))
153+
}
154+
}
155+
})
156+
157+
return g.Wait()
158+
}
159+
160+
// addJitter adds +/- jitter (jitter is [0..1]) to baseDuration.
161+
// Originally copied from go.opentelemetry.io/ebpf-profiler.
162+
func addJitter(baseDuration time.Duration, jitter float64) time.Duration {
163+
if jitter < 0.0 || jitter > 1.0 {
164+
return baseDuration
165+
}
166+
//nolint:gosec
167+
return time.Duration((1 + jitter - 2*jitter*rand.Float64()) * float64(baseDuration))
168+
}

0 commit comments

Comments
 (0)