Skip to content

Commit da3f5b1

Browse files
committed
fix: goroutine leak
1 parent a755327 commit da3f5b1

8 files changed

Lines changed: 87 additions & 54 deletions

File tree

ws/client.go

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ type Client struct {
3434
}
3535

3636
type ClientMessage struct {
37-
Info ClientInfo
38-
Incoming Event
37+
Info ClientInfo
38+
SkipConnectedCheck bool
39+
Incoming Event
3940
}
4041

4142
type ClientInfo struct {
@@ -44,7 +45,6 @@ type ClientInfo struct {
4445
Authenticated bool
4546
AuthenticatedUser string
4647
Write chan outgoing.Message
47-
Close chan string
4848
Addr net.IP
4949
}
5050

@@ -63,36 +63,46 @@ func newClient(conn *websocket.Conn, req *http.Request, read chan ClientMessage,
6363
RoomID: "",
6464
Addr: ip,
6565
Write: make(chan outgoing.Message, 1),
66-
Close: make(chan string, 1),
6766
},
6867
read: read,
6968
}
7069
client.debug().Msg("WebSocket New Connection")
71-
conn.SetCloseHandler(func(code int, text string) error {
72-
message := websocket.FormatCloseMessage(code, text)
73-
client.debug().Str("reason", text).Int("code", code).Msg("WebSocket Close")
74-
return conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(writeWait))
75-
})
7670
return client
7771
}
7872

79-
// Close closes the connection.
80-
func (c *Client) Close() {
73+
// CloseOnError closes the connection.
74+
func (c *Client) CloseOnError(code int, reason string) {
8175
c.once.Do(func() {
82-
c.conn.Close()
8376
go func() {
8477
c.read <- ClientMessage{
85-
Info: c.info,
86-
Incoming: &Disconnected{},
78+
Info: c.info,
79+
Incoming: &Disconnected{
80+
Code: code,
81+
Reason: reason,
82+
},
8783
}
8884
}()
85+
c.writeCloseMessage(code, reason)
8986
})
9087
}
9188

89+
func (c *Client) CloseOnDone(code int, reason string) {
90+
c.once.Do(func() {
91+
c.writeCloseMessage(code, reason)
92+
})
93+
}
94+
95+
func (c *Client) writeCloseMessage(code int, reason string) {
96+
message := websocket.FormatCloseMessage(code, reason)
97+
c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(writeWait))
98+
c.conn.Close()
99+
}
100+
92101
// startWriteHandler starts listening on the client connection. As we do not need anything from the client,
93102
// we ignore incoming messages. Leaves the loop on errors.
94103
func (c *Client) startReading(pongWait time.Duration) {
95-
defer c.Close()
104+
defer c.CloseOnError(websocket.CloseNormalClosure, "Reader Routine Closed")
105+
96106
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
97107
c.conn.SetPongHandler(func(appData string) error {
98108
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
@@ -101,17 +111,17 @@ func (c *Client) startReading(pongWait time.Duration) {
101111
for {
102112
t, m, err := c.conn.NextReader()
103113
if err != nil {
104-
c.printWebSocketError("read", err)
114+
c.CloseOnError(websocket.CloseNormalClosure, "read error: "+err.Error())
105115
return
106116
}
107117
if t == websocket.BinaryMessage {
108-
_ = c.conn.CloseHandler()(websocket.CloseUnsupportedData, fmt.Sprintf("unsupported binary message type: %s", err))
118+
c.CloseOnError(websocket.CloseUnsupportedData, "unsupported binary message type")
109119
return
110120
}
111121

112122
incoming, err := ReadTypedIncoming(m)
113123
if err != nil {
114-
_ = c.conn.CloseHandler()(websocket.CloseNormalClosure, fmt.Sprintf("malformed message: %s", err))
124+
c.CloseOnError(websocket.CloseUnsupportedData, fmt.Sprintf("malformed message: %s", err))
115125
return
116126
}
117127
c.debug().Interface("event", fmt.Sprintf("%T", incoming)).Interface("payload", incoming).Msg("WebSocket Receive")
@@ -125,38 +135,26 @@ func (c *Client) startReading(pongWait time.Duration) {
125135
// * on errors exit the loop.
126136
func (c *Client) startWriteHandler(pingPeriod time.Duration) {
127137
pingTicker := time.NewTicker(pingPeriod)
128-
129-
dead := false
130-
conClosed := func() {
131-
dead = true
132-
c.Close()
133-
pingTicker.Stop()
134-
}
135-
defer conClosed()
138+
defer pingTicker.Stop()
136139
defer func() {
137140
c.debug().Msg("WebSocket Done")
138141
}()
142+
defer c.conn.Close()
139143
for {
140144
select {
141-
case reason := <-c.info.Close:
142-
if reason == CloseDone {
143-
return
144-
} else {
145-
_ = c.conn.CloseHandler()(websocket.CloseNormalClosure, reason)
146-
conClosed()
147-
}
148145
case message := <-c.info.Write:
149-
if dead {
150-
c.debug().Msg("WebSocket write on dead connection")
151-
continue
146+
if msg, ok := message.(outgoing.CloseWriter); ok {
147+
c.debug().Str("reason", msg.Reason).Int("code", msg.Code).Msg("WebSocket Close")
148+
c.CloseOnDone(msg.Code, msg.Reason)
149+
return
152150
}
153151

154152
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
155153
typed, err := ToTypedOutgoing(message)
156154
c.debug().Interface("event", typed.Type).Interface("payload", typed.Payload).Msg("WebSocket Send")
157155
if err != nil {
158156
c.debug().Err(err).Msg("could not get typed message, exiting connection.")
159-
conClosed()
157+
c.CloseOnError(websocket.CloseNormalClosure, "malformed outgoing "+err.Error())
160158
continue
161159
}
162160

@@ -165,14 +163,14 @@ func (c *Client) startWriteHandler(pingPeriod time.Duration) {
165163
}
166164

167165
if err := writeJSON(c.conn, typed); err != nil {
168-
conClosed()
169166
c.printWebSocketError("write", err)
167+
c.CloseOnError(websocket.CloseNormalClosure, "write error"+err.Error())
170168
}
171169
case <-pingTicker.C:
172170
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
173171
if err := ping(c.conn); err != nil {
174-
conClosed()
175172
c.printWebSocketError("ping", err)
173+
c.CloseOnError(websocket.CloseNormalClosure, "ping timeout")
176174
}
177175
}
178176
}

ws/event_connected.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package ws
2+
3+
type Connected struct{}
4+
5+
func (e Connected) Execute(rooms *Rooms, current ClientInfo) error {
6+
rooms.connected[current.ID] = true
7+
return nil
8+
}

ws/event_create.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ func (e *Create) Execute(rooms *Rooms, current ClientInfo) error {
7171
Owner: true,
7272
Addr: current.Addr,
7373
Write: current.Write,
74-
Close: current.Close,
7574
},
7675
},
7776
}

ws/event_disconnected.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,41 @@ package ws
33
import (
44
"bytes"
55

6+
"github.com/gorilla/websocket"
67
"github.com/screego/server/ws/outgoing"
78
)
89

9-
type Disconnected struct{}
10+
type Disconnected struct {
11+
Code int
12+
Reason string
13+
}
1014

1115
func (e *Disconnected) Execute(rooms *Rooms, current ClientInfo) error {
16+
e.executeNoError(rooms, current)
17+
return nil
18+
}
19+
20+
func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
21+
delete(rooms.connected, current.ID)
22+
current.Write <- outgoing.CloseWriter{Code: e.Code, Reason: e.Reason}
23+
1224
if current.RoomID == "" {
13-
return nil
25+
return
1426
}
1527

1628
room, ok := rooms.Rooms[current.RoomID]
1729
if !ok {
1830
// room may already be removed
19-
return nil
31+
return
2032
}
2133

2234
user, ok := room.Users[current.ID]
2335

2436
if !ok {
2537
// room may already be removed
26-
return nil
38+
return
2739
}
2840

29-
current.Close <- CloseDone
3041
delete(room.Users, current.ID)
3142
usersLeftTotal.Inc()
3243

@@ -49,18 +60,19 @@ func (e *Disconnected) Execute(rooms *Rooms, current ClientInfo) error {
4960

5061
if user.Owner && room.CloseOnOwnerLeave {
5162
for _, member := range room.Users {
52-
member.Close <- CloseOwnerLeft
63+
delete(rooms.connected, member.ID)
64+
member.Write <- outgoing.CloseWriter{Code: websocket.CloseNormalClosure, Reason: CloseOwnerLeft}
5365
}
5466
rooms.closeRoom(current.RoomID)
55-
return nil
67+
return
5668
}
5769

5870
if len(room.Users) == 0 {
5971
rooms.closeRoom(current.RoomID)
60-
return nil
72+
return
6173
}
6274

6375
room.notifyInfoChanged()
6476

65-
return nil
77+
return
6678
}

ws/event_join.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ func (e *Join) Execute(rooms *Rooms, current ClientInfo) error {
3939
Owner: false,
4040
Addr: current.Addr,
4141
Write: current.Write,
42-
Close: current.Close,
4342
}
4443
room.notifyInfoChanged()
4544
usersJoinedTotal.Inc()

ws/outgoing/messages.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,12 @@ const (
9696
ConnectionSTUN ConnectionMode = "stun"
9797
ConnectionTURN ConnectionMode = "turn"
9898
)
99+
100+
type CloseWriter struct {
101+
Code int
102+
Reason string
103+
}
104+
105+
func (CloseWriter) Type() string {
106+
return "closewriter"
107+
}

ws/room.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,4 @@ type User struct {
136136
Streaming bool
137137
Owner bool
138138
Write chan<- outgoing.Message
139-
Close chan<- string
140139
}

ws/rooms.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/gorilla/websocket"
11+
"github.com/rs/xid"
1112
"github.com/rs/zerolog/log"
1213
"github.com/screego/server/auth"
1314
"github.com/screego/server/config"
@@ -19,6 +20,7 @@ func NewRooms(tServer turn.Server, users *auth.Users, conf config.Config) *Rooms
1920
return &Rooms{
2021
Rooms: map[string]*Room{},
2122
Incoming: make(chan ClientMessage),
23+
connected: map[xid.ID]bool{},
2224
turnServer: tServer,
2325
users: users,
2426
config: conf,
@@ -49,6 +51,7 @@ type Rooms struct {
4951
users *auth.Users
5052
config config.Config
5153
r *rand.Rand
54+
connected map[xid.ID]bool
5255
}
5356

5457
func (r *Rooms) RandUserName() string {
@@ -70,16 +73,22 @@ func (r *Rooms) Upgrade(w http.ResponseWriter, req *http.Request) {
7073

7174
user, loggedIn := r.users.CurrentUser(req)
7275
c := newClient(conn, req, r.Incoming, user, loggedIn, r.config.TrustProxyHeaders)
76+
r.Incoming <- ClientMessage{Info: c.info, Incoming: Connected{}, SkipConnectedCheck: true}
7377

7478
go c.startReading(time.Second * 20)
7579
go c.startWriteHandler(time.Second * 5)
7680
}
7781

7882
func (r *Rooms) Start() {
79-
for {
80-
msg := <-r.Incoming
83+
for msg := range r.Incoming {
84+
if !msg.SkipConnectedCheck && !r.connected[msg.Info.ID] {
85+
log.Debug().Interface("event", fmt.Sprintf("%T", msg.Incoming)).Interface("payload", msg.Incoming).Msg("WebSocket Ignore")
86+
continue
87+
}
88+
8189
if err := msg.Incoming.Execute(r, msg.Info); err != nil {
82-
msg.Info.Close <- err.Error()
90+
dis := Disconnected{Code: websocket.CloseNormalClosure, Reason: err.Error()}
91+
dis.executeNoError(r, msg.Info)
8392
}
8493
}
8594
}

0 commit comments

Comments
 (0)