Skip to content

Commit b0f9e7c

Browse files
committed
update deps, improve payload handling using Bytes O(N) -> O(1), and expose some configs to public
1 parent baf0a30 commit b0f9e7c

6 files changed

Lines changed: 53 additions & 35 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,4 @@ jobs:
4848
uses: Swatinem/rust-cache@v2
4949

5050
- name: Run tests
51-
run: cargo nextest run --all-targets --no-fail-fast
51+
run: cargo nextest run --all-targets

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ homepage = "https:/github.com/ronakgh97/ghost-sync"
1010
documentation = "https:/github.com/ronakgh97/ghost-sync/master/README.md"
1111

1212
[dependencies]
13-
tokio = { version = "1.50.0", features = ["full", "test-util"] }
13+
tokio = { version = "1.50.0", features = ["full"] }
1414
dashmap = "6.1.0"
1515
anyhow = "1.0.102"
1616
wincode = { version = "0.4.8", features = ["derive", "uuid"] }
1717
uuid = { version = "1.22.0", features = ["v4"] }
1818
bytes = "1.11.1"
1919
colored = "3.1.1"
2020
dotenv = "0.15.0"
21-
chrono = "0.4.44"
21+
chrono = "0.4.44"
22+
23+
[dev-dependencies]
24+
tokio = { version = "1.50.0", features = ["test-util"] }

src/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl Client {
4949
}
5050

5151
/// Receive the next server event.
52-
/// Ping/pong keepalive is handled internally — the user never sees it.
52+
/// Ping/pong keepalive is handled internally
5353
/// Returns `Ok(None)` on clean disconnect.
5454
pub async fn recv(&mut self) -> Result<Option<ServerEvent>, SyncError> {
5555
loop {
@@ -104,7 +104,7 @@ pub struct ClientBuilder {
104104
impl ClientBuilder {
105105
pub fn new() -> Self {
106106
Self {
107-
max_payload: 64 * 1024,
107+
max_payload: 256 * 1024,
108108
}
109109
}
110110
}
@@ -134,8 +134,8 @@ impl ClientBuilder {
134134
let (read_half, write_half) = stream.into_split();
135135

136136
Ok(Client {
137-
reader: BufReader::new(read_half),
138-
writer: BufWriter::new(write_half),
137+
reader: BufReader::with_capacity(64 * 1024, read_half),
138+
writer: BufWriter::with_capacity(64 * 1024, write_half),
139139
max_payload: self.max_payload,
140140
})
141141
}

src/room.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
use crate::types::{Result, ServerWire, SyncError};
1+
use bytes::Bytes;
22
use dashmap::DashMap;
33
use tokio::sync::mpsc;
44
use uuid::Uuid;
55

6-
/// A broadcast frame to be sent to a client.
7-
pub type BroadcastFrame = Vec<u8>;
6+
use crate::types::{Result, ServerWire, SyncError};
7+
8+
/// A broadcast frame to be sent to a client. `Bytes` is reference-counted,
9+
/// so cloning for multiple recipients is an Arc bump, not a memcpy.
10+
pub type BroadcastFrame = Bytes;
811

912
/// A single room holding connected clients.
1013
///
@@ -46,19 +49,21 @@ impl Room {
4649
#[inline(always)]
4750
pub async fn broadcast(&self, sender: Uuid, msg: &ServerWire) -> Vec<Uuid> {
4851
let payload = match wincode::serialize(msg) {
49-
Ok(p) => p,
52+
Ok(p) => Bytes::from(p),
5053
Err(_) => return Vec::new(),
5154
};
52-
self.broadcast_raw(sender, &payload).await
55+
self.broadcast_raw(sender, payload).await
5356
}
5457

55-
/// Send raw bytes to all clients except `sender`.
58+
/// Send pre-serialized bytes to all clients except `sender`.
5659
/// Returns the UUIDs of clients whose write channels were full (frame dropped).
60+
///
61+
/// Cloning `Bytes` is an Arc pointer bump — no data copy regardless of payload size.
5762
#[inline(always)]
58-
pub async fn broadcast_raw(&self, sender: Uuid, payload: &[u8]) -> Vec<Uuid> {
63+
pub async fn broadcast_raw(&self, sender: Uuid, payload: Bytes) -> Vec<Uuid> {
5964
let mut dropped = Vec::new();
6065
for entry in self.clients.iter() {
61-
if *entry.key() != sender && entry.value().try_send(payload.to_vec()).is_err() {
66+
if *entry.key() != sender && entry.value().try_send(payload.clone()).is_err() {
6267
dropped.push(*entry.key());
6368
}
6469
}
@@ -69,11 +74,9 @@ impl Room {
6974
#[inline(always)]
7075
pub async fn send_to(&self, id: &Uuid, msg: &ServerWire) {
7176
if let Some(tx) = self.clients.get(id) {
72-
let payload = match wincode::serialize(msg) {
73-
Ok(p) => p,
74-
Err(_) => return,
75-
};
76-
let _ = tx.try_send(payload);
77+
if let Ok(payload) = wincode::serialize(msg) {
78+
let _ = tx.try_send(Bytes::from(payload));
79+
}
7780
}
7881
}
7982

@@ -90,7 +93,7 @@ pub struct RoomManager {
9093
}
9194

9295
impl RoomManager {
93-
/// Init a room with 64s pre-allocated
96+
/// Init a room manager with 64 pre-allocated slots.
9497
pub fn new() -> Self {
9598
Self {
9699
rooms: DashMap::with_capacity(64),

src/server.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22
use std::time::Duration;
33

4+
use bytes::Bytes;
45
use dashmap::DashMap;
56
use tokio::io::BufWriter;
67
use tokio::net::{TcpListener, TcpStream};
@@ -52,7 +53,7 @@ pub struct Server {
5253
/// Handle to a running server.
5354
///
5455
/// Provides runtime control: shutdown and room management.
55-
/// Rooms can be created and deleted at any time new clients will see the
56+
/// Rooms can be created and deleted at any time, new clients will see the
5657
/// updated room list, and existing clients in deleted rooms get errors on
5758
/// their next broadcast or join attempt.
5859
pub struct ServerHandle {
@@ -102,18 +103,13 @@ impl Server {
102103
}
103104

104105
/// Create a room. Call this before [`Server::run`].
105-
///
106106
/// Clients can only join rooms that have been explicitly created.
107-
///
108-
/// # Errors
109-
///
110107
/// Returns [`SyncError::RoomAlreadyExists`] if a room with this ID exists.
111108
pub fn create_room(&self, id: &str) -> Result<()> {
112109
self.rooms.create(id)
113110
}
114111

115112
/// Delete a room. Call this before [`Server::run`].
116-
///
117113
/// Returns `true` if the room existed.
118114
pub fn delete_room(&self, id: &str) -> bool {
119115
self.rooms.delete(id)
@@ -122,7 +118,7 @@ impl Server {
122118
/// Start accepting connections.
123119
///
124120
/// Binds the TCP listener synchronously. If the bind fails, the error is
125-
/// returned immediately — nothing is spawned on failure.
121+
/// returned immediately
126122
///
127123
/// On success, spawns the accept loop on the current runtime and returns
128124
/// a [`ServerHandle`] for shutdown and runtime action
@@ -188,14 +184,14 @@ impl Server {
188184
Ok(handle)
189185
}
190186

191-
#[inline(always)]
187+
#[inline]
192188
async fn handle_connection(self: &Arc<Self>, stream: TcpStream) -> Result<()> {
193189
let client_id = Uuid::new_v4();
194190
let (read_half, write_half) = stream.into_split();
195191
let mut reader = tokio::io::BufReader::new(read_half);
196192
let mut writer = BufWriter::new(write_half);
197193

198-
let (write_tx, mut write_rx) = mpsc::channel::<Vec<u8>>(64);
194+
let (write_tx, mut write_rx) = mpsc::channel::<Bytes>(self.config.channel_capacity);
199195

200196
// Spawn writer task
201197
let writer_handle = tokio::spawn(async move {
@@ -222,11 +218,12 @@ impl Server {
222218
result
223219
}
224220

221+
#[inline(always)]
225222
async fn client_loop(
226223
self: &Arc<Self>,
227224
client_id: Uuid,
228225
reader: &mut tokio::io::BufReader<tokio::net::tcp::OwnedReadHalf>,
229-
write_tx: &mpsc::Sender<Vec<u8>>,
226+
write_tx: &mpsc::Sender<Bytes>,
230227
) -> Result<()> {
231228
let mut shutdown_rx = self.shutdown_tx.subscribe();
232229
let mut ping_interval = tokio::time::interval(self.config.ping_interval);
@@ -289,7 +286,7 @@ impl Server {
289286
self: &Arc<Self>,
290287
client_id: Uuid,
291288
msg: ClientWire,
292-
write_tx: &mpsc::Sender<Vec<u8>>,
289+
write_tx: &mpsc::Sender<Bytes>,
293290
) -> Result<()> {
294291
match msg {
295292
ClientWire::JoinRoom { room_id } => {
@@ -375,9 +372,10 @@ impl Server {
375372
};
376373
let payload = wincode::serialize(&broadcast)
377374
.map_err(|e| SyncError::Protocol(format!("serialize failed: {:?}", e)))?;
375+
let payload = Bytes::from(payload);
378376

379377
if let Some(room) = self.rooms.get(&room_id) {
380-
let dropped = room.broadcast_raw(client_id, &payload).await;
378+
let dropped = room.broadcast_raw(client_id, payload).await;
381379
for id in dropped {
382380
self.handler.on_backpressure(id, &room_id);
383381
}
@@ -392,6 +390,7 @@ impl Server {
392390
Ok(())
393391
}
394392

393+
#[inline]
395394
async fn cleanup_client(self: &Arc<Self>, client_id: Uuid, room_id: &str) {
396395
let notify = ServerWire::PlayerLeft { client_id };
397396
if let Some(room) = self.rooms.get(room_id) {
@@ -407,9 +406,9 @@ impl Server {
407406
}
408407

409408
#[inline(always)]
410-
async fn send_to_client(&self, client_id: Uuid, tx: &mpsc::Sender<Vec<u8>>, msg: &ServerWire) {
409+
async fn send_to_client(&self, client_id: Uuid, tx: &mpsc::Sender<Bytes>, msg: &ServerWire) {
411410
if let Ok(payload) = wincode::serialize(msg) {
412-
if tx.try_send(payload).is_err() {
411+
if tx.try_send(Bytes::from(payload)).is_err() {
413412
if let Some(state) = self.clients.get(&client_id) {
414413
if let Some(ref room_id) = state.room_id {
415414
self.handler.on_backpressure(client_id, room_id);
@@ -484,6 +483,14 @@ impl ServerBuilder {
484483
self
485484
}
486485

486+
/// Per-client write channel capacity. Frames are dropped when full
487+
/// (with `on_backpressure` hook). Higher values buffer more for bursty
488+
/// games; lower values keep latency tight. Default: 64.
489+
pub fn channel_capacity(mut self, n: usize) -> Self {
490+
self.config.channel_capacity = n;
491+
self
492+
}
493+
487494
/// Set a custom event handler for lifecycle hooks.
488495
pub fn handler(mut self, h: impl ServerHandler) -> Self {
489496
self.handler = Arc::new(h);

src/types.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ pub struct ServerConfig {
5050
/// How often the server pings clients. If no Pong arrives before the
5151
/// next ping tick, the client is disconnected with [`SyncError::PingTimeout`].
5252
pub ping_interval: Duration,
53+
/// Per-client write channel capacity. Frames are dropped when the channel
54+
/// is full (with `on_backpressure` hook). Higher values buffer more for
55+
/// bursty games; lower values keep latency tight.
56+
pub channel_capacity: usize,
5357
}
5458

5559
impl Default for ServerConfig {
@@ -61,6 +65,7 @@ impl Default for ServerConfig {
6165
// Non-divisible defaults to avoid idle_timeout and ping_interval
6266
idle_timeout: Duration::from_secs(31),
6367
ping_interval: Duration::from_secs(13),
68+
channel_capacity: 64,
6469
}
6570
}
6671
}

0 commit comments

Comments
 (0)