Skip to content

feat: parse SSE streaming responses for response plugins#138

Closed
noyitz wants to merge 2 commits into
llm-d:mainfrom
noyitz:fix/sse-response-parsing
Closed

feat: parse SSE streaming responses for response plugins#138
noyitz wants to merge 2 commits into
llm-d:mainfrom
noyitz:fix/sse-response-parsing

Conversation

@noyitz

@noyitz noyitz commented May 28, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Parse SSE (Server-Sent Events) streaming responses for response plugins when JSON parsing fails
  • Fix streaming response header handling: always respond so Envoy proceeds with body chunks
  • Fix streaming response body chunk handling: send immediate ack for non-EoS chunks

Problem

When providers like Anthropic return streaming responses (SSE format with text/event-stream), the response body starts with event: not {. The current code fails JSON parsing and skips all response plugins:

Failed to parse response body as JSON, skipping response plugins

Usage-tracking, metering, and any other response plugins never execute for streaming requests. Additionally, HandleResponseHeaders returns nil for streaming, causing Envoy per-message_timeout_exceeded.

Solution

pkg/handlers/response.go:

  1. When JSON parsing fails, try SSE parsing — scan data: lines for JSON objects, extract usage and model fields
  2. Always respond to response headers so Envoy sends body chunks

pkg/handlers/server.go:
3. Send immediate BodyResponse{} ack for non-EoS response body chunks so Envoy continues streaming

Client sees streamed output in real-time. Chunks accumulated in-memory (bounded by max_tokens) and parsed at EoS.

Test plan

  • Anthropic streaming API (claude-opus-4-6, stream:true) — SSE usage extracted
  • Response plugins execute for streaming responses
  • Non-streaming responses unchanged
  • Usage data recorded in PostgreSQL via metering service

Generated with Claude Code

@github-actions github-actions Bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label May 28, 2026
@noyitz noyitz force-pushed the fix/sse-response-parsing branch from 9e42655 to 15a7ee9 Compare May 28, 2026 20:20
When the response body is not valid JSON (e.g., SSE/Server-Sent Events
from streaming providers like Anthropic), parse the SSE data lines to
extract usage and model information. This enables response plugins
(usage-tracking, metering) to process streaming responses that were
previously skipped with "Failed to parse response body as JSON".

Also fixes two issues with streaming response handling:
1. Always respond to response headers so Envoy proceeds with body
   chunks (previously returned nil, causing per-message timeout)
2. Send an immediate ack for each non-EoS response body chunk so
   Envoy continues forwarding subsequent chunks instead of blocking

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
Comment thread pkg/handlers/response.go Outdated
Comment on lines 67 to 77
if err := json.Unmarshal(responseBodyBytes, &reqCtx.Response.Body); err != nil {
logger.Error(err, "Failed to parse response body as JSON, skipping response plugins")
return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil
// Try parsing as SSE (Server-Sent Events) — streaming responses from providers
// like Anthropic use SSE format which isn't valid JSON.
if sseBody, sseErr := parseSSEResponseBody(responseBodyBytes); sseErr == nil && sseBody != nil {
reqCtx.Response.Body = sseBody
logger.V(logutil.VERBOSE).Info("parsed SSE response body for response plugins")
} else {
logger.Error(err, "Failed to parse response body as JSON or SSE, skipping response plugins")
return s.generateEmptyResponseBodyResponse(responseBodyBytes), nil
}
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why try to parse a JSON and fail? Why not look at the content-type header and parse accordingly?

Address review feedback from @shmuelk: instead of trying JSON parse
and falling back to SSE on failure, check the Content-Type response
header upfront to select the correct parser.

- text/event-stream → SSE parser (parseSSEResponseBody)
- anything else → JSON parser (json.Unmarshal)

Also fix streaming tests:
- JSON body tests now use content-type: application/json (not text/event-stream)
- Tests receive response header ack before sending body chunks
- Tests receive chunk acks for non-final streaming chunks
@github-actions github-actions Bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Jun 15, 2026
@noyitz

noyitz commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

Closing — with the body mode framework (PR #169) and ChunkProcessor (PR #170), we always use FULL_DUPLEX_STREAMED mode. SSE responses are handled by chunk processors in the plugin layer, not by the framework's HandleResponseBody. The buffered SSE parsing path is no longer needed.

@noyitz noyitz closed this Jun 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants