diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index 635a094..6c90abc 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -145,6 +145,10 @@ pub enum NetEndpoint { /// 已升级为中继的 peer 集合。这些 peer 的 RTCPeerConnection 已被关闭, /// 后续 server→peer 字节走信令 WS 二进制帧。 relayed_peers: HashSet, + /// 每个中继 peer 的本地发送限速器。 + /// Worker 仍是最终裁决者;这里提前节流,避免高视距 ChunkSnapshot + /// 在一帧内打爆 DO 的 `rate_limit`。 + relay_rate_limiters: HashMap, /// 已经发出 relay_request 但还在等服务端 RelayActive 的 peer。 /// 用于避免对同一 peer 重复请求升级。 relay_requested: HashSet, @@ -208,6 +212,7 @@ impl NetEndpoint { peer_to_entity: HashMap::new(), host_self_entity_id: None, relayed_peers: HashSet::new(), + relay_rate_limiters: HashMap::new(), relay_requested: HashSet::new(), negotiation_started_ms: HashMap::new(), }; @@ -368,6 +373,7 @@ impl NetEndpoint { peer_to_entity, host_self_entity_id, relayed_peers, + relay_rate_limiters, signaling, .. } = self @@ -377,7 +383,8 @@ impl NetEndpoint { let mut unsent: Vec = Vec::new(); - for msg in msgs { + let mut iter = msgs.into_iter(); + while let Some(msg) = iter.next() { let plan = plan_route(&msg, peer_to_entity, *host_self_entity_id); // 先准备字节(同一条消息发给多个 peer 时复用编码结果) let bytes = if plan.peers_to_send.is_empty() { @@ -394,7 +401,7 @@ impl NetEndpoint { let channel = transport::channel_for_server_message(&msg.message); // 流控检查:若任一目标 peer 的 reliable DC 因 bufferedAmount 暂停,整条消息推迟。 - // 中继 peer 不受 DC bufferedAmount 影响(WS 自带反压)。 + // 中继 peer 走下面的本地令牌桶;两类流控都在发送前检查,避免半条消息已发再回滚。 if let Some(_b) = &bytes && channel == crate::transport::ChannelKind::Reliable { @@ -404,7 +411,38 @@ impl NetEndpoint { }); if any_paused { unsent.push(msg); - continue; + unsent.extend(iter); + break; + } + } + + if bytes.is_some() { + let relayed_targets: Vec = plan + .peers_to_send + .iter() + .copied() + .filter(|pid| relayed_peers.contains(pid)) + .collect(); + let now = now_ms(); + for pid in &relayed_targets { + relay_rate_limiters + .entry(*pid) + .or_insert_with(|| RelayRateLimiter::new(DEFAULT_RELAY_MAX_RATE, now)); + } + let relay_budget_exhausted = relayed_targets.iter().any(|pid| { + relay_rate_limiters + .get_mut(pid) + .is_some_and(|limiter| !limiter.has_token(now)) + }); + if relay_budget_exhausted { + unsent.push(msg); + unsent.extend(iter); + break; + } + for pid in &relayed_targets { + if let Some(limiter) = relay_rate_limiters.get_mut(pid) { + limiter.consume_token(now); + } } } @@ -460,6 +498,7 @@ impl NetEndpoint { ice_servers, session, relayed_peers, + relay_rate_limiters, relay_requested, negotiation_started_ms, .. @@ -471,6 +510,7 @@ impl NetEndpoint { ice_servers, session, relayed_peers, + relay_rate_limiters, relay_requested, negotiation_started_ms, &mut peer_msg_handler, @@ -509,6 +549,50 @@ const CONNECTED_SESSION: RoomSession = RoomSession::Connected; /// 协商超时阈值:从 PeerJoined 起 15s 仍未 Connected → 触发中继升级。 const NEGOTIATION_TIMEOUT_MS: f64 = 15_000.0; +const DEFAULT_RELAY_MAX_RATE: u32 = 200; +const RELAY_CLIENT_RATE_HEADROOM: f64 = 0.8; + +/// Worker 中继按"消息条数"做令牌桶限流。Host 高视距会在一帧内产生上百个 +/// ChunkSnapshot;本地先按服务端下发的 `max_rate` 留出余量发送,避免触发 +/// `relay_closed{reason:"rate_limit"}`。 +#[derive(Clone, Debug)] +pub struct RelayRateLimiter { + capacity: f64, + refill_per_ms: f64, + tokens: f64, + last_refill_ms: f64, +} + +impl RelayRateLimiter { + fn new(max_rate: u32, now_ms: f64) -> Self { + let safe_rate = ((max_rate.max(1) as f64) * RELAY_CLIENT_RATE_HEADROOM).max(1.0); + Self { + capacity: safe_rate, + refill_per_ms: safe_rate / 1000.0, + tokens: safe_rate, + last_refill_ms: now_ms, + } + } + + fn has_token(&mut self, now_ms: f64) -> bool { + self.refill(now_ms); + self.tokens >= 1.0 + } + + fn consume_token(&mut self, now_ms: f64) { + self.refill(now_ms); + self.tokens = (self.tokens - 1.0).max(0.0); + } + + fn refill(&mut self, now_ms: f64) { + let elapsed = now_ms - self.last_refill_ms; + if elapsed <= 0.0 { + return; + } + self.tokens = (self.tokens + elapsed * self.refill_per_ms).min(self.capacity); + self.last_refill_ms = now_ms; + } +} /// 取当前 performance.now() 毫秒;浏览器外(如 Node 测试)返回 0。 fn now_ms() -> f64 { @@ -530,6 +614,7 @@ fn poll_host( ice_servers: &mut Vec, session: &mut RoomSession, relayed_peers: &mut HashSet, + relay_rate_limiters: &mut HashMap, relay_requested: &mut HashSet, negotiation_started_ms: &mut HashMap, peer_msg_handler: &mut Option<&mut dyn FnMut(u32, ClientMessage)>, @@ -571,6 +656,7 @@ fn poll_host( pending.remove(&peer_id); negotiation_started_ms.remove(&peer_id); relayed_peers.remove(&peer_id); + relay_rate_limiters.remove(&peer_id); relay_requested.remove(&peer_id); // Phase 5:通知 client 端做 host_unregister_peer + server.remove_player out.push(RoomEvent::RemoteLeft { peer_id }); @@ -609,7 +695,9 @@ fn poll_host( // 信令关闭对已建立的 PC 没影响(设计如此);不切 session log::info!("[net/host] signaling socket closed"); } - SignalingEvent::RelayActive { peer_id, .. } => { + SignalingEvent::RelayActive { + peer_id, max_rate, .. + } => { // 服务端确认中继对建立。关闭对应 PeerConnection(释放资源),把 peer 移入 relayed_peers。 relay_requested.remove(&peer_id); if let Some(pc) = peers.remove(&peer_id) { @@ -618,6 +706,7 @@ fn poll_host( pending.remove(&peer_id); negotiation_started_ms.remove(&peer_id); if relayed_peers.insert(peer_id) { + relay_rate_limiters.insert(peer_id, RelayRateLimiter::new(max_rate, now_ms())); log::info!("[net/host] peer {peer_id} upgraded to relay"); out.push(RoomEvent::PeerRelayed { peer_id }); // 中继视为已连通,PeerCount 也包含 relayed @@ -630,6 +719,7 @@ fn poll_host( SignalingEvent::RelayClosed { peer_id, reason } => { log::info!("[net/host] relay closed peer {peer_id}: {reason}"); relayed_peers.remove(&peer_id); + relay_rate_limiters.remove(&peer_id); relay_requested.remove(&peer_id); // 直连 PC 之前已关闭;若不在 relayed_peers 也不在 peers,就把 peer 当作离线 out.push(RoomEvent::RemoteLeft { peer_id }); @@ -1066,4 +1156,29 @@ mod tests { assert!(!plan.send_to_local); assert!(plan.peers_to_send.is_empty()); } + + #[test] + fn relay_rate_limiter_keeps_headroom_under_worker_cap() { + let mut limiter = RelayRateLimiter::new(200, 0.0); + + for _ in 0..160 { + assert!(limiter.has_token(0.0)); + limiter.consume_token(0.0); + } + assert!(!limiter.has_token(0.0)); + + // 80% 余量下补充速率为 160/s;6ms 不足 1 条,7ms 可以再发 1 条。 + assert!(!limiter.has_token(6.0)); + assert!(limiter.has_token(7.0)); + } + + #[test] + fn relay_rate_limiter_small_rate_still_progresses() { + let mut limiter = RelayRateLimiter::new(1, 0.0); + + assert!(limiter.has_token(0.0)); + limiter.consume_token(0.0); + assert!(!limiter.has_token(999.0)); + assert!(limiter.has_token(1000.0)); + } } diff --git a/docs/modules/net.md b/docs/modules/net.md index 6657b2f..8d34070 100644 --- a/docs/modules/net.md +++ b/docs/modules/net.md @@ -174,6 +174,8 @@ impl SignalingClient { } ``` +`RelayActive.max_rate` 会被 Host 端记录为本地中继发送令牌桶的上限;`host_route_outbox()` 在真正调用 `send_relay_binary()` 前预扣 token,预算不足时把当前及后续 outbox 消息推迟到下一帧,避免高视距 ChunkSnapshot 突发触发 Worker 的 `rate_limit`。 + **注意**:`web_sys::WebSocket` 的事件回调通过 `Closure` 注册,回调内只能 `inbox.borrow_mut().push_back(event)`,不能直接做长时间工作(保持回调短)。 --- diff --git a/docs/networking/protocol.md b/docs/networking/protocol.md index 7c3d170..071e90d 100644 --- a/docs/networking/protocol.md +++ b/docs/networking/protocol.md @@ -180,6 +180,7 @@ if let Some(full) = assembler.ingest(pos, frag_index, frag_total, payload) { - `PeerConnection::send()` 发送后检查 `bufferedAmount`,超过 **1 MB 高水位**时设置暂停标志 - 暂停后该 peer 的 reliable 消息被推迟到下帧发送,等待 `onbufferedamountlow` 事件清除暂停标志 - `host_route_outbox()` 返回流控阻塞的未发送消息,由 `flush_server_outbox()` 重新入队 server.outbox +- 中继模式额外使用 Worker 下发的 `max_rate` 做客户端本地令牌桶节流(默认按 80% 留余量);高视距产生的大量 `ChunkSnapshot` 会分帧发送,避免触发 `relay_closed{reason:"rate_limit"}` --- diff --git a/docs/networking/signaling.md b/docs/networking/signaling.md index ad02877..71019d6 100644 --- a/docs/networking/signaling.md +++ b/docs/networking/signaling.md @@ -405,7 +405,7 @@ DO 收到客户端帧后: | 单 payload 大小 | ≤ 64 KB(不含 4B 头) | `relay_closed{reason:"msg_too_large"}` | | 发送速率 | 200 msg/s 令牌桶 | `relay_closed{reason:"rate_limit"}` | -参数在 `relay_active` 中下发(`max_msg_size` / `max_rate`),客户端可读取但当前仅作展示用,限速由 DO 强制执行。 +参数在 `relay_active` 中下发(`max_msg_size` / `max_rate`)。DO 仍做最终强制限制;客户端 Host 端也会按 `max_rate` 建本地令牌桶并留约 20% 余量,把高视距产生的大量 `ChunkSnapshot` 分帧送出,避免正常预载触发 `rate_limit`。 ### 关闭与清理