Skip to content

Commit 4e85f0a

Browse files
committed
feat(websocket): add event helper package
1 parent af4409d commit 4e85f0a

7 files changed

Lines changed: 112 additions & 69 deletions

File tree

v3/socketio/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.25.0
44

55
require (
66
github.com/fasthttp/websocket v1.5.12
7-
github.com/gofiber/contrib/v3/websocket v1.0.0
7+
github.com/gofiber/contrib/v3/websocket v1.1.4
88
github.com/gofiber/fiber/v3 v3.2.0
99
github.com/google/uuid v1.6.0
1010
github.com/stretchr/testify v1.11.1

v3/socketio/legacy/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ legacy.On(legacy.EventMessage, func(ep *legacy.EventPayload) {
3131

3232
app.Get("/ws", legacy.New(func(kws *legacy.Websocket) {}))
3333
```
34+
35+
Set tuning globals such as `PongTimeout`, `SendQueueSize`, or `MaxSendRetry` on `github.com/gofiber/contrib/v3/websocket/event` directly before accepting connections.

v3/socketio/legacy/legacy.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
package legacy
66

77
import (
8-
"time"
9-
108
"github.com/gofiber/contrib/v3/websocket"
119
wsevent "github.com/gofiber/contrib/v3/websocket/event"
1210
"github.com/gofiber/fiber/v3"
@@ -41,18 +39,10 @@ var (
4139
ErrorUUIDDuplication = wsevent.ErrorUUIDDuplication
4240
)
4341

44-
var (
45-
PongTimeout = wsevent.PongTimeout
46-
RetrySendTimeout = wsevent.RetrySendTimeout
47-
MaxSendRetry = wsevent.MaxSendRetry
48-
ReadTimeout = wsevent.ReadTimeout
49-
)
50-
5142
// New returns the legacy plain WebSocket event handler.
5243
//
5344
// Deprecated: use github.com/gofiber/contrib/v3/websocket/event.New directly.
5445
func New(callback func(kws *Websocket), config ...websocket.Config) fiber.Handler {
55-
syncConfig()
5646
return wsevent.New(callback, config...)
5747
}
5848

@@ -90,17 +80,3 @@ func Broadcast(message []byte, mType ...int) {
9080
func Fire(event string, data []byte) {
9181
wsevent.Fire(event, data)
9282
}
93-
94-
func syncConfig() {
95-
wsevent.PongTimeout = durationOrDefault(PongTimeout, wsevent.PongTimeout)
96-
wsevent.RetrySendTimeout = durationOrDefault(RetrySendTimeout, wsevent.RetrySendTimeout)
97-
wsevent.MaxSendRetry = MaxSendRetry
98-
wsevent.ReadTimeout = durationOrDefault(ReadTimeout, wsevent.ReadTimeout)
99-
}
100-
101-
func durationOrDefault(value, fallback time.Duration) time.Duration {
102-
if value == 0 {
103-
return fallback
104-
}
105-
return value
106-
}

v3/socketio/legacy/legacy_test.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package legacy
22

33
import (
4+
"errors"
45
"net"
56
"testing"
67
"time"
@@ -47,7 +48,7 @@ func TestLegacyPlainWebSocketEventShim(t *testing.T) {
4748
},
4849
HandshakeTimeout: 5 * time.Second,
4950
}
50-
conn, _, err := dialer.Dial("ws://"+ln.Addr().String(), nil)
51+
conn, err := dialWebSocket(t, dialer, "ws://"+ln.Addr().String())
5152
require.NoError(t, err)
5253
defer func() { _ = conn.Close() }()
5354

@@ -58,3 +59,21 @@ func TestLegacyPlainWebSocketEventShim(t *testing.T) {
5859
require.Equal(t, TextMessage, messageType)
5960
require.Equal(t, "legacy:ping", string(message))
6061
}
62+
63+
func dialWebSocket(t *testing.T, dialer *websocket.Dialer, url string) (*websocket.Conn, error) {
64+
t.Helper()
65+
66+
var lastErr error
67+
for range 20 {
68+
conn, _, err := dialer.Dial(url, nil)
69+
if err == nil {
70+
return conn, nil
71+
}
72+
lastErr = err
73+
time.Sleep(10 * time.Millisecond)
74+
}
75+
if lastErr == nil {
76+
lastErr = errors.New("websocket dial did not run")
77+
}
78+
return nil, lastErr
79+
}

v3/websocket/event/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ func Broadcast(message []byte, mType ...int)
3535
func Fire(name string, data []byte)
3636
```
3737

38+
## Configuration
39+
40+
Set these package-level variables before accepting connections.
41+
42+
| Variable | Default | Description |
43+
|:-------------------|:--------|:------------|
44+
| `PongTimeout` | `1s` | Interval for server-sent WebSocket pong frames. |
45+
| `RetrySendTimeout` | `20ms` | Backoff between retries while the connection is not ready. |
46+
| `MaxSendRetry` | `5` | Max retries for transient socket write readiness issues. |
47+
| `SendQueueSize` | `100` | Per-connection outbound message queue capacity. |
48+
| `ReadTimeout` | `10ms` | Deprecated; reads now block until a frame arrives or the connection closes. |
49+
3850
## Example
3951

4052
```go

v3/websocket/event/event.go

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ var (
6464
RetrySendTimeout = 20 * time.Millisecond
6565
// MaxSendRetry defines the max retries for transient socket write issues.
6666
MaxSendRetry = 5
67-
// ReadTimeout controls the pause between read attempts.
67+
// SendQueueSize controls the per-connection outbound message queue size.
68+
SendQueueSize = 100
69+
// ReadTimeout is deprecated and no longer used; reads block until a
70+
// message arrives or the connection is closed.
6871
ReadTimeout = 10 * time.Millisecond
6972
)
7073

@@ -214,9 +217,7 @@ func (l *safeListeners) get(event string) []eventCallback {
214217
return make([]eventCallback, 0)
215218
}
216219

217-
ret := make([]eventCallback, 0, len(l.list[event]))
218-
ret = append(ret, l.list[event]...)
219-
return ret
220+
return append([]eventCallback(nil), l.list[event]...)
220221
}
221222

222223
var listeners = safeListeners{
@@ -241,7 +242,7 @@ func New(callback func(kws *Websocket), config ...websocket.Config) fiber.Handle
241242
Cookies: func(key string, defaultValue ...string) string {
242243
return c.Cookies(key, defaultValue...)
243244
},
244-
queue: make(chan message, 100),
245+
queue: make(chan message, sendQueueSize()),
245246
done: make(chan struct{}, 1),
246247
attributes: make(map[string]interface{}),
247248
isAlive: true,
@@ -422,6 +423,7 @@ func (kws *Websocket) Emit(message []byte, mType ...int) {
422423
func (kws *Websocket) Close() {
423424
kws.write(CloseMessage, []byte("Connection closed"))
424425
kws.fireEvent(EventClose, nil, nil)
426+
kws.disconnected(nil)
425427
}
426428

427429
// IsAlive reports whether the connection is active.
@@ -475,15 +477,25 @@ func (kws *Websocket) send(ctx context.Context) {
475477
case msg := <-kws.queue:
476478
if !kws.hasConn() {
477479
if msg.retries <= MaxSendRetry {
478-
go func(msg message) {
479-
time.Sleep(RetrySendTimeout)
480-
msg.retries++
481-
select {
482-
case kws.queue <- msg:
483-
case <-ctx.Done():
484-
case <-kws.done:
485-
}
486-
}(msg)
480+
retryTimer := time.NewTimer(RetrySendTimeout)
481+
select {
482+
case <-retryTimer.C:
483+
case <-ctx.Done():
484+
stopTimer(retryTimer)
485+
return
486+
case <-kws.done:
487+
stopTimer(retryTimer)
488+
return
489+
}
490+
491+
msg.retries++
492+
select {
493+
case kws.queue <- msg:
494+
case <-ctx.Done():
495+
return
496+
case <-kws.done:
497+
return
498+
}
487499
}
488500
continue
489501
}
@@ -522,42 +534,35 @@ func (kws *Websocket) run() {
522534
wg.Wait()
523535
}
524536

525-
func (kws *Websocket) read(ctx context.Context) {
526-
timeoutTicker := time.NewTicker(ReadTimeout)
527-
defer timeoutTicker.Stop()
537+
func (kws *Websocket) read(_ context.Context) {
528538
for {
529-
select {
530-
case <-timeoutTicker.C:
531-
if !kws.hasConn() {
532-
continue
533-
}
534-
535-
mType, msg, err := kws.Conn.ReadMessage()
539+
if !kws.hasConn() {
540+
return
541+
}
536542

537-
if mType == PingMessage {
538-
kws.fireEvent(EventPing, nil, nil)
539-
continue
540-
}
543+
mType, msg, err := kws.Conn.ReadMessage()
541544

542-
if mType == PongMessage {
543-
kws.fireEvent(EventPong, nil, nil)
544-
continue
545-
}
545+
if mType == PingMessage {
546+
kws.fireEvent(EventPing, nil, nil)
547+
continue
548+
}
546549

547-
if mType == CloseMessage {
548-
kws.disconnected(nil)
549-
return
550-
}
550+
if mType == PongMessage {
551+
kws.fireEvent(EventPong, nil, nil)
552+
continue
553+
}
551554

552-
if err != nil {
553-
kws.disconnected(err)
554-
return
555-
}
555+
if mType == CloseMessage {
556+
kws.disconnected(nil)
557+
return
558+
}
556559

557-
kws.fireEvent(EventMessage, msg, nil)
558-
case <-ctx.Done():
560+
if err != nil {
561+
kws.disconnected(err)
559562
return
560563
}
564+
565+
kws.fireEvent(EventMessage, msg, nil)
561566
}
562567
}
563568

@@ -597,6 +602,22 @@ func (kws *Websocket) randomUUID() string {
597602
return uuid.New().String()
598603
}
599604

605+
func sendQueueSize() int {
606+
if SendQueueSize <= 0 {
607+
return 1
608+
}
609+
return SendQueueSize
610+
}
611+
612+
func stopTimer(timer *time.Timer) {
613+
if !timer.Stop() {
614+
select {
615+
case <-timer.C:
616+
default:
617+
}
618+
}
619+
}
620+
600621
func fireGlobalEvent(event string, data []byte, err error) {
601622
for _, kws := range pool.all() {
602623
kws.fireEvent(event, data, err)

v3/websocket/event/event_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,19 @@ func TestWebsocketSetUUIDUpdatesPool(t *testing.T) {
306306
require.Equal(t, kws, poolEntry)
307307
}
308308

309+
func TestWebsocketCloseRemovesConnectionFromPool(t *testing.T) {
310+
resetState()
311+
312+
kws := createWS()
313+
pool.set(kws)
314+
315+
kws.Close()
316+
317+
require.False(t, kws.IsAlive())
318+
_, err := pool.get(kws.GetUUID())
319+
require.ErrorIs(t, err, ErrorInvalidConnection)
320+
}
321+
309322
func createWS() *Websocket {
310323
kws := &Websocket{
311324
Conn: nil,
@@ -321,7 +334,7 @@ func createWS() *Websocket {
321334
Cookies: func(key string, defaultValue ...string) string {
322335
return ""
323336
},
324-
queue: make(chan message),
337+
queue: make(chan message, 1),
325338
done: make(chan struct{}, 1),
326339
attributes: make(map[string]interface{}),
327340
isAlive: true,

0 commit comments

Comments
 (0)