From c98d742aba814cc31b09ee15d06679b05c4f965d Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 6 Aug 2025 12:42:05 -0700 Subject: [PATCH 1/4] first draft --- pkg/epp/handlers/request.go | 10 +++++ pkg/epp/handlers/server.go | 83 +++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index 5789b3cc95..14bff31422 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -106,6 +106,16 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He }, }, } + + // Add worker ID header if obtained from FrontEnd service + if reqCtx.WorkerInstanceID != "" { + headers = append(headers, &configPb.HeaderValueOption{ + Header: &configPb.HeaderValue{ + Key: "x-gateway-worker-id", + RawValue: []byte(reqCtx.WorkerInstanceID), + }, + }) + } if reqCtx.RequestSize > 0 { // We need to update the content length header if the body is mutated, see Envoy doc: // https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/http/ext_proc/v3/processing_mode.proto diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 30596606ec..9f57d8b993 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -19,7 +19,9 @@ package handlers import ( "context" "encoding/json" + "fmt" "io" + "net/http" "strings" "time" @@ -50,9 +52,18 @@ func NewStreamingServer(destinationEndpointHintMetadataNamespace, destinationEnd destinationEndpointHintKey: destinationEndpointHintKey, director: director, datastore: datastore, + frontEndAddress: "localhost", // Default FrontEnd address (same sidecar) + frontEndPort: "8081", // Default FrontEnd port + httpClient: &http.Client{Timeout: 30 * time.Second}, } } +// SetFrontEndConfig allows configuration of the FrontEnd service endpoint +func (s *StreamingServer) SetFrontEndConfig(address, port string) { + s.frontEndAddress = address + s.frontEndPort = port +} + type Director interface { HandleRequest(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) HandleResponse(ctx context.Context, reqCtx *RequestContext) (*RequestContext, error) @@ -74,6 +85,11 @@ type StreamingServer struct { destinationEndpointHintMetadataNamespace string datastore Datastore director Director + + // FrontEnd service configuration for worker ID requests + frontEndAddress string + frontEndPort string + httpClient *http.Client } // RequestContext stores context information during the life time of an HTTP request. @@ -109,6 +125,8 @@ type RequestContext struct { respHeaderResp *extProcPb.ProcessingResponse respBodyResp []*extProcPb.ProcessingResponse respTrailerResp *extProcPb.ProcessingResponse + + WorkerInstanceID string // Worker ID from FrontEnd service } type Request struct { @@ -225,6 +243,12 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) break } + // Make blocking HTTP request to FrontEnd service to get worker_instance_id + if err = s.fetchWorkerIDFromFrontEnd(ctx, reqCtx); err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to fetch worker ID from FrontEnd service") + // TODO (atchernych) if FrontEnd call fails ? + } + // Populate the ExtProc protocol responses for the request body. requestBodyBytes, err := json.Marshal(reqCtx.Request.Body) if err != nil { @@ -523,3 +547,62 @@ func buildCommonResponses(bodyBytes []byte, byteLimit int, setEos bool) []*extPr return responses } + +// fetchWorkerIDFromFrontEnd makes a blocking HTTP request to the FrontEnd service +// to obtain the worker_instance_id for the current request +func (s *StreamingServer) fetchWorkerIDFromFrontEnd(ctx context.Context, reqCtx *RequestContext) error { + logger := log.FromContext(ctx) + + // Build FrontEnd service URL + frontEndURL := fmt.Sprintf("http://%s:%s/worker-assignment", s.frontEndAddress, s.frontEndPort) + + // Create empty request (as specified) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, frontEndURL, nil) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to create FrontEnd request") + return fmt.Errorf("failed to create FrontEnd request: %w", err) + } + + // Set appropriate headers + req.Header.Set("Content-Type", "application/json") + + // Make the blocking HTTP request + logger.V(logutil.VERBOSE).Info("Making blocking HTTP request to FrontEnd", "url", frontEndURL) + resp, err := s.httpClient.Do(req) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to call FrontEnd service") + return fmt.Errorf("failed to call FrontEnd service: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + logger.V(logutil.DEFAULT).Error(nil, "FrontEnd service returned non-200 status", "status", resp.StatusCode) + return fmt.Errorf("FrontEnd service returned status %d", resp.StatusCode) + } + + // Read the response body + body, err := io.ReadAll(resp.Body) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to read FrontEnd response body") + return fmt.Errorf("failed to read FrontEnd response body: %w", err) + } + + // Parse JSON response to extract worker_instance_id + var responseData map[string]interface{} + if err := json.Unmarshal(body, &responseData); err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to parse FrontEnd response JSON") + return fmt.Errorf("failed to parse FrontEnd response JSON: %w", err) + } + + // Extract worker_instance_id from response + if workerID, exists := responseData["worker_instance_id"]; exists { + if workerIDStr, ok := workerID.(string); ok && workerIDStr != "" { + reqCtx.WorkerInstanceID = workerIDStr + logger.V(logutil.VERBOSE).Info("Successfully obtained worker ID from FrontEnd", "worker_instance_id", workerIDStr) + return nil + } + } + + logger.V(logutil.DEFAULT).Info("FrontEnd response does not contain valid worker_instance_id", "response_body", string(body)) + return fmt.Errorf("FrontEnd response does not contain valid worker_instance_id") +} From 06a737dce789b3975cf9b7633820f093e9f450cd Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 6 Aug 2025 12:54:31 -0700 Subject: [PATCH 2/4] add nvext annotation --- pkg/epp/handlers/server.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 9f57d8b993..20027c7aba 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -17,6 +17,7 @@ limitations under the License. package handlers import ( + "bytes" "context" "encoding/json" "fmt" @@ -556,8 +557,20 @@ func (s *StreamingServer) fetchWorkerIDFromFrontEnd(ctx context.Context, reqCtx // Build FrontEnd service URL frontEndURL := fmt.Sprintf("http://%s:%s/worker-assignment", s.frontEndAddress, s.frontEndPort) - // Create empty request (as specified) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, frontEndURL, nil) + // Create request body with nvext annotations + requestBody := map[string]interface{}{ + "nvext": map[string]interface{}{ + "annotations": []string{"query_instance_id"}, + }, + } + + requestJSON, err := json.Marshal(requestBody) + if err != nil { + logger.V(logutil.DEFAULT).Error(err, "Failed to marshal request body") + return fmt.Errorf("failed to marshal request body: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, frontEndURL, bytes.NewBuffer(requestJSON)) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Failed to create FrontEnd request") return fmt.Errorf("failed to create FrontEnd request: %w", err) @@ -567,7 +580,9 @@ func (s *StreamingServer) fetchWorkerIDFromFrontEnd(ctx context.Context, reqCtx req.Header.Set("Content-Type", "application/json") // Make the blocking HTTP request - logger.V(logutil.VERBOSE).Info("Making blocking HTTP request to FrontEnd", "url", frontEndURL) + logger.V(logutil.VERBOSE).Info("Making blocking HTTP request to FrontEnd", + "url", frontEndURL, + "body", string(requestJSON)) resp, err := s.httpClient.Do(req) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Failed to call FrontEnd service") From 5103ee3e9d76395d2446c1528a7b5ad2e678435f Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 6 Aug 2025 13:04:42 -0700 Subject: [PATCH 3/4] add reroute header --- pkg/epp/handlers/request.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index 14bff31422..af6a90381c 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -116,6 +116,14 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He }, }) } + + // Add host rewrite header + headers = append(headers, &configPb.HeaderValueOption{ + Header: &configPb.HeaderValue{ + Key: "host_rewrite_header", + RawValue: []byte("x-gateway-destination-endpoint"), + }, + }) if reqCtx.RequestSize > 0 { // We need to update the content length header if the body is mutated, see Envoy doc: // https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/http/ext_proc/v3/processing_mode.proto From 63e9d6a9503effb584de9e926724a0d7c22520c3 Mon Sep 17 00:00:00 2001 From: Anna Tchernych Date: Wed, 6 Aug 2025 16:21:51 -0700 Subject: [PATCH 4/4] fix the FrontEnd URL --- pkg/epp/handlers/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 20027c7aba..684629a24a 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -555,7 +555,7 @@ func (s *StreamingServer) fetchWorkerIDFromFrontEnd(ctx context.Context, reqCtx logger := log.FromContext(ctx) // Build FrontEnd service URL - frontEndURL := fmt.Sprintf("http://%s:%s/worker-assignment", s.frontEndAddress, s.frontEndPort) + frontEndURL := fmt.Sprintf("http://localhost:%s/v1/chat/completions", s.frontEndPort) // Create request body with nvext annotations requestBody := map[string]interface{}{