diff --git a/pkg/epp/framework/plugins/datalayer/source/http/datasource.go b/pkg/epp/framework/plugins/datalayer/source/http/datasource.go index 3fe6adaaeb..668c5d9f49 100644 --- a/pkg/epp/framework/plugins/datalayer/source/http/datasource.go +++ b/pkg/epp/framework/plugins/datalayer/source/http/datasource.go @@ -21,8 +21,10 @@ import ( "crypto/tls" "fmt" "io" + "net" "net/url" "reflect" + "strconv" fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer" fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin" @@ -30,18 +32,20 @@ import ( // HTTPDataSource is a data source that receives its data using HTTP client. type HTTPDataSource struct { - typedName fwkplugin.TypedName - scheme string // scheme to use - path string // path to use + typedName fwkplugin.TypedName + scheme string // scheme to use + path string // path to use + metricsPort int // when non-zero, overrides the port in MetricsHost for scraping client Client // client (e.g. a wrapped http.Client) used to get data parser func(io.Reader) (any, error) outputType reflect.Type } -// NewHTTPDataSource returns a new data source, configured with -// the provided scheme, path and certificate verification parameters. -func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pluginType string, +// NewHTTPDataSource returns a new data source configured with the given scheme, path, +// and certificate verification. metricsPort overrides the port in MetricsHost when +// non-zero; pass 0 to use MetricsHost as-is. +func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, metricsPort int, pluginType string, pluginName string, parser func(io.Reader) (any, error), outputType reflect.Type) (*HTTPDataSource, error) { if scheme != "http" && scheme != "https" { return nil, fmt.Errorf("unsupported scheme: %s", scheme) @@ -59,11 +63,12 @@ func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pl Type: pluginType, Name: pluginName, }, - scheme: scheme, - path: path, - client: defaultClient, - parser: parser, - outputType: outputType, + scheme: scheme, + path: path, + metricsPort: metricsPort, + client: defaultClient, + parser: parser, + outputType: outputType, } return dataSrc, nil } @@ -90,9 +95,18 @@ func (dataSrc *HTTPDataSource) Poll(ctx context.Context, ep fwkdl.Endpoint) (any } func (dataSrc *HTTPDataSource) getEndpoint(ep Addressable) *url.URL { + host := ep.GetMetricsHost() + if dataSrc.metricsPort != 0 { + ip, _, err := net.SplitHostPort(host) + if err == nil { + host = net.JoinHostPort(ip, strconv.Itoa(dataSrc.metricsPort)) + } + // If SplitHostPort fails (e.g. host has no port), use MetricsHost unchanged + // so we still attempt a scrape rather than silently dropping the endpoint. + } return &url.URL{ Scheme: dataSrc.scheme, - Host: ep.GetMetricsHost(), + Host: host, Path: dataSrc.path, } } diff --git a/pkg/epp/framework/plugins/datalayer/source/http/datasource_test.go b/pkg/epp/framework/plugins/datalayer/source/http/datasource_test.go new file mode 100644 index 0000000000..32859fa008 --- /dev/null +++ b/pkg/epp/framework/plugins/datalayer/source/http/datasource_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package http + +import ( + "io" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/types" +) + +// fakeAddressable is a test double for the Addressable interface. +type fakeAddressable struct { + metricsHost string +} + +func (f *fakeAddressable) GetIPAddress() string { return "" } +func (f *fakeAddressable) GetPort() string { return "" } +func (f *fakeAddressable) GetMetricsHost() string { return f.metricsHost } +func (f *fakeAddressable) GetNamespacedName() types.NamespacedName { return types.NamespacedName{Name: "pod", Namespace: "test"} } + +func noopParser(r io.Reader) (any, error) { return nil, nil } + +func TestGetEndpoint(t *testing.T) { + tests := []struct { + name string + metricsHost string + metricsPort int + wantHost string + }{ + { + name: "metricsPort=0 preserves MetricsHost unchanged", + metricsHost: "1.2.3.4:8000", + metricsPort: 0, + wantHost: "1.2.3.4:8000", + }, + { + name: "metricsPort overrides port in MetricsHost", + metricsHost: "1.2.3.4:8000", + metricsPort: 9090, + wantHost: "1.2.3.4:9090", + }, + { + name: "metricsPort with IPv6 address", + metricsHost: "[::1]:8000", + metricsPort: 9090, + wantHost: "[::1]:9090", + }, + { + name: "metricsPort with IPv6 address, no override when port=0", + metricsHost: "[::1]:8000", + metricsPort: 0, + wantHost: "[::1]:8000", + }, + { + name: "malformed host falls back to original MetricsHost", + metricsHost: "not-a-host-with-port", + metricsPort: 9090, + wantHost: "not-a-host-with-port", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ds, err := NewHTTPDataSource("http", "/metrics", false, tc.metricsPort, + "test-type", "test-name", noopParser, reflect.TypeOf("")) + if err != nil { + t.Fatalf("NewHTTPDataSource() error = %v", err) + } + + got := ds.getEndpoint(&fakeAddressable{metricsHost: tc.metricsHost}) + + if diff := cmp.Diff(tc.wantHost, got.Host); diff != "" { + t.Errorf("getEndpoint() host mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/epp/framework/plugins/datalayer/source/metrics/datasource.go b/pkg/epp/framework/plugins/datalayer/source/metrics/datasource.go index 877a0e3646..59d897d232 100644 --- a/pkg/epp/framework/plugins/datalayer/source/metrics/datasource.go +++ b/pkg/epp/framework/plugins/datalayer/source/metrics/datasource.go @@ -48,13 +48,19 @@ type metricsDatasourceParams struct { Path string `json:"path"` // InsecureSkipVerify defines whether model server certificate should be verified or not. InsecureSkipVerify bool `json:"insecureSkipVerify"` + // MetricsPort defines the port to use for scraping metrics from model server pods. + // When set, this overrides the inference port encoded in the endpoint's MetricsHost. + // Useful when the model server exposes metrics on a separate port from inference + // (e.g., vLLM with --metrics-port 9090). + // Defaults to 0, which means the inference port is used. + MetricsPort int `json:"metricsPort"` } // NewHTTPMetricsDataSource constructs a MetricsDataSource with the given scheme and path. // InsecureSkipVerify defaults to true (matching the factory default). // Use this function directly in tests to bypass JSON parameter marshaling. func NewHTTPMetricsDataSource(scheme, path, name string) (*http.HTTPDataSource, error) { - return http.NewHTTPDataSource(scheme, path, defaultMetricsInsecureSkipVerify, + return http.NewHTTPDataSource(scheme, path, defaultMetricsInsecureSkipVerify, 0, MetricsDataSourceType, name, parseMetrics, PrometheusMetricType) } @@ -72,7 +78,11 @@ func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle fw } } - return http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, MetricsDataSourceType, + if cfg.MetricsPort != 0 && (cfg.MetricsPort < 1 || cfg.MetricsPort > 65535) { + return nil, fmt.Errorf("metricsPort must be between 1 and 65535, got %d", cfg.MetricsPort) + } + + return http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, cfg.MetricsPort, MetricsDataSourceType, name, parseMetrics, PrometheusMetricType) } diff --git a/pkg/epp/framework/plugins/datalayer/source/metrics/datasource_test.go b/pkg/epp/framework/plugins/datalayer/source/metrics/datasource_test.go index 501a4172cc..dd1c83efab 100644 --- a/pkg/epp/framework/plugins/datalayer/source/metrics/datasource_test.go +++ b/pkg/epp/framework/plugins/datalayer/source/metrics/datasource_test.go @@ -18,9 +18,11 @@ package metrics import ( "context" + "encoding/json" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/types" fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer" @@ -28,11 +30,11 @@ import ( ) func TestDatasource(t *testing.T) { - _, err := http.NewHTTPDataSource("invalid", "/metrics", true, MetricsDataSourceType, + _, err := http.NewHTTPDataSource("invalid", "/metrics", true, 0, MetricsDataSourceType, "metrics-data-source", parseMetrics, PrometheusMetricType) assert.NotNil(t, err, "expected to fail with invalid scheme") - source, err := http.NewHTTPDataSource("https", "/metrics", true, MetricsDataSourceType, + source, err := http.NewHTTPDataSource("https", "/metrics", true, 0, MetricsDataSourceType, "metrics-data-source", parseMetrics, PrometheusMetricType) assert.Nil(t, err, "failed to create HTTP datasource") @@ -50,3 +52,27 @@ func TestDatasource(t *testing.T) { _, err = source.Poll(ctx, endpoint) assert.NotNil(t, err, "expected to fail polling for metrics") } + +func TestMetricsDataSourceFactory_MetricsPortOverride(t *testing.T) { + params, err := json.Marshal(map[string]any{ + "scheme": "http", + "metricsPort": 9090, + }) + require.NoError(t, err) + + plugin, err := MetricsDataSourceFactory("test-ds", params, nil) + require.NoError(t, err) + + ds, ok := plugin.(fwkdl.PollingDataSource) + require.True(t, ok, "expected MetricsDataSourceFactory to return a PollingDataSource") + + // Poll will fail (no real server), but the error must reference port 9090. + // If metricsPort were ignored, EPP would dial :8000 instead. + endpoint := fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{ + NamespacedName: types.NamespacedName{Name: "pod1", Namespace: "default"}, + MetricsHost: "1.2.3.4:8000", + }, nil) + _, err = ds.Poll(context.Background(), endpoint) + assert.Error(t, err) + assert.Contains(t, err.Error(), "9090", "expected scrape target to use metricsPort 9090, not inference port 8000") +} diff --git a/test/integration/epp/runtime_polling_test.go b/test/integration/epp/runtime_polling_test.go index 0d87157790..11ee7262ca 100644 --- a/test/integration/epp/runtime_polling_test.go +++ b/test/integration/epp/runtime_polling_test.go @@ -86,7 +86,7 @@ func TestRuntimePollingDispatch(t *testing.T) { r := datalayer.NewRuntime(pollingInterval) ext := mocks.NewPollingExtractor("test-extractor") - httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source", + httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source", parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{})) require.NoError(t, err) @@ -138,7 +138,7 @@ func TestRuntimePollingMultipleExtractors(t *testing.T) { ext1 := mocks.NewPollingExtractor("extractor-1") ext2 := mocks.NewPollingExtractor("extractor-2") - httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source", + httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source", parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{})) require.NoError(t, err) @@ -187,7 +187,7 @@ func TestRuntimePollingEndpointLifecycle(t *testing.T) { r := datalayer.NewRuntime(pollingInterval) ext := mocks.NewPollingExtractor("lifecycle-extractor") - httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source", + httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source", parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{})) require.NoError(t, err) @@ -241,7 +241,7 @@ func TestRuntimePollingWithoutExtractors(t *testing.T) { r := datalayer.NewRuntime(50 * time.Millisecond) - httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source", + httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source", parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{})) require.NoError(t, err) @@ -280,7 +280,7 @@ func TestRuntimePollingHTTPError(t *testing.T) { r := datalayer.NewRuntime(pollingInterval) ext := mocks.NewPollingExtractor("error-extractor") - httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source", + httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source", parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{})) require.NoError(t, err)