diff --git a/cmd/thv-operator/api/v1alpha1/virtualmcpserver_types.go b/cmd/thv-operator/api/v1alpha1/virtualmcpserver_types.go index 801d18f62a..70d9aa9e4f 100644 --- a/cmd/thv-operator/api/v1alpha1/virtualmcpserver_types.go +++ b/cmd/thv-operator/api/v1alpha1/virtualmcpserver_types.go @@ -54,6 +54,11 @@ type VirtualMCPServerSpec struct { // +kubebuilder:pruning:PreserveUnknownFields // +kubebuilder:validation:Type=object PodTemplateSpec *runtime.RawExtension `json:"podTemplateSpec,omitempty"` + + // Telemetry configures OpenTelemetry-based observability for the Virtual MCP server + // including distributed tracing, OTLP metrics export, and Prometheus metrics endpoint + // +optional + Telemetry *TelemetryConfig `json:"telemetry,omitempty"` } // GroupRef references an MCPGroup resource diff --git a/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go b/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go index 23ced678d5..a0adc2cf92 100644 --- a/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -2255,6 +2255,11 @@ func (in *VirtualMCPServerSpec) DeepCopyInto(out *VirtualMCPServerSpec) { *out = new(runtime.RawExtension) (*in).DeepCopyInto(*out) } + if in.Telemetry != nil { + in, out := &in.Telemetry, &out.Telemetry + *out = new(TelemetryConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VirtualMCPServerSpec. diff --git a/cmd/thv-operator/pkg/vmcpconfig/converter.go b/cmd/thv-operator/pkg/vmcpconfig/converter.go index 438b59b4f7..792631b268 100644 --- a/cmd/thv-operator/pkg/vmcpconfig/converter.go +++ b/cmd/thv-operator/pkg/vmcpconfig/converter.go @@ -15,6 +15,7 @@ import ( mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" "github.com/stacklok/toolhive/cmd/thv-operator/pkg/oidc" + "github.com/stacklok/toolhive/cmd/thv-operator/pkg/spectoconfig" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" vmcpconfig "github.com/stacklok/toolhive/pkg/vmcp/config" ) @@ -110,6 +111,8 @@ func (c *Converter) Convert( config.Operational = c.convertOperational(ctx, vmcp) } + config.Telemetry = spectoconfig.ConvertTelemetryConfig(ctx, vmcp.Spec.Telemetry, vmcp.Name) + // Apply operational defaults (fills missing values) config.EnsureOperationalDefaults() diff --git a/cmd/vmcp/README.md b/cmd/vmcp/README.md index 0868e426ac..2e8d077ea3 100644 --- a/cmd/vmcp/README.md +++ b/cmd/vmcp/README.md @@ -14,6 +14,7 @@ The Virtual MCP Server (vmcp) is a standalone binary that aggregates multiple MC - ✅ **Session Management**: MCP protocol session tracking with TTL-based cleanup - ✅ **Health Endpoints**: `/health` and `/ping` for service monitoring - ✅ **Configuration Validation**: `vmcp validate` command for config verification +- ✅ **Observability**: OpenTelemetry metrics and traces for backend operations and workflow executions ### In Progress - 🚧 **Incoming Authentication** (Issue #165): OIDC, local, anonymous authentication @@ -121,6 +122,7 @@ vmcp uses a YAML configuration file to define: 3. **Outgoing Authentication**: Virtual MCP → Backend API token exchange 4. **Tool Aggregation**: Conflict resolution and filtering strategies 5. **Operational Settings**: Timeouts, health checks, circuit breakers +6. **Telemetry**: OpenTelemetry metrics/tracing and Prometheus endpoint See [examples/vmcp-config.yaml](../../examples/vmcp-config.yaml) for a complete example. diff --git a/cmd/vmcp/app/commands.go b/cmd/vmcp/app/commands.go index eb87d87185..9fc2fc9242 100644 --- a/cmd/vmcp/app/commands.go +++ b/cmd/vmcp/app/commands.go @@ -12,6 +12,7 @@ import ( "github.com/stacklok/toolhive/pkg/env" "github.com/stacklok/toolhive/pkg/groups" "github.com/stacklok/toolhive/pkg/logger" + "github.com/stacklok/toolhive/pkg/telemetry" "github.com/stacklok/toolhive/pkg/vmcp" "github.com/stacklok/toolhive/pkg/vmcp/aggregator" "github.com/stacklok/toolhive/pkg/vmcp/auth/factory" @@ -288,7 +289,6 @@ func runServe(cmd *cobra.Command, _ []string) error { // Create router rtr := vmcprouter.NewDefaultRouter() - // Setup authentication middleware logger.Infof("Setting up incoming authentication (type: %s)", cfg.IncomingAuth.Type) authMiddleware, authInfoHandler, err := factory.NewIncomingAuthMiddleware(ctx, cfg.IncomingAuth) @@ -303,13 +303,30 @@ func runServe(cmd *cobra.Command, _ []string) error { host, _ := cmd.Flags().GetString("host") port, _ := cmd.Flags().GetInt("port") + // If telemetry is configured, create the provider. + var telemetryProvider *telemetry.Provider + if cfg.Telemetry != nil { + var err error + telemetryProvider, err = telemetry.NewProvider(ctx, *cfg.Telemetry) + if err != nil { + return fmt.Errorf("failed to create telemetry provider: %w", err) + } + defer func() { + err := telemetryProvider.Shutdown(ctx) + if err != nil { + logger.Errorf("failed to shutdown telemetry provider: %v", err) + } + }() + } + serverCfg := &vmcpserver.Config{ - Name: cfg.Name, - Version: getVersion(), - Host: host, - Port: port, - AuthMiddleware: authMiddleware, - AuthInfoHandler: authInfoHandler, + Name: cfg.Name, + Version: getVersion(), + Host: host, + Port: port, + AuthMiddleware: authMiddleware, + AuthInfoHandler: authInfoHandler, + TelemetryProvider: telemetryProvider, } // Convert composite tool configurations to workflow definitions @@ -322,7 +339,7 @@ func runServe(cmd *cobra.Command, _ []string) error { } // Create server with discovery manager, backends, and workflow definitions - srv, err := vmcpserver.New(serverCfg, rtr, backendClient, discoveryMgr, backends, workflowDefs) + srv, err := vmcpserver.New(ctx, serverCfg, rtr, backendClient, discoveryMgr, backends, workflowDefs) if err != nil { return fmt.Errorf("failed to create Virtual MCP Server: %w", err) } diff --git a/deploy/charts/operator-crds/Chart.yaml b/deploy/charts/operator-crds/Chart.yaml index c5f8fd6a55..87e93efd0e 100644 --- a/deploy/charts/operator-crds/Chart.yaml +++ b/deploy/charts/operator-crds/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: toolhive-operator-crds description: A Helm chart for installing the ToolHive Operator CRDs into Kubernetes. type: application -version: 0.0.75 +version: 0.0.76 appVersion: "0.0.1" diff --git a/deploy/charts/operator-crds/README.md b/deploy/charts/operator-crds/README.md index 4fd2a51f48..8d77792f66 100644 --- a/deploy/charts/operator-crds/README.md +++ b/deploy/charts/operator-crds/README.md @@ -1,6 +1,6 @@ # ToolHive Operator CRDs Helm Chart -![Version: 0.0.75](https://img.shields.io/badge/Version-0.0.75-informational?style=flat-square) +![Version: 0.0.76](https://img.shields.io/badge/Version-0.0.76-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) A Helm chart for installing the ToolHive Operator CRDs into Kubernetes. diff --git a/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpservers.yaml b/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpservers.yaml index 0d22818536..d8ff946527 100644 --- a/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpservers.yaml +++ b/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_virtualmcpservers.yaml @@ -733,6 +733,73 @@ spec: - NodePort - LoadBalancer type: string + telemetry: + description: |- + Telemetry configures OpenTelemetry-based observability for the Virtual MCP server + including distributed tracing, OTLP metrics export, and Prometheus metrics endpoint + properties: + openTelemetry: + description: OpenTelemetry defines OpenTelemetry configuration + properties: + enabled: + default: false + description: Enabled controls whether OpenTelemetry is enabled + type: boolean + endpoint: + description: Endpoint is the OTLP endpoint URL for tracing + and metrics + type: string + headers: + description: |- + Headers contains authentication headers for the OTLP endpoint + Specified as key=value pairs + items: + type: string + type: array + insecure: + default: false + description: Insecure indicates whether to use HTTP instead + of HTTPS for the OTLP endpoint + type: boolean + metrics: + description: Metrics defines OpenTelemetry metrics-specific + configuration + properties: + enabled: + default: false + description: Enabled controls whether OTLP metrics are + sent + type: boolean + type: object + serviceName: + description: |- + ServiceName is the service name for telemetry + If not specified, defaults to the MCPServer name + type: string + tracing: + description: Tracing defines OpenTelemetry tracing configuration + properties: + enabled: + default: false + description: Enabled controls whether OTLP tracing is + sent + type: boolean + samplingRate: + default: "0.05" + description: SamplingRate is the trace sampling rate (0.0-1.0) + type: string + type: object + type: object + prometheus: + description: Prometheus defines Prometheus-specific configuration + properties: + enabled: + default: false + description: Enabled controls whether Prometheus metrics endpoint + is exposed + type: boolean + type: object + type: object required: - groupRef - incomingAuth diff --git a/docs/observability.md b/docs/observability.md index 83b632516d..c765dccc44 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -84,3 +84,10 @@ The telemetry middleware: This provides end-to-end visibility across the entire request lifecycle while maintaining the modular architecture of ToolHive's middleware system. + +## Virtual MCP Server Telemetry + +For observability in the Virtual MCP Server (vMCP), including backend request +metrics, workflow execution telemetry, and distributed tracing, see the +dedicated [Virtual MCP Server Observability](./operator/virtualmcpserver-observability.md) +documentation. diff --git a/docs/operator/crd-api.md b/docs/operator/crd-api.md index 25323a7054..e9d5af6b8f 100644 --- a/docs/operator/crd-api.md +++ b/docs/operator/crd-api.md @@ -1697,6 +1697,7 @@ TelemetryConfig defines observability configuration for the MCP server _Appears in:_ - [MCPRemoteProxySpec](#mcpremoteproxyspec) - [MCPServerSpec](#mcpserverspec) +- [VirtualMCPServerSpec](#virtualmcpserverspec) | Field | Description | Default | Validation | | --- | --- | --- | --- | @@ -1975,6 +1976,7 @@ _Appears in:_ | `operational` _[OperationalConfig](#operationalconfig)_ | Operational defines operational settings like timeouts and health checks | | | | `serviceType` _string_ | ServiceType specifies the Kubernetes service type for the Virtual MCP server | ClusterIP | Enum: [ClusterIP NodePort LoadBalancer]
| | `podTemplateSpec` _[RawExtension](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#rawextension-runtime-pkg)_ | PodTemplateSpec defines the pod template to use for the Virtual MCP server
This allows for customizing the pod configuration beyond what is provided by the other fields.
Note that to modify the specific container the Virtual MCP server runs in, you must specify
the 'vmcp' container name in the PodTemplateSpec.
This field accepts a PodTemplateSpec object as JSON/YAML. | | Type: object
| +| `telemetry` _[TelemetryConfig](#telemetryconfig)_ | Telemetry configures OpenTelemetry-based observability for the Virtual MCP server
including distributed tracing, OTLP metrics export, and Prometheus metrics endpoint | | | #### VirtualMCPServerStatus diff --git a/docs/operator/virtualmcpserver-api.md b/docs/operator/virtualmcpserver-api.md index 59582d9d05..14a70f2397 100644 --- a/docs/operator/virtualmcpserver-api.md +++ b/docs/operator/virtualmcpserver-api.md @@ -290,6 +290,47 @@ spec: cpu: "1000m" ``` +### `.spec.telemetry` (optional) + +Configures OpenTelemetry-based observability for the Virtual MCP server, including distributed tracing, OTLP metrics export, and Prometheus metrics endpoint. Uses the same configuration structure as `MCPServer.spec.telemetry`. + +**Type**: `TelemetryConfig` + +**Fields**: +- `openTelemetry` (OpenTelemetryConfig, optional): OpenTelemetry configuration + - `enabled` (boolean): Controls whether OpenTelemetry is enabled + - `endpoint` (string): OTLP endpoint URL for tracing and metrics + - `serviceName` (string): Service name for telemetry (defaults to VirtualMCPServer name) + - `headers` ([]string): Authentication headers for OTLP endpoint (key=value format) + - `insecure` (boolean): Use HTTP instead of HTTPS for the OTLP endpoint + - `metrics` (OpenTelemetryMetricsConfig, optional): Metrics-specific configuration + - `enabled` (boolean): Controls whether OTLP metrics are sent + - `tracing` (OpenTelemetryTracingConfig, optional): Tracing-specific configuration + - `enabled` (boolean): Controls whether OTLP tracing is sent + - `samplingRate` (string): Trace sampling rate (0.0-1.0, default: "0.05") +- `prometheus` (PrometheusConfig, optional): Prometheus-specific configuration + - `enabled` (boolean): Controls whether Prometheus metrics endpoint is exposed at /metrics + +**Example**: +```yaml +spec: + telemetry: + openTelemetry: + enabled: true + endpoint: "otel-collector:4317" + serviceName: "my-vmcp" + insecure: true + tracing: + enabled: true + samplingRate: "0.1" + metrics: + enabled: true + prometheus: + enabled: true +``` + +For details on what metrics and traces are emitted, see the [Virtual MCP Server Observability](./virtualmcpserver-observability.md) documentation. + ## Status Fields ### `.status.conditions` @@ -451,6 +492,19 @@ spec: failureThreshold: 5 timeout: 60s + # Observability + telemetry: + openTelemetry: + enabled: true + endpoint: "otel-collector:4317" + tracing: + enabled: true + samplingRate: "0.1" + metrics: + enabled: true + prometheus: + enabled: true + status: phase: Ready message: "Virtual MCP serving 3 backends with 15 tools" @@ -518,4 +572,5 @@ The VirtualMCPServer CRD includes comprehensive validation: - [MCPServer](./mcpserver-api.md): Individual MCP server instances - [MCPExternalAuthConfig](./mcpexternalauthconfig-api.md): External authentication configuration - [MCPToolConfig](./toolconfig-api.md): Tool filtering and renaming configuration +- [Virtual MCP Server Observability](./virtualmcpserver-observability.md): Telemetry and metrics documentation - [Virtual MCP Proposal](../proposals/THV-2106-virtual-mcp-server.md): Complete design proposal diff --git a/docs/operator/virtualmcpserver-observability.md b/docs/operator/virtualmcpserver-observability.md new file mode 100644 index 0000000000..048c1a7d6d --- /dev/null +++ b/docs/operator/virtualmcpserver-observability.md @@ -0,0 +1,82 @@ +# Virtual MCP Server Observability + +This document describes the observability for the Virtual MCP +Server (vMCP), which aggregates multiple backend MCP servers into a unified +interface. The vMCP provides OpenTelemetry-based instrumentation for monitoring +backend operations and composite tool workflow executions. + +For general ToolHive observability concepts and proxy runner telemetry, see the +main [Observability and Telemetry](../observability.md) documentation. + +## Overview + +The vMCP telemetry provides visibility into: + +1. **Backend operations**: Track requests to individual backend MCP servers + including tool calls, resource reads, prompt retrieval, and capability listing +2. **Workflow executions**: Monitor composite tool workflow performance and errors +3. **Distributed tracing**: Correlate requests across the vMCP and its backends + +The vMCP uses a decorator pattern to wrap backend clients and workflow executors +with telemetry instrumentation. This approach provides consistent metrics and +tracing without modifying the core business logic. + +The implementation of both metrics and traces can be found in `pkg/vmcp/server/telemetry.go`. + +## Metrics + +The vMCP emits metrics for backend operations and workflow executions. All +metrics use the `toolhive_vmcp_` prefix. + +**Backend metrics** track requests to individual backend MCP servers, including +request counts, error counts, and request duration histograms. These metrics +include attributes identifying the target backend (workload ID, name, URL, +transport type) and the action being performed (tool call, resource read, etc.). + +**Workflow metrics** track composite tool workflow executions, including +execution counts, error counts, and duration histograms. These metrics include +the workflow name as an attribute. + +## Distributed Tracing + +The vMCP creates spans for each individual backend operation as well as workflow executions, enabling the attribution of workflow execution errors or latency to specific tool calls. + + +## Configuration + +Configure telemetry in the `VirtualMCPServer` resource using the `spec.telemetry` +field. The telemetry configuration uses the same `TelemetryConfig` type as +`MCPServer`, providing a consistent configuration experience across resources. + +```yaml +apiVersion: toolhive.stacklok.dev/v1alpha1 +kind: VirtualMCPServer +metadata: + name: my-vmcp +spec: + groupRef: + name: my-group + incomingAuth: + type: anonymous + telemetry: + openTelemetry: + enabled: true + endpoint: "otel-collector:4317" + serviceName: "my-vmcp" + insecure: true + tracing: + enabled: true + samplingRate: "0.1" + metrics: + enabled: true + prometheus: + enabled: true +``` + +See the [VirtualMCPServer API reference](./virtualmcpserver-api.md) for complete +CRD documentation. + +## Related Documentation + +- [Observability and Telemetry](../observability.md) - Main ToolHive observability documentation +- [VirtualMCPServer API Reference](./virtualmcpserver-api.md) - Complete CRD specification diff --git a/examples/vmcp-config.yaml b/examples/vmcp-config.yaml index a450aacfb5..43a332a193 100644 --- a/examples/vmcp-config.yaml +++ b/examples/vmcp-config.yaml @@ -187,3 +187,14 @@ operational: # environment: "{{.steps.confirm_deploy.content.environment}}" # depends_on: ["confirm_deploy"] # condition: "{{.steps.confirm_deploy.action == 'accept'}}" + +# ===== OBSERVABILITY ===== +# OpenTelemetry-based metrics and tracing for backend operations and workflows +telemetry: + endpoint: "localhost:4317" # OTLP collector endpoint + servicename: "engineering-vmcp" + tracingenabled: true + metricsenabled: true + samplingrate: 0.1 # 10% sampling + insecure: true # Use HTTP instead of HTTPS + enableprometheusmetricspath: true # Expose /metrics endpoint diff --git a/pkg/telemetry/config.go b/pkg/telemetry/config.go index 8b6c7d276a..1220d4b049 100644 --- a/pkg/telemetry/config.go +++ b/pkg/telemetry/config.go @@ -200,7 +200,7 @@ func setGlobalProvidersAndReturn(telemetryProviders *providers.CompositeProvider // Middleware returns an HTTP middleware that instruments requests with OpenTelemetry. // serverName is the name of the MCP server (e.g., "github", "fetch") -// transport is the backend transport type ("stdio" or "sse") +// transport is the backend transport type ("stdio", "sse", or "streamable-http"). func (p *Provider) Middleware(serverName, transport string) types.MiddlewareFunction { return NewHTTPMiddleware(p.config, p.tracerProvider, p.meterProvider, serverName, transport) } diff --git a/pkg/telemetry/middleware.go b/pkg/telemetry/middleware.go index 1dcdc61cf5..f59c095167 100644 --- a/pkg/telemetry/middleware.go +++ b/pkg/telemetry/middleware.go @@ -46,7 +46,7 @@ type HTTPMiddleware struct { // NewHTTPMiddleware creates a new HTTP middleware for OpenTelemetry instrumentation. // serverName is the name of the MCP server (e.g., "github", "fetch") -// transport is the backend transport type ("stdio" or "sse") +// transport is the backend transport type ("stdio", "sse", or "streamable-http"). func NewHTTPMiddleware( config Config, tracerProvider trace.TracerProvider, diff --git a/pkg/vmcp/config/config.go b/pkg/vmcp/config/config.go index 8af95847e2..a1c3013bff 100644 --- a/pkg/vmcp/config/config.go +++ b/pkg/vmcp/config/config.go @@ -10,6 +10,7 @@ import ( "fmt" "time" + "github.com/stacklok/toolhive/pkg/telemetry" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" ) @@ -87,6 +88,9 @@ type Config struct { // Metadata stores additional configuration metadata. Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata,omitempty"` + + // Telemetry configures telemetry settings. + Telemetry *telemetry.Config `json:"telemetry,omitempty" yaml:"telemetry,omitempty"` } // IncomingAuthConfig configures client authentication to the virtual MCP server. diff --git a/pkg/vmcp/config/yaml_loader.go b/pkg/vmcp/config/yaml_loader.go index ee6830b4cc..ee51a5efc5 100644 --- a/pkg/vmcp/config/yaml_loader.go +++ b/pkg/vmcp/config/yaml_loader.go @@ -8,6 +8,7 @@ import ( "gopkg.in/yaml.v3" "github.com/stacklok/toolhive/pkg/env" + "github.com/stacklok/toolhive/pkg/telemetry" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" ) @@ -58,6 +59,8 @@ type rawConfig struct { Operational *OperationalConfig `yaml:"operational"` CompositeTools []*rawCompositeTool `yaml:"composite_tools"` + + Telemetry *telemetry.Config `yaml:"telemetry"` } type rawIncomingAuth struct { @@ -214,6 +217,8 @@ func (l *YAMLLoader) transformToConfig(raw *rawConfig) (*Config, error) { cfg.CompositeTools = compositeTools } + cfg.Telemetry = raw.Telemetry + // Apply operational defaults (fills missing values) cfg.EnsureOperationalDefaults() diff --git a/pkg/vmcp/config/yaml_loader_transform_test.go b/pkg/vmcp/config/yaml_loader_transform_test.go index ba401fc517..8b1a579c04 100644 --- a/pkg/vmcp/config/yaml_loader_transform_test.go +++ b/pkg/vmcp/config/yaml_loader_transform_test.go @@ -1,6 +1,7 @@ package config import ( + "os" "testing" "time" @@ -9,6 +10,7 @@ import ( "go.uber.org/mock/gomock" "github.com/stacklok/toolhive/pkg/env/mocks" + "github.com/stacklok/toolhive/pkg/telemetry" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" ) @@ -768,3 +770,65 @@ func TestYAMLLoader_transformCompositeTools_WithOutputConfig(t *testing.T) { }) } } + +// TestYAMLLoader_transformTelemetryConfig tests that telemetry configuration is preserved +// when transforming from raw YAML to the final Config struct. +func TestYAMLLoader_transformTelemetryConfig(t *testing.T) { + t.Parallel() + + // Note: yaml.v3 uses lowercase field names by default (no yaml tags on telemetry.Config) + yamlContent := ` +name: telemetry-test +telemetry: + endpoint: "localhost:4318" + servicename: "test-service" + serviceversion: "1.2.3" + tracingenabled: true + metricsenabled: true + samplingrate: 0.75 + insecure: true + enableprometheusmetricspath: true + headers: + Authorization: "Bearer token123" + X-Custom-Header: "custom-value" + environmentvariables: + - "NODE_ENV" + - "DEPLOYMENT_ENV" +` + + // Write temp file + tmpFile, err := os.CreateTemp("", "telemetry-test-*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(yamlContent) + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + // Load config + ctrl := gomock.NewController(t) + mockEnv := mocks.NewMockReader(ctrl) + mockEnv.EXPECT().Getenv(gomock.Any()).Return("").AnyTimes() + + loader := NewYAMLLoader(tmpFile.Name(), mockEnv) + cfg, err := loader.Load() + require.NoError(t, err) + + // Verify telemetry config is fully preserved + require.NotNil(t, cfg.Telemetry, "Telemetry config should not be nil") + + require.Equal(t, telemetry.Config{ + Endpoint: "localhost:4318", + ServiceName: "test-service", + ServiceVersion: "1.2.3", + TracingEnabled: true, + MetricsEnabled: true, + SamplingRate: 0.75, + Insecure: true, + EnablePrometheusMetricsPath: true, + Headers: map[string]string{"Authorization": "Bearer token123", "X-Custom-Header": "custom-value"}, + EnvironmentVariables: []string{"NODE_ENV", "DEPLOYMENT_ENV"}, + CustomAttributes: nil, + }, *cfg.Telemetry) + +} diff --git a/pkg/vmcp/server/health_test.go b/pkg/vmcp/server/health_test.go index 812b11b038..50ec721bf7 100644 --- a/pkg/vmcp/server/health_test.go +++ b/pkg/vmcp/server/health_test.go @@ -59,7 +59,10 @@ func createTestServer(t *testing.T) *server.Server { // Mock Stop to be called during server shutdown mockDiscoveryMgr.EXPECT().Stop().AnyTimes() - srv, err := server.New(&server.Config{ + // Create context for server + ctx, cancel := context.WithCancel(t.Context()) + + srv, err := server.New(ctx, &server.Config{ Name: "test-vmcp", Version: "1.0.0", Host: "127.0.0.1", @@ -68,9 +71,7 @@ func createTestServer(t *testing.T) *server.Server { require.NoError(t, err) // Start server in background - ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) - errCh := make(chan error, 1) go func() { if err := srv.Start(ctx); err != nil { @@ -175,7 +176,7 @@ func TestServer_SessionManager(t *testing.T) { mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) rt := router.NewDefaultRouter() - srv, err := server.New(&server.Config{ + srv, err := server.New(context.Background(), &server.Config{ Name: "test-vmcp", Version: "1.0.0", SessionTTL: 10 * time.Minute, @@ -198,7 +199,7 @@ func TestServer_SessionManager(t *testing.T) { rt := router.NewDefaultRouter() customTTL := 15 * time.Minute - srv, err := server.New(&server.Config{ + srv, err := server.New(context.Background(), &server.Config{ Name: "test-vmcp", Version: "1.0.0", SessionTTL: customTTL, diff --git a/pkg/vmcp/server/integration_test.go b/pkg/vmcp/server/integration_test.go index f96f185a97..7d8af3e989 100644 --- a/pkg/vmcp/server/integration_test.go +++ b/pkg/vmcp/server/integration_test.go @@ -192,7 +192,7 @@ func TestIntegration_AggregatorToRouterToServer(t *testing.T) { // Mock Stop to be called during server shutdown mockDiscoveryMgr.EXPECT().Stop().Times(1) - srv, err := server.New(&server.Config{ + srv, err := server.New(ctx, &server.Config{ Name: "test-vmcp", Version: "1.0.0", Host: "127.0.0.1", @@ -322,7 +322,7 @@ func TestIntegration_HTTPRequestFlowWithRoutingTable(t *testing.T) { } // Create and start server - srv, err := server.New(&server.Config{ + srv, err := server.New(ctx, &server.Config{ Name: "test-vmcp", Version: "1.0.0", Host: "127.0.0.1", diff --git a/pkg/vmcp/server/server.go b/pkg/vmcp/server/server.go index 63d1bbafd9..6ce684cc87 100644 --- a/pkg/vmcp/server/server.go +++ b/pkg/vmcp/server/server.go @@ -19,6 +19,7 @@ import ( "github.com/stacklok/toolhive/pkg/auth" "github.com/stacklok/toolhive/pkg/logger" + "github.com/stacklok/toolhive/pkg/telemetry" transportsession "github.com/stacklok/toolhive/pkg/transport/session" "github.com/stacklok/toolhive/pkg/vmcp" "github.com/stacklok/toolhive/pkg/vmcp/aggregator" @@ -82,6 +83,10 @@ type Config struct { // AuthInfoHandler is the optional handler for /.well-known/oauth-protected-resource endpoint. // Exposes OIDC discovery information about the protected resource. AuthInfoHandler http.Handler + + // TelemetryProvider is the optional telemetry provider. + // If nil, no telemetry is recorded. + TelemetryProvider *telemetry.Provider } // Server is the Virtual MCP Server that aggregates multiple backends. @@ -147,6 +152,7 @@ type Server struct { // //nolint:gocyclo // Complexity from hook logic is acceptable func New( + ctx context.Context, cfg *Config, rt router.Router, backendClient vmcp.BackendClient, @@ -194,6 +200,23 @@ func New( // This provides SDK-agnostic elicitation with security validation elicitationHandler := composer.NewDefaultElicitationHandler(sdkElicitationRequester) + // Decorate backend client with telemetry if provider is configured + // This must happen BEFORE creating the workflow engine so that workflow + // backend calls are instrumented when they occur during workflow execution. + if cfg.TelemetryProvider != nil { + var err error + backendClient, err = monitorBackends( + ctx, + cfg.TelemetryProvider.MeterProvider(), + cfg.TelemetryProvider.TracerProvider(), + backends, + backendClient, + ) + if err != nil { + return nil, fmt.Errorf("failed to monitor backends: %w", err) + } + } + // Create workflow engine (composer) for executing composite tools // The composer orchestrates multi-step workflows across backends // Use in-memory state store with 5-minute cleanup interval and 1-hour max age for completed workflows @@ -206,6 +229,18 @@ func New( return nil, fmt.Errorf("workflow validation failed: %w", err) } + // Decorate workflow executors with telemetry if provider is configured + if cfg.TelemetryProvider != nil { + workflowExecutors, err = monitorWorkflowExecutors( + cfg.TelemetryProvider.MeterProvider(), + cfg.TelemetryProvider.TracerProvider(), + workflowExecutors, + ) + if err != nil { + return nil, fmt.Errorf("failed to monitor workflow executors: %w", err) + } + } + // Create session manager with VMCPSession factory // This enables type-safe access to routing tables while maintaining session lifecycle management sessionManager := transportsession.NewManager(cfg.SessionTTL, vmcpsession.VMCPSessionFactory()) @@ -334,6 +369,16 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/health", s.handleHealth) mux.HandleFunc("/ping", s.handleHealth) + // Optional Prometheus metrics endpoint (unauthenticated) + if s.config.TelemetryProvider != nil { + if prometheusHandler := s.config.TelemetryProvider.PrometheusHandler(); prometheusHandler != nil { + mux.Handle("/metrics", prometheusHandler) + logger.Info("Prometheus metrics endpoint enabled at /metrics") + } else { + logger.Warn("Prometheus metrics endpoint is not enabled, but telemetry provider is configured") + } + } + // Optional .well-known discovery endpoints (unauthenticated, RFC 9728 compliant) // Handles /.well-known/oauth-protected-resource and subpaths (e.g., /mcp) if wellKnownHandler := auth.NewWellKnownHandler(s.config.AuthInfoHandler); wellKnownHandler != nil { @@ -341,9 +386,14 @@ func (s *Server) Start(ctx context.Context) error { logger.Info("RFC 9728 OAuth discovery endpoints enabled at /.well-known/") } - // MCP endpoint - apply middleware chain: auth → discovery + // MCP endpoint - apply middleware chain: auth → discovery → telemetry var mcpHandler http.Handler = streamableServer + if s.config.TelemetryProvider != nil { + mcpHandler = s.config.TelemetryProvider.Middleware(s.config.Name, "streamable-http")(mcpHandler) + logger.Info("Telemetry middleware enabled for MCP endpoints") + } + // Apply discovery middleware (runs after auth middleware) // Discovery middleware performs per-request capability aggregation with user context // Pass sessionManager to enable session-based capability retrieval for subsequent requests diff --git a/pkg/vmcp/server/server_test.go b/pkg/vmcp/server/server_test.go index 0a0a74422e..e17ee43450 100644 --- a/pkg/vmcp/server/server_test.go +++ b/pkg/vmcp/server/server_test.go @@ -76,7 +76,7 @@ func TestNew(t *testing.T) { mockBackendClient := mocks.NewMockBackendClient(ctrl) mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) - s, err := server.New(tt.config, mockRouter, mockBackendClient, mockDiscoveryMgr, []vmcp.Backend{}, nil) + s, err := server.New(context.Background(), tt.config, mockRouter, mockBackendClient, mockDiscoveryMgr, []vmcp.Backend{}, nil) require.NoError(t, err) require.NotNil(t, s) @@ -133,7 +133,7 @@ func TestServer_Address(t *testing.T) { mockBackendClient := mocks.NewMockBackendClient(ctrl) mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) - s, err := server.New(tt.config, mockRouter, mockBackendClient, mockDiscoveryMgr, []vmcp.Backend{}, nil) + s, err := server.New(context.Background(), tt.config, mockRouter, mockBackendClient, mockDiscoveryMgr, []vmcp.Backend{}, nil) require.NoError(t, err) addr := s.Address() assert.Equal(t, tt.expected, addr) @@ -155,7 +155,7 @@ func TestServer_Stop(t *testing.T) { mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) mockDiscoveryMgr.EXPECT().Stop().Times(1) - s, err := server.New(&server.Config{}, mockRouter, mockBackendClient, mockDiscoveryMgr, []vmcp.Backend{}, nil) + s, err := server.New(context.Background(), &server.Config{}, mockRouter, mockBackendClient, mockDiscoveryMgr, []vmcp.Backend{}, nil) require.NoError(t, err) err = s.Stop(context.Background()) require.NoError(t, err) diff --git a/pkg/vmcp/server/telemetry.go b/pkg/vmcp/server/telemetry.go new file mode 100644 index 0000000000..64dbe218b4 --- /dev/null +++ b/pkg/vmcp/server/telemetry.go @@ -0,0 +1,244 @@ +package server + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "github.com/stacklok/toolhive/pkg/vmcp" + "github.com/stacklok/toolhive/pkg/vmcp/server/adapter" +) + +const ( + instrumentationName = "github.com/stacklok/toolhive/pkg/vmcp" +) + +// monitorBackends decorates the backend client so it records telemetry on each method call. +// It also emits a gauge for the number of backends discovered once, since the number of backends is static. +func monitorBackends( + ctx context.Context, + meterProvider metric.MeterProvider, + tracerProvider trace.TracerProvider, + backends []vmcp.Backend, + backendClient vmcp.BackendClient, +) (vmcp.BackendClient, error) { + meter := meterProvider.Meter(instrumentationName) + + backendCount, err := meter.Int64Gauge( + "toolhive_vmcp_backends_discovered", + metric.WithDescription("Number of backends discovered"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create backend count gauge: %w", err) + } + backendCount.Record(ctx, int64(len(backends))) + + requestsTotal, err := meter.Int64Counter( + "toolhive_vmcp_backend_requests", + metric.WithDescription("Total number of requests per backend")) + if err != nil { + return nil, fmt.Errorf("failed to create requests total counter: %w", err) + } + errorsTotal, err := meter.Int64Counter( + "toolhive_vmcp_backend_errors", + metric.WithDescription("Total number of errors per backend")) + if err != nil { + return nil, fmt.Errorf("failed to create errors total counter: %w", err) + } + requestsDuration, err := meter.Float64Histogram( + "toolhive_vmcp_backend_requests_duration", + metric.WithDescription("Duration of requests in seconds per backend"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create requests duration histogram: %w", err) + } + + return telemetryBackendClient{ + backendClient: backendClient, + tracer: tracerProvider.Tracer(instrumentationName), + requestsTotal: requestsTotal, + errorsTotal: errorsTotal, + requestsDuration: requestsDuration, + }, nil +} + +type telemetryBackendClient struct { + backendClient vmcp.BackendClient + tracer trace.Tracer + + requestsTotal metric.Int64Counter + errorsTotal metric.Int64Counter + requestsDuration metric.Float64Histogram +} + +var _ vmcp.BackendClient = telemetryBackendClient{} + +// record updates the metrics and creates a span for each method on the BackendClient interface. +// It returns a function that should be deferred to record the duration, error, and end the span. +func (t telemetryBackendClient) record( + ctx context.Context, target *vmcp.BackendTarget, action string, err *error, +) (context.Context, func()) { + // Create span attributes + commonAttrs := []attribute.KeyValue{ + attribute.String("target.workload_id", target.WorkloadID), + attribute.String("target.workload_name", target.WorkloadName), + attribute.String("target.base_url", target.BaseURL), + attribute.String("target.transport_type", target.TransportType), + attribute.String("action", action), + } + + ctx, span := t.tracer.Start(ctx, "telemetryBackendClient."+action, + // TODO: Add params and results to the span once we have reusable sanitization functions. + trace.WithAttributes(commonAttrs...), + ) + + metricAttrs := metric.WithAttributes(commonAttrs...) + start := time.Now() + t.requestsTotal.Add(ctx, 1, metricAttrs) + + return ctx, func() { + duration := time.Since(start) + t.requestsDuration.Record(ctx, duration.Seconds(), metricAttrs) + if err != nil && *err != nil { + t.errorsTotal.Add(ctx, 1, metricAttrs) + span.RecordError(*err) + span.SetStatus(codes.Error, (*err).Error()) + } + span.End() + } +} + +func (t telemetryBackendClient) CallTool( + ctx context.Context, target *vmcp.BackendTarget, toolName string, arguments map[string]any, +) (_ map[string]any, retErr error) { + ctx, done := t.record(ctx, target, "call_tool", &retErr) + defer done() + return t.backendClient.CallTool(ctx, target, toolName, arguments) +} + +func (t telemetryBackendClient) ReadResource( + ctx context.Context, target *vmcp.BackendTarget, uri string, +) (_ []byte, retErr error) { + ctx, done := t.record(ctx, target, "read_resource", &retErr) + defer done() + return t.backendClient.ReadResource(ctx, target, uri) +} + +func (t telemetryBackendClient) GetPrompt( + ctx context.Context, target *vmcp.BackendTarget, name string, arguments map[string]any, +) (_ string, retErr error) { + ctx, done := t.record(ctx, target, "get_prompt", &retErr) + defer done() + return t.backendClient.GetPrompt(ctx, target, name, arguments) +} + +func (t telemetryBackendClient) ListCapabilities( + ctx context.Context, target *vmcp.BackendTarget, +) (_ *vmcp.CapabilityList, retErr error) { + ctx, done := t.record(ctx, target, "list_capabilities", &retErr) + defer done() + return t.backendClient.ListCapabilities(ctx, target) +} + +// monitorWorkflowExecutors decorates workflow executors with telemetry recording. +// It wraps each executor to emit metrics and traces for execution count, duration, and errors. +func monitorWorkflowExecutors( + meterProvider metric.MeterProvider, + tracerProvider trace.TracerProvider, + executors map[string]adapter.WorkflowExecutor, +) (map[string]adapter.WorkflowExecutor, error) { + if len(executors) == 0 { + return executors, nil + } + + meter := meterProvider.Meter(instrumentationName) + + executionsTotal, err := meter.Int64Counter( + "toolhive_vmcp_workflow_executions", + metric.WithDescription("Total number of workflow executions"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create workflow executions counter: %w", err) + } + + errorsTotal, err := meter.Int64Counter( + "toolhive_vmcp_workflow_errors", + metric.WithDescription("Total number of workflow execution errors"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create workflow errors counter: %w", err) + } + + executionDuration, err := meter.Float64Histogram( + "toolhive_vmcp_workflow_duration", + metric.WithDescription("Duration of workflow executions in seconds"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create workflow duration histogram: %w", err) + } + + tracer := tracerProvider.Tracer(instrumentationName) + + monitored := make(map[string]adapter.WorkflowExecutor, len(executors)) + for name, executor := range executors { + monitored[name] = &telemetryWorkflowExecutor{ + name: name, + executor: executor, + tracer: tracer, + executionsTotal: executionsTotal, + errorsTotal: errorsTotal, + executionDuration: executionDuration, + } + } + + return monitored, nil +} + +// telemetryWorkflowExecutor wraps a WorkflowExecutor with telemetry recording. +type telemetryWorkflowExecutor struct { + name string + executor adapter.WorkflowExecutor + tracer trace.Tracer + executionsTotal metric.Int64Counter + errorsTotal metric.Int64Counter + executionDuration metric.Float64Histogram +} + +var _ adapter.WorkflowExecutor = (*telemetryWorkflowExecutor)(nil) + +// ExecuteWorkflow executes the workflow and records telemetry metrics and traces. +func (t *telemetryWorkflowExecutor) ExecuteWorkflow(ctx context.Context, params map[string]any) (*adapter.WorkflowResult, error) { + commonAttrs := []attribute.KeyValue{ + attribute.String("workflow.name", t.name), + } + + ctx, span := t.tracer.Start(ctx, "telemetryWorkflowExecutor.ExecuteWorkflow", + // TODO: Add params and results to the span once we have reusable sanitization functions. + trace.WithAttributes(commonAttrs...), + ) + defer span.End() + + metricAttrs := metric.WithAttributes(commonAttrs...) + start := time.Now() + t.executionsTotal.Add(ctx, 1, metricAttrs) + + result, err := t.executor.ExecuteWorkflow(ctx, params) + + duration := time.Since(start) + t.executionDuration.Record(ctx, duration.Seconds(), metricAttrs) + + if err != nil { + t.errorsTotal.Add(ctx, 1, metricAttrs) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return result, err +} diff --git a/test/integration/vmcp/helpers/vmcp_server.go b/test/integration/vmcp/helpers/vmcp_server.go index f001a575ce..8e9d559f0f 100644 --- a/test/integration/vmcp/helpers/vmcp_server.go +++ b/test/integration/vmcp/helpers/vmcp_server.go @@ -10,11 +10,13 @@ import ( "github.com/stacklok/toolhive/pkg/auth" "github.com/stacklok/toolhive/pkg/env" + "github.com/stacklok/toolhive/pkg/telemetry" vmcptypes "github.com/stacklok/toolhive/pkg/vmcp" "github.com/stacklok/toolhive/pkg/vmcp/aggregator" "github.com/stacklok/toolhive/pkg/vmcp/auth/factory" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" vmcpclient "github.com/stacklok/toolhive/pkg/vmcp/client" + "github.com/stacklok/toolhive/pkg/vmcp/composer" "github.com/stacklok/toolhive/pkg/vmcp/discovery" "github.com/stacklok/toolhive/pkg/vmcp/router" vmcpserver "github.com/stacklok/toolhive/pkg/vmcp/server" @@ -63,8 +65,10 @@ type VMCPServerOption func(*vmcpServerConfig) // vmcpServerConfig holds configuration for creating a test vMCP server. type vmcpServerConfig struct { - conflictStrategy string - prefixFormat string + conflictStrategy string + prefixFormat string + workflowDefs map[string]*composer.WorkflowDefinition + telemetryProvider *telemetry.Provider } // WithPrefixConflictResolution configures prefix-based conflict resolution. @@ -75,6 +79,20 @@ func WithPrefixConflictResolution(format string) VMCPServerOption { } } +// WithWorkflowDefinitions configures composite tool workflow definitions. +func WithWorkflowDefinitions(defs map[string]*composer.WorkflowDefinition) VMCPServerOption { + return func(c *vmcpServerConfig) { + c.workflowDefs = defs + } +} + +// WithTelemetryProvider configures the telemetry provider. +func WithTelemetryProvider(provider *telemetry.Provider) VMCPServerOption { + return func(c *vmcpServerConfig) { + c.telemetryProvider = provider + } +} + // getFreePort returns an available TCP port on localhost. // This is used for parallel test execution to avoid port conflicts. func getFreePort(tb testing.TB) int { @@ -147,13 +165,14 @@ func NewVMCPServer( rtr := router.NewDefaultRouter() // Create vMCP server with test-specific defaults - vmcpServer, err := vmcpserver.New(&vmcpserver.Config{ - Name: "test-vmcp", - Version: "1.0.0", - Host: "127.0.0.1", - Port: getFreePort(tb), // Get a random available port for parallel test execution - AuthMiddleware: auth.AnonymousMiddleware, - }, rtr, backendClient, discoveryMgr, backends, nil) // nil for workflowDefs in tests + vmcpServer, err := vmcpserver.New(ctx, &vmcpserver.Config{ + Name: "test-vmcp", + Version: "1.0.0", + Host: "127.0.0.1", + Port: getFreePort(tb), // Get a random available port for parallel test execution + AuthMiddleware: auth.AnonymousMiddleware, + TelemetryProvider: config.telemetryProvider, + }, rtr, backendClient, discoveryMgr, backends, config.workflowDefs) require.NoError(tb, err, "failed to create vMCP server") // Start server automatically diff --git a/test/integration/vmcp/vmcp_integration_test.go b/test/integration/vmcp/vmcp_integration_test.go index 57dd164e5e..b60e9fba34 100644 --- a/test/integration/vmcp/vmcp_integration_test.go +++ b/test/integration/vmcp/vmcp_integration_test.go @@ -2,12 +2,19 @@ package vmcp_test import ( "context" + "io" + "net/http" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stacklok/toolhive/pkg/telemetry" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" + "github.com/stacklok/toolhive/pkg/vmcp/composer" "github.com/stacklok/toolhive/test/integration/vmcp/helpers" ) @@ -225,3 +232,126 @@ func TestVMCPServer_TwoBoundaryAuth_HeaderInjection(t *testing.T) { helpers.AssertTextNotContains(t, text, "error", "failed", "leakage") }) } + +// TestVMCPServer_Telemetry_CompositeToolMetrics verifies that vMCP exposes +// Prometheus metrics for composite tool workflow executions and backend requests on /metrics. +// This test creates a composite tool, executes it, and verifies the metrics +// for both the workflow and the backend subtool calls are correctly exposed. +// +//nolint:paralleltest // safe to run in parallel with other tests +func TestVMCPServer_Telemetry_CompositeToolMetrics(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup: Create a synthetic MCP backend server with a simple tool + echoServer := helpers.CreateBackendServer(t, []helpers.BackendTool{ + helpers.NewBackendTool("echo", "Echo the input message", + func(_ context.Context, args map[string]any) string { + msg, _ := args["message"].(string) + return `{"echoed": "` + msg + `"}` + }), + }, helpers.WithBackendName("echo-mcp")) + defer echoServer.Close() + + // Configure backend pointing to test server + backends := []vmcp.Backend{ + helpers.NewBackend("echo", + helpers.WithURL(echoServer.URL+"/mcp"), + helpers.WithMetadata("group", "test-group"), + ), + } + + // Create composite tool workflow definition that calls the echo tool + workflowDefs := map[string]*composer.WorkflowDefinition{ + "echo_workflow": { + Name: "echo_workflow", + Description: "A composite tool that echoes a message", + Parameters: map[string]any{ + "type": "object", + "properties": map[string]any{ + "message": map[string]any{ + "type": "string", + "description": "The message to echo", + }, + }, + "required": []string{"message"}, + }, + Steps: []composer.WorkflowStep{ + { + ID: "echo_step", + Type: "tool", + Tool: "echo_echo", // prefixed with backend name + Arguments: map[string]any{ + "message": "{{.params.message}}", + }, + }, + }, + Timeout: 30 * time.Second, + }, + } + + // Create telemetry provider with Prometheus enabled + telemetryConfig := telemetry.Config{ + ServiceName: "vmcp-telemetry-test", + ServiceVersion: "1.0.0", + EnablePrometheusMetricsPath: true, + } + telemetryProvider, err := telemetry.NewProvider(ctx, telemetryConfig) + require.NoError(t, err, "failed to create telemetry provider") + defer telemetryProvider.Shutdown(ctx) + + // Create vMCP server with composite tool and telemetry + vmcpServer := helpers.NewVMCPServer(ctx, t, backends, + helpers.WithPrefixConflictResolution("{workload}_"), + helpers.WithWorkflowDefinitions(workflowDefs), + helpers.WithTelemetryProvider(telemetryProvider), + ) + + // Create and initialize MCP client + vmcpURL := "http://" + vmcpServer.Address() + "/mcp" + client := helpers.NewMCPClient(ctx, t, vmcpURL) + defer client.Close() + + // Call the composite tool + resp := client.CallTool(ctx, "echo_workflow", map[string]any{"message": "hello world"}) + text := helpers.AssertToolCallSuccess(t, resp) + helpers.AssertTextContains(t, text, "echoed", "hello world") + + // Fetch metrics from /metrics endpoint + metricsURL := "http://" + vmcpServer.Address() + "/metrics" + httpClient := &http.Client{Timeout: 5 * time.Second} + metricsResp, err := httpClient.Get(metricsURL) + require.NoError(t, err, "failed to fetch metrics") + defer metricsResp.Body.Close() + + require.Equal(t, http.StatusOK, metricsResp.StatusCode, "metrics endpoint should return 200") + + body, err := io.ReadAll(metricsResp.Body) + require.NoError(t, err, "failed to read metrics body") + metricsContent := string(body) + + // Log metrics for debugging + t.Logf("Metrics content:\n%s", metricsContent) + + // Verify workflow execution metrics are present (composite tool). + assert.True(t, strings.Contains(metricsContent, "toolhive_vmcp_workflow_executions_total"), + "Should contain workflow executions total metric") + assert.True(t, strings.Contains(metricsContent, "toolhive_vmcp_workflow_duration_seconds"), + "Should contain workflow duration metric") + assert.True(t, strings.Contains(metricsContent, `workflow_name="echo_workflow"`), + "Should contain workflow name label") + + // Verify backend metrics are present. + assert.True(t, strings.Contains(metricsContent, "toolhive_vmcp_backend_requests_total"), + "Should contain backend requests total metric") + assert.True(t, strings.Contains(metricsContent, "toolhive_vmcp_backend_requests_duration"), + "Should contain backend requests duration metric") + + // Verify HTTP middleware metrics are present (incoming MCP requests). + assert.True(t, strings.Contains(metricsContent, "toolhive_mcp_requests_total"), + "Should contain HTTP middleware requests total metric") + assert.True(t, strings.Contains(metricsContent, "toolhive_mcp_request_duration_seconds"), + "Should contain HTTP middleware request duration metric") +}