Skip to content

Commit 7ff72e3

Browse files
committed
refactor: modularize frontend state with event-driven UI updates
Backend: extract Client, Handler, ProtocolError from monolithic Hub into separate modules under internal/signal/. Frontend: introduce state/ layer (roomState, mediaState, peersState) with subscribe/notify mechanism. State mutations automatically trigger UI refresh via coarse-grained subscriptions with anti-reentrancy guard. Remove 21 manual updateControls() calls scattered across controllers. New modules: protocol/message.js (shared contract), webrtc/peerState.js (Perfect Negotiation), controllers/chat.js (DataChannel messaging). Tests: 65 passing (was 34), including subscribe/notify unit tests.
1 parent 1bce5e7 commit 7ff72e3

26 files changed

Lines changed: 2992 additions & 1043 deletions

CONTEXT.md

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# LessUp WebRTC 领域词汇表
2+
3+
本文档定义项目的核心领域概念,用于指导架构讨论和代码命名。
4+
5+
---
6+
7+
## 核心领域
8+
9+
### Room(房间)
10+
11+
一个虚拟的通信空间,客户端加入后可以与其他成员进行 WebRTC 通话。
12+
13+
- 每个房间有唯一的名称(最长 64 字符)
14+
- 房间自动创建(首个客户端加入时)和销毁(最后一个离开时)
15+
- 房间容量上限:50 个客户端
16+
- 系统房间上限:1000 个房间
17+
18+
### Client(客户端)
19+
20+
连接到信令服务器的 WebSocket 连接,代表一个用户的信令身份。
21+
22+
- 客户端 ID:用户提供的唯一标识符(最长 64 字符)
23+
- 连接 ID:服务器分配的内部标识符(用于日志追踪)
24+
- 身份绑定:首次加入房间后,ID 和房间不可更改
25+
26+
### Peer(对等方)
27+
28+
WebRTC 连接的远端参与者,存储在本地状态中。
29+
30+
- 每个 Peer 对应一个 RTCPeerConnection
31+
- 包含连接状态、媒体流、UI 元素引用
32+
- 与特定 Client ID 关联(同一用户可能有多条 Peer 连接)
33+
34+
### Signal(信令)
35+
36+
用于建立 WebRTC 连接的消息交换机制。
37+
38+
- 通过 WebSocket 传输
39+
- 消息类型:join、offer、answer、candidate、hangup
40+
- 信令服务器只转发,不解析 SDP/ICE 内容
41+
42+
### Media Stream(媒体流)
43+
44+
音视频数据的实时流。
45+
46+
- 本地流:来自摄像头/麦克风的采集
47+
- 远端流:从 Peer 接收的流
48+
- 屏幕流:来自屏幕共享的流
49+
50+
### DataChannel(数据通道)
51+
52+
WebRTC 的双向数据传输通道。
53+
54+
- 用于聊天消息传输
55+
- 在发起呼叫时创建
56+
- 支持 open/message/close 事件
57+
58+
---
59+
60+
## 系统边界
61+
62+
### 后端(Go)
63+
64+
| 模块 | 职责 |
65+
|:-----|:-----|
66+
| Signal Hub | 房间管理、消息路由、身份验证 |
67+
| Client | WebSocket 连接生命周期、速率限制 |
68+
| Message | 信令消息结构定义、类型常量 |
69+
| ProtocolError | 协议错误领域模型、错误码定义 |
70+
71+
### 前端(JavaScript)
72+
73+
| 模块 | 职责 |
74+
|:-----|:-----|
75+
| app.js | 应用入口、控制器组装 |
76+
| state/ | 领域状态管理(roomState、mediaState、peersState) |
77+
| signaling.js | WebSocket 连接、消息处理 |
78+
| peers.js | Peer 连接管理、SDP/ICE 协商 |
79+
| chat.js | DataChannel 聊天消息收发 |
80+
| media.js | 媒体流管理、录制 |
81+
| ui.js | DOM 操作、视图渲染 |
82+
| stats.js | WebRTC 统计信息收集 |
83+
84+
---
85+
86+
## 状态机
87+
88+
### 房间状态(roomState)
89+
90+
```
91+
idle → connecting → joined ⇄ reconnecting → idle
92+
```
93+
94+
### Peer 连接状态(connectionState)
95+
96+
```
97+
new → connecting → connected → disconnected → closed/failed
98+
```
99+
100+
---
101+
102+
## 错误码
103+
104+
信令服务器定义的错误码:
105+
106+
| 错误码 | 含义 |
107+
|:-------|:-----|
108+
| `invalid_id` | 客户端 ID 格式无效 |
109+
| `invalid_room` | 房间名格式无效 |
110+
| `invalid_join` | 加入请求缺少必要字段 |
111+
| `duplicate_id` | 房间内 ID 已存在 |
112+
| `room_full` | 房间已满(50 人) |
113+
| `room_limit_reached` | 系统房间上限(1000) |
114+
| `room_missing` | 房间不存在 |
115+
| `already_joined` | 已在房间中 |
116+
| `identity_locked` | 身份已绑定不可更改 |
117+
| `not_joined` | 未加入房间 |
118+
| `invalid_target` | 目标客户端无效 |
119+
| `target_not_found` | 目标客户端不在房间 |
120+
| `membership_lost` | 客户端已从房间移除 |
121+
| `unknown_type` | 消息类型不支持 |
122+
| `rate_limited` | 消息发送过快 |

internal/signal/client.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package signal
2+
3+
import (
4+
"errors"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"github.com/gorilla/websocket"
10+
)
11+
12+
// Client represents a connected WebSocket client.
13+
type Client struct {
14+
mu sync.RWMutex
15+
id string
16+
room string
17+
connID uint64
18+
conn *websocket.Conn
19+
send chan Message
20+
closed chan struct{}
21+
closeOnce sync.Once
22+
// Rate limiting
23+
msgCount int
24+
msgWindow time.Time
25+
rateLimited bool
26+
}
27+
28+
// identity returns the client's ID and room.
29+
func (c *Client) identity() (userID, userRoom string) {
30+
c.mu.RLock()
31+
defer c.mu.RUnlock()
32+
return c.id, c.room
33+
}
34+
35+
// setIdentity sets the client's ID and room.
36+
func (c *Client) setIdentity(id, room string) {
37+
c.mu.Lock()
38+
c.id = id
39+
c.room = room
40+
c.mu.Unlock()
41+
}
42+
43+
// setRoom sets the client's room.
44+
func (c *Client) setRoom(room string) {
45+
c.mu.Lock()
46+
c.room = room
47+
c.mu.Unlock()
48+
}
49+
50+
// checkRateLimit implements token bucket rate limiting.
51+
// Returns true if the message should be allowed, false if rate limited.
52+
// Allows burst up to RateLimitBurst (50), then enforces MaxMessagesPerSecond (30/sec).
53+
func (c *Client) checkRateLimit() bool {
54+
c.mu.Lock()
55+
defer c.mu.Unlock()
56+
57+
now := time.Now()
58+
// Reset window if more than 1 second has passed
59+
if now.Sub(c.msgWindow) >= time.Second {
60+
c.msgWindow = now
61+
c.msgCount = 0
62+
c.rateLimited = false
63+
}
64+
65+
c.msgCount++
66+
67+
// First check: absolute burst limit (hard cap at 50)
68+
if c.msgCount > RateLimitBurst {
69+
if !c.rateLimited {
70+
c.rateLimited = true
71+
log.Printf("signal: rate limiting client conn=%d (burst exceeded)", c.connID)
72+
}
73+
return false
74+
}
75+
76+
// Second check: per-second rate limit after initial burst window
77+
// Only enforce if we're past the burst allowance (30) within the first second
78+
if c.msgCount > MaxMessagesPerSecond && now.Sub(c.msgWindow) < time.Second {
79+
if !c.rateLimited {
80+
c.rateLimited = true
81+
log.Printf("signal: rate limiting client conn=%d (rate exceeded)", c.connID)
82+
}
83+
return false
84+
}
85+
86+
return true
87+
}
88+
89+
// sendError sends an error message to the client.
90+
func (c *Client) sendError(err *ProtocolError) error {
91+
_, room := c.identity()
92+
return c.enqueue(Message{Type: MsgTypeError, Room: room, Code: err.Code, Error: err.Message})
93+
}
94+
95+
// enqueue queues a message for sending to the client.
96+
func (c *Client) enqueue(msg Message) error {
97+
select {
98+
case <-c.closed:
99+
return errClientClosed
100+
default:
101+
}
102+
103+
timer := time.NewTimer(SendTimeout)
104+
105+
select {
106+
case <-c.closed:
107+
timer.Stop()
108+
return errClientClosed
109+
case c.send <- msg:
110+
// Drain timer to prevent resource leak
111+
if !timer.Stop() {
112+
select {
113+
case <-timer.C:
114+
default:
115+
}
116+
}
117+
return nil
118+
case <-timer.C:
119+
return errors.New("send timeout")
120+
}
121+
}
122+
123+
// close closes the client's WebSocket connection.
124+
func (c *Client) close() {
125+
c.closeOnce.Do(func() {
126+
close(c.closed)
127+
if c.conn == nil {
128+
return
129+
}
130+
defer func() {
131+
if r := recover(); r != nil {
132+
log.Printf("signal: conn close panic conn=%d: %v", c.connID, r)
133+
}
134+
}()
135+
_ = c.conn.Close()
136+
})
137+
}
138+
139+
// writePump handles sending messages to the WebSocket connection.
140+
func (c *Client) writePump() {
141+
ticker := time.NewTicker(PingPeriod)
142+
defer ticker.Stop()
143+
144+
for {
145+
select {
146+
case <-c.closed:
147+
return
148+
case msg := <-c.send:
149+
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
150+
log.Printf("signal: set write deadline failed conn=%d: %v", c.connID, err)
151+
c.close()
152+
return
153+
}
154+
if err := c.conn.WriteJSON(msg); err != nil {
155+
id, room := c.identity()
156+
log.Printf("signal: write message error room=%s id=%s conn=%d: %v", room, id, c.connID, err)
157+
c.close()
158+
return
159+
}
160+
case <-ticker.C:
161+
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
162+
log.Printf("signal: set ping deadline failed conn=%d: %v", c.connID, err)
163+
c.close()
164+
return
165+
}
166+
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
167+
id, room := c.identity()
168+
log.Printf("signal: ping failed room=%s id=%s conn=%d: %v", room, id, c.connID, err)
169+
c.close()
170+
return
171+
}
172+
}
173+
}
174+
}

internal/signal/errors.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package signal
2+
3+
// ProtocolError 表示信令协议级别的错误,包含错误码和可读消息。
4+
// 错误码用于客户端程序化处理,消息用于人类阅读。
5+
type ProtocolError struct {
6+
Code string
7+
Message string
8+
}
9+
10+
func (e *ProtocolError) Error() string { return e.Message }
11+
12+
// 信令协议错误定义
13+
// 这些错误码与 openspec/specs/signaling/spec.md 中定义的错误码保持同步。
14+
var (
15+
// 客户端身份相关错误
16+
ErrInvalidID = &ProtocolError{Code: "invalid_id", Message: "invalid client id"}
17+
ErrDuplicateID = &ProtocolError{Code: "duplicate_id", Message: "client id already exists in room"}
18+
ErrIdentityLocked = &ProtocolError{Code: "identity_locked", Message: "connection identity is immutable"}
19+
20+
// 房间相关错误
21+
ErrInvalidRoom = &ProtocolError{Code: "invalid_room", Message: "invalid room name"}
22+
ErrRoomFull = &ProtocolError{Code: "room_full", Message: "room is full"}
23+
ErrRoomLimitReached = &ProtocolError{Code: "room_limit_reached", Message: "room limit reached"}
24+
ErrRoomMissing = &ProtocolError{Code: "room_missing", Message: "room no longer exists"}
25+
26+
// 加入相关错误
27+
ErrInvalidJoin = &ProtocolError{Code: "invalid_join", Message: "room and id are required"}
28+
ErrAlreadyJoined = &ProtocolError{Code: "already_joined", Message: "leave the current room before joining another"}
29+
ErrNotJoined = &ProtocolError{Code: "not_joined", Message: "join a room first"}
30+
31+
// 消息路由相关错误
32+
ErrInvalidTarget = &ProtocolError{Code: "invalid_target", Message: "invalid target client"}
33+
ErrTargetNotFound = &ProtocolError{Code: "target_not_found", Message: "target client is not in the room"}
34+
ErrMembershipLost = &ProtocolError{Code: "membership_lost", Message: "client is no longer registered in room"}
35+
36+
// 消息处理相关错误
37+
ErrUnknownType = &ProtocolError{Code: "unknown_type", Message: "unsupported message type"}
38+
ErrRateLimited = &ProtocolError{Code: "rate_limited", Message: "too many messages, please slow down"}
39+
)

0 commit comments

Comments
 (0)