Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/common/observability/tracing/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,40 @@ import (

"github.com/go-logr/logr"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/trace"

"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
"github.com/llm-d/llm-d-inference-payload-processor/version"
)

// instrumentationName is the default OTel instrumentation scope used when no
// explicit scope is supplied to Tracer.
const instrumentationName = "llm-d-inference-payload-processor"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend shortning the name. How about:

Suggested change
const instrumentationName = "llm-d-inference-payload-processor"
const instrumentationName = "llm-d-ipp"


// Tracer returns a tracer for the given instrumentation scope, defaulting to
// "llm-d-inference-payload-processor". The build version and commit SHA are

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// "llm-d-inference-payload-processor". The build version and commit SHA are
// "llm-d-ipp". The build version and commit SHA are

// attached so every span in a trace carries consistent scope metadata.
func Tracer(scope ...string) trace.Tracer {
name := instrumentationName
if len(scope) > 0 && scope[0] != "" {
name = scope[0]
}
return otel.Tracer(
name,
trace.WithInstrumentationVersion(version.BuildRef),
trace.WithInstrumentationAttributes(
attribute.String("commit-sha", version.CommitSHA),
),
)
}

type errorHandler struct {
logger logr.Logger
}
Expand Down
88 changes: 88 additions & 0 deletions pkg/common/observability/tracing/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2026 The llm-d Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"context"
"testing"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

"github.com/llm-d/llm-d-inference-payload-processor/version"
)

func TestTracer(t *testing.T) {
const (
testBuildRef = "test-build-ref"
testCommitSHA = "test-commit-sha"
)

origBuildRef, origCommitSHA := version.BuildRef, version.CommitSHA
version.BuildRef, version.CommitSHA = testBuildRef, testCommitSHA
t.Cleanup(func() {
version.BuildRef, version.CommitSHA = origBuildRef, origCommitSHA
})

recorder := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
origTP := otel.GetTracerProvider()
otel.SetTracerProvider(tp)
t.Cleanup(func() { otel.SetTracerProvider(origTP) })

tests := []struct {
name string
scope []string
wantScope string
}{
{name: "default scope", scope: nil, wantScope: instrumentationName},
{name: "custom scope", scope: []string{"llm-d-inference-payload-processor/pkg/handlers"}, wantScope: "llm-d-inference-payload-processor/pkg/handlers"},
{name: "empty scope falls back to default", scope: []string{""}, wantScope: instrumentationName},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
recorder.Reset()

_, span := Tracer(tc.scope...).Start(context.Background(), "test-span")
span.End()

ended := recorder.Ended()
if len(ended) != 1 {
t.Fatalf("expected 1 recorded span, got %d", len(ended))
}

scope := ended[0].InstrumentationScope()
if scope.Name != tc.wantScope {
t.Errorf("scope name = %q, want %q", scope.Name, tc.wantScope)
}
if scope.Version != testBuildRef {
t.Errorf("scope version = %q, want %q", scope.Version, testBuildRef)
}

commitSHA, ok := scope.Attributes.Value(attribute.Key("commit-sha"))
if !ok {
t.Fatal("commit-sha scope attribute not set")
}
if commitSHA.AsString() != testCommitSHA {
t.Errorf("commit-sha = %q, want %q", commitSHA.AsString(), testCommitSHA)
}
})
}
}
3 changes: 3 additions & 0 deletions pkg/framework/interface/modelselector/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,8 @@ type Scorer interface {
// Picker picks the final model(s) to send the request to.
type Picker interface {
plugin.Plugin
// Pick selects a target model from the scored candidates. Implementations
// must return a non-nil *PipelineRunResult with a non-nil TargetModel;
// callers rely on this guarantee and do not nil-check the result.
Pick(ctx context.Context, cycleState *plugin.CycleState, scoredModels []*ScoredModel) *PipelineRunResult
}
29 changes: 25 additions & 4 deletions pkg/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ import (
"time"

eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/controller-runtime/pkg/log"

envoy "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/envoy"
errcommon "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/error"
logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/tracing"
datasource "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer/datasource"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling"
Expand Down Expand Up @@ -131,17 +135,34 @@ func (s *Server) runRequestPlugins(ctx context.Context, cycleState *plugin.Cycle
verboseLogger := logger.V(logutil.VERBOSE)
verboseEnabled := verboseLogger.Enabled()

// Stage span grouping the per-plugin spans under gateway.request.
tracer := tracing.Tracer(handlersTracerScope)
ctx, stageSpan := tracer.Start(ctx, "request_plugins", trace.WithSpanKind(trace.SpanKindInternal))
defer stageSpan.End()

for _, reqPlugin := range reqPlugins {
typedName := reqPlugin.TypedName()
if verboseEnabled {
verboseLogger.Info("Executing request plugin", "plugin", reqPlugin.TypedName())
verboseLogger.Info("Executing request plugin", "plugin", typedName)
}
pluginCtx, span := tracer.Start(ctx, "plugin."+typedName.Type,
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
attribute.String("llm_d.plugin.extension_point", requestPluginExtensionPoint),
attribute.String("llm_d.plugin.type", typedName.Type),
attribute.String("llm_d.plugin.name", typedName.Name),
))
before := time.Now()
err := reqPlugin.ProcessRequest(ctx, cycleState, request)
metrics.RecordPluginProcessingLatency(requestPluginExtensionPoint, reqPlugin.TypedName().Type, reqPlugin.TypedName().Name, time.Since(before))
err := reqPlugin.ProcessRequest(pluginCtx, cycleState, request)
metrics.RecordPluginProcessingLatency(requestPluginExtensionPoint, typedName.Type, typedName.Name, time.Since(before))
if err != nil {
logger.Error(err, "Failed to execute request plugin", "plugin", reqPlugin.TypedName())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
logger.Error(err, "Failed to execute request plugin", "plugin", typedName)
return err
}
span.End()
}

return nil

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that we may want to add more test coverage for these changes:

We need a test that does this:

  1. Set up a tracetest.SpanRecorder (as done in telemetry_test.go).
  2. Run runRequestPlugins with one or more fake plugins.
  3. Assert that a request_plugins stage span is created with child plugin.* spans.
  4. Assert that a failing plugin produces a span with codes.Error status.

If I'm not mistaken, I don't believe we have that currently which would leave an opportunity for regressions.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a runRequestPlugins test using a tracetest.SpanRecorder (mirroring telemetry_test.go) with fake plugins that asserts the request_plugins stage span, nested plugin.* child spans, and a codes.Error status when a plugin fails.

Expand Down
81 changes: 81 additions & 0 deletions pkg/handlers/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package handlers
import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"testing"

basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"google.golang.org/protobuf/testing/protocmp"
metricsutils "k8s.io/component-base/metrics/testutil"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -589,6 +594,82 @@ func (p *fakeRequestPlugin) ProcessRequest(ctx context.Context, _ *plugin.CycleS

var _ requesthandling.RequestProcessor = &fakeRequestPlugin{}

// TestRunRequestPlugins_Spans verifies that runRequestPlugins emits a
// "request_plugins" stage span with a child "plugin.*" span per plugin, and
// that a failing plugin's span is marked with an error status.
func TestRunRequestPlugins_Spans(t *testing.T) {
recorder := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
origTP := otel.GetTracerProvider()
otel.SetTracerProvider(tp)
t.Cleanup(func() { otel.SetTracerProvider(origTP) })

wantErr := errors.New("boom")
plugins := []requesthandling.RequestProcessor{
&fakeRequestPlugin{name: "ok", mutateFn: func(context.Context, *requesthandling.InferenceRequest) error { return nil }},
&fakeRequestPlugin{name: "boom", mutateFn: func(context.Context, *requesthandling.InferenceRequest) error { return wantErr }},
}

s := &Server{}
err := s.runRequestPlugins(context.Background(), plugin.NewCycleState(), requesthandling.NewInferenceRequest(), plugins)
if !errors.Is(err, wantErr) {
t.Fatalf("runRequestPlugins error = %v, want %v", err, wantErr)
}

ended := recorder.Ended()

// Stage span grouping the per-plugin spans.
stage := findSpan(t, ended, "request_plugins")

// Both plugins ran (the first succeeded, the second failed and aborted the
// loop), so two plugin spans should exist, each parented to the stage span.
pluginSpans := findSpans(ended, "plugin.fake")
if len(pluginSpans) != 2 {
t.Fatalf("expected 2 plugin.fake spans, got %d", len(pluginSpans))
}
for _, ps := range pluginSpans {
if ps.Parent().SpanID() != stage.SpanContext().SpanID() {
t.Errorf("plugin span parent = %v, want stage span %v", ps.Parent().SpanID(), stage.SpanContext().SpanID())
}
}

// Exactly one plugin span (the failing one) carries an error status.
var errored int
for _, ps := range pluginSpans {
if ps.Status().Code == otelcodes.Error {
errored++
if ps.Status().Description != wantErr.Error() {
t.Errorf("error span description = %q, want %q", ps.Status().Description, wantErr.Error())
}
}
}
if errored != 1 {
t.Errorf("expected exactly 1 plugin span with error status, got %d", errored)
}
}

// findSpan returns the single ended span with the given name, failing the test
// if there is not exactly one.
func findSpan(t *testing.T, spans []sdktrace.ReadOnlySpan, name string) sdktrace.ReadOnlySpan {
t.Helper()
matches := findSpans(spans, name)
if len(matches) != 1 {
t.Fatalf("expected exactly 1 span named %q, got %d", name, len(matches))
}
return matches[0]
}

// findSpans returns all ended spans with the given name.
func findSpans(spans []sdktrace.ReadOnlySpan, name string) []sdktrace.ReadOnlySpan {
var matches []sdktrace.ReadOnlySpan
for _, s := range spans {
if s.Name() == name {
matches = append(matches, s)
}
}
return matches
}

// TestHandleRequestBody_MultiPluginHeaderMutations tests the end-to-end behavior of
// HandleRequestBody when multiple request plugins set and/or remove headers.
// Each sub-test verifies the HeaderMutation in the resulting ProcessingResponse.
Expand Down
30 changes: 25 additions & 5 deletions pkg/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
"time"

eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/controller-runtime/pkg/log"

envoy "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/envoy"
logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/tracing"
datasource "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer/datasource"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling"
Expand Down Expand Up @@ -151,18 +155,34 @@ func (s *Server) runResponsePlugins(ctx context.Context, cycleState *plugin.Cycl
verboseLogger := logger.V(logutil.VERBOSE)
verboseEnabled := verboseLogger.Enabled()

var err error
// Stage span grouping the per-plugin spans under gateway.request.
tracer := tracing.Tracer(handlersTracerScope)
ctx, stageSpan := tracer.Start(ctx, "response_plugins", trace.WithSpanKind(trace.SpanKindInternal))
defer stageSpan.End()

for _, respPlugin := range respPlugins {
typedName := respPlugin.TypedName()
if verboseEnabled {
verboseLogger.Info("Executing response plugin", "plugin", respPlugin.TypedName())
verboseLogger.Info("Executing response plugin", "plugin", typedName)
}
pluginCtx, span := tracer.Start(ctx, "plugin."+typedName.Type,
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
attribute.String("llm_d.plugin.extension_point", responsePluginExtensionPoint),
attribute.String("llm_d.plugin.type", typedName.Type),
attribute.String("llm_d.plugin.name", typedName.Name),
))
before := time.Now()
err = respPlugin.ProcessResponse(ctx, cycleState, response)
metrics.RecordPluginProcessingLatency(responsePluginExtensionPoint, respPlugin.TypedName().Type, respPlugin.TypedName().Name, time.Since(before))
err := respPlugin.ProcessResponse(pluginCtx, cycleState, response)
metrics.RecordPluginProcessingLatency(responsePluginExtensionPoint, typedName.Type, typedName.Name, time.Since(before))
if err != nil {
logger.Error(err, "Failed to execute response plugin", "plugin", respPlugin.TypedName())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
logger.Error(err, "Failed to execute response plugin", "plugin", typedName)
return err
}
span.End()
}

return nil
Expand Down
17 changes: 6 additions & 11 deletions pkg/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/go-logr/logr"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -34,11 +32,11 @@ import (
envoy "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/envoy"
errcommon "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/error"
logutil "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/logging"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/common/observability/tracing"
datasource "github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/datalayer/datasource"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/plugin"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/framework/interface/requesthandling"
"github.com/llm-d/llm-d-inference-payload-processor/pkg/metrics"
"github.com/llm-d/llm-d-inference-payload-processor/version"
)

const (
Expand All @@ -47,6 +45,10 @@ const (

requestPluginExtensionPoint = "request"
responsePluginExtensionPoint = "response"

// handlersTracerScope is the OTel instrumentation scope for spans emitted by
// the request/response handlers, following the package-path naming convention.
handlersTracerScope = "llm-d-inference-payload-processor/pkg/handlers"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shmuelk with your above comment, do we need to update here as well to

Suggested change
handlersTracerScope = "llm-d-inference-payload-processor/pkg/handlers"
handlersTracerScope = "llm-d-ipp/pkg/handlers"

/cc @nirrozenbaum @shaneutt

)

func NewServer(profilePicker requesthandling.ProfilePicker, profiles map[string]*requesthandling.Profile) *Server {
Expand Down Expand Up @@ -99,14 +101,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
ctx := srv.Context()

// Start tracing span for the request
tracer := otel.Tracer(
"llm-d-inference-payload-processor/pkg/handlers",
trace.WithInstrumentationVersion(version.BuildRef),
trace.WithInstrumentationAttributes(
attribute.String("commit-sha", version.CommitSHA),
),
)
ctx, span := tracer.Start(ctx, "gateway.request", trace.WithSpanKind(trace.SpanKindServer))
ctx, span := tracing.Tracer(handlersTracerScope).Start(ctx, "gateway.request", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logger := log.FromContext(ctx)
Expand Down
Loading
Loading