Skip to content

Commit 50deab9

Browse files
committed
reporter: --otlp-logging flag forwards agent logs over OTLP
When set, install a logrus hook on the global logger that converts each entry to a LogEvent and ships it through the existing OTLP log streamer (arrowReporter.ReportLogEvents) along with probe events and any other log producers. The hook captures every level logrus emits — actual filtering is left to the logger's own configuration. LogEvent gains Severity/SeverityText so log records carry proper OTLP severity. Probe-fire events leave these zero/empty (SeverityUnspecified) and the streamer skips both setters in that case, so the field is present on the wire only when the producer populated it. To avoid a feedback loop on export failures (the streamer logs warnings on Export errors and on partial-success rejections), those internal warnings tag themselves with otlp_skip=true; the hook drops any entry carrying that field. In offline mode (no remote-store), the flag logs a one-shot warning at startup and otherwise behaves as off, since the streamer is itself nil in that case.
1 parent 267a6b1 commit 50deab9

5 files changed

Lines changed: 280 additions & 5 deletions

File tree

flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ type Flags struct {
162162
EnableOOMProfAllocs bool `default:"false" help:"Enable OOMProf alloc counts."`
163163

164164
MergeGpuProfiles bool `default:"false" help:"Report GPU kernel timing and GPU PC sampling under a single gpu_time/nanoseconds sample_type, differentiated by a gpu_view label (pc_sample|kernel_time). When false (the default), they are reported as separate sample_types (gpu_kernel_time/nanoseconds and gpu_pcsample/count) with no per-sample labels."`
165+
166+
OTLPLogging bool `default:"false" help:"Forward parca-agent's own logrus output to the remote-store as OTLP log records (in addition to local stderr). Requires a remote-store; ignored in offline mode."`
165167
}
166168

167169
type ExitCode int

main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,15 @@ func mainWithExitCode() flags.ExitCode {
434434
}
435435
parcaReporter.Start(mainCtx)
436436

437+
if f.OTLPLogging {
438+
if grpcConn == nil {
439+
log.Warn("--otlp-logging is set but no remote-store is configured; agent logs will only go to stderr")
440+
} else {
441+
log.AddHook(reporter.NewOTLPLogrusHook(parcaReporter))
442+
log.Info("forwarding parca-agent logs to remote-store via OTLP")
443+
}
444+
}
445+
437446
includeEnvVars := libpf.Set[string]{}
438447
if len(f.IncludeEnvVar) > 0 {
439448
for _, env := range f.IncludeEnvVar {

reporter/log_streamer.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ import (
3131
// streamer batches a slice of these and ships them as one OTLP/gRPC
3232
// ExportLogsServiceRequest.
3333
type LogEvent struct {
34-
TimestampNs int64 // wall-clock ns (unix epoch) of the event itself
35-
ObservedTimestampNs int64 // wall-clock ns at the moment the producer enqueued the event
36-
Body string // LogRecord.Body (set as a string body)
34+
TimestampNs int64 // wall-clock ns (unix epoch) of the event itself
35+
ObservedTimestampNs int64 // wall-clock ns at the moment the producer enqueued the event
36+
Body string // LogRecord.Body (set as a string body)
37+
Severity plog.SeverityNumber // LogRecord.SeverityNumber; zero = unspecified
38+
SeverityText string // LogRecord.SeverityText; empty = unset
3739
Attributes map[string]LogAttr
3840
}
3941

@@ -121,7 +123,8 @@ func (s *logStreamer) run(ctx context.Context) {
121123
return
122124
}
123125
s.exportErrs.Add(1)
124-
log.Warnf("log streamer: export errored (dropping %d events, backing off %s): %v",
126+
log.WithField(otlpSkipField, true).Warnf(
127+
"log streamer: export errored (dropping %d events, backing off %s): %v",
125128
len(batch), logStreamerErrorBackoff, err)
126129
// Backoff to avoid spinning on a persistently-broken endpoint.
127130
// Events accumulating during the sleep are queued in s.in and may
@@ -175,7 +178,8 @@ func (s *logStreamer) export(ctx context.Context, batch []LogEvent) error {
175178
}
176179
if ps := resp.PartialSuccess(); ps.RejectedLogRecords() > 0 {
177180
s.rejected.Add(uint64(ps.RejectedLogRecords()))
178-
log.Warnf("log streamer: server rejected %d/%d records: %s",
181+
log.WithField(otlpSkipField, true).Warnf(
182+
"log streamer: server rejected %d/%d records: %s",
179183
ps.RejectedLogRecords(), len(batch), ps.ErrorMessage())
180184
}
181185
return nil
@@ -203,6 +207,12 @@ func (s *logStreamer) buildLogs(batch []LogEvent) plog.Logs {
203207
lr.SetTimestamp(pcommon.Timestamp(ev.TimestampNs))
204208
lr.SetObservedTimestamp(pcommon.Timestamp(ev.ObservedTimestampNs))
205209
lr.Body().SetStr(ev.Body)
210+
if ev.Severity != plog.SeverityNumberUnspecified {
211+
lr.SetSeverityNumber(ev.Severity)
212+
}
213+
if ev.SeverityText != "" {
214+
lr.SetSeverityText(ev.SeverityText)
215+
}
206216
a := lr.Attributes()
207217
for k, v := range ev.Attributes {
208218
if v.IsInt {

reporter/logrus_hook.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2026 The Parca Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package reporter
15+
16+
import (
17+
"fmt"
18+
"strings"
19+
"time"
20+
21+
"github.com/sirupsen/logrus"
22+
"go.opentelemetry.io/collector/pdata/plog"
23+
)
24+
25+
// otlpSkipField is a logrus entry field name. When present and true on an
26+
// entry, OTLPLogrusHook drops the entry instead of forwarding it through
27+
// ReportLogEvents. The log streamer tags its own error/partial-success warns
28+
// with this field to break the feedback loop that would otherwise grow the
29+
// streamer's queue every time an export fails.
30+
const otlpSkipField = "otlp_skip"
31+
32+
// OTLPLogrusHook is a logrus.Hook that ships every captured entry through the
33+
// supplied ParcaReporter as an OTLP log record. Install with
34+
// logrus.AddHook(reporter.NewOTLPLogrusHook(rep)) after the reporter is
35+
// constructed; entries logged before installation are not captured.
36+
//
37+
// Fire is non-blocking on the steady state: ReportLogEvents performs a single
38+
// non-blocking channel send into the streamer's queue. Entries are silently
39+
// dropped if the queue is saturated (accounted in the streamer's queueDrops
40+
// counter); Fire never returns a non-nil error so it cannot interfere with
41+
// the logrus call site.
42+
type OTLPLogrusHook struct {
43+
rep ParcaReporter
44+
}
45+
46+
// NewOTLPLogrusHook returns a hook bound to rep. The hook fires on every
47+
// logrus level by default; logrus's own level filter is what actually
48+
// gates which entries reach Fire.
49+
func NewOTLPLogrusHook(rep ParcaReporter) *OTLPLogrusHook {
50+
return &OTLPLogrusHook{rep: rep}
51+
}
52+
53+
// Levels returns logrus.AllLevels so the hook fires for every level the
54+
// underlying logger emits. Filtering by level should be done by configuring
55+
// the logger itself (e.g. --log-level).
56+
func (h *OTLPLogrusHook) Levels() []logrus.Level {
57+
return logrus.AllLevels
58+
}
59+
60+
// Fire converts the logrus entry to a LogEvent and enqueues it via the
61+
// reporter. Entries carrying the otlp_skip=true field are dropped to break
62+
// the streamer→logrus→streamer feedback loop.
63+
func (h *OTLPLogrusHook) Fire(e *logrus.Entry) error {
64+
if v, ok := e.Data[otlpSkipField]; ok {
65+
if b, ok := v.(bool); ok && b {
66+
return nil
67+
}
68+
}
69+
70+
// Always emit at least `level` as a per-record attribute.
71+
attrs := make(map[string]LogAttr, len(e.Data)+1)
72+
attrs["level"] = LogAttr{Str: strings.ToUpper(e.Level.String())}
73+
for k, v := range e.Data {
74+
if k == otlpSkipField {
75+
continue
76+
}
77+
attrs[k] = toLogAttr(v)
78+
}
79+
80+
ev := LogEvent{
81+
TimestampNs: e.Time.UnixNano(),
82+
ObservedTimestampNs: time.Now().UnixNano(),
83+
Body: e.Message,
84+
Severity: logrusLevelToSeverity(e.Level),
85+
SeverityText: strings.ToUpper(e.Level.String()),
86+
Attributes: attrs,
87+
}
88+
89+
// ReportLogEvents itself never returns an error in the current
90+
// implementation; even if it did we'd swallow it here to avoid
91+
// propagating into the logrus call site.
92+
_ = h.rep.ReportLogEvents([]LogEvent{ev})
93+
return nil
94+
}
95+
96+
// toLogAttr maps a logrus field value to the tagged LogAttr union. Integer
97+
// types are preserved; everything else is stringified via fmt.Sprint so we
98+
// don't lose information for types that the OTLP attribute set could in
99+
// principle represent (floats, bools, errors, structs) but our minimal
100+
// LogAttr does not yet model.
101+
func toLogAttr(v any) LogAttr {
102+
switch x := v.(type) {
103+
case string:
104+
return LogAttr{Str: x}
105+
case int:
106+
return LogAttr{Int: int64(x), IsInt: true}
107+
case int32:
108+
return LogAttr{Int: int64(x), IsInt: true}
109+
case int64:
110+
return LogAttr{Int: x, IsInt: true}
111+
case uint:
112+
return LogAttr{Int: int64(x), IsInt: true}
113+
case uint32:
114+
return LogAttr{Int: int64(x), IsInt: true}
115+
case uint64:
116+
return LogAttr{Int: int64(x), IsInt: true}
117+
case error:
118+
return LogAttr{Str: x.Error()}
119+
default:
120+
return LogAttr{Str: fmt.Sprint(v)}
121+
}
122+
}
123+
124+
// logrusLevelToSeverity maps a logrus.Level to the closest OTLP severity
125+
// number. Panic level shares Fatal since OTLP has no distinct panic bucket.
126+
func logrusLevelToSeverity(l logrus.Level) plog.SeverityNumber {
127+
switch l {
128+
case logrus.TraceLevel:
129+
return plog.SeverityNumberTrace
130+
case logrus.DebugLevel:
131+
return plog.SeverityNumberDebug
132+
case logrus.InfoLevel:
133+
return plog.SeverityNumberInfo
134+
case logrus.WarnLevel:
135+
return plog.SeverityNumberWarn
136+
case logrus.ErrorLevel:
137+
return plog.SeverityNumberError
138+
case logrus.FatalLevel, logrus.PanicLevel:
139+
return plog.SeverityNumberFatal
140+
default:
141+
return plog.SeverityNumberUnspecified
142+
}
143+
}

reporter/logrus_hook_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright 2026 The Parca Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package reporter
15+
16+
import (
17+
"errors"
18+
"testing"
19+
"time"
20+
21+
"github.com/sirupsen/logrus"
22+
"github.com/stretchr/testify/require"
23+
"go.opentelemetry.io/collector/pdata/plog"
24+
"go.opentelemetry.io/ebpf-profiler/libpf"
25+
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
26+
)
27+
28+
// fakeReporter satisfies ParcaReporter; it captures every LogEvent batch sent
29+
// through ReportLogEvents and ignores ReportTraceEvent. Tests inspect captured.
30+
type fakeReporter struct {
31+
captured []LogEvent
32+
}
33+
34+
func (f *fakeReporter) ReportTraceEvent(_ *libpf.Trace, _ *samples.TraceEventMeta) error {
35+
return nil
36+
}
37+
38+
func (f *fakeReporter) ReportLogEvents(events []LogEvent) error {
39+
f.captured = append(f.captured, events...)
40+
return nil
41+
}
42+
43+
func TestOTLPLogrusHook_FiresWithSeverityAndAttributes(t *testing.T) {
44+
rep := &fakeReporter{}
45+
hook := NewOTLPLogrusHook(rep)
46+
47+
ts := time.Date(2026, 6, 4, 12, 0, 0, 0, time.UTC)
48+
entry := &logrus.Entry{
49+
Time: ts,
50+
Level: logrus.WarnLevel,
51+
Message: "something happened",
52+
Data: logrus.Fields{
53+
"pid": int(4321),
54+
"comm": "myapp",
55+
"err": errors.New("boom"),
56+
"unk": 3.14, // unsupported type → stringified
57+
"u64": uint64(7),
58+
},
59+
}
60+
require.NoError(t, hook.Fire(entry))
61+
62+
require.Len(t, rep.captured, 1)
63+
ev := rep.captured[0]
64+
require.Equal(t, ts.UnixNano(), ev.TimestampNs)
65+
require.Equal(t, "something happened", ev.Body)
66+
require.Equal(t, plog.SeverityNumberWarn, ev.Severity)
67+
require.Equal(t, "WARNING", ev.SeverityText)
68+
69+
require.Equal(t, LogAttr{Str: "myapp"}, ev.Attributes["comm"])
70+
require.Equal(t, LogAttr{Int: 4321, IsInt: true}, ev.Attributes["pid"])
71+
require.Equal(t, LogAttr{Str: "boom"}, ev.Attributes["err"])
72+
require.Equal(t, LogAttr{Str: "3.14"}, ev.Attributes["unk"])
73+
require.Equal(t, LogAttr{Int: 7, IsInt: true}, ev.Attributes["u64"])
74+
}
75+
76+
func TestOTLPLogrusHook_SkipsSelfTaggedEntries(t *testing.T) {
77+
rep := &fakeReporter{}
78+
hook := NewOTLPLogrusHook(rep)
79+
80+
entry := &logrus.Entry{
81+
Time: time.Now(),
82+
Level: logrus.WarnLevel,
83+
Message: "streamer error — must not loop",
84+
Data: logrus.Fields{otlpSkipField: true, "extra": "x"},
85+
}
86+
require.NoError(t, hook.Fire(entry))
87+
require.Empty(t, rep.captured, "entries tagged otlp_skip=true must be dropped")
88+
}
89+
90+
func TestOTLPLogrusHook_LevelMapping(t *testing.T) {
91+
cases := []struct {
92+
level logrus.Level
93+
want plog.SeverityNumber
94+
}{
95+
{logrus.TraceLevel, plog.SeverityNumberTrace},
96+
{logrus.DebugLevel, plog.SeverityNumberDebug},
97+
{logrus.InfoLevel, plog.SeverityNumberInfo},
98+
{logrus.WarnLevel, plog.SeverityNumberWarn},
99+
{logrus.ErrorLevel, plog.SeverityNumberError},
100+
{logrus.FatalLevel, plog.SeverityNumberFatal},
101+
{logrus.PanicLevel, plog.SeverityNumberFatal},
102+
}
103+
for _, c := range cases {
104+
require.Equal(t, c.want, logrusLevelToSeverity(c.level), "level=%s", c.level)
105+
}
106+
}
107+
108+
func TestOTLPLogrusHook_LevelsCoversAll(t *testing.T) {
109+
hook := NewOTLPLogrusHook(&fakeReporter{})
110+
require.ElementsMatch(t, logrus.AllLevels, hook.Levels())
111+
}

0 commit comments

Comments
 (0)