Skip to content

Commit 8f59008

Browse files
Marko Petzoldclaude
andcommitted
wamp,transport: add Peer.Done() for cooperative shutdown; idempotent Close
Adds a Done() <-chan struct{} method to the wamp.Peer interface. Done is closed when Close is called, before the channel returned by Send is closed. Senders that may race with Close can now select on both Send and Done to detect peer closure cooperatively: select { case peer.Send() <- msg: case <-peer.Done(): return // peer is closing; abandon } The previous contract was: bare `peer.Send() <- msg` racing with Close panics with "send on closed channel". That hazard is now documented in the Peer interface doc and an opt-in safe pattern is provided. (Migrating call sites to the cooperative pattern is a follow-up; the realm-shutdown path in router/realm.go already serialises Close with sends via its actor goroutine, so the panic vector there is structural rather than user-facing.) Also makes Close idempotent on all three peer implementations (localPeer, rawSocketPeer, websocketPeer) via sync.Once. Calling Close more than once is now a safe no-op rather than a "close of closed channel" panic. The previous implementations either had no guard at all (localPeer, rawSocketPeer) or a racy select-with-default early-out (websocketPeer). Tests: - transport: TestLocalPeerDoneClosesOnClose, TestLocalPeerDoneIndependentPerSide, TestLocalPeerCloseIdempotent — pin the Done() and idempotency contract on localPeer. - transport: existing TestCloseWebsocketPeer continues to verify websocketPeer Close idempotency (now genuinely safe via sync.Once instead of the previous racy select). Test scaffolding (testPeer in router/broker_test.go and wamp/peer_test.go) updated to satisfy the new interface method. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8b4f740 commit 8f59008

7 files changed

Lines changed: 214 additions & 54 deletions

File tree

router/broker_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,20 @@ import (
1111
)
1212

1313
type testPeer struct {
14-
in chan wamp.Message
14+
in chan wamp.Message
15+
done chan struct{}
1516
}
1617

1718
func newTestPeer() *testPeer {
1819
return &testPeer{
19-
in: make(chan wamp.Message, 1),
20+
in: make(chan wamp.Message, 1),
21+
done: make(chan struct{}),
2022
}
2123
}
2224

2325
func (p *testPeer) Recv() <-chan wamp.Message { return p.in }
2426
func (p *testPeer) Send() chan<- wamp.Message { return p.in }
27+
func (p *testPeer) Done() <-chan struct{} { return p.done }
2528
func (p *testPeer) Close() {}
2629

2730
func (p *testPeer) IsLocal() bool { return true }

transport/localpeer.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package transport
22

33
import (
4+
"sync"
5+
46
"github.com/gammazero/nexus/v3/wamp"
57
)
68

@@ -32,9 +34,9 @@ func LinkedPeersQSize(queueSize int) (wamp.Peer, wamp.Peer) {
3234
cToR := make(chan wamp.Message)
3335

3436
// router reads from and writes to client
35-
r := &localPeer{rd: cToR, wr: rToC}
37+
r := &localPeer{rd: cToR, wr: rToC, done: make(chan struct{})}
3638
// client reads from and writes to router
37-
c := &localPeer{rd: rToC, wr: cToR}
39+
c := &localPeer{rd: rToC, wr: cToR, done: make(chan struct{})}
3840

3941
return c, r
4042
}
@@ -43,6 +45,9 @@ func LinkedPeersQSize(queueSize int) (wamp.Peer, wamp.Peer) {
4345
type localPeer struct {
4446
rd <-chan wamp.Message
4547
wr chan<- wamp.Message
48+
49+
done chan struct{}
50+
closeOnce sync.Once
4651
}
4752

4853
// IsLocal returns true is the wamp.Peer is a localPeer.
@@ -54,6 +59,19 @@ func (p *localPeer) Recv() <-chan wamp.Message { return p.rd }
5459
// Send returns the peer's outbound message channel.
5560
func (p *localPeer) Send() chan<- wamp.Message { return p.wr }
5661

57-
// Close closes the outgoing channel, waking any readers waiting on data from
58-
// this peer.
59-
func (p *localPeer) Close() { close(p.wr) }
62+
// Done returns a channel that is closed when the peer is closing. It
63+
// is closed before the Send channel, so a goroutine selecting on both
64+
// Send and Done is guaranteed to wake via Done before the runtime
65+
// observes the Send channel as closed.
66+
func (p *localPeer) Done() <-chan struct{} { return p.done }
67+
68+
// Close closes the outgoing channel, waking any readers waiting on
69+
// data from this peer. Idempotent and safe to call concurrently — the
70+
// first invocation closes Done first, then the outbound channel; later
71+
// invocations are no-ops.
72+
func (p *localPeer) Close() {
73+
p.closeOnce.Do(func() {
74+
close(p.done)
75+
close(p.wr)
76+
})
77+
}

transport/localpeer_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,70 @@ func TestBlockOnBlockedRouter(t *testing.T) {
8888
<-done
8989
}
9090

91+
// TestLocalPeerDoneClosesOnClose verifies the Peer.Done() contract:
92+
// Close fires the Done channel before unwinding the rest of the
93+
// shutdown sequence, so consumers selecting on Done observe peer
94+
// closure cooperatively.
95+
func TestLocalPeerDoneClosesOnClose(t *testing.T) {
96+
c, _ := transport.LinkedPeers()
97+
98+
// Done starts open.
99+
select {
100+
case <-c.Done():
101+
require.FailNow(t, "Done must not be closed before Close")
102+
default:
103+
}
104+
105+
c.Close()
106+
107+
// Done is closed after Close.
108+
select {
109+
case <-c.Done():
110+
case <-time.After(time.Second):
111+
require.FailNow(t, "Done must be closed after Close")
112+
}
113+
}
114+
115+
// TestLocalPeerDoneIndependentPerSide verifies the two peers in a
116+
// LinkedPeers pair have independent Done signals — closing one does
117+
// not signal the other.
118+
func TestLocalPeerDoneIndependentPerSide(t *testing.T) {
119+
c, r := transport.LinkedPeers()
120+
c.Close()
121+
122+
select {
123+
case <-c.Done():
124+
default:
125+
require.FailNow(t, "client peer Done must be closed after client Close")
126+
}
127+
128+
select {
129+
case <-r.Done():
130+
require.FailNow(t, "router peer Done must not be closed by client Close")
131+
default:
132+
}
133+
134+
r.Close()
135+
select {
136+
case <-r.Done():
137+
default:
138+
require.FailNow(t, "router peer Done must be closed after router Close")
139+
}
140+
}
141+
142+
// TestLocalPeerCloseIdempotent verifies that calling Close more than
143+
// once is a safe no-op rather than a "close of closed channel" panic.
144+
// The realm shutdown ordering relies on this: a session whose handler
145+
// closed its peer naturally may also be in the snapshot of
146+
// pending-to-close peers that the realm walks at the end of close().
147+
func TestLocalPeerCloseIdempotent(t *testing.T) {
148+
c, r := transport.LinkedPeers()
149+
c.Close()
150+
require.NotPanics(t, func() { c.Close() }, "second Close on client peer must not panic")
151+
require.NotPanics(t, func() { r.Close() }, "Close on router peer must not panic")
152+
require.NotPanics(t, func() { r.Close() }, "second Close on router peer must not panic")
153+
}
154+
91155
func BenchmarkClientToRouter(b *testing.B) {
92156
c, r := transport.LinkedPeers()
93157

transport/rawsocketpeer.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/gammazero/nexus/v3/stdlog"
@@ -35,6 +36,8 @@ type rawSocketPeer struct {
3536

3637
writerDone chan struct{}
3738

39+
closeOnce sync.Once
40+
3841
log stdlog.StdLog
3942
}
4043

@@ -174,27 +177,39 @@ func (rs *rawSocketPeer) Recv() <-chan wamp.Message { return rs.rd }
174177

175178
func (rs *rawSocketPeer) Send() chan<- wamp.Message { return rs.wr }
176179

180+
// Done returns a channel that is closed when the peer is closing. It
181+
// is closed before the Send channel, so a goroutine selecting on both
182+
// Send and Done is guaranteed to wake via Done before the runtime
183+
// observes the Send channel as closed.
184+
func (rs *rawSocketPeer) Done() <-chan struct{} { return rs.closed }
185+
177186
func (rs *rawSocketPeer) IsLocal() bool { return false }
178187

179-
// Close closes the rawsocket peer. This closes the local send channel, and
180-
// sends a close control message to the socket to tell the other side to close.
188+
// Close closes the rawsocket peer. Idempotent and safe to call
189+
// concurrently — the first invocation closes Done first, then unwinds
190+
// sendHandler/recvHandler/socket; later invocations are no-ops.
181191
//
182192
// *** Do not call Send after calling Close. ***
183193
func (rs *rawSocketPeer) Close() {
184-
// Tell sendHandler to exit, and discard any queued messages. Do not close
185-
// wr channel in case there are incoming messages during close.
186-
rs.cancelSender()
187-
<-rs.writerDone
188-
close(rs.wr)
189-
for range rs.wr {
190-
}
191-
192-
// Tell recvHandler to close.
193-
close(rs.closed)
194+
rs.closeOnce.Do(func() {
195+
// Close the Done channel first so any Send goroutine that
196+
// selected on it can wake and abandon before the wr channel
197+
// is closed.
198+
close(rs.closed)
199+
200+
// Tell sendHandler to exit, and discard any queued messages.
201+
// Do not close wr channel in case there are incoming messages
202+
// during close.
203+
rs.cancelSender()
204+
<-rs.writerDone
205+
close(rs.wr)
206+
for range rs.wr {
207+
}
194208

195-
// Ignore errors since socket may have been closed by other side first in
196-
// response to a goodbye message.
197-
_ = rs.conn.Close()
209+
// Ignore errors since socket may have been closed by other
210+
// side first in response to a goodbye message.
211+
_ = rs.conn.Close()
212+
})
198213
}
199214

200215
// sendHandler pulls messages from the write channel, and pushes them to the

transport/websocketpeer.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net"
88
"net/http"
99
"net/url"
10+
"sync"
1011
"sync/atomic"
1112
"time"
1213

@@ -92,6 +93,8 @@ type websocketPeer struct {
9293
recvDone chan struct{}
9394
writerDone chan struct{}
9495

96+
closeOnce sync.Once
97+
9598
log stdlog.StdLog
9699
}
97100

@@ -228,40 +231,45 @@ func (w *websocketPeer) Send() chan<- wamp.Message { return w.wr }
228231

229232
func (w *websocketPeer) IsLocal() bool { return false }
230233

231-
// Close closes the websocket peer. This closes the local send channel, and
232-
// sends a close control message to the websocket to tell the other side to
233-
// close.
234+
// Close closes the websocket peer. Idempotent and safe to call
235+
// concurrently — the first invocation closes Done first, then unwinds
236+
// sendHandler/recvHandler/socket; later invocations are no-ops.
234237
//
235238
// *** Do not call Send after calling Close. ***
236239
func (w *websocketPeer) Close() {
237-
select {
238-
case <-w.closed:
239-
return
240-
default:
241-
}
242-
243-
// Tell sendHandler to exit and discard any queued messages. Do not close
244-
// wr channel in case there are incoming messages during close.
245-
w.cancelSender()
246-
<-w.writerDone
247-
close(w.wr)
248-
for range w.wr {
249-
}
250-
251-
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "goodbye")
240+
w.closeOnce.Do(func() {
241+
// Close the Done channel first so any Send goroutine that
242+
// selected on it can wake and abandon before the wr channel
243+
// is closed.
244+
close(w.closed)
245+
246+
// Tell sendHandler to exit and discard any queued messages.
247+
// Do not close wr channel in case there are incoming messages
248+
// during close.
249+
w.cancelSender()
250+
<-w.writerDone
251+
close(w.wr)
252+
for range w.wr {
253+
}
252254

253-
// Tell recvHandler to close.
254-
close(w.closed)
255+
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "goodbye")
255256

256-
// Ignore errors since websocket may have been closed by other side first
257-
// in response to a goodbye message.
258-
_ = w.conn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(ctrlTimeout))
259-
_ = w.conn.Close()
257+
// Ignore errors since websocket may have been closed by other
258+
// side first in response to a goodbye message.
259+
_ = w.conn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(ctrlTimeout))
260+
_ = w.conn.Close()
260261

261-
// Wait for the recvHandler goroutine to exit.
262-
<-w.recvDone
262+
// Wait for the recvHandler goroutine to exit.
263+
<-w.recvDone
264+
})
263265
}
264266

267+
// Done returns a channel that is closed when the peer is closing. It
268+
// is closed before the Send channel, so a goroutine selecting on both
269+
// Send and Done is guaranteed to wake via Done before the runtime
270+
// observes the Send channel as closed.
271+
func (w *websocketPeer) Done() <-chan struct{} { return w.closed }
272+
265273
// sendHandler pulls messages from the write channel, and pushes them to the
266274
// websocket.
267275
func (w *websocketPeer) sendHandler() {

wamp/peer.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,65 @@ import (
66
)
77

88
// Peer is the interface implemented by endpoints communicating via WAMP.
9+
//
10+
// # Lifecycle and concurrent shutdown
11+
//
12+
// Close is idempotent and safe to call concurrently with itself. It is
13+
// NOT fully safe to call concurrently with Send for callers that use
14+
// the bare-channel form `peer.Send() <- msg`: the underlying outbound
15+
// channel must be closed at some point so the partner peer's Recv
16+
// loop can terminate, and a parked send racing that close panics
17+
// regardless of any synchronisation the implementation does internally
18+
// — that is a Go runtime constraint, not an implementation detail.
19+
//
20+
// Done() exists to make the cooperative pattern available:
21+
//
22+
// select {
23+
// case peer.Send() <- msg:
24+
// case <-peer.Done():
25+
// return // peer is closing; abandon
26+
// }
27+
//
28+
// Implementations close Done before closing the Send channel, so a
29+
// goroutine entering this select after Close has begun observes Done
30+
// ready and exits cleanly without ever attempting the send. A goroutine
31+
// already parked in this select when Close starts wakes via Done in
32+
// the typical case — but if the partner peer's Recv buffer happens
33+
// to have room at the same instant, the runtime may still pick the
34+
// Send case; callers that need ironclad safety should arrange for
35+
// their Send goroutines to exit before calling Close (e.g. via a
36+
// goroutine lifecycle wait group). This is the same pattern the
37+
// router's realm-shutdown ordering already follows internally.
38+
//
39+
// Bare `peer.Send() <- msg` (no select) racing with Close panics, by
40+
// design — fixing it would require either an extra forwarder
41+
// goroutine per peer (with its own goroutine-leak hazard) or an API
42+
// change to a method-form Send that takes a mutex. Both have higher
43+
// architectural cost than the documented hazard. Callers that may
44+
// race with Close should always use the cooperative pattern above.
945
type Peer interface {
10-
// Closes the peer connection and the channel returned from Recv().
46+
// Close closes the peer connection and the channel returned from
47+
// Recv(). Close is idempotent; calling it more than once is a
48+
// no-op. Done is closed before the Send channel is closed,
49+
// see the cooperative shutdown pattern in the package doc.
1150
Close()
1251

1352
// IsLocal returns true if the session is local.
1453
IsLocal() bool
1554

16-
// Recv returns a channel of messages from the peer.
55+
// Recv returns a channel of messages from the peer. The channel
56+
// is closed when the remote side closes its connection, or when
57+
// Close is called on this peer.
1758
Recv() <-chan Message
1859

19-
// Send returns the peer's outgoing message channel.
60+
// Send returns the peer's outgoing message channel. See the
61+
// package doc for the cooperative-shutdown pattern that makes
62+
// Send safe to use concurrently with Close.
2063
Send() chan<- Message
64+
65+
// Done returns a channel that is closed when Close is called.
66+
// Senders use it to detect peer closure cooperatively.
67+
Done() <-chan struct{}
2168
}
2269

2370
// RecvTimeout receives a message from a peer within the specified time.

0 commit comments

Comments
 (0)