Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 119 additions & 4 deletions crates/net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ pub enum NetEndpoint {
/// 已升级为中继的 peer 集合。这些 peer 的 RTCPeerConnection 已被关闭,
/// 后续 server→peer 字节走信令 WS 二进制帧。
relayed_peers: HashSet<u32>,
/// 每个中继 peer 的本地发送限速器。
/// Worker 仍是最终裁决者;这里提前节流,避免高视距 ChunkSnapshot
/// 在一帧内打爆 DO 的 `rate_limit`。
relay_rate_limiters: HashMap<u32, RelayRateLimiter>,
/// 已经发出 relay_request 但还在等服务端 RelayActive 的 peer。
/// 用于避免对同一 peer 重复请求升级。
relay_requested: HashSet<u32>,
Expand Down Expand Up @@ -208,6 +212,7 @@ impl NetEndpoint {
peer_to_entity: HashMap::new(),
host_self_entity_id: None,
relayed_peers: HashSet::new(),
relay_rate_limiters: HashMap::new(),
relay_requested: HashSet::new(),
negotiation_started_ms: HashMap::new(),
};
Expand Down Expand Up @@ -368,6 +373,7 @@ impl NetEndpoint {
peer_to_entity,
host_self_entity_id,
relayed_peers,
relay_rate_limiters,
signaling,
..
} = self
Expand All @@ -377,7 +383,8 @@ impl NetEndpoint {

let mut unsent: Vec<OutboundMessage> = Vec::new();

for msg in msgs {
let mut iter = msgs.into_iter();
while let Some(msg) = iter.next() {
let plan = plan_route(&msg, peer_to_entity, *host_self_entity_id);
// 先准备字节(同一条消息发给多个 peer 时复用编码结果)
let bytes = if plan.peers_to_send.is_empty() {
Expand All @@ -394,7 +401,7 @@ impl NetEndpoint {
let channel = transport::channel_for_server_message(&msg.message);

// 流控检查:若任一目标 peer 的 reliable DC 因 bufferedAmount 暂停,整条消息推迟。
// 中继 peer 不受 DC bufferedAmount 影响(WS 自带反压)
// 中继 peer 走下面的本地令牌桶;两类流控都在发送前检查,避免半条消息已发再回滚
if let Some(_b) = &bytes
&& channel == crate::transport::ChannelKind::Reliable
{
Expand All @@ -404,7 +411,38 @@ impl NetEndpoint {
});
if any_paused {
unsent.push(msg);
continue;
unsent.extend(iter);
break;
}
}

if bytes.is_some() {
let relayed_targets: Vec<u32> = plan
.peers_to_send
.iter()
.copied()
.filter(|pid| relayed_peers.contains(pid))
.collect();
let now = now_ms();
for pid in &relayed_targets {
relay_rate_limiters
.entry(*pid)
.or_insert_with(|| RelayRateLimiter::new(DEFAULT_RELAY_MAX_RATE, now));
}
let relay_budget_exhausted = relayed_targets.iter().any(|pid| {
relay_rate_limiters
.get_mut(pid)
.is_some_and(|limiter| !limiter.has_token(now))
});
if relay_budget_exhausted {
unsent.push(msg);
unsent.extend(iter);
break;
}
for pid in &relayed_targets {
if let Some(limiter) = relay_rate_limiters.get_mut(pid) {
limiter.consume_token(now);
}
}
}

Expand Down Expand Up @@ -460,6 +498,7 @@ impl NetEndpoint {
ice_servers,
session,
relayed_peers,
relay_rate_limiters,
relay_requested,
negotiation_started_ms,
..
Expand All @@ -471,6 +510,7 @@ impl NetEndpoint {
ice_servers,
session,
relayed_peers,
relay_rate_limiters,
relay_requested,
negotiation_started_ms,
&mut peer_msg_handler,
Expand Down Expand Up @@ -509,6 +549,50 @@ const CONNECTED_SESSION: RoomSession = RoomSession::Connected;

/// 协商超时阈值:从 PeerJoined 起 15s 仍未 Connected → 触发中继升级。
const NEGOTIATION_TIMEOUT_MS: f64 = 15_000.0;
const DEFAULT_RELAY_MAX_RATE: u32 = 200;
const RELAY_CLIENT_RATE_HEADROOM: f64 = 0.8;

/// Worker 中继按"消息条数"做令牌桶限流。Host 高视距会在一帧内产生上百个
/// ChunkSnapshot;本地先按服务端下发的 `max_rate` 留出余量发送,避免触发
/// `relay_closed{reason:"rate_limit"}`。
#[derive(Clone, Debug)]
pub struct RelayRateLimiter {
capacity: f64,
refill_per_ms: f64,
tokens: f64,
last_refill_ms: f64,
}

impl RelayRateLimiter {
fn new(max_rate: u32, now_ms: f64) -> Self {
let safe_rate = ((max_rate.max(1) as f64) * RELAY_CLIENT_RATE_HEADROOM).max(1.0);
Self {
capacity: safe_rate,
refill_per_ms: safe_rate / 1000.0,
tokens: safe_rate,
last_refill_ms: now_ms,
}
}

fn has_token(&mut self, now_ms: f64) -> bool {
self.refill(now_ms);
self.tokens >= 1.0
}

fn consume_token(&mut self, now_ms: f64) {
self.refill(now_ms);
self.tokens = (self.tokens - 1.0).max(0.0);
}

fn refill(&mut self, now_ms: f64) {
let elapsed = now_ms - self.last_refill_ms;
if elapsed <= 0.0 {
return;
}
self.tokens = (self.tokens + elapsed * self.refill_per_ms).min(self.capacity);
self.last_refill_ms = now_ms;
}
}

/// 取当前 performance.now() 毫秒;浏览器外(如 Node 测试)返回 0。
fn now_ms() -> f64 {
Expand All @@ -530,6 +614,7 @@ fn poll_host(
ice_servers: &mut Vec<IceServerConfig>,
session: &mut RoomSession,
relayed_peers: &mut HashSet<u32>,
relay_rate_limiters: &mut HashMap<u32, RelayRateLimiter>,
relay_requested: &mut HashSet<u32>,
negotiation_started_ms: &mut HashMap<u32, f64>,
peer_msg_handler: &mut Option<&mut dyn FnMut(u32, ClientMessage)>,
Expand Down Expand Up @@ -571,6 +656,7 @@ fn poll_host(
pending.remove(&peer_id);
negotiation_started_ms.remove(&peer_id);
relayed_peers.remove(&peer_id);
relay_rate_limiters.remove(&peer_id);
relay_requested.remove(&peer_id);
// Phase 5:通知 client 端做 host_unregister_peer + server.remove_player
out.push(RoomEvent::RemoteLeft { peer_id });
Expand Down Expand Up @@ -609,7 +695,9 @@ fn poll_host(
// 信令关闭对已建立的 PC 没影响(设计如此);不切 session
log::info!("[net/host] signaling socket closed");
}
SignalingEvent::RelayActive { peer_id, .. } => {
SignalingEvent::RelayActive {
peer_id, max_rate, ..
} => {
// 服务端确认中继对建立。关闭对应 PeerConnection(释放资源),把 peer 移入 relayed_peers。
relay_requested.remove(&peer_id);
if let Some(pc) = peers.remove(&peer_id) {
Expand All @@ -618,6 +706,7 @@ fn poll_host(
pending.remove(&peer_id);
negotiation_started_ms.remove(&peer_id);
if relayed_peers.insert(peer_id) {
relay_rate_limiters.insert(peer_id, RelayRateLimiter::new(max_rate, now_ms()));
log::info!("[net/host] peer {peer_id} upgraded to relay");
out.push(RoomEvent::PeerRelayed { peer_id });
// 中继视为已连通,PeerCount 也包含 relayed
Expand All @@ -630,6 +719,7 @@ fn poll_host(
SignalingEvent::RelayClosed { peer_id, reason } => {
log::info!("[net/host] relay closed peer {peer_id}: {reason}");
relayed_peers.remove(&peer_id);
relay_rate_limiters.remove(&peer_id);
relay_requested.remove(&peer_id);
// 直连 PC 之前已关闭;若不在 relayed_peers 也不在 peers,就把 peer 当作离线
out.push(RoomEvent::RemoteLeft { peer_id });
Expand Down Expand Up @@ -1066,4 +1156,29 @@ mod tests {
assert!(!plan.send_to_local);
assert!(plan.peers_to_send.is_empty());
}

#[test]
fn relay_rate_limiter_keeps_headroom_under_worker_cap() {
let mut limiter = RelayRateLimiter::new(200, 0.0);

for _ in 0..160 {
assert!(limiter.has_token(0.0));
limiter.consume_token(0.0);
}
assert!(!limiter.has_token(0.0));

// 80% 余量下补充速率为 160/s;6ms 不足 1 条,7ms 可以再发 1 条。
assert!(!limiter.has_token(6.0));
assert!(limiter.has_token(7.0));
}

#[test]
fn relay_rate_limiter_small_rate_still_progresses() {
let mut limiter = RelayRateLimiter::new(1, 0.0);

assert!(limiter.has_token(0.0));
limiter.consume_token(0.0);
assert!(!limiter.has_token(999.0));
assert!(limiter.has_token(1000.0));
}
}
2 changes: 2 additions & 0 deletions docs/modules/net.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl SignalingClient {
}
```

`RelayActive.max_rate` 会被 Host 端记录为本地中继发送令牌桶的上限;`host_route_outbox()` 在真正调用 `send_relay_binary()` 前预扣 token,预算不足时把当前及后续 outbox 消息推迟到下一帧,避免高视距 ChunkSnapshot 突发触发 Worker 的 `rate_limit`。

**注意**:`web_sys::WebSocket` 的事件回调通过 `Closure<dyn FnMut(...)>` 注册,回调内只能 `inbox.borrow_mut().push_back(event)`,不能直接做长时间工作(保持回调短)。

---
Expand Down
1 change: 1 addition & 0 deletions docs/networking/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ if let Some(full) = assembler.ingest(pos, frag_index, frag_total, payload) {
- `PeerConnection::send()` 发送后检查 `bufferedAmount`,超过 **1 MB 高水位**时设置暂停标志
- 暂停后该 peer 的 reliable 消息被推迟到下帧发送,等待 `onbufferedamountlow` 事件清除暂停标志
- `host_route_outbox()` 返回流控阻塞的未发送消息,由 `flush_server_outbox()` 重新入队 server.outbox
- 中继模式额外使用 Worker 下发的 `max_rate` 做客户端本地令牌桶节流(默认按 80% 留余量);高视距产生的大量 `ChunkSnapshot` 会分帧发送,避免触发 `relay_closed{reason:"rate_limit"}`

---

Expand Down
2 changes: 1 addition & 1 deletion docs/networking/signaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ DO 收到客户端帧后:
| 单 payload 大小 | ≤ 64 KB(不含 4B 头) | `relay_closed{reason:"msg_too_large"}` |
| 发送速率 | 200 msg/s 令牌桶 | `relay_closed{reason:"rate_limit"}` |

参数在 `relay_active` 中下发(`max_msg_size` / `max_rate`),客户端可读取但当前仅作展示用,限速由 DO 强制执行
参数在 `relay_active` 中下发(`max_msg_size` / `max_rate`)。DO 仍做最终强制限制;客户端 Host 端也会按 `max_rate` 建本地令牌桶并留约 20% 余量,把高视距产生的大量 `ChunkSnapshot` 分帧送出,避免正常预载触发 `rate_limit`

### 关闭与清理

Expand Down
Loading