Skip to content

Commit 0f283ca

Browse files
committed
refactor
1 parent bf2419e commit 0f283ca

1 file changed

Lines changed: 12 additions & 8 deletions

File tree

go/stream.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,11 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
136136
if err != nil {
137137
return
138138
}
139-
go s.monitorConn(conn)
140-
// Lock is aquired here to prevent race condition with Close() occuring
141-
// during this background reconnect attempt
142-
s.closingMutex.Lock()
143-
s.conns = append(s.conns, conn)
144-
s.closingMutex.Unlock()
139+
s.addConn(conn)
145140
}()
146141
continue
147142
}
148-
go s.monitorConn(conn)
149-
s.conns = append(s.conns, conn)
143+
s.addConn(conn)
150144
}
151145

152146
// Only fail if we couldn't connect to ANY origins
@@ -170,6 +164,16 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
170164
return s, nil
171165
}
172166

167+
// addConn adds a connection to the monitoring goroutine aquires the closingMutex to append the connection
168+
// to the stream's connection list safely. It uses the closingMutex to prevent a race conditions with Close()
169+
// but is also using this mutex to prevent race conditions with other goroutines appending to conns.
170+
func (s *stream) addConn(conn *wsConn) {
171+
go s.monitorConn(conn)
172+
s.closingMutex.Lock()
173+
s.conns = append(s.conns, conn)
174+
s.closingMutex.Unlock()
175+
}
176+
173177
func (s *stream) pingConn(ctx context.Context, conn *wsConn) {
174178
ticker := time.NewTicker(time.Second * 2)
175179
defer ticker.Stop()

0 commit comments

Comments
 (0)