Skip to content

Commit ed3813a

Browse files
committed
fix(observability): extract upstream traceparent, re-parent server span, and inject traceparent on egress
Signed-off-by: Guangya Liu <gyliu513@gmail.com>
1 parent 11891e1 commit ed3813a

4 files changed

Lines changed: 206 additions & 4 deletions

File tree

pkg/handlers/request.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"time"
2525

2626
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
27+
"go.opentelemetry.io/otel"
28+
"go.opentelemetry.io/otel/propagation"
2729
"sigs.k8s.io/controller-runtime/pkg/log"
2830

2931
envoy "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/envoy"
@@ -46,15 +48,32 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, reqCtx *RequestContex
4648
}
4749
}
4850

51+
// Inject the active trace context into the egress headers so the next
52+
// filter in the chain is parented to this span rather than the upstream one.
53+
traceCarrier := propagation.MapCarrier{}
54+
otel.GetTextMapPropagator().Inject(ctx, traceCarrier)
55+
for key, value := range traceCarrier {
56+
reqCtx.Request.SetHeader(key, value)
57+
}
58+
4959
if !headers.GetEndOfStream() {
5060
log.FromContext(ctx).V(logutil.VERBOSE).Info("captured request headers, deferring response until body arrives...")
5161
return nil
5262
}
5363
// EndOfStream means no body is expected, return HeadersResponse immediately
64+
headersResponse := &eppb.HeadersResponse{}
65+
if len(reqCtx.Request.MutatedHeaders()) > 0 || len(reqCtx.Request.RemovedHeaders()) > 0 {
66+
headersResponse.Response = &eppb.CommonResponse{
67+
HeaderMutation: &eppb.HeaderMutation{
68+
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
69+
RemoveHeaders: reqCtx.Request.RemovedHeaders(),
70+
},
71+
}
72+
}
5473
return []*eppb.ProcessingResponse{
5574
{
5675
Response: &eppb.ProcessingResponse_RequestHeaders{
57-
RequestHeaders: &eppb.HeadersResponse{},
76+
RequestHeaders: headersResponse,
5877
},
5978
},
6079
}

pkg/handlers/request_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2727
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2828
"github.com/google/go-cmp/cmp"
29+
"go.opentelemetry.io/otel/trace"
2930
"google.golang.org/protobuf/testing/protocmp"
3031
metricsutils "k8s.io/component-base/metrics/testutil"
3132
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
@@ -170,6 +171,85 @@ func TestHandleRequestHeaders(t *testing.T) {
170171
}
171172
}
172173

174+
func TestHandleRequestHeaders_InjectsTraceContext(t *testing.T) {
175+
withTraceContextPropagator(t)
176+
177+
traceID, err := trace.TraceIDFromHex(testTraceID)
178+
if err != nil {
179+
t.Fatalf("failed to parse trace ID: %v", err)
180+
}
181+
spanID, err := trace.SpanIDFromHex(testSpanID)
182+
if err != nil {
183+
t.Fatalf("failed to parse span ID: %v", err)
184+
}
185+
ctx := trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{
186+
TraceID: traceID,
187+
SpanID: spanID,
188+
TraceFlags: trace.FlagsSampled,
189+
}))
190+
191+
wantTraceparent := "00-" + testTraceID + "-" + testSpanID + "-01"
192+
193+
t.Run("records traceparent as a header mutation", func(t *testing.T) {
194+
server := newServerForTest(newTestProfiles())
195+
reqCtx := &RequestContext{Request: requesthandling.NewInferenceRequest()}
196+
197+
resp := server.HandleRequestHeaders(ctx, reqCtx, &extProcPb.HttpHeaders{
198+
Headers: &basepb.HeaderMap{Headers: []*basepb.HeaderValue{}},
199+
})
200+
if resp != nil {
201+
t.Fatalf("expected deferred response, got %v", resp)
202+
}
203+
if got := reqCtx.Request.MutatedHeaders()["traceparent"]; got != wantTraceparent {
204+
t.Errorf("mutated traceparent = %q, want %q", got, wantTraceparent)
205+
}
206+
})
207+
208+
t.Run("end of stream response carries the traceparent mutation", func(t *testing.T) {
209+
server := newServerForTest(newTestProfiles())
210+
reqCtx := &RequestContext{Request: requesthandling.NewInferenceRequest()}
211+
212+
resp := server.HandleRequestHeaders(ctx, reqCtx, &extProcPb.HttpHeaders{
213+
Headers: &basepb.HeaderMap{Headers: []*basepb.HeaderValue{}},
214+
EndOfStream: true,
215+
})
216+
if len(resp) != 1 {
217+
t.Fatalf("expected a single response, got %d", len(resp))
218+
}
219+
mutation := resp[0].GetRequestHeaders().GetResponse().GetHeaderMutation()
220+
if mutation == nil {
221+
t.Fatal("expected a header mutation in the end-of-stream response")
222+
}
223+
var got string
224+
for _, h := range mutation.GetSetHeaders() {
225+
if h.GetHeader().GetKey() == "traceparent" {
226+
got = string(h.GetHeader().GetRawValue())
227+
}
228+
}
229+
if got != wantTraceparent {
230+
t.Errorf("traceparent in header mutation = %q, want %q", got, wantTraceparent)
231+
}
232+
})
233+
234+
t.Run("upstream traceparent matching current context is not re-set", func(t *testing.T) {
235+
server := newServerForTest(newTestProfiles())
236+
reqCtx := &RequestContext{Request: requesthandling.NewInferenceRequest()}
237+
238+
resp := server.HandleRequestHeaders(ctx, reqCtx, &extProcPb.HttpHeaders{
239+
Headers: &basepb.HeaderMap{Headers: []*basepb.HeaderValue{
240+
{Key: "traceparent", RawValue: []byte(wantTraceparent)},
241+
}},
242+
EndOfStream: true,
243+
})
244+
if len(resp) != 1 {
245+
t.Fatalf("expected a single response, got %d", len(resp))
246+
}
247+
if mutation := resp[0].GetRequestHeaders().GetResponse().GetHeaderMutation(); mutation != nil {
248+
t.Errorf("expected no header mutation, got %v", mutation)
249+
}
250+
})
251+
}
252+
173253
// === Request Body Tests (built-in plugins) ===
174254

175255
func TestHandleRequestBody_BuiltInPlugins(t *testing.T) {

pkg/handlers/server.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"errors"
2222
"io"
23+
"strings"
2324
"time"
2425

2526
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2627
"go.opentelemetry.io/otel"
2728
"go.opentelemetry.io/otel/attribute"
29+
"go.opentelemetry.io/otel/propagation"
2830
"go.opentelemetry.io/otel/trace"
2931
"google.golang.org/grpc/codes"
3032
"google.golang.org/grpc/status"
@@ -81,19 +83,37 @@ type RequestContext struct {
8183
Response *requesthandling.InferenceResponse
8284
}
8385

86+
// extractTraceContext returns ctx augmented with the upstream trace context
87+
// (traceparent, tracestate, baggage) propagated via the Envoy request headers.
88+
func extractTraceContext(ctx context.Context, headers *extProcPb.HttpHeaders) context.Context {
89+
carrier := propagation.MapCarrier{}
90+
if headers != nil && headers.Headers != nil {
91+
for _, header := range headers.Headers.Headers {
92+
carrier[strings.ToLower(header.Key)] = envoy.GetHeaderValue(header)
93+
}
94+
}
95+
return otel.GetTextMapPropagator().Extract(ctx, carrier)
96+
}
97+
8498
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
8599
ctx := srv.Context()
86100

87-
// Start tracing span for the request
101+
// The server span is started when the request headers arrive, so it can be
102+
// parented to the upstream trace context they carry instead of starting an
103+
// orphan root trace.
88104
tracer := otel.Tracer(
89105
"llm-d/inference-payload-processor/extproc",
90106
trace.WithInstrumentationVersion(version.BuildRef),
91107
trace.WithInstrumentationAttributes(
92108
attribute.String("commit-sha", version.CommitSHA),
93109
),
94110
)
95-
ctx, span := tracer.Start(ctx, "gateway.request", trace.WithSpanKind(trace.SpanKindServer))
96-
defer span.End()
111+
var span trace.Span
112+
defer func() {
113+
if span != nil {
114+
span.End()
115+
}
116+
}()
97117

98118
logger := log.FromContext(ctx)
99119
loggerVerbose := logger.V(logutil.VERBOSE)
@@ -129,6 +149,10 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
129149
var err error
130150
switch v := req.Request.(type) {
131151
case *extProcPb.ProcessingRequest_RequestHeaders:
152+
if span == nil {
153+
ctx, span = tracer.Start(extractTraceContext(ctx, v.RequestHeaders),
154+
"gateway.request", trace.WithSpanKind(trace.SpanKindServer))
155+
}
132156
if requestId := envoy.ExtractHeaderValue(v, requestIdHeaderKey); len(requestId) > 0 {
133157
logger = logger.WithValues(requestIdHeaderKey, requestId)
134158
loggerVerbose = logger.V(logutil.VERBOSE)

pkg/handlers/server_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import (
2525
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2626
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2727
"github.com/google/go-cmp/cmp"
28+
"go.opentelemetry.io/otel"
29+
"go.opentelemetry.io/otel/propagation"
30+
"go.opentelemetry.io/otel/trace"
2831
"google.golang.org/protobuf/testing/protocmp"
2932

3033
envoytest "github.com/llm-d/llm-d-inference-payload-processor/pkg/common/envoy/test"
@@ -40,6 +43,82 @@ import (
4043

4144
const testProfileName = "default"
4245

46+
const (
47+
testTraceID = "0af7651916cd43dd8448eb211c80319c"
48+
testSpanID = "b7ad6b7169203331"
49+
)
50+
51+
// withTraceContextPropagator installs the W3C trace context propagator for the
52+
// duration of the test and restores the previous global propagator afterwards.
53+
func withTraceContextPropagator(t *testing.T) {
54+
t.Helper()
55+
prev := otel.GetTextMapPropagator()
56+
otel.SetTextMapPropagator(propagation.TraceContext{})
57+
t.Cleanup(func() { otel.SetTextMapPropagator(prev) })
58+
}
59+
60+
func TestExtractTraceContext(t *testing.T) {
61+
withTraceContextPropagator(t)
62+
63+
tests := []struct {
64+
name string
65+
headers *extProcPb.HttpHeaders
66+
wantValid bool
67+
}{
68+
{
69+
name: "extracts upstream traceparent",
70+
headers: &extProcPb.HttpHeaders{
71+
Headers: &basepb.HeaderMap{
72+
Headers: []*basepb.HeaderValue{
73+
{Key: "traceparent", RawValue: []byte("00-" + testTraceID + "-" + testSpanID + "-01")},
74+
},
75+
},
76+
},
77+
wantValid: true,
78+
},
79+
{
80+
name: "header key lookup is case-insensitive",
81+
headers: &extProcPb.HttpHeaders{
82+
Headers: &basepb.HeaderMap{
83+
Headers: []*basepb.HeaderValue{
84+
{Key: "Traceparent", RawValue: []byte("00-" + testTraceID + "-" + testSpanID + "-01")},
85+
},
86+
},
87+
},
88+
wantValid: true,
89+
},
90+
{
91+
name: "no traceparent leaves context untouched",
92+
headers: &extProcPb.HttpHeaders{Headers: &basepb.HeaderMap{}},
93+
wantValid: false,
94+
},
95+
{
96+
name: "nil headers leaves context untouched",
97+
headers: nil,
98+
wantValid: false,
99+
},
100+
}
101+
102+
for _, tc := range tests {
103+
t.Run(tc.name, func(t *testing.T) {
104+
ctx := extractTraceContext(context.Background(), tc.headers)
105+
sc := trace.SpanContextFromContext(ctx)
106+
if sc.IsValid() != tc.wantValid {
107+
t.Fatalf("extracted span context validity = %v, want %v", sc.IsValid(), tc.wantValid)
108+
}
109+
if !tc.wantValid {
110+
return
111+
}
112+
if got := sc.TraceID().String(); got != testTraceID {
113+
t.Errorf("trace ID = %s, want %s", got, testTraceID)
114+
}
115+
if got := sc.SpanID().String(); got != testSpanID {
116+
t.Errorf("span ID = %s, want %s", got, testSpanID)
117+
}
118+
})
119+
}
120+
}
121+
43122
func TestHandleRequestBody(t *testing.T) {
44123
ctx := logutil.NewTestLoggerIntoContext(context.Background())
45124

0 commit comments

Comments
 (0)