Skip to content

Commit e10b4b3

Browse files
committed
Propagate process context to profiles
1 parent c6a0a64 commit e10b4b3

11 files changed

Lines changed: 809 additions & 45 deletions

File tree

libpf/trace.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf"
55

66
import (
77
"unique"
8+
9+
"go.opentelemetry.io/collector/pdata/pcommon"
810
)
911

1012
// FrameMappingFileData represents a backing file for a memory mapping.
@@ -120,6 +122,7 @@ type EbpfTrace struct {
120122
NumFrames int
121123
EnvVars map[String]String
122124
CustomLabels map[String]String
125+
Resource *pcommon.Resource
123126
KernelFrames Frames
124127
FrameData []uint64
125128
FrameDataBuf [3072]uint64

processcontext/processcontext.go

Lines changed: 200 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,17 @@ import (
77
"encoding/binary"
88
"errors"
99
"fmt"
10+
"net/url"
11+
"strings"
1012
"structs"
1113
"unsafe"
1214

15+
"go.opentelemetry.io/collector/pdata/pcommon"
16+
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
17+
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
1318
"google.golang.org/protobuf/proto"
1419

20+
"go.opentelemetry.io/ebpf-profiler/internal/log"
1521
"go.opentelemetry.io/ebpf-profiler/libpf"
1622
"go.opentelemetry.io/ebpf-profiler/libpf/pfunsafe"
1723
processcontextpb "go.opentelemetry.io/ebpf-profiler/processcontext/v1development"
@@ -47,6 +53,12 @@ const (
4753

4854
// Offset of the MonotonicPublishedAtNs field in the header struct
4955
monotonicPublishedAtNsOffset = libpf.Address(unsafe.Offsetof(header{}.MonotonicPublishedAtNs))
56+
57+
// resourceAttrKey is the environment variable name OpenTelemetry Resource information will be read from.
58+
resourceAttrKey = "OTEL_RESOURCE_ATTRIBUTES"
59+
60+
// svcNameKey is the environment variable name that Service Name information will be read from.
61+
svcNameKey = "OTEL_SERVICE_NAME"
5062
)
5163

5264
var (
@@ -60,9 +72,14 @@ var (
6072
ErrNoUpdate = errors.New("ProcessContext has not been updated")
6173
)
6274

75+
// Info is a snapshot of process context. The pointed-to Resource and
76+
// ExtraAttributes are shared by pointer across goroutines (process-manager
77+
// writer, tracer, reporter) without locking; once an Info is published they
78+
// MUST be treated as read-only by all holders.
6379
type Info struct {
64-
Context *processcontextpb.ProcessContext
65-
PublishedAtNs uint64
80+
Resource *pcommon.Resource
81+
ExtraAttributes *pcommon.Map
82+
PublishedAtNs uint64
6683
}
6784

6885
// header represents the 32-byte memory region header per OTEP #4719.
@@ -200,11 +217,188 @@ func readPayload(rm remotememory.RemoteMemory, hdr header) (Info, error) {
200217
return Info{}, fmt.Errorf("failed to unmarshal ProcessContext: %w", err)
201218
}
202219

203-
return Info{Context: ctx, PublishedAtNs: hdr.MonotonicPublishedAtNs}, nil
220+
var resource *pcommon.Resource
221+
if ctx.Resource != nil {
222+
r := pcommon.NewResource()
223+
for _, attr := range ctx.Resource.Attributes {
224+
if v, ok := convertAnyValue(attr.Value); ok {
225+
v.MoveTo(r.Attributes().PutEmpty(attr.Key))
226+
}
227+
}
228+
resource = &r
229+
}
230+
231+
var extraAttributes *pcommon.Map
232+
if ctx.ExtraAttributes != nil {
233+
m := pcommon.NewMap()
234+
for _, attr := range ctx.ExtraAttributes {
235+
if v, ok := convertAnyValue(attr.Value); ok {
236+
v.MoveTo(m.PutEmpty(attr.Key))
237+
}
238+
}
239+
extraAttributes = &m
240+
}
241+
return Info{Resource: resource, ExtraAttributes: extraAttributes, PublishedAtNs: hdr.MonotonicPublishedAtNs}, nil
242+
}
243+
244+
// convertAnyValue converts a commonpb.AnyValue to a pcommon.Value, handling
245+
// all value types including nested maps and arrays. Returns (_, false) for
246+
// nil inputs and unknown variants so callers can skip them rather than
247+
// emit phantom empty entries.
248+
func convertAnyValue(src *commonpb.AnyValue) (pcommon.Value, bool) {
249+
if src == nil {
250+
return pcommon.Value{}, false
251+
}
252+
switch v := src.Value.(type) {
253+
case *commonpb.AnyValue_StringValue:
254+
return pcommon.NewValueStr(v.StringValue), true
255+
case *commonpb.AnyValue_BoolValue:
256+
return pcommon.NewValueBool(v.BoolValue), true
257+
case *commonpb.AnyValue_IntValue:
258+
return pcommon.NewValueInt(v.IntValue), true
259+
case *commonpb.AnyValue_DoubleValue:
260+
return pcommon.NewValueDouble(v.DoubleValue), true
261+
case *commonpb.AnyValue_BytesValue:
262+
val := pcommon.NewValueBytes()
263+
val.Bytes().FromRaw(v.BytesValue)
264+
return val, true
265+
case *commonpb.AnyValue_ArrayValue:
266+
val := pcommon.NewValueSlice()
267+
if v.ArrayValue != nil {
268+
sl := val.Slice()
269+
sl.EnsureCapacity(len(v.ArrayValue.Values))
270+
for _, item := range v.ArrayValue.Values {
271+
if itemVal, ok := convertAnyValue(item); ok {
272+
itemVal.MoveTo(sl.AppendEmpty())
273+
}
274+
}
275+
}
276+
return val, true
277+
case *commonpb.AnyValue_KvlistValue:
278+
val := pcommon.NewValueMap()
279+
if v.KvlistValue != nil {
280+
m := val.Map()
281+
m.EnsureCapacity(len(v.KvlistValue.Values))
282+
for _, kv := range v.KvlistValue.Values {
283+
if kvVal, ok := convertAnyValue(kv.Value); ok {
284+
kvVal.MoveTo(m.PutEmpty(kv.Key))
285+
}
286+
}
287+
}
288+
return val, true
289+
default:
290+
log.Debugf("convertAnyValue: unknown AnyValue variant %T, skipping", v)
291+
return pcommon.Value{}, false
292+
}
293+
}
294+
295+
// WithMergedEnvVars returns process context with attributes derived from
296+
// OTEL_SERVICE_NAME and OTEL_RESOURCE_ATTRIBUTES merged into its Resource.
297+
func WithMergedEnvVars(info Info, envVars map[libpf.String]libpf.String) Info {
298+
info.Resource = mergeResources(info.Resource, resourceFromEnvVars(envVars))
299+
return info
300+
}
301+
302+
// resourceFromEnvVars builds a Resource from OTEL_SERVICE_NAME and
303+
// OTEL_RESOURCE_ATTRIBUTES, returning nil when neither yields any attribute.
304+
func resourceFromEnvVars(envVars map[libpf.String]libpf.String) *pcommon.Resource {
305+
r := pcommon.NewResource()
306+
if v, ok := envVars[libpf.Intern(resourceAttrKey)]; ok {
307+
pairs, err := parseResourceAttributes(v.String())
308+
if err != nil {
309+
log.Debugf("OTEL_RESOURCE_ATTRIBUTES=%q: discarding invalid value: %v", v.String(), err)
310+
} else {
311+
for _, p := range pairs {
312+
r.Attributes().PutStr(p.key, p.value)
313+
}
314+
}
315+
}
316+
if v, ok := envVars[libpf.Intern(svcNameKey)]; ok {
317+
r.Attributes().PutStr(string(semconv.ServiceNameKey), v.String())
318+
}
319+
if r.Attributes().Len() == 0 {
320+
return nil
321+
}
322+
return &r
323+
}
324+
325+
// mergeResources returns a Resource with primary's attributes plus any keys
326+
// from secondary not already in primary (primary wins on collision). Returns
327+
// nil only when both inputs are nil. Inputs are not modified.
328+
func mergeResources(primary, secondary *pcommon.Resource) *pcommon.Resource {
329+
if primary == nil {
330+
return secondary
331+
}
332+
if secondary == nil {
333+
return primary
334+
}
335+
r := pcommon.NewResource()
336+
primary.Attributes().CopyTo(r.Attributes())
337+
secondary.Attributes().Range(func(k string, v pcommon.Value) bool {
338+
if _, exists := r.Attributes().Get(k); !exists {
339+
v.CopyTo(r.Attributes().PutEmpty(k))
340+
}
341+
return true
342+
})
343+
return &r
344+
}
345+
346+
// resourceAttribute is one parsed entry from OTEL_RESOURCE_ATTRIBUTES.
347+
type resourceAttribute struct {
348+
key, value string
204349
}
205350

206-
func (p *Info) ClearExtraAttributes() {
207-
if p.Context != nil {
208-
p.Context.ExtraAttributes = nil
351+
// parseResourceAttributes parses an OTEL_RESOURCE_ATTRIBUTES value as
352+
// comma-separated key=value pairs where keys and values are percent-encoded.
353+
// Returns the pairs in source order; the caller dedups via last-writer-wins.
354+
// On any decoding error the whole value is discarded per OTel spec and a
355+
// non-nil error is returned.
356+
func parseResourceAttributes(raw string) ([]resourceAttribute, error) {
357+
if raw == "" {
358+
return nil, nil
359+
}
360+
var pairs []resourceAttribute
361+
for pair := range strings.SplitSeq(raw, ",") {
362+
k, v, ok := strings.Cut(pair, "=")
363+
if !ok {
364+
return nil, fmt.Errorf("missing '=' in %q", pair)
365+
}
366+
key, err := url.PathUnescape(strings.TrimSpace(k))
367+
if err != nil {
368+
return nil, fmt.Errorf("invalid key %q: %w", k, err)
369+
}
370+
value, err := url.PathUnescape(strings.TrimSpace(v))
371+
if err != nil {
372+
return nil, fmt.Errorf("invalid value for key %q: %w", key, err)
373+
}
374+
pairs = append(pairs, resourceAttribute{key, value})
375+
}
376+
return pairs, nil
377+
}
378+
379+
// ResourceToContextKey returns a stable key derived from the
380+
// (service.namespace, service.name, service.instance.id) triplet which the
381+
// OTel semantic conventions describe as globally unique for a service
382+
// instance.
383+
// See: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/service.md
384+
//
385+
// Returns libpf.NullString only when resource is nil or none of the three
386+
// attributes is present. When at least one is present, the result joins all
387+
// three with ':' (missing components render as empty strings); callers
388+
// should treat the null sentinel as "unidentifiable" and may choose to
389+
// group such samples by other fields.
390+
func ResourceToContextKey(resource *pcommon.Resource) libpf.String {
391+
if resource == nil {
392+
return libpf.NullString
393+
}
394+
serviceNamespace, namespaceOk := resource.Attributes().Get(string(semconv.ServiceNamespaceKey))
395+
serviceName, nameOk := resource.Attributes().Get(string(semconv.ServiceNameKey))
396+
serviceInstanceID, instanceIdOk := resource.Attributes().Get(string(semconv.ServiceInstanceIDKey))
397+
// If all three attributes are missing, return an empty string instead of ":::" to ensure that nil resource
398+
// and empty resource are treated as the same.
399+
if !namespaceOk && !nameOk && !instanceIdOk {
400+
return libpf.NullString
209401
}
402+
return libpf.Intern(fmt.Sprintf("%s:%s:%s",
403+
serviceNamespace.Str(), serviceName.Str(), serviceInstanceID.Str()))
210404
}

0 commit comments

Comments
 (0)