@@ -16,218 +16,63 @@ package reporter
1616import (
1717 "context"
1818 "fmt"
19- "sync/atomic"
2019 "time"
2120
22- log "github.com/sirupsen/logrus "
23- "go.opentelemetry.io/collector/pdata/pcommon "
24- "go.opentelemetry.io/collector/pdata/plog "
25- "go.opentelemetry.io/collector/pdata/plog/plogotlp "
21+ "go.opentelemetry.io/otel/attribute "
22+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc "
23+ sdklog "go.opentelemetry.io/otel/sdk/log "
24+ "go.opentelemetry.io/otel/sdk/resource "
2625 "google.golang.org/grpc"
2726)
2827
29- // LogEvent is the in-process representation of a single OTLP log record
30- // produced by any source that uses the ParcaReporter log-event API. The
31- // streamer batches a slice of these and ships them as one OTLP/gRPC
32- // ExportLogsServiceRequest.
33- type LogEvent struct {
34- TimestampNs int64 // wall-clock ns (unix epoch) of the event itself
35- ObservedTimestampNs int64 // wall-clock ns at the moment the producer enqueued the event
36- Body string // LogRecord.Body (set as a string body)
37- Attributes map [string ]LogAttr
38- }
39-
40- // LogAttr is a tagged union covering the OTLP attribute value types we use.
41- // Producers populate one of Str / Int and leave the other zero. The streamer
42- // picks the right setter based on which is set.
43- type LogAttr struct {
44- Str string
45- Int int64
46- IsInt bool
47- }
48-
49- const (
50- logStreamerBatchSize = 512
51- logStreamerBatchAge = 250 * time .Millisecond
52- logStreamerQueueSize = 4096
53- logStreamerErrorBackoff = 5 * time .Second
54- logStreamerScopeName = "parca-agent"
55- )
56-
57- // logStreamerOptions is the resource-attribute payload attached to every batch.
58- type logStreamerOptions struct {
28+ // logProviderOptions is the resource-attribute payload attached to every batch
29+ // shipped by the OTLP log exporter.
30+ type logProviderOptions struct {
5931 ServiceName string // service.name = "parca-agent"
6032 ServiceVersion string // service.version = build VCS revision
6133 HostName string // host.name = agent --node
6234}
6335
64- // logStreamer batches LogEvents and ships them as OTLP/gRPC
65- // ExportLogsServiceRequest messages via plogotlp.GRPCClient. Owned by
66- // arrowReporter; constructed once per New() and run in the Start() goroutine
67- // when grpcConn is non-nil.
68- type logStreamer struct {
69- conn * grpc.ClientConn
70- client plogotlp.GRPCClient
71- opts logStreamerOptions
72-
73- in chan LogEvent
74-
75- // Counters surfaced via prometheus from arrowReporter; the streamer itself
76- // only owns the atomics. arrowReporter wires them into a registry.
77- batchesSent atomic.Uint64
78- eventsSent atomic.Uint64
79- exportErrs atomic.Uint64
80- queueDrops atomic.Uint64
81- rejected atomic.Uint64
82- }
83-
84- func newLogStreamer (conn * grpc.ClientConn , opts logStreamerOptions ) * logStreamer {
85- return & logStreamer {
86- conn : conn ,
87- client : plogotlp .NewGRPCClient (conn ),
88- opts : opts ,
89- in : make (chan LogEvent , logStreamerQueueSize ),
90- }
91- }
92-
93- // enqueue tries to publish a single event. Returns false if the queue is full;
94- // the caller (ReportLogEvents) increments queueDrops and moves on.
95- func (s * logStreamer ) enqueue (ev LogEvent ) bool {
96- select {
97- case s .in <- ev :
98- return true
99- default :
100- s .queueDrops .Add (1 )
101- return false
102- }
103- }
104-
105- // run batches LogEvents and ships them as OTLP ExportLogsServiceRequest
106- // messages. Each batch is one unary RPC; transient backend errors trigger a
107- // brief sleep to avoid hot-looping on persistent failures. Returns when ctx is
108- // cancelled.
109- func (s * logStreamer ) run (ctx context.Context ) {
110- batch := make ([]LogEvent , 0 , logStreamerBatchSize )
111- flushTimer := time .NewTimer (logStreamerBatchAge )
112- defer flushTimer .Stop ()
113- stopLogFlushTimer (flushTimer )
114-
115- flush := func () {
116- if len (batch ) == 0 {
117- return
118- }
119- if err := s .export (ctx , batch ); err != nil {
120- if ctx .Err () != nil {
121- return
122- }
123- s .exportErrs .Add (1 )
124- log .Warnf ("log streamer: export errored (dropping %d events, backing off %s): %v" ,
125- len (batch ), logStreamerErrorBackoff , err )
126- // Backoff to avoid spinning on a persistently-broken endpoint.
127- // Events accumulating during the sleep are queued in s.in and may
128- // also be dropped by enqueue's non-blocking send (queueDrops).
129- select {
130- case <- ctx .Done ():
131- case <- time .After (logStreamerErrorBackoff ):
132- }
133- } else {
134- s .batchesSent .Add (1 )
135- s .eventsSent .Add (uint64 (len (batch )))
136- }
137- batch = batch [:0 ]
138- }
139-
140- for {
141- select {
142- case <- ctx .Done ():
143- flush ()
144- return
145-
146- case ev , ok := <- s .in :
147- if ! ok {
148- flush ()
149- return
150- }
151- if len (batch ) == 0 {
152- resetLogFlushTimer (flushTimer , logStreamerBatchAge )
153- }
154- batch = append (batch , ev )
155- if len (batch ) >= logStreamerBatchSize {
156- flush ()
157- stopLogFlushTimer (flushTimer )
158- }
159-
160- case <- flushTimer .C :
161- flush ()
162- }
163- }
164- }
36+ // Batching policy. The defaults the OTel SDK ships with are tuned for general
37+ // telemetry workloads (1 s flush interval); for probe-fire-style events we
38+ // want a tighter age cap so a single slow callback shows up in the UI within
39+ // a few hundred ms of the dtor firing.
40+ const (
41+ logExportMaxBatchSize = 512
42+ logExportInterval = 250 * time .Millisecond
43+ logMaxQueueSize = 4096
44+ )
16545
166- // export ships one batch as a single OTLP/gRPC ExportLogsServiceRequest. The
167- // returned error means the RPC itself failed; a successful RPC with
168- // PartialSuccess.RejectedLogRecords > 0 is logged but not returned (the rest of
169- // the batch was accepted).
170- func ( s * logStreamer ) export ( ctx context. Context , batch [] LogEvent ) error {
171- req := plogotlp . NewExportRequestFromLogs ( s . buildLogs ( batch ))
172- resp , err := s . client . Export (ctx , req )
46+ // newLogProvider constructs an OTel logs LoggerProvider that ships records as
47+ // OTLP/gRPC ExportLogsServiceRequest messages over the supplied connection.
48+ // The connection is shared with the profile-data path (caller owns it; we
49+ // only borrow); the SDK's BatchProcessor runs its own goroutines for batching
50+ // + retry and is torn down by the returned provider's Shutdown.
51+ func newLogProvider ( ctx context. Context , conn * grpc. ClientConn , opts logProviderOptions ) ( * sdklog. LoggerProvider , error ) {
52+ exp , err := otlploggrpc . New (ctx , otlploggrpc . WithGRPCConn ( conn ) )
17353 if err != nil {
174- return fmt .Errorf ("plogotlp export: %w" , err )
175- }
176- if ps := resp .PartialSuccess (); ps .RejectedLogRecords () > 0 {
177- s .rejected .Add (uint64 (ps .RejectedLogRecords ()))
178- log .Warnf ("log streamer: server rejected %d/%d records: %s" ,
179- ps .RejectedLogRecords (), len (batch ), ps .ErrorMessage ())
54+ return nil , fmt .Errorf ("create otlploggrpc exporter: %w" , err )
18055 }
181- return nil
182- }
18356
184- func (s * logStreamer ) buildLogs (batch []LogEvent ) plog.Logs {
185- logs := plog .NewLogs ()
186- rl := logs .ResourceLogs ().AppendEmpty ()
187- resAttr := rl .Resource ().Attributes ()
188- resAttr .PutStr ("service.name" , s .opts .ServiceName )
189- if s .opts .ServiceVersion != "" {
190- resAttr .PutStr ("service.version" , s .opts .ServiceVersion )
57+ attrs := []attribute.KeyValue {
58+ attribute .String ("service.name" , opts .ServiceName ),
19159 }
192- if s . opts .HostName != "" {
193- resAttr . PutStr ( "host.name " , s . opts .HostName )
60+ if opts .ServiceVersion != "" {
61+ attrs = append ( attrs , attribute . String ( "service.version " , opts .ServiceVersion ) )
19462 }
195-
196- sl := rl .ScopeLogs ().AppendEmpty ()
197- sl .Scope ().SetName (logStreamerScopeName )
198-
199- records := sl .LogRecords ()
200- records .EnsureCapacity (len (batch ))
201- for _ , ev := range batch {
202- lr := records .AppendEmpty ()
203- lr .SetTimestamp (pcommon .Timestamp (ev .TimestampNs ))
204- lr .SetObservedTimestamp (pcommon .Timestamp (ev .ObservedTimestampNs ))
205- lr .Body ().SetStr (ev .Body )
206- a := lr .Attributes ()
207- for k , v := range ev .Attributes {
208- if v .IsInt {
209- a .PutInt (k , v .Int )
210- } else {
211- a .PutStr (k , v .Str )
212- }
213- }
63+ if opts .HostName != "" {
64+ attrs = append (attrs , attribute .String ("host.name" , opts .HostName ))
21465 }
215-
216- return logs
217- }
218-
219- // stopLogFlushTimer drains the timer channel after Stop so the next Reset
220- // starts cleanly.
221- func stopLogFlushTimer (t * time.Timer ) {
222- if ! t .Stop () {
223- select {
224- case <- t .C :
225- default :
226- }
227- }
228- }
229-
230- func resetLogFlushTimer (t * time.Timer , d time.Duration ) {
231- stopLogFlushTimer (t )
232- t .Reset (d )
66+ res := resource .NewSchemaless (attrs ... )
67+
68+ bp := sdklog .NewBatchProcessor (exp ,
69+ sdklog .WithMaxQueueSize (logMaxQueueSize ),
70+ sdklog .WithExportMaxBatchSize (logExportMaxBatchSize ),
71+ sdklog .WithExportInterval (logExportInterval ),
72+ )
73+
74+ return sdklog .NewLoggerProvider (
75+ sdklog .WithResource (res ),
76+ sdklog .WithProcessor (bp ),
77+ ), nil
23378}
0 commit comments