Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
403 changes: 374 additions & 29 deletions cmd/thv-operator/controllers/virtualmcpserver_controller.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions cmd/thv-operator/pkg/virtualmcpserverstatus/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ func (s *StatusCollector) SetDiscoveredBackends(backends []mcpv1alpha1.Discovere
s.hasChanges = true
}

// GetDiscoveredBackends returns the current discovered backends value.
// If SetDiscoveredBackends has been called, returns the new value.
// Otherwise, returns the existing value from the VirtualMCPServer status.
func (s *StatusCollector) GetDiscoveredBackends() []mcpv1alpha1.DiscoveredBackend {
if s.discoveredBackends != nil {
return s.discoveredBackends
}
return s.vmcp.Status.DiscoveredBackends
}

// UpdateStatus applies all collected status changes in a single batch update.
// Expects vmcpStatus to be freshly fetched from the cluster to ensure the update operates on the latest resource version.
func (s *StatusCollector) UpdateStatus(ctx context.Context, vmcpStatus *mcpv1alpha1.VirtualMCPServerStatus) bool {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cmd/thv-operator/pkg/virtualmcpserverstatus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type StatusManager interface {
// SetDiscoveredBackends sets the discovered backends list
SetDiscoveredBackends(backends []mcpv1alpha1.DiscoveredBackend)

// GetDiscoveredBackends returns the current discovered backends value.
// If SetDiscoveredBackends has been called, returns the new value.
// Otherwise, returns the existing value from the VirtualMCPServer status.
GetDiscoveredBackends() []mcpv1alpha1.DiscoveredBackend

// UpdateStatus applies all collected status changes in a single batch update.
// Returns true if updates were applied, false if no changes were collected.
UpdateStatus(ctx context.Context, vmcpStatus *mcpv1alpha1.VirtualMCPServerStatus) bool
Expand Down
10 changes: 7 additions & 3 deletions cmd/thv-operator/pkg/vmcpconfig/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (c *Converter) Convert(
}

config.Telemetry = spectoconfig.ConvertTelemetryConfig(ctx, vmcp.Spec.Telemetry, vmcp.Name)
config.Audit = spectoconfig.ConvertAuditConfig(ctx, vmcp.Spec.Audit, vmcp.Name)

// Apply operational defaults (fills missing values)
config.EnsureOperationalDefaults()
Expand Down Expand Up @@ -863,7 +862,7 @@ func convertOutputProperty(

// convertOperational converts OperationalConfig from CRD to vmcp config
func (*Converter) convertOperational(
_ context.Context,
ctx context.Context,
vmcp *mcpv1alpha1.VirtualMCPServer,
) *vmcpconfig.OperationalConfig {
operational := &vmcpconfig.OperationalConfig{}
Expand Down Expand Up @@ -896,7 +895,12 @@ func (*Converter) convertOperational(

// Parse health check interval
if vmcp.Spec.Operational.FailureHandling.HealthCheckInterval != "" {
if duration, err := time.ParseDuration(vmcp.Spec.Operational.FailureHandling.HealthCheckInterval); err == nil {
duration, err := time.ParseDuration(vmcp.Spec.Operational.FailureHandling.HealthCheckInterval)
if err != nil {
ctxLogger := log.FromContext(ctx)
ctxLogger.Error(err, "Failed to parse HealthCheckInterval, health monitoring will be disabled",
"value", vmcp.Spec.Operational.FailureHandling.HealthCheckInterval)
} else {
operational.FailureHandling.HealthCheckInterval = vmcpconfig.Duration(duration)
}
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,14 @@ func runServe(cmd *cobra.Command, _ []string) error {
// Configure health monitoring if enabled
var healthMonitorConfig *health.MonitorConfig
if cfg.Operational != nil && cfg.Operational.FailureHandling != nil && cfg.Operational.FailureHandling.HealthCheckInterval > 0 {
// Validate configuration values before creating MonitorConfig
// This provides defense in depth and clearer error messages
// Note: HealthCheckInterval is config.Duration (alias for time.Duration), already in nanoseconds
// from YAML/JSON parsing via time.ParseDuration. This is a simple type cast, not unit conversion.
checkInterval := time.Duration(cfg.Operational.FailureHandling.HealthCheckInterval)
if checkInterval <= 0 {
return fmt.Errorf("invalid health check configuration: check interval must be > 0, got %v", checkInterval)
}
if cfg.Operational.FailureHandling.UnhealthyThreshold < 1 {
return fmt.Errorf("invalid health check configuration: unhealthy threshold must be >= 1, got %d",
cfg.Operational.FailureHandling.UnhealthyThreshold)
Expand Down
173 changes: 173 additions & 0 deletions examples/operator/virtual-mcps/vmcp_health_monitoring.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Example: VirtualMCPServer with Health Monitoring
#
# This example demonstrates health monitoring configuration for VirtualMCPServer.
# Health monitoring enables:
# - Periodic health checks on backend MCPServers using ListCapabilities calls
# - Automatic detection of unhealthy backends
# - Backend status tracking in VirtualMCPServer status
# - Graceful handling of partial failures (best_effort mode)
#
# Health Status Values:
# - ready: Backend is healthy and responding
# - unavailable: Backend is not responding or workload is in Pending/Failed/Terminating phase
# - degraded: Backend is responding but degraded (e.g., slow response times)
# - unknown: Health status not yet determined
#
# This example creates:
# 1. Three MCPServer backends (demonstrating different health scenarios)
# 2. An MCPGroup to organize them
# 3. A VirtualMCPServer with health monitoring enabled
#
# Usage:
# kubectl apply -f vmcp_health_monitoring.yaml
#
# Monitor health status:
# kubectl get virtualmcpserver health-monitoring-vmcp -o jsonpath='{.status.discoveredBackends[*].status}'
# kubectl get virtualmcpserver health-monitoring-vmcp -o jsonpath='{.status.discoveredBackends[*].lastHealthCheck}'

---
# Step 1: Create MCPGroup
apiVersion: toolhive.stacklok.dev/v1alpha1
kind: MCPGroup
metadata:
name: health-monitoring-group
namespace: default
spec:
description: Group demonstrating health monitoring for VirtualMCPServer

---
# Step 2: Create healthy backend MCPServers
apiVersion: toolhive.stacklok.dev/v1alpha1
kind: MCPServer
metadata:
name: backend-1-healthy
namespace: default
spec:
groupRef: health-monitoring-group
image: ghcr.io/stackloklabs/yardstick/yardstick-server:0.0.2
transport: streamable-http
proxyPort: 8080
mcpPort: 8080
env:
- name: TRANSPORT
value: streamable-http
- name: TOOL_PREFIX
value: backend1
resources:
limits:
cpu: "100m"
memory: "128Mi"
requests:
cpu: "50m"
memory: "64Mi"

---
apiVersion: toolhive.stacklok.dev/v1alpha1
kind: MCPServer
metadata:
name: backend-2-healthy
namespace: default
spec:
groupRef: health-monitoring-group
image: ghcr.io/stackloklabs/yardstick/yardstick-server:0.0.2
transport: streamable-http
proxyPort: 8080
mcpPort: 8080
env:
- name: TRANSPORT
value: streamable-http
- name: TOOL_PREFIX
value: backend2
resources:
limits:
cpu: "100m"
memory: "128Mi"
requests:
cpu: "50m"
memory: "64Mi"

---
# Step 3: Create VirtualMCPServer with health monitoring enabled
apiVersion: toolhive.stacklok.dev/v1alpha1
kind: VirtualMCPServer
metadata:
name: health-monitoring-vmcp
namespace: default
spec:
# Reference to the MCPGroup containing backend MCPServers
groupRef:
name: health-monitoring-group

# Incoming authentication (client -> vMCP)
incomingAuth:
type: anonymous
authzConfig:
type: inline
inline:
policies:
- 'permit(principal, action, resource);'

# Outgoing authentication (vMCP -> backends)
outgoingAuth:
source: discovered

# Aggregation configuration
aggregation:
conflictResolution: prefix
conflictResolutionConfig:
prefixFormat: "{workload}_"

# Service type - NodePort for external access
serviceType: NodePort

# Operational settings with health monitoring
operational:
# Enable debug logging to see health check details
logLevel: debug

# Timeout configuration
timeouts:
default: 30s

# Failure handling with health monitoring
failureHandling:
# Health check interval - how often to check backend health
# Shorter intervals provide faster detection but more overhead
# Recommended: 30s-60s for production, 5s-10s for testing
healthCheckInterval: 30s

# Unhealthy threshold - consecutive failures before marking unhealthy
# Higher values reduce false positives from transient failures
# Recommended: 2-3 for production
unhealthyThreshold: 3

# Partial failure mode - behavior when some backends are unavailable
# - fail: Fail entire request if any backend is unavailable (strict)
# - best_effort: Continue with available backends (resilient)
# Recommended: best_effort for production to maintain availability
partialFailureMode: best_effort

---
# Example: To test health monitoring behavior, you can:
#
# 1. Check initial status (all backends should be healthy):
# kubectl get virtualmcpserver health-monitoring-vmcp -o yaml
#
# 2. View backend health status:
# kubectl get virtualmcpserver health-monitoring-vmcp \
# -o jsonpath='{range .status.discoveredBackends[*]}{.name}{"\t"}{.status}{"\t"}{.lastHealthCheck}{"\n"}{end}'
#
# 3. Simulate an unhealthy backend by deleting one:
# kubectl delete mcpserver backend-2-healthy
#
# 4. Wait for health checks to detect the failure (up to healthCheckInterval * unhealthyThreshold):
# watch kubectl get virtualmcpserver health-monitoring-vmcp \
# -o jsonpath='{range .status.discoveredBackends[*]}{.name}{"\t"}{.status}{"\n"}{end}'
#
# 5. Observe the VirtualMCPServer phase changes to "Degraded" but continues serving:
# kubectl get virtualmcpserver health-monitoring-vmcp
#
# 6. Recreate the backend to see recovery:
# kubectl apply -f vmcp_health_monitoring.yaml
#
# 7. Health monitoring will detect recovery and update status to "ready"
11 changes: 11 additions & 0 deletions test/e2e/thv-operator/virtualmcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ginkgo -vv

- `suite_test.go` - Ginkgo test suite setup with kubeconfig loading
- `virtualmcp_discovered_mode_test.go` - Tests VirtualMCPServer with discovered mode aggregation
- `virtualmcp_health_monitoring_test.go` - Tests VirtualMCPServer health monitoring functionality
- `helpers.go` - Common helper functions for interacting with Kubernetes resources
- `README.md` - This file

Expand All @@ -77,6 +78,16 @@ Comprehensive E2E tests for VirtualMCPServer in discovered mode, which automatic
- Validates discovered mode configuration and backend discovery
- Uses prefix conflict resolution strategy to namespace tools from different backends

#### Health Monitoring Tests (`virtualmcp_health_monitoring_test.go`)
End-to-end tests for VirtualMCPServer health monitoring of backend MCP servers:
- Creates VirtualMCPServer with configured health check interval and unhealthy threshold
- Creates multiple backend MCPServers (2 healthy, 1 initially unhealthy)
- Verifies health monitoring correctly identifies healthy and unhealthy backends
- Tests that health check timestamps are updated periodically
- Validates backend recovery detection (unhealthy → healthy transitions)
- Ensures health status is accurately reflected in VirtualMCPServer status
- Uses fast health check intervals (5s) for quicker test execution

## Environment Variables

| Variable | Description | Default |
Expand Down
38 changes: 38 additions & 0 deletions test/e2e/thv-operator/virtualmcp/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,44 @@ func WaitForVirtualMCPServerReady(
}, timeout, pollingInterval).Should(gomega.Succeed())
}

// WaitForVirtualMCPServerDeployed waits for a VirtualMCPServer to have pods running and a URL assigned,
// without requiring the Ready condition to be True. This is useful for health monitoring tests
// where some backends may intentionally be unhealthy, causing Ready condition to be False.
func WaitForVirtualMCPServerDeployed(
ctx context.Context,
c client.Client,
name, namespace string,
timeout time.Duration,
pollingInterval time.Duration,
) {
vmcpServer := &mcpv1alpha1.VirtualMCPServer{}

gomega.Eventually(func() error {
if err := c.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, vmcpServer); err != nil {
return err
}

// Check that the VirtualMCPServer has a URL (indicating it's been deployed)
if vmcpServer.Status.URL == "" {
return fmt.Errorf("VirtualMCPServer URL not set yet")
}

// Check that pods are running (but not necessarily all backends healthy)
labels := map[string]string{
"app.kubernetes.io/name": "virtualmcpserver",
"app.kubernetes.io/instance": name,
}
if err := checkPodsReady(ctx, c, namespace, labels); err != nil {
return fmt.Errorf("VirtualMCPServer pods not ready: %w", err)
}

return nil
}, timeout, pollingInterval).Should(gomega.Succeed())
}

// checkPodsReady checks if all pods matching the given labels are ready
func checkPodsReady(ctx context.Context, c client.Client, namespace string, labels map[string]string) error {
podList := &corev1.PodList{}
Expand Down
Loading
Loading