Skip to content

Commit 5e980c7

Browse files
authored
fix(sse): flush response headers before user handler runs (#1038)
1 parent 39f388e commit 5e980c7

2 files changed

Lines changed: 80 additions & 0 deletions

File tree

sse/sse.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an
132132
return &huma.StreamResponse{
133133
Body: func(ctx huma.Context) {
134134
ctx.SetHeader("Content-Type", "text/event-stream")
135+
// Commit response headers immediately so the client's
136+
// EventSource.onopen fires without waiting for the first event.
137+
ctx.SetStatus(http.StatusOK)
135138
bw := ctx.BodyWriter()
136139
encoder := json.NewEncoder(bw)
137140

@@ -149,6 +152,9 @@ func Register[I any](api huma.API, op huma.Operation, eventTypeMap map[string]an
149152
break
150153
}
151154
}
155+
if flusher != nil {
156+
flusher.Flush()
157+
}
152158

153159
var deadliner writeDeadliner
154160
deadlineCheck := bw

sse/sse_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"net/http"
7+
"slices"
78
"testing"
89
"time"
910

@@ -57,6 +58,35 @@ func (w *WrappedDeadliner) SetWriteDeadline(t time.Time) error {
5758
return w.deadlineErr
5859
}
5960

61+
// orderedWriter records the order in which WriteHeader, Write, and Flush
62+
// are called so tests can assert headers are committed before body writes.
63+
type orderedWriter struct {
64+
events []string
65+
status int
66+
}
67+
68+
func (w *orderedWriter) Header() http.Header {
69+
return http.Header{}
70+
}
71+
72+
func (w *orderedWriter) Write(p []byte) (int, error) {
73+
w.events = append(w.events, "write")
74+
return len(p), nil
75+
}
76+
77+
func (w *orderedWriter) WriteHeader(statusCode int) {
78+
w.status = statusCode
79+
w.events = append(w.events, "writeHeader")
80+
}
81+
82+
func (w *orderedWriter) Flush() {
83+
w.events = append(w.events, "flush")
84+
}
85+
86+
func (w *orderedWriter) SetWriteDeadline(t time.Time) error {
87+
return nil
88+
}
89+
6090
type sseTest struct {
6191
Title string
6292
TestFunc func(t *testing.T)
@@ -131,6 +161,50 @@ data: {"error": "encode error: json: unsupported type: chan int"}
131161
api.Adapter().ServeHTTP(w, req)
132162
},
133163
},
164+
{
165+
Title: "sse flushes headers before first event",
166+
TestFunc: func(t *testing.T) {
167+
_, api := humatest.New(t)
168+
started := make(chan struct{})
169+
release := make(chan struct{})
170+
sse.Register(api, huma.Operation{
171+
OperationID: "sse",
172+
Method: http.MethodGet,
173+
Path: "/sse",
174+
}, map[string]any{
175+
"message": &DefaultMessage{},
176+
}, func(ctx context.Context, input *struct{}, send sse.Sender) {
177+
close(started)
178+
<-release
179+
})
180+
181+
w := &orderedWriter{}
182+
req, _ := http.NewRequest(http.MethodGet, "/sse", nil)
183+
done := make(chan struct{})
184+
go func() {
185+
api.Adapter().ServeHTTP(w, req)
186+
close(done)
187+
}()
188+
<-started
189+
// At this point the user handler is running but has not sent any
190+
// events yet. Headers must already be committed and flushed so
191+
// that EventSource.onopen fires on the client.
192+
eventsBeforeRelease := append([]string(nil), w.events...)
193+
close(release)
194+
<-done
195+
196+
assert.Equal(t, http.StatusOK, w.status)
197+
require.Contains(t, eventsBeforeRelease, "writeHeader",
198+
"WriteHeader must be called before the user handler blocks")
199+
require.Contains(t, eventsBeforeRelease, "flush",
200+
"Flush must be called before the user handler blocks")
201+
whIdx := slices.Index(eventsBeforeRelease, "writeHeader")
202+
flushIdx := slices.Index(eventsBeforeRelease, "flush")
203+
assert.Less(t, whIdx, flushIdx, "WriteHeader must precede Flush")
204+
assert.NotContains(t, eventsBeforeRelease, "write",
205+
"no body write should occur before the user handler sends an event")
206+
},
207+
},
134208
{
135209
Title: "sse stable event order in openapi",
136210
TestFunc: func(t *testing.T) {

0 commit comments

Comments
 (0)