Background
Server-Side-Events is a HTTP based server to client streaming protocol that is gaining more popularity thanks to its simplicity, HTTP compatibility and adoption by newer protocols like MCP.
Goa makes it possible to implement an HTTP endpoint which implements Server-Sent-Events with a Goa service today by defining a design that complies with the standard and bypassing the generation of decoder via SkipResponseBodyEncodeDecode:
Method("stream", func() {
Description("Stream events using Server-Sent Events")
HTTP(func() {
GET("/events/stream")
SkipResponseBodyEncodeDecode()
})
})
Dummy implementation:
type sseReader struct{}
// Implement Goa generated endpoint method
func (s *Service) Stream(ctx context.Context) (resp io.ReadCloser, err error) {
resp = &sseReader{}
return
}
// Go http stdlib implementation will call WriteTo on the ResponseWriter
func (r *sseReader) WriteTo(w io.Writer) (n int64, err error) {
rw := w.(http.ResponseWriter)
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
w.Write([]byte("data: foo"))
return 9, nil
}
// Dummy implementation to satisfy io.ReadCloser interface
// WriteTo will get used
func (r *sseReader) Read(_ []byte) (n int64, err error) {
return 0, nil
}
func (r *sseReader) Close() error {
return nil
}
While this works it requires intimate knowledge with the SSE protocol, the Go http stdlib package and Goa's SkipResponseBodyEncodeDecode behavior. This proposal adds native support for SSE to Goa, making it trivial to add SSE endpoints in Goa services.
DSL
This proposal introduces a new DSL function ServerSentEvents which tells Goa that a streaming endpoint should be implemented using SSE instead of the default WebSocket implementation:
var Event = Type("Event", func() {
Description("A notification message sent via SSE")
Attribute("message", String, "Message body")
Attribute("timestamp", Int, "Unix timestamp")
Required("message", "timestamp")
})
Method("stream", func() {
Description("Stream events using Server-Sent Events")
StreamingResult(Event) // Specifies a streaming endpint
HTTP(func() {
GET("/events/stream")
ServerSentEvents() // Use SSE instead of WebSocket
// Will stream events with the shape:
// data: {"message":<message>,"timestamp":<timestamp>}
})
})
ServerSentEvents also makes it possible to specify how to populate the SSE event fields explicitly:
ServerSentEvents("message") // Use the message field to populate the "data" field of SSE events
ServerSentEvents(func() {
SSEEventData("message") // Use result type "message" attribute to populate "data" SSE event field
SSEEventType("type") // Use result type "type" attribute to populate "event" SSE event field
SSEEventID("id") // Use result type "id" attribute to populate "id" SSE event field
SSEEventRetry("retry") // Use result type "retry" attribute to populate "retry" SSE event field
})
The above is equivalent to:
ServerSentEvents("message", func() {
SSEEventType("type") // Use result type "type" attribute to populate "event" SSE event field
SSEEventID("id") // Use result type "id" attribute to populate "id" SSE event field
SSEEventRetry("retry") // Use result type "retry" attribute to populate "retry" SSE event field
})
Finally the SSERequestID function makes it possible to set the "Last-Event-Id" request header:
ServerSentEvents(func() {
SSERequestID("startID") // Use payload "startID" attribute to populate "Last-Event-Id" request header
})
Generated Code
The generated endpoint method follows the same signature as the WebSocket case:
Stream(context.Context, *StreamPayload, StreamServerStream) (err error)
Where StreamPayload contains any request payload (including the attribute initialized with the value of the "Last-Event-Id" request header if present and defined in the design) and StreamServerStream is the same interface used for WebSockets:
type StreamServerStream interface {
// Send streams instances of "string".
Send(<result>) error
// SendWithContext streams instances of "string" with context.
SendWithContext(context.Context, <result>) error
// Close closes the stream.
Close() error
}
Where <result> is the type given to the StreamingResult DSL function.
The generated Goa service method can be implemented as follows:
func (s *svc) Stream(ctx context.Context, p *svc.StreamPayload, stream svc.StreamServerStream) error {
for {
select {
case <-ctx.Done():
break
case ev := <-svc.chan:
if err := stream.Send(ev); err != nil {
return err
}
}
}
return nil
}
The code above assumes that the service is equipped with a chan field which is a channel where events get published by the business logic.
Given the code above Goa:
- Writes default SSE headers, this can be overridden using a HTTP middleware, the generated code won't overwrite headers that already exist.
- Marshals the business logic event into a SSE event following the attribute names defined in the design.
The event data attributes gets marshaled using the configured HTTP encoder if not a primitive type.
- Streams the result to the response body
Background
Server-Side-Events is a HTTP based server to client streaming protocol that is gaining more popularity thanks to its simplicity, HTTP compatibility and adoption by newer protocols like MCP.
Goa makes it possible to implement an HTTP endpoint which implements Server-Sent-Events with a Goa service today by defining a design that complies with the standard and bypassing the generation of decoder via SkipResponseBodyEncodeDecode:
Dummy implementation:
While this works it requires intimate knowledge with the SSE protocol, the Go http stdlib package and Goa's
SkipResponseBodyEncodeDecodebehavior. This proposal adds native support for SSE to Goa, making it trivial to add SSE endpoints in Goa services.DSL
This proposal introduces a new DSL function
ServerSentEventswhich tells Goa that a streaming endpoint should be implemented using SSE instead of the default WebSocket implementation:ServerSentEventsalso makes it possible to specify how to populate the SSE event fields explicitly:The above is equivalent to:
Finally the
SSERequestIDfunction makes it possible to set the "Last-Event-Id" request header:Generated Code
The generated endpoint method follows the same signature as the WebSocket case:
Where
StreamPayloadcontains any request payload (including the attribute initialized with the value of the "Last-Event-Id" request header if present and defined in the design) andStreamServerStreamis the same interface used for WebSockets:Where
<result>is the type given to theStreamingResultDSL function.The generated Goa service method can be implemented as follows:
The code above assumes that the service is equipped with a
chanfield which is a channel where events get published by the business logic.Given the code above Goa:
The event data attributes gets marshaled using the configured HTTP encoder if not a primitive type.