Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 97 additions & 45 deletions cmd/thv-operator/pkg/validation/image_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"time"

corev1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
v0 "github.com/modelcontextprotocol/registry/pkg/api/v0"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

regtypes "github.com/stacklok/toolhive-core/registry/types"
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
)

Expand Down Expand Up @@ -68,17 +70,21 @@
func NewImageValidator(k8sClient client.Client, namespace string, validation ImageValidation) ImageValidator {
if validation == ImageValidationRegistryEnforcing {
return &RegistryEnforcingValidator{
client: k8sClient,
namespace: namespace,
client: k8sClient,
namespace: namespace,
httpClient: &http.Client{Timeout: 10 * time.Second},
}
}
return &AlwaysAllowValidator{}
}

// RegistryEnforcingValidator provides validation against MCPRegistry resources
// RegistryEnforcingValidator provides validation against MCPRegistry resources.
// It queries the registry API service (via HTTP) to check whether an image
// exists in a registry's OCI packages.
type RegistryEnforcingValidator struct {
client client.Client
namespace string
client client.Client
namespace string
httpClient *http.Client
}

// ValidateImage checks if an image should be validated and if it exists in registries
Expand Down Expand Up @@ -191,7 +197,8 @@
return enforcingRegistries
}

// checkImageInRegistry checks if an image exists in a specific MCPRegistry
// checkImageInRegistry checks if an image exists in a specific MCPRegistry by
// querying the registry API service at the URL stored in the MCPRegistry status.
func (v *RegistryEnforcingValidator) checkImageInRegistry(
ctx context.Context,
mcpRegistry *mcpv1alpha1.MCPRegistry,
Expand All @@ -202,56 +209,101 @@
return false, nil
}

// Get the ConfigMap containing the registry data
configMapName := mcpRegistry.GetStorageName()
configMap := &corev1.ConfigMap{}
if err := v.client.Get(ctx, client.ObjectKey{
Name: configMapName,
Namespace: v.namespace,
}, configMap); err != nil {
if k8serr.IsNotFound(err) {
// ConfigMap not found, registry data not available
return false, nil
}
return false, fmt.Errorf("failed to get ConfigMap %s: %w", configMapName, err)
// Get the registry API URL from status
registryURL := mcpRegistry.Status.URL
if registryURL == "" {
return false, nil
}

// Get the registry data from the ConfigMap
registryData, exists := configMap.Data["registry.json"]
if !exists {
// No registry data in ConfigMap
return false, nil
// Query the registry API for all servers
servers, err := v.listRegistryServers(ctx, registryURL)
if err != nil {
return false, fmt.Errorf("failed to query registry API at %s: %w", registryURL, err)
}

// Parse the registry data
var reg regtypes.Registry
if err := json.Unmarshal([]byte(registryData), &reg); err != nil {
// Invalid registry data
return false, fmt.Errorf("failed to parse registry data: %w", err)
return findImageInServers(servers, image), nil
}

// listRegistryServers queries the registry API to fetch all servers, handling pagination.
func (v *RegistryEnforcingValidator) listRegistryServers(
ctx context.Context,
registryURL string,
) ([]v0.ServerResponse, error) {
var allServers []v0.ServerResponse
cursor := ""

for {
servers, nextCursor, err := v.fetchRegistryPage(ctx, registryURL, cursor)
if err != nil {
return nil, err
}

allServers = append(allServers, servers...)

if nextCursor == "" {
break
}
cursor = nextCursor

// Safety limit to prevent infinite loops
if len(allServers) > 10000 {
return nil, fmt.Errorf("exceeded maximum server limit (10000)")

Check warning on line 250 in cmd/thv-operator/pkg/validation/image_validation.go

View check run for this annotation

Claude / Claude Code Review

Safety limit check unreachable for single-page responses

The safety limit check in `listRegistryServers()` is unreachable for single-page responses: `if len(allServers) > 10000` is placed after the `if nextCursor == "" { break }` statement, so a registry returning >10,000 items in a single page (ignoring the `limit=100` request parameter) would bypass the cap entirely. The fix is to move the safety check before the break.
Comment on lines +233 to +250
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The safety limit check in listRegistryServers() is unreachable for single-page responses: if len(allServers) > 10000 is placed after the if nextCursor == "" { break } statement, so a registry returning >10,000 items in a single page (ignoring the limit=100 request parameter) would bypass the cap entirely. The fix is to move the safety check before the break.

Extended reasoning...

What the bug is and how it manifests

In listRegistryServers() (image_validation.go, lines 233–250), after appending a page of results the code checks whether there is a next cursor and breaks immediately if there is none. The 10,000-server safety check lives below that break, so it is never evaluated for the final (or only) page of a response:

allServers = append(allServers, servers...)

if nextCursor == "" {
    break                   // ← exits the loop here for single-page responses
}
cursor = nextCursor

// Safety limit to prevent infinite loops
if len(allServers) > 10000 { // ← never reached when nextCursor is empty
    return nil, fmt.Errorf("exceeded maximum server limit (10000)")
}

The specific code path that triggers it

A registry API that ignores the limit=100 query parameter and returns all servers (say, 50,000 of them) in a single JSON response with an empty nextCursor will cause the loop to: (1) append all 50,000 entries to allServers, then (2) see nextCursor == "" and break — without ever checking the length.

Why existing code doesn't prevent it

The limit=100 request parameter is advisory; a non-compliant or compromised registry is free to ignore it. The 10-second HTTP client timeout does bound total transfer time, but json.NewDecoder(resp.Body).Decode() attempts to consume the entire body before returning — meaning a large single-page payload will be fully allocated in memory before any timeout can interrupt decoding at the application layer.

What the impact would be

A misconfigured or malicious registry served at MCPRegistry.Status.URL could force the operator to allocate unbounded memory in a single validation call, potentially causing OOM conditions in the operator pod. The comment says 'Safety limit to prevent infinite loops' but the error message says 'exceeded maximum server limit (10000)' — indicating the intent was a hard total-count cap, not just an infinite-loop guard.

How to fix it

Move the safety check before the break:

allServers = append(allServers, servers...)

if len(allServers) > 10000 {
    return nil, fmt.Errorf("exceeded maximum server limit (10000)")
}

if nextCursor == "" {
    break
}
cursor = nextCursor

Step-by-step proof

  1. Registry API is called with limit=100 but returns 20,000 items and nextCursor: "".
  2. fetchRegistryPage decodes the full body and returns 20,000 ServerResponse objects plus an empty cursor.
  3. Back in listRegistryServers, allServers = append(allServers, servers...) grows to 20,000 entries — all in memory.
  4. if nextCursor == "" { break } is true, so the loop exits.
  5. The if len(allServers) > 10000 check is never evaluated.
  6. listRegistryServers returns all 20,000 entries, and they are passed to findImageInServers for scanning.

Mitigating factors that keep severity low: the registry URL is set by a cluster operator (trusted source), the limit=100 parameter discourages large responses from well-behaved servers, and the 10-second timeout bounds wall-clock exposure. Nevertheless, the placement is a logical error relative to the stated invariant.

}
}

// Search for the image in this registry
return findImageInRegistry(&reg, image), nil
return allServers, nil
}

// findImageInRegistry searches for an image in a registry
func findImageInRegistry(reg *regtypes.Registry, image string) bool {
// Check top-level servers
for _, server := range reg.Servers {
if server.Image == image {
return true
// fetchRegistryPage fetches a single page of servers from the registry API.
func (v *RegistryEnforcingValidator) fetchRegistryPage(
ctx context.Context,
registryURL string,
cursor string,
) ([]v0.ServerResponse, string, error) {
params := url.Values{}
params.Set("limit", "100")
if cursor != "" {
params.Set("cursor", cursor)
}

endpoint := fmt.Sprintf("%s/v0.1/servers?%s", registryURL, params.Encode())

req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return nil, "", fmt.Errorf("failed to create request: %w", err)
}

resp, err := v.httpClient.Do(req)
if err != nil {
return nil, "", fmt.Errorf("failed to query registry: %w", err)
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.FromContext(ctx).V(1).Info("Failed to close response body", "error", closeErr)
}
}()

if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("registry API returned status %d", resp.StatusCode)
}

var listResp v0.ServerListResponse
if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil {
return nil, "", fmt.Errorf("failed to decode registry API response: %w", err)
}

// Check servers in groups
// TODO: check with Rado or Ria, is this needed?
for _, group := range reg.Groups {
for _, server := range group.Servers {
if server.Image == image {
return listResp.Servers, listResp.Metadata.NextCursor, nil

Check warning on line 295 in cmd/thv-operator/pkg/validation/image_validation.go

View check run for this annotation

Claude / Claude Code Review

HTTP response body not drained before close prevents connection reuse

In fetchRegistryPage(), when resp.StatusCode \!= http.StatusOK, the function returns early without draining the response body. The deferred resp.Body.Close() closes it unread, preventing Go's HTTP transport from returning the TCP connection to the pool. Add io.Copy(io.Discard, resp.Body) before the early return to allow connection reuse on error paths.
Comment on lines +287 to +295
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In fetchRegistryPage(), when resp.StatusCode != http.StatusOK, the function returns early without draining the response body. The deferred resp.Body.Close() closes it unread, preventing Go's HTTP transport from returning the TCP connection to the pool. Add io.Copy(io.Discard, resp.Body) before the early return to allow connection reuse on error paths.

Extended reasoning...

What the bug is and how it manifests

In fetchRegistryPage() (image_validation.go lines 287-295), when resp.StatusCode != http.StatusOK, the code returns an error immediately. A defer closes the body, but the body was never read. Per Go's net/http documentation, the HTTP/1.x transport can only reuse a keep-alive TCP connection if the response body is fully consumed before being closed. Without draining, the connection is closed rather than returned to the pool.

The specific code path that triggers it

The sequence in fetchRegistryPage is: (1) v.httpClient.Do(req) makes the HTTP call, (2) defer resp.Body.Close() is registered, (3) if resp.StatusCode != http.StatusOK returns an error immediately, (4) the deferred Close() runs on an unread body. On the success path the body is fully consumed by json.NewDecoder(resp.Body).Decode, so only the error path is affected.

Why existing code does not prevent it

The defer resp.Body.Close() pattern is correct for the success path but insufficient for error paths. Go's transport checks whether the body was fully read when Close() is called to decide if the connection can be returned to the pool. An unread body means the transport cannot safely reuse the connection and must close the TCP socket instead.

What the impact would be

Each non-200 response from the registry API wastes one TCP connection. Under normal operation this path is never hit. It only matters during registry errors (5xx, 4xx). For a cluster-internal operator admission webhook this is low-impact: non-200 responses should be rare, the connection pool is bounded, and the 10-second client timeout provides a natural backstop. All four verifiers agreed this is a nit-severity issue.

How to fix it

Add io.Copy(io.Discard, resp.Body) before the early return on non-200 status:

if resp.StatusCode \!= http.StatusOK {
    _, _ = io.Copy(io.Discard, resp.Body)
    return nil, "", fmt.Errorf("registry API returned status %d", resp.StatusCode)
}

Step-by-step proof

  1. fetchRegistryPage calls v.httpClient.Do(req) and receives a 503 response from a struggling registry.
  2. defer resp.Body.Close() is registered but not yet run.
  3. resp.StatusCode != http.StatusOK is true; the function returns an error immediately.
  4. The deferred Close() runs on an unread body (the 503 HTML error page was never consumed).
  5. Go's http.Transport sees an unread body on Close() and marks the connection as non-reusable, closing the underlying TCP socket.
  6. The next pagination call or the next admission webhook invocation must open a fresh TCP connection to the registry service rather than reusing the pooled one.

}

// findImageInServers searches for an OCI image in the servers returned by the registry API.
func findImageInServers(servers []v0.ServerResponse, image string) bool {
for i := range servers {
for j := range servers[i].Server.Packages {
if servers[i].Server.Packages[j].RegistryType == "oci" &&
servers[i].Server.Packages[j].Identifier == image {
return true
}
}
}

return false
}
Loading
Loading