Skip to content

Commit a9eb0ca

Browse files
Marko Petzoldclaude
andcommitted
test/transport: rawsocket frame-edge + ws keepalive coverage; spec-strict deser-error
Adds wire-byte-level tests for rawsocket recvHandler edge cases and the websocket keepalive missed-pong path, plus tightens the rawsocket deserialize-error path to match WAMP §5.3.1. Behavior change: - rawsocket recvHandler previously skipped a frame whose body failed deserialization (`continue MsgLoop`) and kept reading. The original code carried a `// TODO: something more than merely logging?` comment hinting this was permissive. Per WAMP §5.3.1 a malformed payload is a protocol violation and the receiver MUST abort. Changed to log + close conn + return — same teardown as the existing io.ReadFull error path. Senders that need fault-tolerance for occasional malformed frames must reconnect. Tests (transport/rawsocketpeer_test.go via net.Pipe + AcceptRawSocket): - C3 EOFAfterHandshake: remote drops conn immediately after handshake; recvHandler observes EOF, defer-closes Recv. - C3 HandshakeFailureDoesNotLeak: server-side handshake fails on garbage magic byte; AcceptRawSocket returns error without starting any goroutines. - C1 OversizedFrame: declared length exceeds recvLimit; recvHandler closes conn via the size-limit branch. - C2 TruncatedFrame: declared body length 100, only 50 bytes sent; io.ReadFull errors and recvHandler exits cleanly. - C5 DeserializerErrorAborts: malformed payload triggers conn close and recvHandler exit (the strict §5.3.1 path this commit introduces). - C9 PingStorm: 1000 PINGs of varying sizes are echoed back inline (no per-ping goroutine), then a regular HELLO is delivered through the same connection. - C9 PingPong: PONG frames are silently consumed (per spec §5.4.4); a subsequent HELLO is delivered. ws (transport/websocketpeer_test.go via a pingCountingConn mock): - C10 KeepAliveMissedPongs: silent peer (no pong responses) is detected after 2 unanswered pings; sendHandlerKeepAlive calls conn.Close, recvHandler unblocks and defer-closes Recv, peer.Close stays idempotent. Race detector clean over count=10. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8b4f740 commit a9eb0ca

3 files changed

Lines changed: 391 additions & 2 deletions

File tree

transport/rawsocketpeer.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,13 @@ MsgLoop:
285285
}
286286
msg, err = rs.serializer.Deserialize(buf)
287287
if err != nil {
288-
// TODO: something more than merely logging?
288+
// A malformed payload is a protocol violation per
289+
// WAMP §5.3.1; abort the connection rather than
290+
// silently skipping the frame. Same teardown shape
291+
// as the io.ReadFull error path above.
289292
rs.log.Println("Cannot deserialize peer message:", err)
290-
continue MsgLoop
293+
_ = rs.conn.Close()
294+
return
291295
}
292296
case 1: // PING
293297
header[0] = 0x02

transport/rawsocketpeer_test.go

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
package transport_test
2+
3+
import (
4+
"io"
5+
"log"
6+
"net"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/gammazero/nexus/v3/transport"
14+
"github.com/gammazero/nexus/v3/transport/serialize"
15+
"github.com/gammazero/nexus/v3/wamp"
16+
)
17+
18+
// rawSocketMagic is the RawSocket protocol magic byte (spec §5.4.1).
19+
const rawSocketMagic = 0x7f
20+
21+
// newPipedRawSocket sets up a server-side rawSocketPeer backed by a
22+
// net.Pipe(). The client side of the pipe is returned to the caller
23+
// for raw-byte-level driving of test scenarios. The handshake bytes
24+
// (4 bytes each direction) are exchanged before this returns, so the
25+
// caller is at the message-frame boundary.
26+
//
27+
// The recvLimit is rounded UP to the nearest power-of-2 ≥ 512; pass
28+
// 1 to get the smallest available limit (512).
29+
func newPipedRawSocket(t *testing.T, recvLimit int) (clientConn net.Conn, peer wamp.Peer) {
30+
t.Helper()
31+
client, server := net.Pipe()
32+
33+
// Client-side handshake: magic + (sendLimit<<4 | json) + 0 + 0.
34+
// We pick sendLimit=0 (which becomes 2^9=512) and serialization=1 (JSON).
35+
handshakeDone := make(chan struct{})
36+
go func() {
37+
defer close(handshakeDone)
38+
if _, err := client.Write([]byte{rawSocketMagic, 0<<4 | 1, 0, 0}); err != nil {
39+
t.Errorf("client handshake write: %v", err)
40+
return
41+
}
42+
var resp [4]byte
43+
if _, err := io.ReadFull(client, resp[:]); err != nil {
44+
t.Errorf("client handshake read: %v", err)
45+
return
46+
}
47+
if resp[0] != rawSocketMagic {
48+
t.Errorf("client handshake: got magic %#x, want %#x", resp[0], rawSocketMagic)
49+
}
50+
}()
51+
52+
logger := log.New(io.Discard, "", 0)
53+
peer, err := transport.AcceptRawSocket(server, logger, recvLimit, 0)
54+
require.NoError(t, err)
55+
56+
select {
57+
case <-handshakeDone:
58+
case <-time.After(time.Second):
59+
require.FailNow(t, "client handshake did not complete")
60+
}
61+
return client, peer
62+
}
63+
64+
// writeFrame writes a 4-byte length prefix + body to conn. The first
65+
// byte of the prefix is the WAMP rawsocket type byte (0=msg, 1=ping,
66+
// 2=pong); the next 3 bytes are the body length in big-endian.
67+
func writeFrame(t *testing.T, conn net.Conn, frameType byte, body []byte) {
68+
t.Helper()
69+
header := []byte{
70+
frameType,
71+
byte((len(body) >> 16) & 0xff),
72+
byte((len(body) >> 8) & 0xff),
73+
byte(len(body) & 0xff),
74+
}
75+
if _, err := conn.Write(header); err != nil {
76+
t.Fatalf("write header: %v", err)
77+
}
78+
if len(body) > 0 {
79+
if _, err := conn.Write(body); err != nil {
80+
t.Fatalf("write body: %v", err)
81+
}
82+
}
83+
}
84+
85+
// TestRawSocketC3_EOFAfterHandshake verifies that the rawSocketPeer
86+
// recvHandler/sendHandler exit cleanly when the remote side closes
87+
// the connection immediately after the handshake — no goroutine leak.
88+
// Adjacent to gammazero/nexus#242, which fixed two similar leak
89+
// vectors on the websocket peer.
90+
func TestRawSocketC3_EOFAfterHandshake(t *testing.T) {
91+
client, peer := newPipedRawSocket(t, 1)
92+
t.Cleanup(func() { peer.Close() })
93+
94+
// Remote drops the connection without sending any WAMP frame.
95+
require.NoError(t, client.Close())
96+
97+
// recvHandler should observe EOF, defer-close peer.Recv(), and
98+
// the router-side observer (us) reads zero/closed.
99+
select {
100+
case msg, ok := <-peer.Recv():
101+
require.False(t, ok, "expected closed Recv, got msg %v", msg)
102+
case <-time.After(time.Second):
103+
require.FailNow(t, "peer.Recv() did not close after remote EOF")
104+
}
105+
}
106+
107+
// TestRawSocketC1_OversizedFrame verifies that the rawSocketPeer
108+
// rejects a frame whose declared length exceeds the negotiated
109+
// recvLimit. The spec (§5.4) allows the receiver to close the
110+
// connection on protocol violation.
111+
func TestRawSocketC1_OversizedFrame(t *testing.T) {
112+
const recvLimit = 1 // → 2^9 = 512 bytes
113+
client, peer := newPipedRawSocket(t, recvLimit)
114+
t.Cleanup(func() { peer.Close() })
115+
116+
// Declare a body length of 2048 — bigger than the 512-byte limit.
117+
// Don't bother sending the body; recvHandler should reject on the
118+
// header alone.
119+
header := []byte{0x00, 0x00, 0x08, 0x00}
120+
_, err := client.Write(header)
121+
require.NoError(t, err)
122+
123+
// recvHandler should close the connection. peer.Recv() should
124+
// close as a result.
125+
select {
126+
case msg, ok := <-peer.Recv():
127+
require.False(t, ok, "expected closed Recv, got msg %v", msg)
128+
case <-time.After(time.Second):
129+
require.FailNow(t, "peer.Recv() did not close after oversized frame")
130+
}
131+
_ = client.Close()
132+
}
133+
134+
// TestRawSocketC2_TruncatedFrame verifies that the rawSocketPeer
135+
// handles a frame whose body is short of the declared length. The
136+
// underlying io.ReadFull should return ErrUnexpectedEOF; recvHandler
137+
// logs and exits.
138+
func TestRawSocketC2_TruncatedFrame(t *testing.T) {
139+
client, peer := newPipedRawSocket(t, 4) // 2^13 = 8192
140+
t.Cleanup(func() { peer.Close() })
141+
142+
// Declare a body length of 100, then send only 50 bytes before
143+
// closing the conn. recvHandler will block in io.ReadFull
144+
// expecting 100 and observe the EOF after 50.
145+
header := []byte{0x00, 0x00, 0x00, 100}
146+
_, err := client.Write(header)
147+
require.NoError(t, err)
148+
_, err = client.Write(make([]byte, 50))
149+
require.NoError(t, err)
150+
require.NoError(t, client.Close())
151+
152+
select {
153+
case msg, ok := <-peer.Recv():
154+
require.False(t, ok, "expected closed Recv, got msg %v", msg)
155+
case <-time.After(time.Second):
156+
require.FailNow(t, "peer.Recv() did not close after truncated frame")
157+
}
158+
}
159+
160+
// TestRawSocketC5_DeserializerErrorAborts pins WAMP §5.3.1 strict
161+
// behavior: when a frame's body is well-framed but its payload fails
162+
// deserialization, recvHandler logs the error, closes the connection,
163+
// and exits — same teardown as a torn read. The peer's Recv channel
164+
// closes; no further frames are processed even if more arrive on the
165+
// wire.
166+
//
167+
// Prior to the recvHandler fix in this commit, recvHandler silently
168+
// skipped the bad frame (`continue MsgLoop`) and kept reading —
169+
// permissive but at odds with the spec.
170+
func TestRawSocketC5_DeserializerErrorAborts(t *testing.T) {
171+
client, peer := newPipedRawSocket(t, 4) // 2^13 = 8192
172+
t.Cleanup(func() { peer.Close() })
173+
174+
// First (and only) frame the server processes: body is
175+
// `not valid json {` — well-framed but fails JSON
176+
// deserialization.
177+
writeFrame(t, client, 0x00, []byte("not valid json {"))
178+
179+
// peer.Recv() must close (recvHandler returned via the new
180+
// abort-on-deserialize-error path). A subsequent frame is
181+
// irrelevant — we already proved recvHandler stopped.
182+
select {
183+
case msg, ok := <-peer.Recv():
184+
require.False(t, ok, "expected closed Recv after malformed frame, got msg %v", msg)
185+
case <-time.After(time.Second):
186+
require.FailNow(t, "peer.Recv() did not close after malformed frame")
187+
}
188+
_ = client.Close()
189+
}
190+
191+
// TestRawSocketC9_PingStorm verifies the rawSocketPeer handles a
192+
// rapid burst of PING frames inline (no per-ping goroutine spawn,
193+
// no memory growth, connection stays responsive). After the storm,
194+
// a regular WAMP message must still be delivered.
195+
func TestRawSocketC9_PingStorm(t *testing.T) {
196+
client, peer := newPipedRawSocket(t, 6) // 2^15 = 32768
197+
t.Cleanup(func() { peer.Close() })
198+
199+
const pingCount = 1000
200+
201+
// Concurrently consume PONGs from the server side so the
202+
// client's send buffer doesn't fill up (net.Pipe is unbuffered).
203+
pongDone := make(chan struct{})
204+
go func() {
205+
defer close(pongDone)
206+
for range pingCount {
207+
var resp [4]byte
208+
if _, err := io.ReadFull(client, resp[:]); err != nil {
209+
return
210+
}
211+
if resp[0] != 0x02 {
212+
t.Errorf("expected PONG (type 0x02), got %#x", resp[0])
213+
return
214+
}
215+
length := int(resp[1])<<16 | int(resp[2])<<8 | int(resp[3])
216+
if length > 0 {
217+
if _, err := io.CopyN(io.Discard, client, int64(length)); err != nil {
218+
return
219+
}
220+
}
221+
}
222+
}()
223+
224+
// Burst PINGs of varying sizes (1, 2, ..., pingCount bytes
225+
// modulo a small max so we stay well within recvLimit).
226+
for i := 1; i <= pingCount; i++ {
227+
size := (i % 64) + 1
228+
writeFrame(t, client, 0x01, make([]byte, size))
229+
}
230+
231+
// Wait for all PONGs.
232+
select {
233+
case <-pongDone:
234+
case <-time.After(5 * time.Second):
235+
require.FailNow(t, "PING storm did not produce expected PONG count")
236+
}
237+
238+
// Connection must still be responsive: send a HELLO and observe
239+
// it delivered to peer.Recv().
240+
hello := wamp.Hello{Realm: "test", Details: wamp.Dict{}}
241+
helloBytes, err := (&serialize.JSONSerializer{}).Serialize(&hello)
242+
require.NoError(t, err)
243+
writeFrame(t, client, 0x00, helloBytes)
244+
245+
select {
246+
case msg, ok := <-peer.Recv():
247+
require.True(t, ok, "Recv closed unexpectedly after PING storm")
248+
_, isHello := msg.(*wamp.Hello)
249+
require.True(t, isHello, "expected HELLO after PING storm, got %T", msg)
250+
case <-time.After(time.Second):
251+
require.FailNow(t, "peer.Recv() did not deliver post-storm HELLO")
252+
}
253+
_ = client.Close()
254+
}
255+
256+
// TestRawSocketC9_PingPong verifies that PONG frames received from
257+
// the remote side are silently consumed (per spec §5.4.4): the
258+
// peer's recvHandler discards the body and continues. This is the
259+
// inverse of the PING test — exercises the case 2 branch.
260+
func TestRawSocketC9_PingPong(t *testing.T) {
261+
client, peer := newPipedRawSocket(t, 4)
262+
t.Cleanup(func() { peer.Close() })
263+
264+
// Send a PONG frame followed by a HELLO.
265+
writeFrame(t, client, 0x02, []byte("opaque pong payload"))
266+
hello := wamp.Hello{Realm: "test", Details: wamp.Dict{}}
267+
helloBytes, err := (&serialize.JSONSerializer{}).Serialize(&hello)
268+
require.NoError(t, err)
269+
writeFrame(t, client, 0x00, helloBytes)
270+
271+
select {
272+
case msg, ok := <-peer.Recv():
273+
require.True(t, ok, "Recv closed after PONG")
274+
_, isHello := msg.(*wamp.Hello)
275+
require.True(t, isHello, "expected HELLO after PONG discard, got %T", msg)
276+
case <-time.After(time.Second):
277+
require.FailNow(t, "peer.Recv() did not deliver post-PONG HELLO")
278+
}
279+
_ = client.Close()
280+
}
281+
282+
// TestRawSocketC3_HandshakeFailureDoesNotLeak verifies that when the
283+
// server-side handshake itself fails (e.g. client sends garbage in
284+
// the magic byte), AcceptRawSocket returns an error and no peer is
285+
// constructed — no goroutines started, nothing to leak.
286+
func TestRawSocketC3_HandshakeFailureDoesNotLeak(t *testing.T) {
287+
client, server := net.Pipe()
288+
defer client.Close()
289+
290+
var wg sync.WaitGroup
291+
wg.Add(1)
292+
go func() {
293+
defer wg.Done()
294+
// Garbage in magic byte → server should error out.
295+
_, _ = client.Write([]byte{0x00, 0x00, 0x00, 0x00})
296+
}()
297+
298+
logger := log.New(io.Discard, "", 0)
299+
_, err := transport.AcceptRawSocket(server, logger, 1, 0)
300+
require.Error(t, err, "expected handshake error on bad magic")
301+
302+
wg.Wait()
303+
}

0 commit comments

Comments
 (0)