Skip to content

Commit f6f4fbf

Browse files
authored
[patch] fixed bug in SSE event loop exiting (#56)
[patch] implemented Stringer for SSE event types
1 parent 2eec771 commit f6f4fbf

2 files changed

Lines changed: 23 additions & 10 deletions

File tree

extensions/sse/client.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ const (
3737
eTypeClient
3838
)
3939

40+
func (et eventType) String() string {
41+
switch et {
42+
case eTypeNewClient:
43+
return "new_client"
44+
case eTypeClientList:
45+
return "client_list"
46+
case eTypeRemoveClient:
47+
return "remove_client"
48+
case eTypeActiveClientCount:
49+
return "active_client_count"
50+
}
51+
return "unknown"
52+
}
53+
4054
type event struct {
4155
Type eventType
4256
ClientID string
@@ -70,14 +84,12 @@ func (cs *Clients) listener(events <-chan event) {
7084

7185
case eTypeRemoveClient:
7286
cli := cs.clients[ev.ClientID]
73-
if cli == nil {
87+
if cli != nil {
88+
// Ctx.Done() is needed to close its streaming handler
89+
cli.Ctx.Done()
7490
ev.Response <- nil
75-
return
7691
}
7792

78-
close(cli.Msg)
79-
cli.Ctx.Done()
80-
8193
delete(cs.clients, ev.ClientID)
8294
ev.Response <- nil
8395

@@ -114,9 +126,12 @@ func (cs *Clients) Range(f func(cli *Client)) {
114126
}
115127

116128
response := <-rch
117-
for i := range response.Clients {
118-
f(response.Clients[i])
119-
}
129+
// running in Go routine to not block the event listener
130+
go func() {
131+
for i := range response.Clients {
132+
f(response.Clients[i])
133+
}
134+
}()
120135
}
121136

122137
func (cs *Clients) Remove(clientID string) int {

extensions/sse/sse.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,13 @@ func (sse *SSE) Handler(w http.ResponseWriter, r *http.Request) error {
3737
if !hasFlusher {
3838
return sse.UnsupportedMessage(w, r)
3939
}
40-
4140
header := w.Header()
4241
header.Set("Content-Type", "text/event-stream")
4342
header.Set("Connection", "keep-alive")
4443
header.Set("X-Accel-Buffering", "no")
4544
w.WriteHeader(http.StatusOK)
4645

4746
ctx := r.Context()
48-
4947
clientID := r.Header.Get(sse.ClientIDHeader)
5048
client := sse.NewClient(ctx, w, clientID)
5149
defer sse.RemoveClient(ctx, clientID)

0 commit comments

Comments
 (0)