feat: SSEMarshaler for browser-consumable streaming responses#92
feat: SSEMarshaler for browser-consumable streaming responses#92ankurs wants to merge 2 commits into
Conversation
SSEMarshaler implements runtime.Marshaler for text/event-stream so
server-streaming gateway RPCs can be consumed directly by browser
EventSource clients (e.g. AI/LLM token streams).
Services opt in from PreStart:
core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
The HTTP compression wrapper now excludes text/event-stream from
gzip/zstd via a wrapped DefaultContentTypeFilter — compressed SSE is
buffered by proxies/CDNs and defeats real-time delivery.
JSON payloads use the embedded runtime.JSONPb so field naming matches
the gateway's default responses. SSE is server-to-client only;
Unmarshal and NewDecoder return an error.
📝 WalkthroughWalkthroughThis PR introduces Server-Sent Events (SSE) support to the gRPC-Gateway core package by implementing ChangesServer-Sent Events Marshaler and Compression Integration
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds a new core.SSEMarshaler for browser-consumable text/event-stream responses from grpc-gateway server-streaming RPCs, and updates HTTP compression to avoid compressing SSE responses.
Changes:
- Introduces
SSEMarshalerwith SSE framing, content type, delimiter, encoder, and read-rejection behavior. - Excludes
text/event-streamresponses from gzhttp compression. - Adds tests and generated README documentation for the new marshaler and compression behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| README.md | Documents the new SSEMarshaler public API and usage. |
| marshaler_sse.go | Adds the SSE marshaler implementation. |
| marshaler_sse_test.go | Adds unit tests for SSE marshaling, encoding, delimiter, content type, and interface conformance. |
| compression.go | Adds SSE content-type exclusion to the HTTP compression wrapper. |
| compression_test.go | Adds coverage for ensuring SSE responses are not compressed. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| // Delimiter returns "\n\n", which terminates one SSE frame. | ||
| func (*SSEMarshaler) Delimiter() []byte { | ||
| return sseDelimiter |
| out := make([]byte, 0, len(ssePrefix)+len(body)) | ||
| out = append(out, ssePrefix...) | ||
| out = append(out, body...) |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@marshaler_sse.go`:
- Around line 62-70: The Marshal implementation of SSEMarshaler currently
prepends ssePrefix only once and then appends the raw JSON body from
JSONPb.Marshal, which breaks multiline SSE payloads; change Marshal (method
SSEMarshaler.Marshal) to transform the marshaled body so every line is prefixed
with ssePrefix (for example, prepend ssePrefix to the whole body and replace
every '\n' in body with '\n'+ssePrefix) before returning, ensuring multiline
JSON lines each start with ssePrefix.
In `@README.md`:
- Around line 526-528: The README's fenced code blocks around examples that
include core.RegisterHTTPMarshaler, &core.SSEMarshaler{}, and the
&core.SSEMarshaler{JSONPb: runtime.JSONPb{...}} snippet lack language
identifiers and trigger MD040; update each of those triple-backtick blocks to
include the Go language tag (```go) so the examples for
core.RegisterHTTPMarshaler and the SSEMarshaler/JSONPb snippet are fenced as Go
code.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 39d6f570-8046-438a-b8ca-c10d7247ad99
📒 Files selected for processing (5)
README.mdcompression.gocompression_test.gomarshaler_sse.gomarshaler_sse_test.go
| func (s *SSEMarshaler) Marshal(v any) ([]byte, error) { | ||
| body, err := s.JSONPb.Marshal(v) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| out := make([]byte, 0, len(ssePrefix)+len(body)) | ||
| out = append(out, ssePrefix...) | ||
| out = append(out, body...) | ||
| return out, nil |
There was a problem hiding this comment.
Prefix every SSE payload line, not just the first.
Marshal adds data: once, then appends raw JSON. If runtime.JSONPb is configured to emit multiline JSON, subsequent lines won’t start with data:, and EventSource will parse an incomplete/broken payload.
Proposed fix
package core
import (
+ "bytes"
"errors"
"io"
@@
var (
ssePrefix = []byte("data: ")
+ sseLinePrefix = []byte("\ndata: ")
sseDelimiter = []byte("\n\n")
errSSEReadNotSupported = errors.New("core: SSEMarshaler does not support reading; Server-Sent Events is a server-to-client format")
)
@@
func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
body, err := s.JSONPb.Marshal(v)
if err != nil {
return nil, err
}
- out := make([]byte, 0, len(ssePrefix)+len(body))
+ body = bytes.ReplaceAll(body, []byte("\n"), sseLinePrefix)
+ out := make([]byte, 0, len(ssePrefix)+len(body))
out = append(out, ssePrefix...)
out = append(out, body...)
return out, nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *SSEMarshaler) Marshal(v any) ([]byte, error) { | |
| body, err := s.JSONPb.Marshal(v) | |
| if err != nil { | |
| return nil, err | |
| } | |
| out := make([]byte, 0, len(ssePrefix)+len(body)) | |
| out = append(out, ssePrefix...) | |
| out = append(out, body...) | |
| return out, nil | |
| func (s *SSEMarshaler) Marshal(v any) ([]byte, error) { | |
| body, err := s.JSONPb.Marshal(v) | |
| if err != nil { | |
| return nil, err | |
| } | |
| body = bytes.ReplaceAll(body, []byte("\n"), sseLinePrefix) | |
| out := make([]byte, 0, len(ssePrefix)+len(body)) | |
| out = append(out, ssePrefix...) | |
| out = append(out, body...) | |
| return out, nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@marshaler_sse.go` around lines 62 - 70, The Marshal implementation of
SSEMarshaler currently prepends ssePrefix only once and then appends the raw
JSON body from JSONPb.Marshal, which breaks multiline SSE payloads; change
Marshal (method SSEMarshaler.Marshal) to transform the marshaled body so every
line is prefixed with ssePrefix (for example, prepend ssePrefix to the whole
body and replace every '\n' in body with '\n'+ssePrefix) before returning,
ensuring multiline JSON lines each start with ssePrefix.
| ``` | ||
| core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{}) | ||
| ``` |
There was a problem hiding this comment.
Add language identifiers to fenced code blocks.
The two new fenced blocks are missing language tags (MD040) and can fail markdown lint.
Suggested fix
-```
+```go
core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})- +go
&core.SSEMarshaler{JSONPb: runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
}}
</details>
Also applies to: 536-540
<details>
<summary>🧰 Tools</summary>
<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>
[warning] 526-526: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
</details>
</details>
<details>
<summary>🤖 Prompt for AI Agents</summary>
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @README.md around lines 526 - 528, The README's fenced code blocks around
examples that include core.RegisterHTTPMarshaler, &core.SSEMarshaler{}, and the
&core.SSEMarshaler{JSONPb: runtime.JSONPb{...}} snippet lack language
identifiers and trigger MD040; update each of those triple-backtick blocks to
include the Go language tag (```go) so the examples for
core.RegisterHTTPMarshaler and the SSEMarshaler/JSONPb snippet are fenced as Go
code.
</details>
<!-- fingerprinting:phantom:triton:hawk -->
<!-- This is an auto-generated comment by CodeRabbit -->
Server-streaming gateway RPCs are now SSE-consumable out of the box for clients sending Accept: text/event-stream. The marshaler is auto-registered in initHTTP alongside the existing application/proto and JSON builtin options, matching the established "batteries included with Disable* opt-out" pattern (DISABLE_HTTP_COMPRESSION, DISABLE_NEW_RELIC, DISABLE_ZSTD_COMPRESSION). Set DISABLE_SSE_MARSHALER=true to suppress the registration — useful for services that want to register a custom SSE marshaler via RegisterHTTPMarshaler. Service-registered marshalers still win on the same MIME (last-write-wins inside grpc-gateway), so a custom variant can replace the default without setting the disable flag. Extracted the mux option assembly into buildHTTPMuxOptions so the toggle behavior can be tested directly via runtime.MarshalerForRequest without standing up the full HTTP server.
Summary
core.SSEMarshaler— aruntime.Marshalerthat emitstext/event-streamframes so server-streaming gateway RPCs are directly consumable by browserEventSourceclients (AI/LLM token streams, progressive loading, etc.).Disable*opt-out" pattern (DISABLE_HTTP_COMPRESSION,DISABLE_NEW_RELIC,DISABLE_ZSTD_COMPRESSION). SetDISABLE_SSE_MARSHALER=trueto suppress.text/event-streamfrom the gzhttp compression wrapper — compressed SSE is buffered by proxies/CDNs and breaks real-time delivery.Out of the box, any server-streaming gateway RPC responds with SSE framing when the client sends
Accept: text/event-stream, and with newline-delimited JSON otherwise.Design notes
runtime.JSONPbso JSON payload formatting matches the gateway's default JSON responses (field names, enum encoding, etc.).runtime.Delimited(\n\nbetween frames) andruntime.StreamContentType(returnstext/event-streamfor streaming responses).UnmarshalandNewDecoderreturn a sentinel error rather than silently accepting bogus input.registeredServeMuxOptions()), so a service can still overridetext/event-streamwith a custom marshaler viaRegisterHTTPMarshalerwithout setting the disable flag. Last-write-wins on the same MIME inside grpc-gateway.gzhttp.DefaultContentTypeFilterrather than replacing it, preserving existing behavior for non-SSE responses.buildHTTPMuxOptionsso the toggle is testable directly viaruntime.MarshalerForRequestwithout standing up the full HTTP server.Behavior change for existing services
On upgrade, existing services start serving
text/event-streamfor any server-streaming RPC when clients request it. Low risk (only triggers on content-type negotiation, no new endpoints exposed). Services that want the previous behavior setDISABLE_SSE_MARSHALER=true.Test plan
make test— full core suite green (covers SSE marshal/encoder/delimiter/content-type, gateway interface assertions, encoder write-error propagation, compression exclusion with bare andcharset=utf-8SSE content types, and the new toggle test usingruntime.MarshalerForRequest).make lint— golangci-lint clean (pre-existing vulncheck flag forgolang.org/x/netv0.52.0 is unrelated to this change).make doc— both READMEs regenerated.coreis tagged.