diff --git a/CHANGELOG.md b/CHANGELOG.md index 94247f545fa..cebf526fcf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `go.opentelemetry.io/otel/semconv/v1.41.0` package. The package contains semantic conventions from the `v1.41.0` version of the OpenTelemetry Semantic Conventions. See the [migration documentation](./semconv/v1.41.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.40.0`. (#8324) +- Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`. (#8263) - Add Observable variants of instruments to `go.opentelemetry.io/otel/semconv/v1.41.0` package. (#8350) - Generate explicit histogram bucket boundaries from weaver configuration for HTTP and RPC duration instruments in `go.opentelemetry.io/otel/semconv/v1.41.0`. (#8002) diff --git a/exporters/stdout/stdoutlog/exporter.go b/exporters/stdout/stdoutlog/exporter.go index 3d48d67081e..94e9bfd3609 100644 --- a/exporters/stdout/stdoutlog/exporter.go +++ b/exporters/stdout/stdoutlog/exporter.go @@ -8,6 +8,9 @@ import ( "encoding/json" "sync/atomic" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/counter" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/observ" + "go.opentelemetry.io/otel/sdk/log" ) @@ -18,6 +21,7 @@ var _ log.Exporter = &Exporter{} type Exporter struct { encoder atomic.Pointer[json.Encoder] timestamps bool + inst *observ.Instrumentation } // New creates an [Exporter]. @@ -29,21 +33,31 @@ func New(options ...Option) (*Exporter, error) { enc.SetIndent("", "\t") } - e := Exporter{ + e := &Exporter{ timestamps: cfg.Timestamps, } e.encoder.Store(enc) - return &e, nil + var err error + e.inst, err = observ.NewInstrumentation(counter.NextExporterID()) + return e, err } // Export exports log records to writer. -func (e *Exporter) Export(ctx context.Context, records []log.Record) error { +func (e *Exporter) Export(ctx context.Context, records []log.Record) (err error) { enc := e.encoder.Load() if enc == nil { return nil } + var success int64 + if e.inst != nil { + op := e.inst.ExportLogs(ctx, int64(len(records))) + defer func() { + op.End(success, err) + }() + } + for _, record := range records { // Honor context cancellation. if err := ctx.Err(); err != nil { @@ -55,6 +69,7 @@ func (e *Exporter) Export(ctx context.Context, records []log.Record) error { if err := enc.Encode(recordJSON); err != nil { return err } + success++ } return nil } diff --git a/exporters/stdout/stdoutlog/exporter_test.go b/exporters/stdout/stdoutlog/exporter_test.go index a64f89a6240..242b86200b7 100644 --- a/exporters/stdout/stdoutlog/exporter_test.go +++ b/exporters/stdout/stdoutlog/exporter_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "sync" "testing" "time" @@ -14,12 +15,21 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/counter" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/observ" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/sdk/instrumentation" sdklog "go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/log/logtest" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.41.0" + "go.opentelemetry.io/otel/semconv/v1.41.0/otelconv" "go.opentelemetry.io/otel/trace" ) @@ -458,3 +468,216 @@ func TestValueMarshalJSON(t *testing.T) { }) } } + +func TestObservability(t *testing.T) { + tests := []struct { + name string + enabled bool + test func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) + }{ + { + name: "Disabled", + enabled: false, + test: func(t *testing.T, _ func() metricdata.ScopeMetrics) { + var buf bytes.Buffer + exporter, err := New(WithWriter(&buf)) + require.NoError(t, err) + assert.Nil(t, exporter.inst) + }, + }, + { + name: "upload success", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + ctx := t.Context() + var buf bytes.Buffer + exporter, err := New(WithWriter(&buf)) + require.NoError(t, err) + err = exporter.Export(ctx, []sdklog.Record{getRecord(time.Now())}) + require.NoError(t, err) + assertStdoutLogObservabilityMetrics(t, scopeMetrics(), 1, 1, nil) + }, + }, + { + name: "upload failed", + enabled: true, + test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) { + ctx := t.Context() + writeErr := errors.New("write failed") + exporter, err := New(WithWriter(&failingWriter{err: writeErr})) + require.NoError(t, err) + err = exporter.Export(ctx, []sdklog.Record{getRecord(time.Now())}) + require.ErrorIs(t, err, writeErr) + assertStdoutLogObservabilityMetrics(t, scopeMetrics(), 1, 0, writeErr) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if tc.enabled { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + _ = counter.SetExporterID(0) + } + provider := otel.GetMeterProvider() + t.Cleanup(func() { + otel.SetMeterProvider(provider) + }) + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + scopeMetrics := func() metricdata.ScopeMetrics { + var got metricdata.ResourceMetrics + err := r.Collect(t.Context(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + return got.ScopeMetrics[0] + } + tc.test(t, scopeMetrics) + }) + } +} + +func BenchmarkExporterObservability(b *testing.B) { + ctx := b.Context() + rec := getRecord(time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC)) + records := []sdklog.Record{rec} + + b.Run("Disabled", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "false") + var buf bytes.Buffer + exporter, err := New(WithWriter(&buf)) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + buf.Reset() + _ = exporter.Export(ctx, records) + } + }) + + setupObservability := func(b *testing.B) { + b.Helper() + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + _ = counter.SetExporterID(0) + provider := otel.GetMeterProvider() + b.Cleanup(func() { + otel.SetMeterProvider(provider) + }) + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + } + + b.Run("UploadSuccess", func(b *testing.B) { + setupObservability(b) + var buf bytes.Buffer + exporter, err := New(WithWriter(&buf)) + require.NoError(b, err) + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + buf.Reset() + _ = exporter.Export(ctx, records) + } + }) +} + +// failingWriter implements [io.Writer] and always returns an error. +type failingWriter struct { + err error +} + +func (w *failingWriter) Write([]byte) (int, error) { + return 0, w.err +} + +func stdoutObservAttrSet(err error) attribute.Set { + attrs := []attribute.KeyValue{ + semconv.OTelComponentName(observ.GetComponentName(0)), + semconv.OTelComponentTypeKey.String(observ.ComponentType), + } + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + return attribute.NewSet(attrs...) +} + +func stdoutLogInflightMetric() metricdata.Metrics { + inflight := otelconv.SDKExporterLogInflight{} + return metricdata.Metrics{ + Name: inflight.Name(), + Description: inflight.Description(), + Unit: inflight.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: stdoutObservAttrSet(nil), Value: 0}, + }, + }, + } +} + +func stdoutLogExportedMetric(success, total int64, err error) metricdata.Metrics { + dp := []metricdata.DataPoint[int64]{ + {Attributes: stdoutObservAttrSet(nil), Value: success}, + } + if err != nil { + dp = append(dp, metricdata.DataPoint[int64]{ + Attributes: stdoutObservAttrSet(err), + Value: total - success, + }) + } + exported := otelconv.SDKExporterLogExported{} + return metricdata.Metrics{ + Name: exported.Name(), + Description: exported.Description(), + Unit: exported.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dp, + }, + } +} + +func stdoutLogDurationMetric(err error) metricdata.Metrics { + duration := otelconv.SDKExporterOperationDuration{} + return metricdata.Metrics{ + Name: duration.Name(), + Description: duration.Description(), + Unit: duration.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: stdoutObservAttrSet(err)}, + }, + }, + } +} + +func assertStdoutLogObservabilityMetrics( + t *testing.T, + got metricdata.ScopeMetrics, + logs int64, + success int64, + err error, +) { + t.Helper() + wantScope := instrumentation.Scope{ + Name: observ.ScopeName, + Version: internal.Version, + SchemaURL: semconv.SchemaURL, + } + assert.Equal(t, wantScope, got.Scope) + + m := got.Metrics + require.Len(t, m, 3) + + o := metricdatatest.IgnoreTimestamp() + metricdatatest.AssertEqual(t, stdoutLogInflightMetric(), m[0], o) + metricdatatest.AssertEqual(t, stdoutLogExportedMetric(success, logs, err), m[1], o) + metricdatatest.AssertEqual(t, stdoutLogDurationMetric(err), m[2], metricdatatest.IgnoreValue(), o) +} diff --git a/exporters/stdout/stdoutlog/internal/counter/counter.go b/exporters/stdout/stdoutlog/internal/counter/counter.go new file mode 100644 index 00000000000..bbb4cd7ddc9 --- /dev/null +++ b/exporters/stdout/stdoutlog/internal/counter/counter.go @@ -0,0 +1,31 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package counter provides a simple counter for generating unique IDs. +// +// This package is used to generate unique IDs while allowing testing packages +// to reset the counter. +package counter // import "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/counter" + +import "sync/atomic" + +// exporterN is a global 0-based count of the number of exporters created. +var exporterN atomic.Int64 + +// NextExporterID returns the next unique ID for an exporter. +func NextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc +} + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} diff --git a/exporters/stdout/stdoutlog/internal/counter/counter_test.go b/exporters/stdout/stdoutlog/internal/counter/counter_test.go new file mode 100644 index 00000000000..f3e380d3325 --- /dev/null +++ b/exporters/stdout/stdoutlog/internal/counter/counter_test.go @@ -0,0 +1,65 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package counter + +import ( + "sync" + "testing" +) + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := NextExporterID() + if id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := NextExporterID() + if id != 42 { + t.Errorf("NextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := NextExporterID(); id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } +} \ No newline at end of file diff --git a/exporters/stdout/stdoutlog/internal/gen.go b/exporters/stdout/stdoutlog/internal/gen.go index 0dda966df6c..79085225bf9 100644 --- a/exporters/stdout/stdoutlog/internal/gen.go +++ b/exporters/stdout/stdoutlog/internal/gen.go @@ -7,3 +7,5 @@ package internal // import "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/ //go:generate gotmpl --body=../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/stdout/stdoutlog\" }" --out=x/x.go //go:generate gotmpl --body=../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go +//go:generate gotmpl --body=../../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/counter\" }" --out=counter/counter.go +//go:generate gotmpl --body=../../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go diff --git a/exporters/stdout/stdoutlog/internal/observ/instrumentation.go b/exporters/stdout/stdoutlog/internal/observ/instrumentation.go index 3911fbc2317..ae8d1e98fbb 100644 --- a/exporters/stdout/stdoutlog/internal/observ/instrumentation.go +++ b/exporters/stdout/stdoutlog/internal/observ/instrumentation.go @@ -24,7 +24,7 @@ import ( const ( // ScopeName is the unique name of the meter used for instrumentation. - ScopeName = "go.opentelemetry.io/otel/exporters/stdoutlog/internal/observ" + ScopeName = "go.opentelemetry.io/otel/exporters/stdout/stdoutlog/internal/observ" // ComponentType uniquely identifies the OpenTelemetry Exporter component // being instrumented. @@ -83,8 +83,7 @@ type Instrumentation struct { duration metric.Float64Histogram attrs []attribute.KeyValue - addOpt metric.AddOption - recOpt metric.RecordOption + setOpt metric.MeasurementOption } // GetComponentName returns the constant name for the exporter with the @@ -97,7 +96,7 @@ func getAttrs(id int64) []attribute.KeyValue { attrs := make([]attribute.KeyValue, 0, 2) attrs = append(attrs, semconv.OTelComponentName(GetComponentName(id)), - semconv.OTelComponentNameKey.String(ComponentType)) + semconv.OTelComponentTypeKey.String(ComponentType)) return attrs } @@ -144,8 +143,7 @@ func NewInstrumentation(id int64) (*Instrumentation, error) { return nil, err } inst.attrs = getAttrs(id) - inst.addOpt = metric.WithAttributeSet(attribute.NewSet(inst.attrs...)) - inst.recOpt = metric.WithAttributeSet(attribute.NewSet(inst.attrs...)) + inst.setOpt = metric.WithAttributeSet(attribute.NewSet(inst.attrs...)) return inst, nil } @@ -155,11 +153,12 @@ func NewInstrumentation(id int64) (*Instrumentation, error) { func (i *Instrumentation) ExportLogs(ctx context.Context, count int64) ExportOp { start := time.Now() - addOpt := get[metric.AddOption](addOptPool) - defer put(addOptPool, addOpt) - *addOpt = append(*addOpt, i.addOpt) - - i.inflight.Add(ctx, count, *addOpt...) + if i.inflight.Enabled(ctx) { + addOpt := get[metric.AddOption](addOptPool) + defer put(addOptPool, addOpt) + *addOpt = append(*addOpt, i.setOpt) + i.inflight.Add(ctx, count, *addOpt...) + } return ExportOp{ count: count, @@ -185,42 +184,47 @@ type ExportOp struct { // If err is not nil, End records failed log exports as count-success with the // error.type attribute set from err. func (e ExportOp) End(success int64, err error) { + inflightLogsEnable := e.inst.inflight.Enabled(e.ctx) + exportedLogsEnable := e.inst.exported.Enabled(e.ctx) + opDurationEnable := e.inst.duration.Enabled(e.ctx) + + if !inflightLogsEnable && !exportedLogsEnable && !opDurationEnable { + return + } + addOpt := get[metric.AddOption](addOptPool) defer put(addOptPool, addOpt) - *addOpt = append(*addOpt, e.inst.addOpt) + *addOpt = append(*addOpt, e.inst.setOpt) - e.inst.inflight.Add(e.ctx, -e.count, *addOpt...) + if inflightLogsEnable { + e.inst.inflight.Add(e.ctx, -e.count, *addOpt...) + } - e.inst.exported.Add(e.ctx, success, *addOpt...) + if exportedLogsEnable { + e.inst.exported.Add(e.ctx, success, *addOpt...) + } - if err != nil { + mOpt := e.inst.setOpt + if err != nil && (exportedLogsEnable || opDurationEnable) { // Add the error.type attribute to the attribute set. attrs := get[attribute.KeyValue](attrsPool) defer put(attrsPool, attrs) *attrs = append(*attrs, e.inst.attrs...) *attrs = append(*attrs, semconv.ErrorType(err)) - o := metric.WithAttributeSet(attribute.NewSet(*attrs...)) + mOpt = metric.WithAttributeSet(attribute.NewSet(*attrs...)) - *addOpt = append((*addOpt)[:0], o) - e.inst.exported.Add(e.ctx, e.count-success, *addOpt...) + if exportedLogsEnable { + *addOpt = append((*addOpt)[:0], mOpt) + e.inst.exported.Add(e.ctx, e.count-success, *addOpt...) + } } - recordOpt := get[metric.RecordOption](recordOptPool) - defer put(recordOptPool, recordOpt) + if opDurationEnable { + recordOpt := get[metric.RecordOption](recordOptPool) + defer put(recordOptPool, recordOpt) - *recordOpt = append(*recordOpt, e.inst.recordOption(err)) - e.inst.duration.Record(e.ctx, time.Since(e.start).Seconds(), *recordOpt...) -} - -func (i *Instrumentation) recordOption(err error) metric.RecordOption { - if err == nil { - return i.recOpt + *recordOpt = append(*recordOpt, mOpt) + e.inst.duration.Record(e.ctx, time.Since(e.start).Seconds(), *recordOpt...) } - attrs := get[attribute.KeyValue](attrsPool) - defer put(attrsPool, attrs) - - *attrs = append(*attrs, i.attrs...) - *attrs = append(*attrs, semconv.ErrorType(err)) - return metric.WithAttributeSet(attribute.NewSet(*attrs...)) } diff --git a/exporters/stdout/stdoutlog/internal/observ/instrumentation_test.go b/exporters/stdout/stdoutlog/internal/observ/instrumentation_test.go index 66a4f09dc1d..e37b8ba3887 100644 --- a/exporters/stdout/stdoutlog/internal/observ/instrumentation_test.go +++ b/exporters/stdout/stdoutlog/internal/observ/instrumentation_test.go @@ -81,7 +81,7 @@ func TestNewInstrumentation(t *testing.T) { func set(err error) attribute.Set { attrs := []attribute.KeyValue{ semconv.OTelComponentName(GetComponentName(ID)), - semconv.OTelComponentNameKey.String(ComponentType), + semconv.OTelComponentTypeKey.String(ComponentType), } if err != nil { attrs = append(attrs, semconv.ErrorType(err))