@@ -81,6 +81,7 @@ type stream struct {
8181 streamCtxCancel context.CancelFunc
8282 closeError atomic.Value
8383 connStatusCallback func (isConneccted bool , host string , origin string )
84+ connMu sync.Mutex
8485
8586 waterMarkMu sync.Mutex
8687 waterMark map [string ]uint64
@@ -137,12 +138,17 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
137138 return
138139 }
139140 go s .monitorConn (conn )
141+ s .connMu .Lock ()
140142 s .conns = append (s .conns , conn )
143+ s .connMu .Unlock ()
141144 }()
142145 continue
146+ } else {
147+ s .connMu .Lock ()
148+ s .conns = append (s .conns , conn )
149+ s .connMu .Unlock ()
150+ go s .monitorConn (conn )
143151 }
144- go s .monitorConn (conn )
145- s .conns = append (s .conns , conn )
146152 }
147153
148154 // Only fail if we couldn't connect to ANY origins
@@ -159,7 +165,9 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
159165 return nil , err
160166 }
161167 go s .monitorConn (conn )
168+ s .connMu .Lock ()
162169 s .conns = append (s .conns , conn )
170+ s .connMu .Unlock ()
163171 s .stats .configuredConnections .Add (1 )
164172 }
165173
@@ -214,12 +222,11 @@ func (s *stream) monitorConn(conn *wsConn) {
214222 cancel ()
215223 // `Add(^uint64(0))` will decrement activeConnections
216224 s .stats .activeConnections .Add (^ uint64 (0 ))
217- if s .connStatusCallback != nil {
218- go s .connStatusCallback (false , conn .host , conn .origin )
219- }
220-
221225 // check for stream close conditions before reconnect attempts
222226 if ctxErr := s .streamCtx .Err (); ctxErr != nil || s .closed .Load () {
227+ if s .connStatusCallback != nil {
228+ s .connStatusCallback (false , conn .host , conn .origin )
229+ }
223230 if ctxErr != nil {
224231 s .config .logInfo (
225232 "client: stream websocket %s context done: %s" ,
@@ -230,6 +237,10 @@ func (s *stream) monitorConn(conn *wsConn) {
230237 return
231238 }
232239
240+ if s .connStatusCallback != nil {
241+ go s .connStatusCallback (false , conn .host , conn .origin )
242+ }
243+
233244 // reconnect protocol
234245 if s .stats .activeConnections .Load () == 0 {
235246 s .stats .fullReconnects .Add (1 )
@@ -339,9 +350,12 @@ func (s *stream) Close() (err error) {
339350 s .closingMutex .Lock ()
340351 defer s .closingMutex .Unlock ()
341352
353+ s .connMu .Lock ()
342354 for x := 0 ; x < len (s .conns ); x ++ {
343355 _ = s .conns [x ].close ()
344356 }
357+ s .connMu .Unlock ()
358+
345359 close (s .output )
346360 // return a pending error
347361 if err , ok := s .closeError .Load ().(error ); ok {
0 commit comments