Skip to content

Commit 0e61eba

Browse files
authored
Add SSE Support (#3697)
* Initial work on adding SSE support Added DSL and expressions. * Add server side streaming This commit adds server side rendering of SSE responses. TBD: * Handling the incoming Last-Request-Id * Client side handling * Initial complete implementation of SSE * Fix dup check
1 parent 0580e64 commit 0e61eba

50 files changed

Lines changed: 2160 additions & 51 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ cmd/goa/goa
1616

1717
# DeepSource cruft
1818
cover.out
19-
bin/
19+
20+
# MacOS cruft
21+
**/.DS_Store

dsl/sse.go

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
package dsl
2+
3+
import (
4+
"goa.design/goa/v3/eval"
5+
"goa.design/goa/v3/expr"
6+
)
7+
8+
// ServerSentEvents specifies that a streaming endpoint should use the
9+
// Server-Sent Events protocol for streaming instead of WebSockets. It can be
10+
// used in four ways:
11+
//
12+
// 1. ServerSentEvents(): StreamingResult type is used directly as the event
13+
// "data" field (serialized into JSON if not a primitive type)
14+
// 2. ServerSentEvents("attributeName"): The specified attribute is used as the
15+
// event "data" field (serialized into JSON if not a primitive type)
16+
// 3. ServerSentEvents(func() { ... }): Custom mapping of attributes to event
17+
// fields
18+
// 4. ServerSentEvents("attributeName", func() { ... }): Define attribute name
19+
// used as the "data" field and custom mapping for others.
20+
//
21+
// ServerSentEvents can appear in an API HTTP expression (to specify SSE for all streaming
22+
// methods in the API), in a Service HTTP expression (to specify SSE for all streaming
23+
// methods in the service), or in a Method HTTP expression. When specified at the
24+
// API or service level, any method with a StreamingPayload will fall back to using WebSockets
25+
// as SSE only supports server-to-client streaming.
26+
//
27+
// See SSEEventData, SSEEventID, SSEEventType, SSEEventRetry for more details on
28+
// mapping result attributes to event fields. See SSERequestID for more details on
29+
// mapping payload attributes to the Last-Event-ID request header.
30+
//
31+
// Example:
32+
//
33+
// var Notification = Type("Notification", func() {
34+
// Attribute("message", String)
35+
// Attribute("timestamp", String)
36+
// Required("message", "timestamp")
37+
// })
38+
//
39+
// var _ = Service("events", func() {
40+
// HTTP(func() {
41+
// ServerSentEvents() // All streaming methods in this service use SSE by default
42+
// })
43+
//
44+
// // Simple method with just data field
45+
// Method("stream", func() {
46+
// StreamingResult(Notification)
47+
// HTTP(func() {
48+
// GET("/events") // Messages are sent as {"data": {"message": <message>, "timestamp": <timestamp>}}
49+
// })
50+
// })
51+
// })
52+
//
53+
// var _ = Service("other", func() {
54+
// // Method using WebSockets
55+
// Method("stream", func() {
56+
// StreamingResult(Notification)
57+
// HTTP(func() {
58+
// GET("/websocket")
59+
// })
60+
// })
61+
//
62+
// // Method using SSE
63+
// Method("stream", func() {
64+
// Payload(func() {
65+
// Attribute("id", String)
66+
// })
67+
// StreamingResult(Notification)
68+
// HTTP(func() {
69+
// ServerSentEvents(func() { // Use SSE for this method
70+
// SSERequestID("id") // Use payload "id" field to set "Last-Event-Id" request header
71+
// SSEEventID("timestamp") // Use result "timestamp" attribute for "id" event field
72+
// SSEEventData("message") // Use result "message" attribute for "data" event field
73+
// })
74+
// GET("/sse") // Messages are sent as {"id": <timestamp>, "data": <message>}
75+
// })
76+
// })
77+
// })
78+
func ServerSentEvents(args ...any) {
79+
if len(args) > 2 {
80+
eval.TooManyArgError()
81+
return
82+
}
83+
84+
var fn func()
85+
var dataField string
86+
if len(args) > 0 {
87+
switch actual := args[0].(type) {
88+
case func():
89+
fn = actual
90+
case string:
91+
dataField = actual
92+
case nil:
93+
// Use the entire result as data field
94+
default:
95+
eval.InvalidArgError("function or string", args[0])
96+
return
97+
}
98+
if len(args) == 2 {
99+
if fn != nil {
100+
eval.TooManyArgError()
101+
return
102+
}
103+
var ok bool
104+
fn, ok = args[1].(func())
105+
if !ok {
106+
eval.InvalidArgError("function", args[1])
107+
return
108+
}
109+
}
110+
}
111+
112+
sse := &expr.HTTPSSEExpr{
113+
DataField: dataField,
114+
}
115+
116+
switch actual := eval.Current().(type) {
117+
case *expr.HTTPExpr:
118+
actual.SSE = sse
119+
case *expr.HTTPServiceExpr:
120+
actual.SSE = sse
121+
case *expr.HTTPEndpointExpr:
122+
actual.SSE = sse
123+
default:
124+
eval.IncompatibleDSL()
125+
}
126+
127+
if fn != nil {
128+
eval.Execute(fn, sse)
129+
}
130+
}
131+
132+
// SSERequestID defines the attribute of the Payload type that provides the
133+
// Last-Event-ID request header value. The attribute must exist in the Payload
134+
// type and must be of type String.
135+
//
136+
// SSERequestID must appear in a `ServerSentEvents` expression.
137+
//
138+
// SSERequestID accepts a single argument: the name of the attribute of the
139+
// Payload type that provides the Last-Event-ID request header value.
140+
//
141+
// Example:
142+
//
143+
// Method("stream", func() {
144+
// Payload(func() {
145+
// Attribute("id", String)
146+
// })
147+
// StreamingResult(Notification)
148+
// HTTP(func() {
149+
// GET("/events")
150+
// ServerSentEvents(func() { // Use SSE for this method
151+
// SSERequestID("id") // Use payload "id" field to set "Last-Event-Id" request header
152+
// SSEEventID("timestamp") // Use result "timestamp" attribute for "id" event field
153+
// SSEEventData("message") // Use result "message" attribute for "data" event field
154+
// })
155+
// })
156+
// })
157+
func SSERequestID(name string) {
158+
if name == "" {
159+
eval.ReportError("request ID field name cannot be empty")
160+
return
161+
}
162+
sse, ok := eval.Current().(*expr.HTTPSSEExpr)
163+
if !ok {
164+
eval.IncompatibleDSL()
165+
return
166+
}
167+
sse.RequestIDField = name
168+
}
169+
170+
// SSEEventData defines the attribute of the StreamingResult type that provides the
171+
// data field for a Server-Sent Event. The attribute must exist in the
172+
// StreamingResult type.
173+
//
174+
// SSEEventData must appear in a `ServerSentEvents` expression.
175+
//
176+
// SSEEventData accepts a single argument: the name of the attribute of the
177+
// StreamingResult type that provides the data field for a Server-Sent Event.
178+
//
179+
// Example:
180+
//
181+
// Method("stream", func() {
182+
// StreamingResult(Payload)
183+
// HTTP(func() {
184+
// GET("/events")
185+
// ServerSentEvents(func() {
186+
// SSEEventData("message") // Use payload "message" attribute for SSE data field, other attributes are ignored
187+
// })
188+
// })
189+
// })
190+
func SSEEventData(name string) {
191+
if name == "" {
192+
eval.ReportError("data field name cannot be empty")
193+
return
194+
}
195+
sse, ok := eval.Current().(*expr.HTTPSSEExpr)
196+
if !ok {
197+
eval.IncompatibleDSL()
198+
return
199+
}
200+
sse.DataField = name
201+
}
202+
203+
// SSEEventID defines the attribute of the StreamingResult type that provides the
204+
// id field for a Server-Sent Event. The attribute must exist in the
205+
// StreamingResult type and must be of type String.
206+
//
207+
// SSEEventID must appear in a `ServerSentEvents` expression.
208+
//
209+
// SSEEventID accepts a single argument: the name of the attribute of the
210+
// StreamingResult type that provides the id field for a Server-Sent Event.
211+
//
212+
// Example:
213+
//
214+
// Method("stream", func() {
215+
// StreamingResult(Payload)
216+
// HTTP(func() {
217+
// GET("/events")
218+
// ServerSentEvents(func() {
219+
// SSEEventID("timestamp") // Use "timestamp" attribute for SSE id field
220+
// })
221+
// })
222+
// })
223+
func SSEEventID(name string) {
224+
if name == "" {
225+
eval.ReportError("id field name cannot be empty")
226+
return
227+
}
228+
sse, ok := eval.Current().(*expr.HTTPSSEExpr)
229+
if !ok {
230+
eval.IncompatibleDSL()
231+
return
232+
}
233+
sse.IDField = name
234+
}
235+
236+
// SSEEventType defines the attribute of the StreamingResult type that provides the
237+
// event field (event type) for a Server-Sent Event. The attribute must exist in the
238+
// StreamingResult type and must be of type String.
239+
//
240+
// SSEEventType must appear in a `ServerSentEvents` expression.
241+
//
242+
// SSEEventType accepts a single argument: the name of the attribute of the
243+
// StreamingResult type that provides the event field for a Server-Sent Event.
244+
//
245+
// Example:
246+
//
247+
// Method("stream", func() {
248+
// StreamingResult(Payload)
249+
// HTTP(func() {
250+
// GET("/events")
251+
// ServerSentEvents(func() {
252+
// SSEEventType("type") // Use payload "type" attribute for SSE event field
253+
// })
254+
// })
255+
// })
256+
func SSEEventType(name string) {
257+
if name == "" {
258+
eval.ReportError("event field name cannot be empty")
259+
return
260+
}
261+
sse, ok := eval.Current().(*expr.HTTPSSEExpr)
262+
if !ok {
263+
eval.IncompatibleDSL()
264+
return
265+
}
266+
sse.EventField = name
267+
}
268+
269+
// SSEEventRetry defines the attribute of the StreamingResult type that provides
270+
// the retry field for a Server-Sent Event. The attribute must exist in the
271+
// StreamingResult type and must be of type Int or UInt.
272+
//
273+
// SSEEventRetry must appear in a `ServerSentEvents` expression.
274+
//
275+
// SSEEventRetry accepts a single argument: the name of the attribute of the
276+
// StreamingResult type that provides the retry field for a Server-Sent Event.
277+
//
278+
// Example:
279+
//
280+
// Method("stream", func() {
281+
// StreamingResult(Notification)
282+
// HTTP(func() {
283+
// GET("/events")
284+
// ServerSentEvents(func() {
285+
// SSEEventRetry("retry") // Use "retry" attribute for SSE retry field
286+
// })
287+
// })
288+
// })
289+
func SSEEventRetry(name string) {
290+
if name == "" {
291+
eval.ReportError("retry field name cannot be empty")
292+
return
293+
}
294+
sse, ok := eval.Current().(*expr.HTTPSSEExpr)
295+
if !ok {
296+
eval.IncompatibleDSL()
297+
return
298+
}
299+
sse.RetryField = name
300+
}

0 commit comments

Comments
 (0)