Skip to content

Commit f55ac80

Browse files
committed
feat: #1 optimize backpressure handling with improved buffer sizes and data types
1 parent 7f511ec commit f55ac80

9 files changed

Lines changed: 48 additions & 29 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
[![Dialga](https://img.pokemondb.net/sprites/black-white/anim/normal/dialga.gif)](https://pokemondb.net/pokedex/dialga)
1313
[![Palkia](https://img.pokemondb.net/sprites/black-white/anim/normal/palkia.gif)](https://pokemondb.net/pokedex/palkia)
1414

15-
Networking in games is pure suffering.
15+
***Networking in games is pure suffering.***
1616

1717
You start with “just a simple relay server.” _(alone let authoritative servers)_
1818
Just a few sockets. A couple messages. Clean.
@@ -81,4 +81,4 @@ TODO
8181
- Add tuned buffering and improve performance by lessen serialization and deserialization, where possible (Zero-copy,
8282
etc.)
8383
- Fixed the DAMN BACKPRESSURE, CHANNEL GETS FULL AND THEN EVERYTHING BLOWS UP
84-
- Somehow increase TCP write throughput, the only bottleneck right now is TCP write, which is fucking I/O bound!!
84+
- Somehow increase TCP write throughput, the only bottleneck right now is TCP write, which is fucking I/O bound!!

examples/bot.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use wincode::{SchemaRead, SchemaWrite};
1111

1212
const ADDR: &str = "127.0.0.1:7777";
1313
const ROOM: &str = "test-room";
14-
const BOT_COUNT: usize = 144;
15-
const TICK_RATE: u64 = 8;
14+
const BOT_COUNT: usize = 64;
15+
const TICK_RATE: u64 = 24;
1616
const MAX_SPEED: f32 = 400.0;
1717
const SCREEN: f32 = 600.0;
1818

examples/daemon.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::Result;
2+
use dashmap::DashMap;
23
use ghost_sync::{info, warn, Server, ServerHandle, ServerHandler, Uuid};
34
use std::net::SocketAddr;
45
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -13,26 +14,31 @@ struct Daemon {
1314
}
1415

1516
struct Metrics {
16-
total_connections: AtomicUsize,
1717
start_time: Instant,
18+
total_connections: AtomicUsize,
19+
// Key - ClientID, Value - Number of frames dropped
20+
backpressure_tracker: DashMap<Uuid, AtomicUsize>,
1821
}
1922

2023
impl Daemon {
2124
fn new() -> Self {
2225
Self {
2326
metrics: Metrics {
24-
total_connections: AtomicUsize::new(0),
2527
start_time: Instant::now(),
28+
total_connections: AtomicUsize::new(0),
29+
backpressure_tracker: DashMap::with_capacity(1024),
2630
},
2731
}
2832
}
2933
}
3034

31-
struct DaemonHandler(Arc<Daemon>);
35+
struct DaemonHandler {
36+
daemon: Arc<Daemon>,
37+
}
3238

3339
impl DaemonHandler {
34-
fn new(demon: Arc<Daemon>) -> Self {
35-
Self(demon)
40+
fn new(daemon: Arc<Daemon>) -> Self {
41+
Self { daemon }
3642
}
3743
}
3844

@@ -46,7 +52,7 @@ struct RoomMeta {
4652
impl ServerHandler for DaemonHandler {
4753
fn on_connect(&self, addr: SocketAddr) -> bool {
4854
let count = self
49-
.0
55+
.daemon
5056
.metrics
5157
.total_connections
5258
.fetch_add(1, Ordering::Relaxed)
@@ -77,6 +83,12 @@ impl ServerHandler for DaemonHandler {
7783
"Backpressure: Client {} in room '{}' has a full write channel, frame dropped",
7884
client_id, room_id
7985
);
86+
let tracker = &self.daemon.metrics.backpressure_tracker;
87+
_ = tracker
88+
.entry(client_id)
89+
.or_insert_with(|| AtomicUsize::new(0))
90+
.fetch_add(1, Ordering::Relaxed)
91+
+ 1;
8092
}
8193
}
8294

examples/test_daemon.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ def main():
4444
room_count = resp.count("clients\n")
4545
print(f"Room count: {room_count}")
4646

47-
print("> CREATE_ROOM test-room")
47+
print("> CREATE_ROOM foo-bar-room")
4848
print(send_cmd(sock, "CREATE_ROOM\ntest-room\n"))
4949

5050
print("> ROOM_CLIENTS test-room")
5151
print(send_cmd(sock, "ROOM_CLIENTS\ntest-room\n"))
5252

53-
print("> DELETE_ROOM test-room")
53+
print("> DELETE_ROOM foo-bar-room")
5454
print(send_cmd(sock, "DELETE_ROOM\ntest-room\n"))
5555

5656
print("> METRICS")

src/client.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::Bytes;
12
use tokio::io::{BufReader, BufWriter};
23
use tokio::net::TcpStream;
34
use uuid::Uuid;
@@ -109,7 +110,10 @@ impl Client {
109110
ServerWire::PlayerJoined { client_id } => ServerEvent::PlayerJoined { client_id },
110111
ServerWire::PlayerLeft { client_id } => ServerEvent::PlayerLeft { client_id },
111112
ServerWire::Error(msg) => ServerEvent::Error(msg),
112-
ServerWire::Broadcast { sender_id, data } => ServerEvent::Broadcast { sender_id, data },
113+
ServerWire::Broadcast { sender_id, data } => ServerEvent::Broadcast {
114+
sender_id,
115+
data: Bytes::from(data),
116+
},
113117
// Ping/Pong are handled internally before reaching here
114118
_ => unreachable!("ping/pong should be handled in recv()"),
115119
}
@@ -155,8 +159,8 @@ impl ClientBuilder {
155159
let (read_half, write_half) = stream.into_split();
156160

157161
Ok(Client {
158-
reader: BufReader::with_capacity(2 * 1024 * 1024, read_half),
159-
writer: BufWriter::with_capacity(1024 * 1024, write_half),
162+
reader: BufReader::with_capacity(4 * 1024 * 1024, read_half),
163+
writer: BufWriter::with_capacity(6 * 1024 * 1024, write_half),
160164
max_payload: self.max_payload,
161165
client_id: None,
162166
})

src/protocol.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::types::Result;
2+
use bytes::Bytes;
23
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
34
use wincode::config::DefaultConfig;
45
use wincode::SchemaWrite;
@@ -21,7 +22,7 @@ where
2122

2223
/// Read a length-prefixed wincode-deserialized frame with a max payload size check.
2324
#[inline(always)]
24-
pub async fn read_frame_raw<R>(reader: &mut R, max_payload: usize) -> Result<Vec<u8>>
25+
pub async fn read_frame_raw<R>(reader: &mut R, max_payload: usize) -> Result<Bytes>
2526
where
2627
R: AsyncRead + Unpin,
2728
{
@@ -38,18 +39,18 @@ where
3839

3940
let mut payload = vec![0u8; len];
4041
reader.read_exact(&mut payload).await?;
41-
Ok(payload)
42+
Ok(Bytes::from(payload))
4243
}
4344

4445
/// Write a raw byte slice as a length-prefixed frame.
4546
#[inline(always)]
46-
pub async fn write_frame_raw<W>(writer: &mut W, payload: &[u8]) -> Result<()>
47+
pub async fn write_frame_raw<W>(writer: &mut W, payload: Bytes) -> Result<()>
4748
where
4849
W: AsyncWrite + Unpin,
4950
{
5051
let len = payload.len() as u32;
5152
writer.write_all(&len.to_be_bytes()).await?;
52-
writer.write_all(payload).await?;
53+
writer.write_all(&payload).await?;
5354
writer.flush().await?;
5455
Ok(())
5556
}

src/server.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ impl Server {
366366
let client_id = Uuid::new_v4();
367367
stream.set_nodelay(true).ok();
368368
let (read_half, write_half) = stream.into_split();
369-
let mut reader = tokio::io::BufReader::with_capacity(1024 * 1024, read_half);
370-
let mut writer = tokio::io::BufWriter::with_capacity(4 * 1024 * 1024, write_half);
369+
let mut reader = tokio::io::BufReader::with_capacity(6 * 1024 * 1024, read_half);
370+
let mut writer = tokio::io::BufWriter::with_capacity(12 * 1024 * 1024, write_half);
371371

372372
// This is the main bottleneck for backpressure handling. Each client has a dedicated writer task
373373
// that receives frames to send via this channel. If the channel is full, we know the client is falling behind and can drop frames or disconnect as needed.
@@ -376,7 +376,7 @@ impl Server {
376376
// Spawn writer task
377377
let writer_handle = tokio::spawn(async move {
378378
while let Some(frame) = write_rx.recv().await {
379-
if let Err(e) = protocol::write_frame_raw(&mut writer, &frame).await {
379+
if let Err(e) = protocol::write_frame_raw(&mut writer, frame).await {
380380
#[allow(clippy::needless_ifs)]
381381
if !e.is_connection_closed() {}
382382
warn!("write error: {e}");
@@ -617,6 +617,7 @@ impl Server {
617617
#[inline(always)]
618618
async fn send_to_client(&self, client_id: Uuid, tx: &mpsc::Sender<Bytes>, msg: &ServerWire) {
619619
if let Ok(payload) = wincode::serialize(msg) {
620+
// If the channel is full, we drop the frame and call the backpressure hook.
620621
if tx.try_send(Bytes::from(payload)).is_err() {
621622
if let Some(state) = self.clients.get(&client_id) {
622623
if let Some(ref room_id) = state.room_id {
@@ -711,7 +712,7 @@ impl ServerBuilder {
711712

712713
/// Build the server. Call [`Server::run`] to start accepting connections.
713714
pub fn build(self) -> Server {
714-
let (tx, _) = broadcast::channel(1);
715+
let (tx, _) = broadcast::channel(4);
715716
Server {
716717
config: self.config.clone(),
717718
handler: self.handler,

src/types.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::Bytes;
12
use std::time::Duration;
23
use uuid::Uuid;
34
use wincode::{SchemaRead, SchemaWrite};
@@ -37,7 +38,7 @@ pub enum ServerEvent {
3738
/// Server error.
3839
Error(String),
3940
/// Relayed data from a peer.
40-
Broadcast { sender_id: Uuid, data: Vec<u8> },
41+
Broadcast { sender_id: Uuid, data: Bytes },
4142
}
4243

4344
/// Server configuration.
@@ -74,7 +75,7 @@ impl Default for ServerConfig {
7475
// Non-divisible defaults to avoid idle_timeout and ping_interval
7576
idle_timeout: Duration::from_secs(31),
7677
ping_interval: Duration::from_secs(13),
77-
channel_capacity: 64,
78+
channel_capacity: 1024,
7879
}
7980
}
8081
}

tests/integration.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#![allow(deprecated)]
2-
use std::sync::{Arc, Mutex};
3-
use std::time::Duration;
4-
2+
use bytes::Bytes;
53
use ghost_sync::{Client, Server, ServerEvent, SyncError};
64
use rand::rngs::StdRng;
75
use rand::{RngExt, SeedableRng};
6+
use std::sync::{Arc, Mutex};
7+
use std::time::Duration;
88
use tokio::sync::{mpsc, watch};
99

1010
/// Helper: start a server on a random port with a "test" room.
@@ -97,7 +97,7 @@ async fn broadcast_relay() {
9797
// A should receive it
9898
match a.recv().await.unwrap() {
9999
Some(ServerEvent::Broadcast { data, .. }) => {
100-
assert_eq!(&data, b"hello");
100+
assert_eq!(data, Bytes::from_static(b"hello"));
101101
}
102102
other => panic!("expected Broadcast, got: {:?}", other.is_some()),
103103
}

0 commit comments

Comments
 (0)