Skip to content

Commit 007d8b6

Browse files
committed
fix: clear subscriberConn in runMux defer to prevent stale reference and msgType refactor
1 parent 4e5b5f1 commit 007d8b6

2 files changed

Lines changed: 13 additions & 7 deletions

File tree

pkg/pubsub/mode.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,14 @@ func (m *GSOCEphemeralMode) CreateSubscriberConn(stream p2p.Stream, overlay swar
375375

376376
// runMux reads broker messages from the shared p2p stream and broadcasts each to all
377377
// registered WS sessions. It exits when the stream closes or returns an error.
378+
// On exit it immediately clears m.subscriberConn so new Connect calls open a fresh stream.
378379
func (m *GSOCEphemeralMode) runMux(stream p2p.Stream) {
379-
defer m.subscriberConn.closeAll()
380+
defer func() {
381+
m.subscriberConn.closeAll()
382+
m.mu.Lock()
383+
m.subscriberConn = nil
384+
m.mu.Unlock()
385+
}()
380386
for {
381387
msg, err := m.ReadBrokerMessage(stream)
382388
if err != nil {
@@ -397,14 +403,13 @@ func (m *GSOCEphemeralMode) GetSubscriberConn() *SubscriberConn {
397403
return m.subscriberConn
398404
}
399405

400-
// RemoveSubscriberConn decrements the ref count for the connection.
401-
// When the last WS session exits, it closes the stream, stopping the mux goroutine.
406+
// RemoveSubscriberConn decrements the ref count for conn.
407+
// When the last WS session exits it closes the stream, stopping the mux goroutine.
408+
// If the mux already died and cleared m.subscriberConn, refs are still tracked on conn
409+
// so the stream is closed exactly once when refs reach zero.
402410
func (m *GSOCEphemeralMode) RemoveSubscriberConn(conn *SubscriberConn) {
403411
m.mu.Lock()
404412
defer m.mu.Unlock()
405-
if m.subscriberConn != conn {
406-
return
407-
}
408413
conn.refs--
409414
if conn.refs <= 0 {
410415
m.subscriberConn = nil

pkg/pubsub/pubsub.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ const (
3434
ModeGSOCEphemeral ModeID = 1
3535

3636
// Service-level broker message types.
37-
MsgTypePing byte = 0x03
37+
// 0x01 is reserved for ping across all modes; mode-specific types start at 0x02.
38+
MsgTypePing byte = 0x01
3839

3940
// streamPingInterval is how often the broker sends a keepalive ping to each subscriber.
4041
streamPingInterval = 30 * time.Second

0 commit comments

Comments
 (0)