Skip to content

Commit 0239da7

Browse files
Marko Petzoldclaude
andcommitted
test/transport: rawsocket frame-edge regression coverage
Adds wire-byte-level tests for the rawsocketpeer's recvHandler edge cases via net.Pipe + AcceptRawSocket. All pin existing behavior; none expose new bugs (which is itself worth knowing given that the analogous websocket tests were absent before gammazero#242). - C3 EOFAfterHandshake: remote drops conn immediately after handshake; recvHandler observes EOF, defer-closes Recv. No goroutine leak. - C3 HandshakeFailureDoesNotLeak: server-side handshake fails on garbage magic byte; AcceptRawSocket returns error without starting any goroutines. - C1 OversizedFrame: declared length exceeds recvLimit; recvHandler closes the conn and exits via the size-limit branch. Recv channel closes. - C2 TruncatedFrame: declared body length 100, only 50 bytes sent; io.ReadFull errors and recvHandler exits cleanly. - C5 DeserializerError: well-framed but malformed payload is silently skipped (`continue MsgLoop`); a subsequent valid frame is delivered. Pins the current permissive behavior. Note: WAMP §5.3.1 calls protocol violations a hard-error condition; the permissive `continue` is at odds with strict spec compliance and worth revisiting separately. - C9 PingStorm: 1000 PINGs of varying sizes are echoed back inline (no per-ping goroutine), then a regular HELLO is delivered through the same connection. Verifies the inline `io.CopyN(conn, conn)` echo path scales. - C9 PingPong: PONG frames are silently consumed (per spec §5.4.4); a subsequent HELLO is delivered. Each test uses a tiny `newPipedRawSocket` helper that does the 4-byte handshake exchange via net.Pipe and returns the client-side conn for raw-byte driving. Race detector clean over count=10. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8b4f740 commit 0239da7

1 file changed

Lines changed: 309 additions & 0 deletions

File tree

transport/rawsocketpeer_test.go

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package transport_test
2+
3+
import (
4+
"io"
5+
"log"
6+
"net"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/gammazero/nexus/v3/transport"
14+
"github.com/gammazero/nexus/v3/transport/serialize"
15+
"github.com/gammazero/nexus/v3/wamp"
16+
)
17+
18+
// rawSocketMagic is the RawSocket protocol magic byte (spec §5.4.1).
19+
const rawSocketMagic = 0x7f
20+
21+
// newPipedRawSocket sets up a server-side rawSocketPeer backed by a
22+
// net.Pipe(). The client side of the pipe is returned to the caller
23+
// for raw-byte-level driving of test scenarios. The handshake bytes
24+
// (4 bytes each direction) are exchanged before this returns, so the
25+
// caller is at the message-frame boundary.
26+
//
27+
// The recvLimit is rounded UP to the nearest power-of-2 ≥ 512; pass
28+
// 1 to get the smallest available limit (512).
29+
func newPipedRawSocket(t *testing.T, recvLimit int) (clientConn net.Conn, peer wamp.Peer) {
30+
t.Helper()
31+
client, server := net.Pipe()
32+
33+
// Client-side handshake: magic + (sendLimit<<4 | json) + 0 + 0.
34+
// We pick sendLimit=0 (which becomes 2^9=512) and serialization=1 (JSON).
35+
handshakeDone := make(chan struct{})
36+
go func() {
37+
defer close(handshakeDone)
38+
if _, err := client.Write([]byte{rawSocketMagic, 0<<4 | 1, 0, 0}); err != nil {
39+
t.Errorf("client handshake write: %v", err)
40+
return
41+
}
42+
var resp [4]byte
43+
if _, err := io.ReadFull(client, resp[:]); err != nil {
44+
t.Errorf("client handshake read: %v", err)
45+
return
46+
}
47+
if resp[0] != rawSocketMagic {
48+
t.Errorf("client handshake: got magic %#x, want %#x", resp[0], rawSocketMagic)
49+
}
50+
}()
51+
52+
logger := log.New(io.Discard, "", 0)
53+
peer, err := transport.AcceptRawSocket(server, logger, recvLimit, 0)
54+
require.NoError(t, err)
55+
56+
select {
57+
case <-handshakeDone:
58+
case <-time.After(time.Second):
59+
require.FailNow(t, "client handshake did not complete")
60+
}
61+
return client, peer
62+
}
63+
64+
// writeFrame writes a 4-byte length prefix + body to conn. The first
65+
// byte of the prefix is the WAMP rawsocket type byte (0=msg, 1=ping,
66+
// 2=pong); the next 3 bytes are the body length in big-endian.
67+
func writeFrame(t *testing.T, conn net.Conn, frameType byte, body []byte) {
68+
t.Helper()
69+
header := []byte{
70+
frameType,
71+
byte((len(body) >> 16) & 0xff),
72+
byte((len(body) >> 8) & 0xff),
73+
byte(len(body) & 0xff),
74+
}
75+
if _, err := conn.Write(header); err != nil {
76+
t.Fatalf("write header: %v", err)
77+
}
78+
if len(body) > 0 {
79+
if _, err := conn.Write(body); err != nil {
80+
t.Fatalf("write body: %v", err)
81+
}
82+
}
83+
}
84+
85+
// TestRawSocketC3_EOFAfterHandshake verifies that the rawSocketPeer
86+
// recvHandler/sendHandler exit cleanly when the remote side closes
87+
// the connection immediately after the handshake — no goroutine leak.
88+
// Adjacent to gammazero/nexus#242, which fixed two similar leak
89+
// vectors on the websocket peer.
90+
func TestRawSocketC3_EOFAfterHandshake(t *testing.T) {
91+
client, peer := newPipedRawSocket(t, 1)
92+
t.Cleanup(func() { peer.Close() })
93+
94+
// Remote drops the connection without sending any WAMP frame.
95+
require.NoError(t, client.Close())
96+
97+
// recvHandler should observe EOF, defer-close peer.Recv(), and
98+
// the router-side observer (us) reads zero/closed.
99+
select {
100+
case msg, ok := <-peer.Recv():
101+
require.False(t, ok, "expected closed Recv, got msg %v", msg)
102+
case <-time.After(time.Second):
103+
require.FailNow(t, "peer.Recv() did not close after remote EOF")
104+
}
105+
}
106+
107+
// TestRawSocketC1_OversizedFrame verifies that the rawSocketPeer
108+
// rejects a frame whose declared length exceeds the negotiated
109+
// recvLimit. The spec (§5.4) allows the receiver to close the
110+
// connection on protocol violation.
111+
func TestRawSocketC1_OversizedFrame(t *testing.T) {
112+
const recvLimit = 1 // → 2^9 = 512 bytes
113+
client, peer := newPipedRawSocket(t, recvLimit)
114+
t.Cleanup(func() { peer.Close() })
115+
116+
// Declare a body length of 2048 — bigger than the 512-byte limit.
117+
// Don't bother sending the body; recvHandler should reject on the
118+
// header alone.
119+
header := []byte{0x00, 0x00, 0x08, 0x00}
120+
_, err := client.Write(header)
121+
require.NoError(t, err)
122+
123+
// recvHandler should close the connection. peer.Recv() should
124+
// close as a result.
125+
select {
126+
case msg, ok := <-peer.Recv():
127+
require.False(t, ok, "expected closed Recv, got msg %v", msg)
128+
case <-time.After(time.Second):
129+
require.FailNow(t, "peer.Recv() did not close after oversized frame")
130+
}
131+
_ = client.Close()
132+
}
133+
134+
// TestRawSocketC2_TruncatedFrame verifies that the rawSocketPeer
135+
// handles a frame whose body is short of the declared length. The
136+
// underlying io.ReadFull should return ErrUnexpectedEOF; recvHandler
137+
// logs and exits.
138+
func TestRawSocketC2_TruncatedFrame(t *testing.T) {
139+
client, peer := newPipedRawSocket(t, 4) // 2^13 = 8192
140+
t.Cleanup(func() { peer.Close() })
141+
142+
// Declare a body length of 100, then send only 50 bytes before
143+
// closing the conn. recvHandler will block in io.ReadFull
144+
// expecting 100 and observe the EOF after 50.
145+
header := []byte{0x00, 0x00, 0x00, 100}
146+
_, err := client.Write(header)
147+
require.NoError(t, err)
148+
_, err = client.Write(make([]byte, 50))
149+
require.NoError(t, err)
150+
require.NoError(t, client.Close())
151+
152+
select {
153+
case msg, ok := <-peer.Recv():
154+
require.False(t, ok, "expected closed Recv, got msg %v", msg)
155+
case <-time.After(time.Second):
156+
require.FailNow(t, "peer.Recv() did not close after truncated frame")
157+
}
158+
}
159+
160+
// TestRawSocketC5_DeserializerError pins the current behavior:
161+
// when a frame's body is well-framed but its payload fails
162+
// deserialization, recvHandler logs the error and *continues* the
163+
// MsgLoop (does not abort the connection). A subsequent valid frame
164+
// is delivered normally.
165+
//
166+
// WAMP §5.3.1 calls protocol violations a hard-error condition that
167+
// SHOULD trigger an ABORT. nexus's permissive behavior is at odds
168+
// with that — pinning it here makes any future strict-mode change
169+
// visible.
170+
func TestRawSocketC5_DeserializerError(t *testing.T) {
171+
client, peer := newPipedRawSocket(t, 4) // 2^13 = 8192
172+
t.Cleanup(func() { peer.Close() })
173+
174+
// First frame: body is `not valid json {` — well-framed but
175+
// fails JSON deserialization.
176+
writeFrame(t, client, 0x00, []byte("not valid json {"))
177+
178+
// Second frame: a valid HELLO message in JSON form.
179+
hello := wamp.Hello{Realm: "test", Details: wamp.Dict{}}
180+
helloBytes, err := (&serialize.JSONSerializer{}).Serialize(&hello)
181+
require.NoError(t, err)
182+
writeFrame(t, client, 0x00, helloBytes)
183+
184+
// peer.Recv() should yield the HELLO. The bad first frame is
185+
// silently skipped per current behavior.
186+
select {
187+
case msg, ok := <-peer.Recv():
188+
require.True(t, ok, "Recv closed unexpectedly")
189+
_, isHello := msg.(*wamp.Hello)
190+
require.True(t, isHello, "expected HELLO after deserializer error skip, got %T", msg)
191+
case <-time.After(time.Second):
192+
require.FailNow(t, "peer.Recv() did not deliver post-error HELLO")
193+
}
194+
_ = client.Close()
195+
}
196+
197+
// TestRawSocketC9_PingStorm verifies the rawSocketPeer handles a
198+
// rapid burst of PING frames inline (no per-ping goroutine spawn,
199+
// no memory growth, connection stays responsive). After the storm,
200+
// a regular WAMP message must still be delivered.
201+
func TestRawSocketC9_PingStorm(t *testing.T) {
202+
client, peer := newPipedRawSocket(t, 6) // 2^15 = 32768
203+
t.Cleanup(func() { peer.Close() })
204+
205+
const pingCount = 1000
206+
207+
// Concurrently consume PONGs from the server side so the
208+
// client's send buffer doesn't fill up (net.Pipe is unbuffered).
209+
pongDone := make(chan struct{})
210+
go func() {
211+
defer close(pongDone)
212+
for range pingCount {
213+
var resp [4]byte
214+
if _, err := io.ReadFull(client, resp[:]); err != nil {
215+
return
216+
}
217+
if resp[0] != 0x02 {
218+
t.Errorf("expected PONG (type 0x02), got %#x", resp[0])
219+
return
220+
}
221+
length := int(resp[1])<<16 | int(resp[2])<<8 | int(resp[3])
222+
if length > 0 {
223+
if _, err := io.CopyN(io.Discard, client, int64(length)); err != nil {
224+
return
225+
}
226+
}
227+
}
228+
}()
229+
230+
// Burst PINGs of varying sizes (1, 2, ..., pingCount bytes
231+
// modulo a small max so we stay well within recvLimit).
232+
for i := 1; i <= pingCount; i++ {
233+
size := (i % 64) + 1
234+
writeFrame(t, client, 0x01, make([]byte, size))
235+
}
236+
237+
// Wait for all PONGs.
238+
select {
239+
case <-pongDone:
240+
case <-time.After(5 * time.Second):
241+
require.FailNow(t, "PING storm did not produce expected PONG count")
242+
}
243+
244+
// Connection must still be responsive: send a HELLO and observe
245+
// it delivered to peer.Recv().
246+
hello := wamp.Hello{Realm: "test", Details: wamp.Dict{}}
247+
helloBytes, err := (&serialize.JSONSerializer{}).Serialize(&hello)
248+
require.NoError(t, err)
249+
writeFrame(t, client, 0x00, helloBytes)
250+
251+
select {
252+
case msg, ok := <-peer.Recv():
253+
require.True(t, ok, "Recv closed unexpectedly after PING storm")
254+
_, isHello := msg.(*wamp.Hello)
255+
require.True(t, isHello, "expected HELLO after PING storm, got %T", msg)
256+
case <-time.After(time.Second):
257+
require.FailNow(t, "peer.Recv() did not deliver post-storm HELLO")
258+
}
259+
_ = client.Close()
260+
}
261+
262+
// TestRawSocketC9_PingPong verifies that PONG frames received from
263+
// the remote side are silently consumed (per spec §5.4.4): the
264+
// peer's recvHandler discards the body and continues. This is the
265+
// inverse of the PING test — exercises the case 2 branch.
266+
func TestRawSocketC9_PingPong(t *testing.T) {
267+
client, peer := newPipedRawSocket(t, 4)
268+
t.Cleanup(func() { peer.Close() })
269+
270+
// Send a PONG frame followed by a HELLO.
271+
writeFrame(t, client, 0x02, []byte("opaque pong payload"))
272+
hello := wamp.Hello{Realm: "test", Details: wamp.Dict{}}
273+
helloBytes, err := (&serialize.JSONSerializer{}).Serialize(&hello)
274+
require.NoError(t, err)
275+
writeFrame(t, client, 0x00, helloBytes)
276+
277+
select {
278+
case msg, ok := <-peer.Recv():
279+
require.True(t, ok, "Recv closed after PONG")
280+
_, isHello := msg.(*wamp.Hello)
281+
require.True(t, isHello, "expected HELLO after PONG discard, got %T", msg)
282+
case <-time.After(time.Second):
283+
require.FailNow(t, "peer.Recv() did not deliver post-PONG HELLO")
284+
}
285+
_ = client.Close()
286+
}
287+
288+
// TestRawSocketC3_HandshakeFailureDoesNotLeak verifies that when the
289+
// server-side handshake itself fails (e.g. client sends garbage in
290+
// the magic byte), AcceptRawSocket returns an error and no peer is
291+
// constructed — no goroutines started, nothing to leak.
292+
func TestRawSocketC3_HandshakeFailureDoesNotLeak(t *testing.T) {
293+
client, server := net.Pipe()
294+
defer client.Close()
295+
296+
var wg sync.WaitGroup
297+
wg.Add(1)
298+
go func() {
299+
defer wg.Done()
300+
// Garbage in magic byte → server should error out.
301+
_, _ = client.Write([]byte{0x00, 0x00, 0x00, 0x00})
302+
}()
303+
304+
logger := log.New(io.Discard, "", 0)
305+
_, err := transport.AcceptRawSocket(server, logger, 1, 0)
306+
require.Error(t, err, "expected handshake error on bad magic")
307+
308+
wg.Wait()
309+
}

0 commit comments

Comments
 (0)