Skip to content

Commit 15a7ee9

Browse files
committed
feat: parse SSE streaming responses for response plugins
When the response body is not valid JSON (e.g., SSE/Server-Sent Events from streaming providers like Anthropic), parse the SSE data lines to extract usage and model information. This enables response plugins (usage-tracking, metering) to process streaming responses that were previously skipped with "Failed to parse response body as JSON". Also fixes two issues with streaming response handling: 1. Always respond to response headers so Envoy proceeds with body chunks (previously returned nil, causing per-message timeout) 2. Send an immediate ack for each non-EoS response body chunk so Envoy continues forwarding subsequent chunks instead of blocking Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
1 parent f9837c9 commit 15a7ee9

2 files changed

Lines changed: 68 additions & 4 deletions

File tree

pkg/handlers/response.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package handlers
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/json"
2223
"fmt"
@@ -44,9 +45,9 @@ func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestConte
4445

4546
if !headers.GetEndOfStream() {
4647
log.FromContext(ctx).V(logutil.VERBOSE).Info("captured response headers, deferring response until body arrives...")
47-
return nil
4848
}
49-
// EndOfStream means no body is expected, return HeadersResponse immediately
49+
// Always respond to response headers so Envoy proceeds with body chunks.
50+
// In STREAMED/FULL_DUPLEX_STREAMED mode, Envoy blocks until we respond.
5051
return []*eppb.ProcessingResponse{
5152
{
5253
Response: &eppb.ProcessingResponse_ResponseHeaders{
@@ -64,8 +65,15 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext,
6465
}
6566

6667
if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil {
67-
logger.Error(err, "Failed to parse response body as JSON, skipping response plugins")
68-
return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil
68+
// Try parsing as SSE (Server-Sent Events) — streaming responses from providers
69+
// like Anthropic use SSE format which isn't valid JSON.
70+
if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil {
71+
reqCtx.Response.Body = sseBody
72+
logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins")
73+
} else {
74+
logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins")
75+
return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil
76+
}
6977
}
7078

7179
if err := s.runResponsePlugins(ctx, reqCtx.CycleState, reqCtx.Response); err != nil {
@@ -130,6 +138,52 @@ func (s *Server) HandleResponseTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pr
130138
}, nil
131139
}
132140

141+
// parseSSEResponseBody extracts a composite response body from an SSE (Server-Sent Events)
142+
// stream. It scans all "data:" lines for JSON objects and merges usage/model fields into
143+
// a single map that response plugins can process. This enables usage-tracking and metering
144+
// plugins to work with streaming responses from providers like Anthropic and OpenAI.
145+
func parseSSEResponseBody(body []byte) (map[string]any, error) {
146+
result := map[string]any{}
147+
lines := bytes.Split(body, []byte("\n"))
148+
149+
for _, line := range lines {
150+
line = bytes.TrimSpace(line)
151+
if !bytes.HasPrefix(line, []byte("data:")) {
152+
continue
153+
}
154+
data := bytes.TrimSpace(line[5:])
155+
if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) {
156+
continue
157+
}
158+
159+
var event map[string]any
160+
if err := json.Unmarshal(data, &event); err != nil {
161+
continue
162+
}
163+
164+
if model, ok := event["model"].(string); ok && model != "" {
165+
result["model"] = model
166+
}
167+
168+
if usage, ok := event["usage"].(map[string]any); ok {
169+
existing, _ := result["usage"].(map[string]any)
170+
if existing == nil {
171+
existing = map[string]any{}
172+
}
173+
for k, v := range usage {
174+
existing[k] = v
175+
}
176+
result["usage"] = existing
177+
}
178+
}
179+
180+
if len(result) == 0 {
181+
return nil, fmt.Errorf("no parseable SSE data events found")
182+
}
183+
184+
return result, nil
185+
}
186+
133187
// runResponsePlugins executes response plugins in the order they were registered.
134188
func (s *Server) runResponsePlugins(ctx context.Context, cycleState *plugin.CycleState, response *requesthandling.InferenceResponse) error {
135189
logger := log.FromContext(ctx).V(logutil.DEFAULT)

pkg/handlers/server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
141141
loggerVerbose.Info("Incoming response body chunk", "EoS", v.ResponseBody.EndOfStream)
142142
responseBody = append(responseBody, v.ResponseBody.Body...)
143143
if !v.ResponseBody.EndOfStream {
144+
// Send an immediate response for this chunk so Envoy continues
145+
// streaming. Without this, Envoy blocks waiting for our response
146+
// and stops forwarding subsequent chunks.
147+
if sendErr := srv.Send(&extProcPb.ProcessingResponse{
148+
Response: &extProcPb.ProcessingResponse_ResponseBody{
149+
ResponseBody: &extProcPb.BodyResponse{},
150+
},
151+
}); sendErr != nil {
152+
return status.Errorf(codes.Unknown, "failed to send streaming response ack: %v", sendErr)
153+
}
144154
continue
145155
}
146156
responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody)

0 commit comments

Comments
 (0)