Skip to content

Commit 51bd506

Browse files
committed
Trace BackendClient and Workflow calls
Signed-off-by: Jeremy Drouillard <jeremy@stacklok.com>
1 parent eaa4660 commit 51bd506

3 files changed

Lines changed: 63 additions & 25 deletions

File tree

cmd/thv-operator/pkg/vmcpconfig/converter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ func (c *Converter) Convert(
9696
config.Operational = c.convertOperational(ctx, vmcp)
9797
}
9898

99-
// Convert Telemetry - pass through directly as it's the same type
100-
99+
// Convert Telemetry - pass through directly as it's the same type.
100+
// Eventually, we may want to decouple the two, but for now this is fine.
101101
config.Telemetry = vmcp.Spec.Telemetry
102102

103103
// Apply operational defaults (fills missing values)

pkg/vmcp/server/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func New(
204204
// backend calls are instrumented when they occur during workflow execution.
205205
if cfg.TelemetryProvider != nil {
206206
var err error
207-
backendClient, err = monitorBackends(context.Background(), cfg.TelemetryProvider.MeterProvider(), backends, backendClient)
207+
backendClient, err = monitorBackends(context.Background(), cfg.TelemetryProvider.MeterProvider(), cfg.TelemetryProvider.TracerProvider(), backends, backendClient)
208208
if err != nil {
209209
return nil, fmt.Errorf("failed to monitor backends: %w", err)
210210
}
@@ -224,7 +224,7 @@ func New(
224224

225225
// Decorate workflow executors with telemetry if provider is configured
226226
if cfg.TelemetryProvider != nil {
227-
workflowExecutors, err = monitorWorkflowExecutors(cfg.TelemetryProvider.MeterProvider(), workflowExecutors)
227+
workflowExecutors, err = monitorWorkflowExecutors(cfg.TelemetryProvider.MeterProvider(), cfg.TelemetryProvider.TracerProvider(), workflowExecutors)
228228
if err != nil {
229229
return nil, fmt.Errorf("failed to monitor workflow executors: %w", err)
230230
}

pkg/vmcp/server/telemetry.go

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"time"
77

88
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/codes"
910
"go.opentelemetry.io/otel/metric"
11+
"go.opentelemetry.io/otel/trace"
1012

1113
"github.com/stacklok/toolhive/pkg/vmcp"
1214
"github.com/stacklok/toolhive/pkg/vmcp/server/adapter"
@@ -18,7 +20,13 @@ const (
1820

1921
// monitorBackends decorates the backend client so it records telemetry on each method call.
2022
// It also emits a gauge for the number of backends discovered once, since the number of backends is static.
21-
func monitorBackends(ctx context.Context, meterProvider metric.MeterProvider, backends []vmcp.Backend, backendClient vmcp.BackendClient) (vmcp.BackendClient, error) {
23+
func monitorBackends(
24+
ctx context.Context,
25+
meterProvider metric.MeterProvider,
26+
tracerProvider trace.TracerProvider,
27+
backends []vmcp.Backend,
28+
backendClient vmcp.BackendClient,
29+
) (vmcp.BackendClient, error) {
2230
meter := meterProvider.Meter(instrumentationName)
2331

2432
backendCount, err := meter.Int64Gauge(
@@ -45,15 +53,16 @@ func monitorBackends(ctx context.Context, meterProvider metric.MeterProvider, ba
4553

4654
return telemetryBackendClient{
4755
backendClient: backendClient,
56+
tracer: tracerProvider.Tracer(instrumentationName),
4857
requestsTotal: requestsTotal,
4958
errorsTotal: errorsTotal,
5059
requestsDuration: requestsDuration,
5160
}, nil
52-
5361
}
5462

5563
type telemetryBackendClient struct {
5664
backendClient vmcp.BackendClient
65+
tracer trace.Tracer
5766

5867
requestsTotal metric.Int64Counter
5968
errorsTotal metric.Int64Counter
@@ -62,52 +71,68 @@ type telemetryBackendClient struct {
6271

6372
var _ vmcp.BackendClient = telemetryBackendClient{}
6473

65-
// record updates the telemetry metrics for each method on the BackendClient interface.
66-
// It returns a function that should be deferred to record the duration and error.
67-
func (t telemetryBackendClient) record(ctx context.Context, target *vmcp.BackendTarget, action string, err *error) func() {
68-
attrs := metric.WithAttributes(
74+
// record updates the metrics and creates a span for each method on the BackendClient interface.
75+
// It returns a function that should be deferred to record the duration, error, and end the span.
76+
func (t telemetryBackendClient) record(ctx context.Context, target *vmcp.BackendTarget, action string, err *error) (context.Context, func()) {
77+
// Create span attributes
78+
commonAttrs := []attribute.KeyValue{
6979
attribute.String("target.workload_id", target.WorkloadID),
7080
attribute.String("target.workload_name", target.WorkloadName),
7181
attribute.String("target.base_url", target.BaseURL),
7282
attribute.String("target.transport_type", target.TransportType),
7383
attribute.String("action", action),
84+
}
85+
86+
ctx, span := t.tracer.Start(ctx, "telemetryBackendClient."+action,
87+
// TODO: Add params and results to the span once we have reusable sanitization functions.
88+
trace.WithAttributes(commonAttrs...),
7489
)
90+
91+
metricAttrs := metric.WithAttributes(commonAttrs...)
7592
start := time.Now()
76-
t.requestsTotal.Add(ctx, 1, attrs)
93+
t.requestsTotal.Add(ctx, 1, metricAttrs)
7794

78-
return func() {
95+
return ctx, func() {
7996
duration := time.Since(start)
80-
t.requestsDuration.Record(ctx, duration.Seconds(), attrs)
81-
if err != nil {
82-
t.errorsTotal.Add(ctx, 1, attrs)
97+
t.requestsDuration.Record(ctx, duration.Seconds(), metricAttrs)
98+
if err != nil && *err != nil {
99+
t.errorsTotal.Add(ctx, 1, metricAttrs)
100+
span.RecordError(*err)
101+
span.SetStatus(codes.Error, (*err).Error())
83102
}
103+
span.End()
84104
}
85105
}
86106

87107
func (t telemetryBackendClient) CallTool(ctx context.Context, target *vmcp.BackendTarget, toolName string, arguments map[string]any) (_ map[string]any, retErr error) {
88-
defer t.record(ctx, target, "call_tool", &retErr)()
108+
ctx, done := t.record(ctx, target, "call_tool", &retErr)
109+
defer done()
89110
return t.backendClient.CallTool(ctx, target, toolName, arguments)
90111
}
91112

92113
func (t telemetryBackendClient) ReadResource(ctx context.Context, target *vmcp.BackendTarget, uri string) (_ []byte, retErr error) {
93-
defer t.record(ctx, target, "read_resource", &retErr)()
114+
ctx, done := t.record(ctx, target, "read_resource", &retErr)
115+
defer done()
94116
return t.backendClient.ReadResource(ctx, target, uri)
95117
}
96118

97119
func (t telemetryBackendClient) GetPrompt(ctx context.Context, target *vmcp.BackendTarget, name string, arguments map[string]any) (_ string, retErr error) {
98-
defer t.record(ctx, target, "get_prompt", &retErr)()
120+
ctx, done := t.record(ctx, target, "get_prompt", &retErr)
121+
defer done()
99122
return t.backendClient.GetPrompt(ctx, target, name, arguments)
100123
}
101124

102125
func (t telemetryBackendClient) ListCapabilities(ctx context.Context, target *vmcp.BackendTarget) (_ *vmcp.CapabilityList, retErr error) {
103-
defer t.record(ctx, target, "list_capabilities", &retErr)()
126+
ctx, done := t.record(ctx, target, "list_capabilities", &retErr)
127+
defer done()
104128
return t.backendClient.ListCapabilities(ctx, target)
105129
}
106130

107131
// monitorWorkflowExecutors decorates workflow executors with telemetry recording.
108-
// It wraps each executor to emit metrics for execution count, duration, and errors.
132+
// It wraps each executor to emit metrics and traces for execution count, duration, and errors.
109133
func monitorWorkflowExecutors(
110134
meterProvider metric.MeterProvider,
135+
tracerProvider trace.TracerProvider,
111136
executors map[string]adapter.WorkflowExecutor,
112137
) (map[string]adapter.WorkflowExecutor, error) {
113138
if len(executors) == 0 {
@@ -140,11 +165,14 @@ func monitorWorkflowExecutors(
140165
return nil, fmt.Errorf("failed to create workflow duration histogram: %w", err)
141166
}
142167

168+
tracer := tracerProvider.Tracer(instrumentationName)
169+
143170
monitored := make(map[string]adapter.WorkflowExecutor, len(executors))
144171
for name, executor := range executors {
145172
monitored[name] = &telemetryWorkflowExecutor{
146173
name: name,
147174
executor: executor,
175+
tracer: tracer,
148176
executionsTotal: executionsTotal,
149177
errorsTotal: errorsTotal,
150178
executionDuration: executionDuration,
@@ -158,29 +186,39 @@ func monitorWorkflowExecutors(
158186
type telemetryWorkflowExecutor struct {
159187
name string
160188
executor adapter.WorkflowExecutor
189+
tracer trace.Tracer
161190
executionsTotal metric.Int64Counter
162191
errorsTotal metric.Int64Counter
163192
executionDuration metric.Float64Histogram
164193
}
165194

166195
var _ adapter.WorkflowExecutor = (*telemetryWorkflowExecutor)(nil)
167196

168-
// ExecuteWorkflow executes the workflow and records telemetry metrics.
197+
// ExecuteWorkflow executes the workflow and records telemetry metrics and traces.
169198
func (t *telemetryWorkflowExecutor) ExecuteWorkflow(ctx context.Context, params map[string]any) (*adapter.WorkflowResult, error) {
170-
attrs := metric.WithAttributes(
199+
commonAttrs := []attribute.KeyValue{
171200
attribute.String("workflow.name", t.name),
201+
}
202+
203+
ctx, span := t.tracer.Start(ctx, "telemetryWorkflowExecutor.ExecuteWorkflow",
204+
// TODO: Add params and results to the span once we have reusable sanitization functions.
205+
trace.WithAttributes(commonAttrs...),
172206
)
207+
defer span.End()
173208

209+
metricAttrs := metric.WithAttributes(commonAttrs...)
174210
start := time.Now()
175-
t.executionsTotal.Add(ctx, 1, attrs)
211+
t.executionsTotal.Add(ctx, 1, metricAttrs)
176212

177213
result, err := t.executor.ExecuteWorkflow(ctx, params)
178214

179215
duration := time.Since(start)
180-
t.executionDuration.Record(ctx, duration.Seconds(), attrs)
216+
t.executionDuration.Record(ctx, duration.Seconds(), metricAttrs)
181217

182218
if err != nil {
183-
t.errorsTotal.Add(ctx, 1, attrs)
219+
t.errorsTotal.Add(ctx, 1, metricAttrs)
220+
span.RecordError(err)
221+
span.SetStatus(codes.Error, err.Error())
184222
}
185223

186224
return result, err

0 commit comments

Comments
 (0)