Skip to content

feat: SSEMarshaler for browser-consumable streaming responses#92

Open
ankurs wants to merge 2 commits into
mainfrom
feat/sse-marshaler
Open

feat: SSEMarshaler for browser-consumable streaming responses#92
ankurs wants to merge 2 commits into
mainfrom
feat/sse-marshaler

Conversation

@ankurs
Copy link
Copy Markdown
Member

@ankurs ankurs commented May 17, 2026

Summary

  • Adds core.SSEMarshaler — a runtime.Marshaler that emits text/event-stream frames so server-streaming gateway RPCs are directly consumable by browser EventSource clients (AI/LLM token streams, progressive loading, etc.).
  • Auto-registers the SSE marshaler by default, matching the existing "batteries included with Disable* opt-out" pattern (DISABLE_HTTP_COMPRESSION, DISABLE_NEW_RELIC, DISABLE_ZSTD_COMPRESSION). Set DISABLE_SSE_MARSHALER=true to suppress.
  • Excludes text/event-stream from the gzhttp compression wrapper — compressed SSE is buffered by proxies/CDNs and breaks real-time delivery.

Out of the box, any server-streaming gateway RPC responds with SSE framing when the client sends Accept: text/event-stream, and with newline-delimited JSON otherwise.

Design notes

  • Marshaler embeds runtime.JSONPb so JSON payload formatting matches the gateway's default JSON responses (field names, enum encoding, etc.).
  • Implements runtime.Delimited (\n\n between frames) and runtime.StreamContentType (returns text/event-stream for streaming responses).
  • SSE is server-to-client only — Unmarshal and NewDecoder return a sentinel error rather than silently accepting bogus input.
  • The auto-registration is prepended to mux options (before registeredServeMuxOptions()), so a service can still override text/event-stream with a custom marshaler via RegisterHTTPMarshaler without setting the disable flag. Last-write-wins on the same MIME inside grpc-gateway.
  • Compression exclusion composes with gzhttp.DefaultContentTypeFilter rather than replacing it, preserving existing behavior for non-SSE responses.
  • Mux option assembly extracted into buildHTTPMuxOptions so the toggle is testable directly via runtime.MarshalerForRequest without standing up the full HTTP server.

Behavior change for existing services

On upgrade, existing services start serving text/event-stream for any server-streaming RPC when clients request it. Low risk (only triggers on content-type negotiation, no new endpoints exposed). Services that want the previous behavior set DISABLE_SSE_MARSHALER=true.

Test plan

  • make test — full core suite green (covers SSE marshal/encoder/delimiter/content-type, gateway interface assertions, encoder write-error propagation, compression exclusion with bare and charset=utf-8 SSE content types, and the new toggle test using runtime.MarshalerForRequest).
  • make lint — golangci-lint clean (pre-existing vulncheck flag for golang.org/x/net v0.52.0 is unrelated to this change).
  • make doc — both READMEs regenerated.
  • Manual e2e via cookiecutter once core is tagged.

SSEMarshaler implements runtime.Marshaler for text/event-stream so
server-streaming gateway RPCs can be consumed directly by browser
EventSource clients (e.g. AI/LLM token streams).

Services opt in from PreStart:

  core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})

The HTTP compression wrapper now excludes text/event-stream from
gzip/zstd via a wrapped DefaultContentTypeFilter — compressed SSE is
buffered by proxies/CDNs and defeats real-time delivery.

JSON payloads use the embedded runtime.JSONPb so field naming matches
the gateway's default responses. SSE is server-to-client only;
Unmarshal and NewDecoder return an error.
Copilot AI review requested due to automatic review settings May 17, 2026 15:13
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 17, 2026

📝 Walkthrough

Walkthrough

This PR introduces Server-Sent Events (SSE) support to the gRPC-Gateway core package by implementing SSEMarshaler, excluding SSE from HTTP compression, and documenting the new marshaler type with comprehensive test coverage.

Changes

Server-Sent Events Marshaler and Compression Integration

Layer / File(s) Summary
SSEMarshaler Type and Methods
marshaler_sse.go
Defines SSEMarshaler struct embedding runtime.JSONPb and implements marshaling interfaces: ContentType and StreamContentType return text/event-stream, Marshal prefixes JSON with data: without trailing newline, Delimiter() returns \n\n, Unmarshal and NewDecoder return errors since SSE is server-to-client only, and NewEncoder produces properly formatted SSE output.
SSEMarshaler Validation Tests
marshaler_sse_test.go
Comprehensive test suite validates content type reporting, SSE frame delimiter (\n\n), marshal output structure (prefix and no trailing newline), encoder wire format (multiple frames with separators), unmarshal/decoder error handling, interface conformance with runtime.Marshaler/runtime.Delimited/runtime.StreamContentType, and write error propagation.
HTTP Compression Exclusion for SSE
compression.go, compression_test.go
Configures HTTP compression wrapper to exclude text/event-stream responses: adds mime import, defines sseMediaType constant, integrates excludeSSEContentTypeFilter into newHTTPCompressionWrapper to parse Content-Type and bypass compression for SSE variants including charset parameters. Test verifies both plain and parameterized SSE media types remain uncompressed while other types compress normally.
API Reference Documentation
README.md
Updates package index and adds comprehensive API reference for SSEMarshaler describing type behavior, registration via RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{}), client opt-in via Accept: text/event-stream header, frame formatting rules, and explicit server-to-client semantics.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Possibly related PRs

  • go-coldbrew/core#88: Extended newHTTPCompressionWrapper and RegisterHTTPMarshaler plumbing that this PR builds upon to exclude text/event-stream from compression and wire SSEMarshaler registration.

Poem

🐰 A marshaler hops through the event stream so bright,
With data: prefixes and delimiters tight,
SSE frames dance in \n\n harmony,
While compression steps back—"Not for me!"
Streams flow server-ward, a whimsical sight! 🌊

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately summarizes the main change: introducing SSEMarshaler for Server-Sent Events streaming responses that can be consumed directly by browsers.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sse-marshaler

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new core.SSEMarshaler for browser-consumable text/event-stream responses from grpc-gateway server-streaming RPCs, and updates HTTP compression to avoid compressing SSE responses.

Changes:

  • Introduces SSEMarshaler with SSE framing, content type, delimiter, encoder, and read-rejection behavior.
  • Excludes text/event-stream responses from gzhttp compression.
  • Adds tests and generated README documentation for the new marshaler and compression behavior.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
README.md Documents the new SSEMarshaler public API and usage.
marshaler_sse.go Adds the SSE marshaler implementation.
marshaler_sse_test.go Adds unit tests for SSE marshaling, encoding, delimiter, content type, and interface conformance.
compression.go Adds SSE content-type exclusion to the HTTP compression wrapper.
compression_test.go Adds coverage for ensuring SSE responses are not compressed.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread marshaler_sse.go

// Delimiter returns "\n\n", which terminates one SSE frame.
func (*SSEMarshaler) Delimiter() []byte {
return sseDelimiter
Comment thread marshaler_sse.go
Comment on lines +67 to +69
out := make([]byte, 0, len(ssePrefix)+len(body))
out = append(out, ssePrefix...)
out = append(out, body...)
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@marshaler_sse.go`:
- Around line 62-70: The Marshal implementation of SSEMarshaler currently
prepends ssePrefix only once and then appends the raw JSON body from
JSONPb.Marshal, which breaks multiline SSE payloads; change Marshal (method
SSEMarshaler.Marshal) to transform the marshaled body so every line is prefixed
with ssePrefix (for example, prepend ssePrefix to the whole body and replace
every '\n' in body with '\n'+ssePrefix) before returning, ensuring multiline
JSON lines each start with ssePrefix.

In `@README.md`:
- Around line 526-528: The README's fenced code blocks around examples that
include core.RegisterHTTPMarshaler, &core.SSEMarshaler{}, and the
&core.SSEMarshaler{JSONPb: runtime.JSONPb{...}} snippet lack language
identifiers and trigger MD040; update each of those triple-backtick blocks to
include the Go language tag (```go) so the examples for
core.RegisterHTTPMarshaler and the SSEMarshaler/JSONPb snippet are fenced as Go
code.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 39d6f570-8046-438a-b8ca-c10d7247ad99

📥 Commits

Reviewing files that changed from the base of the PR and between 9c3fafe and f4069b6.

📒 Files selected for processing (5)
  • README.md
  • compression.go
  • compression_test.go
  • marshaler_sse.go
  • marshaler_sse_test.go

Comment thread marshaler_sse.go
Comment on lines +62 to +70
func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
body, err := s.JSONPb.Marshal(v)
if err != nil {
return nil, err
}
out := make([]byte, 0, len(ssePrefix)+len(body))
out = append(out, ssePrefix...)
out = append(out, body...)
return out, nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Prefix every SSE payload line, not just the first.

Marshal adds data: once, then appends raw JSON. If runtime.JSONPb is configured to emit multiline JSON, subsequent lines won’t start with data:, and EventSource will parse an incomplete/broken payload.

Proposed fix
 package core
 
 import (
+	"bytes"
 	"errors"
 	"io"
@@
 var (
 	ssePrefix              = []byte("data: ")
+	sseLinePrefix          = []byte("\ndata: ")
 	sseDelimiter           = []byte("\n\n")
 	errSSEReadNotSupported = errors.New("core: SSEMarshaler does not support reading; Server-Sent Events is a server-to-client format")
 )
@@
 func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
 	body, err := s.JSONPb.Marshal(v)
 	if err != nil {
 		return nil, err
 	}
-	out := make([]byte, 0, len(ssePrefix)+len(body))
+	body = bytes.ReplaceAll(body, []byte("\n"), sseLinePrefix)
+	out := make([]byte, 0, len(ssePrefix)+len(body))
 	out = append(out, ssePrefix...)
 	out = append(out, body...)
 	return out, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
body, err := s.JSONPb.Marshal(v)
if err != nil {
return nil, err
}
out := make([]byte, 0, len(ssePrefix)+len(body))
out = append(out, ssePrefix...)
out = append(out, body...)
return out, nil
func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
body, err := s.JSONPb.Marshal(v)
if err != nil {
return nil, err
}
body = bytes.ReplaceAll(body, []byte("\n"), sseLinePrefix)
out := make([]byte, 0, len(ssePrefix)+len(body))
out = append(out, ssePrefix...)
out = append(out, body...)
return out, nil
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@marshaler_sse.go` around lines 62 - 70, The Marshal implementation of
SSEMarshaler currently prepends ssePrefix only once and then appends the raw
JSON body from JSONPb.Marshal, which breaks multiline SSE payloads; change
Marshal (method SSEMarshaler.Marshal) to transform the marshaled body so every
line is prefixed with ssePrefix (for example, prepend ssePrefix to the whole
body and replace every '\n' in body with '\n'+ssePrefix) before returning,
ensuring multiline JSON lines each start with ssePrefix.

Comment thread README.md
Comment on lines +526 to +528
```
core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add language identifiers to fenced code blocks.

The two new fenced blocks are missing language tags (MD040) and can fail markdown lint.

Suggested fix
-```
+```go
 core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})

- +go
&core.SSEMarshaler{JSONPb: runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
}}

</details>


Also applies to: 536-540

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>

[warning] 526-526: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @README.md around lines 526 - 528, The README's fenced code blocks around
examples that include core.RegisterHTTPMarshaler, &core.SSEMarshaler{}, and the
&core.SSEMarshaler{JSONPb: runtime.JSONPb{...}} snippet lack language
identifiers and trigger MD040; update each of those triple-backtick blocks to
include the Go language tag (```go) so the examples for
core.RegisterHTTPMarshaler and the SSEMarshaler/JSONPb snippet are fenced as Go
code.


</details>

<!-- fingerprinting:phantom:triton:hawk -->

<!-- This is an auto-generated comment by CodeRabbit -->

Server-streaming gateway RPCs are now SSE-consumable out of the box for
clients sending Accept: text/event-stream. The marshaler is auto-registered
in initHTTP alongside the existing application/proto and JSON builtin
options, matching the established "batteries included with Disable* opt-out"
pattern (DISABLE_HTTP_COMPRESSION, DISABLE_NEW_RELIC, DISABLE_ZSTD_COMPRESSION).

Set DISABLE_SSE_MARSHALER=true to suppress the registration — useful for
services that want to register a custom SSE marshaler via
RegisterHTTPMarshaler. Service-registered marshalers still win on the same
MIME (last-write-wins inside grpc-gateway), so a custom variant can replace
the default without setting the disable flag.

Extracted the mux option assembly into buildHTTPMuxOptions so the toggle
behavior can be tested directly via runtime.MarshalerForRequest without
standing up the full HTTP server.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants