-
Notifications
You must be signed in to change notification settings - Fork 207
NOT TO BE MERGED! Query registry API instead of ConfigMap for image validation #4720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,13 +9,15 @@ | |
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "net/http" | ||
| "net/url" | ||
| "time" | ||
|
|
||
| corev1 "k8s.io/api/core/v1" | ||
| k8serr "k8s.io/apimachinery/pkg/api/errors" | ||
| v0 "github.com/modelcontextprotocol/registry/pkg/api/v0" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/log" | ||
|
|
||
| regtypes "github.com/stacklok/toolhive-core/registry/types" | ||
| mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" | ||
| ) | ||
|
|
||
|
|
@@ -68,17 +70,21 @@ | |
| func NewImageValidator(k8sClient client.Client, namespace string, validation ImageValidation) ImageValidator { | ||
| if validation == ImageValidationRegistryEnforcing { | ||
| return &RegistryEnforcingValidator{ | ||
| client: k8sClient, | ||
| namespace: namespace, | ||
| client: k8sClient, | ||
| namespace: namespace, | ||
| httpClient: &http.Client{Timeout: 10 * time.Second}, | ||
| } | ||
| } | ||
| return &AlwaysAllowValidator{} | ||
| } | ||
|
|
||
| // RegistryEnforcingValidator provides validation against MCPRegistry resources | ||
| // RegistryEnforcingValidator provides validation against MCPRegistry resources. | ||
| // It queries the registry API service (via HTTP) to check whether an image | ||
| // exists in a registry's OCI packages. | ||
| type RegistryEnforcingValidator struct { | ||
| client client.Client | ||
| namespace string | ||
| client client.Client | ||
| namespace string | ||
| httpClient *http.Client | ||
| } | ||
|
|
||
| // ValidateImage checks if an image should be validated and if it exists in registries | ||
|
|
@@ -191,7 +197,8 @@ | |
| return enforcingRegistries | ||
| } | ||
|
|
||
| // checkImageInRegistry checks if an image exists in a specific MCPRegistry | ||
| // checkImageInRegistry checks if an image exists in a specific MCPRegistry by | ||
| // querying the registry API service at the URL stored in the MCPRegistry status. | ||
| func (v *RegistryEnforcingValidator) checkImageInRegistry( | ||
| ctx context.Context, | ||
| mcpRegistry *mcpv1alpha1.MCPRegistry, | ||
|
|
@@ -202,56 +209,101 @@ | |
| return false, nil | ||
| } | ||
|
|
||
| // Get the ConfigMap containing the registry data | ||
| configMapName := mcpRegistry.GetStorageName() | ||
| configMap := &corev1.ConfigMap{} | ||
| if err := v.client.Get(ctx, client.ObjectKey{ | ||
| Name: configMapName, | ||
| Namespace: v.namespace, | ||
| }, configMap); err != nil { | ||
| if k8serr.IsNotFound(err) { | ||
| // ConfigMap not found, registry data not available | ||
| return false, nil | ||
| } | ||
| return false, fmt.Errorf("failed to get ConfigMap %s: %w", configMapName, err) | ||
| // Get the registry API URL from status | ||
| registryURL := mcpRegistry.Status.URL | ||
| if registryURL == "" { | ||
| return false, nil | ||
| } | ||
|
|
||
| // Get the registry data from the ConfigMap | ||
| registryData, exists := configMap.Data["registry.json"] | ||
| if !exists { | ||
| // No registry data in ConfigMap | ||
| return false, nil | ||
| // Query the registry API for all servers | ||
| servers, err := v.listRegistryServers(ctx, registryURL) | ||
| if err != nil { | ||
| return false, fmt.Errorf("failed to query registry API at %s: %w", registryURL, err) | ||
| } | ||
|
|
||
| // Parse the registry data | ||
| var reg regtypes.Registry | ||
| if err := json.Unmarshal([]byte(registryData), ®); err != nil { | ||
| // Invalid registry data | ||
| return false, fmt.Errorf("failed to parse registry data: %w", err) | ||
| return findImageInServers(servers, image), nil | ||
| } | ||
|
|
||
| // listRegistryServers queries the registry API to fetch all servers, handling pagination. | ||
| func (v *RegistryEnforcingValidator) listRegistryServers( | ||
| ctx context.Context, | ||
| registryURL string, | ||
| ) ([]v0.ServerResponse, error) { | ||
| var allServers []v0.ServerResponse | ||
| cursor := "" | ||
|
|
||
| for { | ||
| servers, nextCursor, err := v.fetchRegistryPage(ctx, registryURL, cursor) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| allServers = append(allServers, servers...) | ||
|
|
||
| if nextCursor == "" { | ||
| break | ||
| } | ||
| cursor = nextCursor | ||
|
|
||
| // Safety limit to prevent infinite loops | ||
| if len(allServers) > 10000 { | ||
| return nil, fmt.Errorf("exceeded maximum server limit (10000)") | ||
|
Check warning on line 250 in cmd/thv-operator/pkg/validation/image_validation.go
|
||
| } | ||
| } | ||
|
|
||
| // Search for the image in this registry | ||
| return findImageInRegistry(®, image), nil | ||
| return allServers, nil | ||
| } | ||
|
|
||
| // findImageInRegistry searches for an image in a registry | ||
| func findImageInRegistry(reg *regtypes.Registry, image string) bool { | ||
| // Check top-level servers | ||
| for _, server := range reg.Servers { | ||
| if server.Image == image { | ||
| return true | ||
| // fetchRegistryPage fetches a single page of servers from the registry API. | ||
| func (v *RegistryEnforcingValidator) fetchRegistryPage( | ||
| ctx context.Context, | ||
| registryURL string, | ||
| cursor string, | ||
| ) ([]v0.ServerResponse, string, error) { | ||
| params := url.Values{} | ||
| params.Set("limit", "100") | ||
| if cursor != "" { | ||
| params.Set("cursor", cursor) | ||
| } | ||
|
|
||
| endpoint := fmt.Sprintf("%s/v0.1/servers?%s", registryURL, params.Encode()) | ||
|
|
||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) | ||
| if err != nil { | ||
| return nil, "", fmt.Errorf("failed to create request: %w", err) | ||
| } | ||
|
|
||
| resp, err := v.httpClient.Do(req) | ||
| if err != nil { | ||
| return nil, "", fmt.Errorf("failed to query registry: %w", err) | ||
| } | ||
| defer func() { | ||
| if closeErr := resp.Body.Close(); closeErr != nil { | ||
| log.FromContext(ctx).V(1).Info("Failed to close response body", "error", closeErr) | ||
| } | ||
| }() | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
| return nil, "", fmt.Errorf("registry API returned status %d", resp.StatusCode) | ||
| } | ||
|
|
||
| var listResp v0.ServerListResponse | ||
| if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { | ||
| return nil, "", fmt.Errorf("failed to decode registry API response: %w", err) | ||
| } | ||
|
|
||
| // Check servers in groups | ||
| // TODO: check with Rado or Ria, is this needed? | ||
| for _, group := range reg.Groups { | ||
| for _, server := range group.Servers { | ||
| if server.Image == image { | ||
| return listResp.Servers, listResp.Metadata.NextCursor, nil | ||
|
Check warning on line 295 in cmd/thv-operator/pkg/validation/image_validation.go
|
||
|
Comment on lines
+287
to
+295
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 In fetchRegistryPage(), when resp.StatusCode != http.StatusOK, the function returns early without draining the response body. The deferred resp.Body.Close() closes it unread, preventing Go's HTTP transport from returning the TCP connection to the pool. Add io.Copy(io.Discard, resp.Body) before the early return to allow connection reuse on error paths. Extended reasoning...What the bug is and how it manifests In fetchRegistryPage() (image_validation.go lines 287-295), when resp.StatusCode != http.StatusOK, the code returns an error immediately. A defer closes the body, but the body was never read. Per Go's net/http documentation, the HTTP/1.x transport can only reuse a keep-alive TCP connection if the response body is fully consumed before being closed. Without draining, the connection is closed rather than returned to the pool. The specific code path that triggers it The sequence in fetchRegistryPage is: (1) v.httpClient.Do(req) makes the HTTP call, (2) defer resp.Body.Close() is registered, (3) if resp.StatusCode != http.StatusOK returns an error immediately, (4) the deferred Close() runs on an unread body. On the success path the body is fully consumed by json.NewDecoder(resp.Body).Decode, so only the error path is affected. Why existing code does not prevent it The defer resp.Body.Close() pattern is correct for the success path but insufficient for error paths. Go's transport checks whether the body was fully read when Close() is called to decide if the connection can be returned to the pool. An unread body means the transport cannot safely reuse the connection and must close the TCP socket instead. What the impact would be Each non-200 response from the registry API wastes one TCP connection. Under normal operation this path is never hit. It only matters during registry errors (5xx, 4xx). For a cluster-internal operator admission webhook this is low-impact: non-200 responses should be rare, the connection pool is bounded, and the 10-second client timeout provides a natural backstop. All four verifiers agreed this is a nit-severity issue. How to fix it Add io.Copy(io.Discard, resp.Body) before the early return on non-200 status: Step-by-step proof
|
||
| } | ||
|
|
||
| // findImageInServers searches for an OCI image in the servers returned by the registry API. | ||
| func findImageInServers(servers []v0.ServerResponse, image string) bool { | ||
| for i := range servers { | ||
| for j := range servers[i].Server.Packages { | ||
| if servers[i].Server.Packages[j].RegistryType == "oci" && | ||
| servers[i].Server.Packages[j].Identifier == image { | ||
| return true | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return false | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 The safety limit check in
listRegistryServers()is unreachable for single-page responses:if len(allServers) > 10000is placed after theif nextCursor == "" { break }statement, so a registry returning >10,000 items in a single page (ignoring thelimit=100request parameter) would bypass the cap entirely. The fix is to move the safety check before the break.Extended reasoning...
What the bug is and how it manifests
In
listRegistryServers()(image_validation.go, lines 233–250), after appending a page of results the code checks whether there is a next cursor and breaks immediately if there is none. The 10,000-server safety check lives below that break, so it is never evaluated for the final (or only) page of a response:The specific code path that triggers it
A registry API that ignores the
limit=100query parameter and returns all servers (say, 50,000 of them) in a single JSON response with an emptynextCursorwill cause the loop to: (1) append all 50,000 entries toallServers, then (2) seenextCursor == ""and break — without ever checking the length.Why existing code doesn't prevent it
The
limit=100request parameter is advisory; a non-compliant or compromised registry is free to ignore it. The 10-second HTTP client timeout does bound total transfer time, butjson.NewDecoder(resp.Body).Decode()attempts to consume the entire body before returning — meaning a large single-page payload will be fully allocated in memory before any timeout can interrupt decoding at the application layer.What the impact would be
A misconfigured or malicious registry served at
MCPRegistry.Status.URLcould force the operator to allocate unbounded memory in a single validation call, potentially causing OOM conditions in the operator pod. The comment says 'Safety limit to prevent infinite loops' but the error message says 'exceeded maximum server limit (10000)' — indicating the intent was a hard total-count cap, not just an infinite-loop guard.How to fix it
Move the safety check before the break:
Step-by-step proof
limit=100but returns 20,000 items andnextCursor: "".fetchRegistryPagedecodes the full body and returns 20,000ServerResponseobjects plus an empty cursor.listRegistryServers,allServers = append(allServers, servers...)grows to 20,000 entries — all in memory.if nextCursor == "" { break }is true, so the loop exits.if len(allServers) > 10000check is never evaluated.listRegistryServersreturns all 20,000 entries, and they are passed tofindImageInServersfor scanning.Mitigating factors that keep severity low: the registry URL is set by a cluster operator (trusted source), the
limit=100parameter discourages large responses from well-behaved servers, and the 10-second timeout bounds wall-clock exposure. Nevertheless, the placement is a logical error relative to the stated invariant.