From d91ef4ce0c6ad42b0f17814718c0e355d810c3df Mon Sep 17 00:00:00 2001 From: John Zhang Date: Thu, 14 Aug 2025 12:37:25 +1000 Subject: [PATCH] replace Flusher interface with http.ResponseController Flush method Signed-off-by: John Zhang --- .../templates/server_handler_init.go.tpl | 16 +++------- http/codegen/templates/server_sse.go.tpl | 8 ++--- ...skip response body encode decode.go.golden | 8 ++--- .../testdata/golden/sse-all-fields.golden | 4 +-- http/codegen/testdata/golden/sse-bool.golden | 4 +-- .../testdata/golden/sse-data-field.golden | 4 +-- .../testdata/golden/sse-data-id-field.golden | 4 +-- http/codegen/testdata/golden/sse-int.golden | 4 +-- .../codegen/testdata/golden/sse-object.golden | 4 +-- .../testdata/golden/sse-request-id.golden | 4 +-- .../codegen/testdata/golden/sse-string.golden | 4 +-- .../testdata/handler_init_functions.go | 8 ++--- http/middleware/capture.go | 7 ++--- .../templates/sse_server_stream.go.tpl | 30 +++++++++---------- .../templates/sse_server_stream_impl.go.tpl | 24 +++++++-------- .../testdata/golden/jsonrpc-sse-object.golden | 4 +-- .../testdata/golden/jsonrpc-sse-string.golden | 4 +-- 17 files changed, 48 insertions(+), 93 deletions(-) diff --git a/http/codegen/templates/server_handler_init.go.tpl b/http/codegen/templates/server_handler_init.go.tpl index 33fea8fbaa..1dc01273a6 100644 --- a/http/codegen/templates/server_handler_init.go.tpl +++ b/http/codegen/templates/server_handler_init.go.tpl @@ -117,9 +117,7 @@ func {{ .HandlerInit }}( errhandler(ctx, w, err) } } else { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } } @@ -140,9 +138,7 @@ func {{ .HandlerInit }}( return } if _, err := io.Copy(w, buf); err != nil { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } {{- else }} @@ -255,9 +251,7 @@ func {{ .HandlerInit }}( errhandler(ctx, w, err) } } else { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } } @@ -284,9 +278,7 @@ func {{ .HandlerInit }}( {{- end }} {{- if .Method.SkipResponseBodyEncodeDecode }} if _, err := io.Copy(w, buf); err != nil { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } {{- end }} diff --git a/http/codegen/templates/server_sse.go.tpl b/http/codegen/templates/server_sse.go.tpl index a8ceffa5fc..9c3b008eb8 100644 --- a/http/codegen/templates/server_sse.go.tpl +++ b/http/codegen/templates/server_sse.go.tpl @@ -38,7 +38,7 @@ func (s *{{ .SSE.StructName }}) {{ .SSE.SendWithContextName }}(ctx context.Conte {{- else }} res := v {{- end }} - + {{ if .SSE.IDField }} if id := res.{{ .SSE.IDField }}; id != "" { fmt.Fprintf(s.w, "id: %s\n", id) @@ -66,10 +66,8 @@ func (s *{{ .SSE.StructName }}) {{ .SSE.SendWithContextName }}(ctx context.Conte {{- end }} fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } - return nil + http.NewResponseController(s.w).Flush() + return nil } {{ comment "Close is a no-op for SSE. We keep the method for compatibility with other stream types." }} diff --git a/http/codegen/testdata/golden/handler_skip response body encode decode.go.golden b/http/codegen/testdata/golden/handler_skip response body encode decode.go.golden index 96ad3de729..aea06dfb77 100644 --- a/http/codegen/testdata/golden/handler_skip response body encode decode.go.golden +++ b/http/codegen/testdata/golden/handler_skip response body encode decode.go.golden @@ -41,9 +41,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler( errhandler(ctx, w, err) } } else { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } } @@ -64,9 +62,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler( return } if _, err := io.Copy(w, buf); err != nil { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } }) diff --git a/http/codegen/testdata/golden/sse-all-fields.golden b/http/codegen/testdata/golden/sse-all-fields.golden index c73fea05c4..80a5e0b26b 100644 --- a/http/codegen/testdata/golden/sse-all-fields.golden +++ b/http/codegen/testdata/golden/sse-all-fields.golden @@ -55,9 +55,7 @@ func (s *SSEAllFieldsMethodServerStream) SendWithContext(ctx context.Context, v data = string(byts) fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-bool.golden b/http/codegen/testdata/golden/sse-bool.golden index b58d798caf..a68ac4da4f 100644 --- a/http/codegen/testdata/golden/sse-bool.golden +++ b/http/codegen/testdata/golden/sse-bool.golden @@ -41,9 +41,7 @@ func (s *SSEBoolMethodServerStream) SendWithContext(ctx context.Context, v bool) data = string(byts) fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-data-field.golden b/http/codegen/testdata/golden/sse-data-field.golden index 037abf457c..4b2f1d20e7 100644 --- a/http/codegen/testdata/golden/sse-data-field.golden +++ b/http/codegen/testdata/golden/sse-data-field.golden @@ -41,9 +41,7 @@ func (s *SSEDataFieldMethodServerStream) SendWithContext(ctx context.Context, v data = dataField fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-data-id-field.golden b/http/codegen/testdata/golden/sse-data-id-field.golden index b4a44383bd..49848b1758 100644 --- a/http/codegen/testdata/golden/sse-data-id-field.golden +++ b/http/codegen/testdata/golden/sse-data-id-field.golden @@ -45,9 +45,7 @@ func (s *SSEDataIDFieldMethodServerStream) SendWithContext(ctx context.Context, data = dataField fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-int.golden b/http/codegen/testdata/golden/sse-int.golden index d789c17921..0c24fd8d9a 100644 --- a/http/codegen/testdata/golden/sse-int.golden +++ b/http/codegen/testdata/golden/sse-int.golden @@ -37,9 +37,7 @@ func (s *SSEIntMethodServerStream) SendWithContext(ctx context.Context, v int) e data = fmt.Sprintf("%d", res) fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-object.golden b/http/codegen/testdata/golden/sse-object.golden index a5f03b2cbd..77cb4a576b 100644 --- a/http/codegen/testdata/golden/sse-object.golden +++ b/http/codegen/testdata/golden/sse-object.golden @@ -43,9 +43,7 @@ func (s *SSEObjectMethodServerStream) SendWithContext(ctx context.Context, v *ss data = string(byts) fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-request-id.golden b/http/codegen/testdata/golden/sse-request-id.golden index 96890d35ce..42c40099b6 100644 --- a/http/codegen/testdata/golden/sse-request-id.golden +++ b/http/codegen/testdata/golden/sse-request-id.golden @@ -38,9 +38,7 @@ func (s *SSERequestIDMethodServerStream) SendWithContext(ctx context.Context, v data = res fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/golden/sse-string.golden b/http/codegen/testdata/golden/sse-string.golden index e013da59c4..e57e893482 100644 --- a/http/codegen/testdata/golden/sse-string.golden +++ b/http/codegen/testdata/golden/sse-string.golden @@ -38,9 +38,7 @@ func (s *SSEStringMethodServerStream) SendWithContext(ctx context.Context, v str data = res fmt.Fprintf(s.w, "data: %s\n\n", data) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() return nil } diff --git a/http/codegen/testdata/handler_init_functions.go b/http/codegen/testdata/handler_init_functions.go index 05dd88a188..67928c6913 100644 --- a/http/codegen/testdata/handler_init_functions.go +++ b/http/codegen/testdata/handler_init_functions.go @@ -295,9 +295,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler( errhandler(ctx, w, err) } } else { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } } @@ -318,9 +316,7 @@ func NewMethodSkipResponseBodyEncodeDecodeHandler( return } if _, err := io.Copy(w, buf); err != nil { - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(w).Flush() panic(http.ErrAbortHandler) // too late to write an error } }) diff --git a/http/middleware/capture.go b/http/middleware/capture.go index 018f5b3324..7370096220 100644 --- a/http/middleware/capture.go +++ b/http/middleware/capture.go @@ -41,12 +41,9 @@ func (w *ResponseCapture) Write(b []byte) (int, error) { return n, err } -// Flush implements the http.Flusher interface if the underlying response -// writer supports it. +// Flush call http.ResponseController Flush() method func (w *ResponseCapture) Flush() { - if f, ok := w.ResponseWriter.(http.Flusher); ok { - f.Flush() - } + _ = http.NewResponseController(w.ResponseWriter).Flush() } // Push implements the http.Pusher interface if the underlying response diff --git a/jsonrpc/codegen/templates/sse_server_stream.go.tpl b/jsonrpc/codegen/templates/sse_server_stream.go.tpl index 18f310697f..085654ab15 100644 --- a/jsonrpc/codegen/templates/sse_server_stream.go.tpl +++ b/jsonrpc/codegen/templates/sse_server_stream.go.tpl @@ -6,7 +6,7 @@ type {{ .SSE.StructName }} struct { encoder func(context.Context, http.ResponseWriter) goahttp.Encoder // w is the HTTP response writer w http.ResponseWriter - // r is the HTTP request + // r is the HTTP request r *http.Request // requestID is the JSON-RPC request ID for sending final response requestID any @@ -39,9 +39,7 @@ func (s *{{ lowerInitial .SSE.StructName }}EventWriter) Write(data []byte) (int, func (s *{{ lowerInitial .SSE.StructName }}EventWriter) finish() { if s.started { s.w.Write([]byte("\n\n")) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() } } @@ -55,27 +53,27 @@ func (s *{{ .SSE.StructName }}) Send(ctx context.Context, event {{ .ServicePkgNa return fmt.Errorf("stream closed") } s.mu.Unlock() - + {{ comment "Type assert to the specific result type" }} result, ok := event.({{ .SSE.EventTypeRef }}) if !ok { return fmt.Errorf("unexpected event type: %T", event) } - + {{- if and .Result (index .Result.Responses 0).ServerBody (index (index .Result.Responses 0).ServerBody 0).Init }} {{ comment "Convert to response body type for proper JSON encoding" }} body := {{ (index (index .Result.Responses 0).ServerBody 0).Init.Name }}(result) {{- else }} body := result {{- end }} - + {{ comment "Send as notification (no ID)" }} message := map[string]any{ "jsonrpc": "2.0", "method": {{ printf "%q" .Method.Name }}, "params": body, } - + return s.sendSSEEvent("notification", message) } @@ -91,13 +89,13 @@ func (s *{{ .SSE.StructName }}) SendAndClose(ctx context.Context, event {{ .Serv } s.closed = true s.mu.Unlock() - + {{ comment "Type assert to the specific result type" }} result, ok := event.({{ .SSE.EventTypeRef }}) if !ok { return fmt.Errorf("unexpected event type: %T", event) } - + {{ comment "Determine the ID to use for the response" }} var id any = s.requestID {{- if .Result.IDAttribute }} @@ -117,21 +115,21 @@ func (s *{{ .SSE.StructName }}) SendAndClose(ctx context.Context, event {{ .Serv } {{- end }} {{- end }} - + {{- if and .Result (index .Result.Responses 0).ServerBody (index (index .Result.Responses 0).ServerBody 0).Init }} {{ comment "Convert to response body type for proper JSON encoding" }} body := {{ (index (index .Result.Responses 0).ServerBody 0).Init.Name }}(result) {{- else }} body := result {{- end }} - + {{ comment "Send as response with ID" }} message := map[string]any{ "jsonrpc": "2.0", "id": id, "result": body, } - + return s.sendSSEEvent("response", message) } @@ -189,12 +187,12 @@ func (s *{{ .SSE.StructName }}) sendSSEEvent(eventType string, v any) error { // Create SSE event writer that wraps the response writer ew := &{{ lowerInitial .SSE.StructName }}EventWriter{w: s.w, eventType: eventType} - + // Create encoder with the event writer and encode the value err := s.encoder(context.Background(), ew).Encode(v) - + // Finish the SSE event (adds newlines and flushes) ew.finish() - + return err } \ No newline at end of file diff --git a/jsonrpc/codegen/templates/sse_server_stream_impl.go.tpl b/jsonrpc/codegen/templates/sse_server_stream_impl.go.tpl index d3777e4f6c..fcb54c6711 100644 --- a/jsonrpc/codegen/templates/sse_server_stream_impl.go.tpl +++ b/jsonrpc/codegen/templates/sse_server_stream_impl.go.tpl @@ -35,9 +35,7 @@ func (s *sseEventWriter) Write(data []byte) (int, error) { func (s *sseEventWriter) finish() { if s.started { s.w.Write([]byte("\n\n")) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() } } @@ -56,16 +54,16 @@ func (s *{{ lowerInitial .Service.StructName }}SSEStream) initSSEHeaders() { // sendSSEEvent sends a single SSE event by creating an encoder that writes to the event writer func (s *{{ lowerInitial .Service.StructName }}SSEStream) sendSSEEvent(eventType string, v any) error { s.initSSEHeaders() - + // Create SSE event writer that wraps the response writer ew := &sseEventWriter{w: s.w, eventType: eventType} - + // Create encoder with the event writer and encode the value err := s.encoder(context.Background(), ew).Encode(v) - + // Finish the SSE event (adds newlines and flushes) ew.finish() - + return err } @@ -100,7 +98,7 @@ func (s *{{ lowerInitial .Service.StructName }}SSEStream) Send(ctx context.Conte {{- else }} body := v {{- end }} - + {{ comment "Check if this is a notification or response by looking for ID field" }} var id string var isResponse bool @@ -121,10 +119,10 @@ func (s *{{ lowerInitial .Service.StructName }}SSEStream) Send(ctx context.Conte } {{- end }} {{- end }} - + var message map[string]any var eventType string - + if isResponse { {{ comment "Send as response with ID" }} resp := jsonrpc.MakeSuccessResponse(id, body) @@ -143,7 +141,7 @@ func (s *{{ lowerInitial .Service.StructName }}SSEStream) Send(ctx context.Conte } eventType = "notification" } - + return s.sendSSEEvent(eventType, message) {{- end }} {{- end }} @@ -160,7 +158,7 @@ func (s *{{ lowerInitial .Service.StructName }}SSEStream) SendError(ctx context. code := jsonrpc.InternalError message := err.Error() var data any - + if errors.As(err, &en) { switch en.GoaErrorName() { case "invalid_params": @@ -171,7 +169,7 @@ func (s *{{ lowerInitial .Service.StructName }}SSEStream) SendError(ctx context. code = jsonrpc.InternalError } } - + return s.sendError(ctx, id, code, message, data) } {{- end }} \ No newline at end of file diff --git a/jsonrpc/codegen/testdata/golden/jsonrpc-sse-object.golden b/jsonrpc/codegen/testdata/golden/jsonrpc-sse-object.golden index ef52b81c84..e68df613d4 100644 --- a/jsonrpc/codegen/testdata/golden/jsonrpc-sse-object.golden +++ b/jsonrpc/codegen/testdata/golden/jsonrpc-sse-object.golden @@ -40,9 +40,7 @@ func (s *streamServerStreamEventWriter) Write(data []byte) (int, error) { func (s *streamServerStreamEventWriter) finish() { if s.started { s.w.Write([]byte("\n\n")) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() } } diff --git a/jsonrpc/codegen/testdata/golden/jsonrpc-sse-string.golden b/jsonrpc/codegen/testdata/golden/jsonrpc-sse-string.golden index f20e0fd2a6..ddd8eb390b 100644 --- a/jsonrpc/codegen/testdata/golden/jsonrpc-sse-string.golden +++ b/jsonrpc/codegen/testdata/golden/jsonrpc-sse-string.golden @@ -40,9 +40,7 @@ func (s *streamServerStreamEventWriter) Write(data []byte) (int, error) { func (s *streamServerStreamEventWriter) finish() { if s.started { s.w.Write([]byte("\n\n")) - if f, ok := s.w.(http.Flusher); ok { - f.Flush() - } + http.NewResponseController(s.w).Flush() } }