Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions http/codegen/templates/server_handler_init.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ func {{ .HandlerInit }}(
errhandler(ctx, w, err)
}
} else {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
}
Expand All @@ -140,9 +138,7 @@ func {{ .HandlerInit }}(
return
}
if _, err := io.Copy(w, buf); err != nil {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
{{- else }}
Expand Down Expand Up @@ -255,9 +251,7 @@ func {{ .HandlerInit }}(
errhandler(ctx, w, err)
}
} else {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
}
Expand All @@ -284,9 +278,7 @@ func {{ .HandlerInit }}(
{{- end }}
{{- if .Method.SkipResponseBodyEncodeDecode }}
if _, err := io.Copy(w, buf); err != nil {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
{{- end }}
Expand Down
8 changes: 3 additions & 5 deletions http/codegen/templates/server_sse.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *{{ .SSE.StructName }}) {{ .SSE.SendWithContextName }}(ctx context.Conte
{{- else }}
res := v
{{- end }}

{{ if .SSE.IDField }}
if id := res.{{ .SSE.IDField }}; id != "" {
fmt.Fprintf(s.w, "id: %s\n", id)
Expand Down Expand Up @@ -66,10 +66,8 @@ func (s *{{ .SSE.StructName }}) {{ .SSE.SendWithContextName }}(ctx context.Conte
{{- end }}
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
return nil
http.NewResponseController(s.w).Flush()
return nil
}

{{ comment "Close is a no-op for SSE. We keep the method for compatibility with other stream types." }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler(
errhandler(ctx, w, err)
}
} else {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
}
Expand All @@ -64,9 +62,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler(
return
}
if _, err := io.Copy(w, buf); err != nil {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
})
Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-all-fields.golden
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ func (s *SSEAllFieldsMethodServerStream) SendWithContext(ctx context.Context, v
data = string(byts)
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-bool.golden
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ func (s *SSEBoolMethodServerStream) SendWithContext(ctx context.Context, v bool)
data = string(byts)
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-data-field.golden
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ func (s *SSEDataFieldMethodServerStream) SendWithContext(ctx context.Context, v
data = dataField
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-data-id-field.golden
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ func (s *SSEDataIDFieldMethodServerStream) SendWithContext(ctx context.Context,
data = dataField
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-int.golden
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func (s *SSEIntMethodServerStream) SendWithContext(ctx context.Context, v int) e
data = fmt.Sprintf("%d", res)
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-object.golden
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func (s *SSEObjectMethodServerStream) SendWithContext(ctx context.Context, v *ss
data = string(byts)
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-request-id.golden
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ func (s *SSERequestIDMethodServerStream) SendWithContext(ctx context.Context, v
data = res
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions http/codegen/testdata/golden/sse-string.golden
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ func (s *SSEStringMethodServerStream) SendWithContext(ctx context.Context, v str
data = res
fmt.Fprintf(s.w, "data: %s\n\n", data)

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
return nil
}

Expand Down
8 changes: 2 additions & 6 deletions http/codegen/testdata/handler_init_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler(
errhandler(ctx, w, err)
}
} else {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
}
Expand All @@ -318,9 +316,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler(
return
}
if _, err := io.Copy(w, buf); err != nil {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(w).Flush()
panic(http.ErrAbortHandler) // too late to write an error
}
})
Expand Down
7 changes: 2 additions & 5 deletions http/middleware/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,9 @@ func (w *ResponseCapture) Write(b []byte) (int, error) {
return n, err
}

// Flush implements the http.Flusher interface if the underlying response
// writer supports it.
// Flush call http.ResponseController Flush() method
func (w *ResponseCapture) Flush() {
if f, ok := w.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
_ = http.NewResponseController(w.ResponseWriter).Flush()
}

// Push implements the http.Pusher interface if the underlying response
Expand Down
30 changes: 14 additions & 16 deletions jsonrpc/codegen/templates/sse_server_stream.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type {{ .SSE.StructName }} struct {
encoder func(context.Context, http.ResponseWriter) goahttp.Encoder
// w is the HTTP response writer
w http.ResponseWriter
// r is the HTTP request
// r is the HTTP request
r *http.Request
// requestID is the JSON-RPC request ID for sending final response
requestID any
Expand Down Expand Up @@ -39,9 +39,7 @@ func (s *{{ lowerInitial .SSE.StructName }}EventWriter) Write(data []byte) (int,
func (s *{{ lowerInitial .SSE.StructName }}EventWriter) finish() {
if s.started {
s.w.Write([]byte("\n\n"))
if f, ok := s.w.(http.Flusher); ok {
f.Flush()
}
http.NewResponseController(s.w).Flush()
}
}

Expand All @@ -55,27 +53,27 @@ func (s *{{ .SSE.StructName }}) Send(ctx context.Context, event {{ .ServicePkgNa
return fmt.Errorf("stream closed")
}
s.mu.Unlock()

{{ comment "Type assert to the specific result type" }}
result, ok := event.({{ .SSE.EventTypeRef }})
if !ok {
return fmt.Errorf("unexpected event type: %T", event)
}

{{- if and .Result (index .Result.Responses 0).ServerBody (index (index .Result.Responses 0).ServerBody 0).Init }}
{{ comment "Convert to response body type for proper JSON encoding" }}
body := {{ (index (index .Result.Responses 0).ServerBody 0).Init.Name }}(result)
{{- else }}
body := result
{{- end }}

{{ comment "Send as notification (no ID)" }}
message := map[string]any{
"jsonrpc": "2.0",
"method": {{ printf "%q" .Method.Name }},
"params": body,
}

return s.sendSSEEvent("notification", message)
}

Expand All @@ -91,13 +89,13 @@ func (s *{{ .SSE.StructName }}) SendAndClose(ctx context.Context, event {{ .Serv
}
s.closed = true
s.mu.Unlock()

{{ comment "Type assert to the specific result type" }}
result, ok := event.({{ .SSE.EventTypeRef }})
if !ok {
return fmt.Errorf("unexpected event type: %T", event)
}

{{ comment "Determine the ID to use for the response" }}
var id any = s.requestID
{{- if .Result.IDAttribute }}
Expand All @@ -117,21 +115,21 @@ func (s *{{ .SSE.StructName }}) SendAndClose(ctx context.Context, event {{ .Serv
}
{{- end }}
{{- end }}

{{- if and .Result (index .Result.Responses 0).ServerBody (index (index .Result.Responses 0).ServerBody 0).Init }}
{{ comment "Convert to response body type for proper JSON encoding" }}
body := {{ (index (index .Result.Responses 0).ServerBody 0).Init.Name }}(result)
{{- else }}
body := result
{{- end }}

{{ comment "Send as response with ID" }}
message := map[string]any{
"jsonrpc": "2.0",
"id": id,
"result": body,
}

return s.sendSSEEvent("response", message)
}

Expand Down Expand Up @@ -189,12 +187,12 @@ func (s *{{ .SSE.StructName }}) sendSSEEvent(eventType string, v any) error {

// Create SSE event writer that wraps the response writer
ew := &{{ lowerInitial .SSE.StructName }}EventWriter{w: s.w, eventType: eventType}

// Create encoder with the event writer and encode the value
err := s.encoder(context.Background(), ew).Encode(v)

// Finish the SSE event (adds newlines and flushes)
ew.finish()

return err
}
Loading
Loading