Skip to content

Commit 0203008

Browse files
committed
messages::serialize: take/put buffers from/into a SerializeBufferPool
1 parent b07f22e commit 0203008

10 files changed

Lines changed: 193 additions & 64 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api-messages/src/websocket.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -858,25 +858,19 @@ pub fn decide_compression(len: usize, compression: Compression) -> Compression {
858858
}
859859
}
860860

861-
pub fn brotli_compress(bytes: &[u8], out: &mut Vec<u8>) {
862-
let reader = &mut &bytes[..];
863-
864-
// The default Brotli buffer size.
865-
const BUFFER_SIZE: usize = 4096;
861+
pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) {
866862
// We are optimizing for compression speed,
867863
// so we choose the lowest (fastest) level of compression.
868864
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
869865
// for large `SubscriptionUpdate` messages at this level.
870-
const COMPRESSION_LEVEL: u32 = 1;
871-
// The default value for an internal compression parameter.
872-
// See `BrotliEncoderParams` for more details.
873-
const LG_WIN: u32 = 22;
866+
const COMPRESSION_LEVEL: i32 = 1;
874867

875-
let mut encoder = brotli::CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);
876-
877-
encoder
878-
.read_to_end(out)
879-
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
868+
let params = brotli::enc::BrotliEncoderParams {
869+
quality: COMPRESSION_LEVEL,
870+
..<_>::default()
871+
};
872+
let reader = &mut &bytes[..];
873+
brotli::BrotliCompress(reader, out, &params).expect("should be able to BrotliCompress");
880874
}
881875

882876
pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
@@ -885,10 +879,10 @@ pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
885879
Ok(decompressed)
886880
}
887881

888-
pub fn gzip_compress(bytes: &[u8], out: &mut Vec<u8>) {
882+
pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) {
889883
let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast());
890884
encoder.write_all(bytes).unwrap();
891-
encoder.finish().expect("Failed to gzip compress `bytes`");
885+
encoder.finish().expect("should be able to gzip compress `bytes`");
892886
}
893887

894888
pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {

crates/client-api/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use async_trait::async_trait;
55
use axum::response::ErrorResponse;
66
use http::StatusCode;
77

8+
use spacetimedb::client::messages::SerializeBufferPool;
89
use spacetimedb::client::ClientActorIndex;
910
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
1011
use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
@@ -39,6 +40,8 @@ pub trait NodeDelegate: Send + Sync {
3940
/// The [`Host`] is spawned implicitly if not already running.
4041
async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>>;
4142
fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir;
43+
44+
fn websocket_send_serialize_buffer_pool(&self) -> &Arc<SerializeBufferPool>;
4245
}
4346

4447
/// Client view of a running module.
@@ -371,6 +374,10 @@ impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
371374
fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
372375
(**self).module_logs_dir(replica_id)
373376
}
377+
378+
fn websocket_send_serialize_buffer_pool(&self) -> &Arc<SerializeBufferPool> {
379+
(**self).websocket_send_serialize_buffer_pool()
380+
}
374381
}
375382

376383
pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {

crates/client-api/src/routes/subscribe.rs

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::collections::VecDeque;
22
use std::mem;
33
use std::pin::{pin, Pin};
4+
use std::sync::Arc;
45
use std::time::Duration;
56

67
use axum::extract::{Path, Query, State};
@@ -14,12 +15,14 @@ use futures::{Future, FutureExt, SinkExt, StreamExt};
1415
use http::{HeaderValue, StatusCode};
1516
use scopeguard::ScopeGuard;
1617
use serde::Deserialize;
17-
use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage};
18+
use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage, SerializeBufferPool};
1819
use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, Protocol};
20+
use spacetimedb::execution_context::WorkloadType;
1921
use spacetimedb::host::module_host::ClientConnectedError;
2022
use spacetimedb::host::NoSuchModule;
2123
use spacetimedb::util::also_poll;
2224
use spacetimedb::worker_metrics::WORKER_METRICS;
25+
use spacetimedb::Identity;
2326
use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
2427
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
2528
use std::time::Instant;
@@ -125,6 +128,8 @@ where
125128
name: ctx.client_actor_index().next_client_name(),
126129
};
127130

131+
let serialize_buffer_pool = ctx.websocket_send_serialize_buffer_pool().clone();
132+
128133
let ws_config = WebSocketConfig::default()
129134
.max_message_size(Some(0x2000000))
130135
.max_frame_size(None)
@@ -146,7 +151,7 @@ where
146151
None => log::debug!("New client connected from unknown ip"),
147152
}
148153

149-
let actor = |client, sendrx| ws_client_actor(client, ws, sendrx);
154+
let actor = |client, sendrx| ws_client_actor(client, ws, sendrx, serialize_buffer_pool);
150155
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
151156
{
152157
Ok(s) => s,
@@ -180,13 +185,18 @@ where
180185

181186
const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);
182187

183-
async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: mpsc::Receiver<SerializableMessage>) {
188+
async fn ws_client_actor(
189+
client: ClientConnection,
190+
ws: WebSocketStream,
191+
sendrx: mpsc::Receiver<SerializableMessage>,
192+
serialize_buffer_pool: Arc<SerializeBufferPool>,
193+
) {
184194
// ensure that even if this task gets cancelled, we always cleanup the connection
185195
let mut client = scopeguard::guard(client, |client| {
186196
tokio::spawn(client.disconnect());
187197
});
188198

189-
ws_client_actor_inner(&mut client, ws, sendrx).await;
199+
ws_client_actor_inner(&mut client, ws, sendrx, &serialize_buffer_pool).await;
190200

191201
ScopeGuard::into_inner(client).disconnect().await;
192202
}
@@ -203,6 +213,7 @@ async fn ws_client_actor_inner(
203213
client: &mut ClientConnection,
204214
mut ws: WebSocketStream,
205215
mut sendrx: mpsc::Receiver<SerializableMessage>,
216+
serialize_buffer_pool: &SerializeBufferPool,
206217
) {
207218
let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT);
208219
let mut got_pong = true;
@@ -298,31 +309,31 @@ async fn ws_client_actor_inner(
298309
// even though the websocket RFC allows it. should we fork tungstenite?
299310
log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]);
300311
} else {
301-
let send_all = async {
302-
for msg in rx_buf.drain(..n) {
303-
let workload = msg.workload();
304-
let num_rows = msg.num_rows();
305-
306-
let msg = datamsg_to_wsmsg(serialize(msg, client.config));
307-
308-
// These metrics should be updated together,
309-
// or not at all.
310-
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
311-
WORKER_METRICS
312-
.websocket_sent_num_rows
313-
.with_label_values(&addr, &workload)
314-
.observe(num_rows as f64);
315-
WORKER_METRICS
316-
.websocket_sent_msg_size
317-
.with_label_values(&addr, &workload)
318-
.observe(msg.len() as f64);
312+
let send_all = async {
313+
for msg in rx_buf.drain(..n) {
314+
let workload = msg.workload();
315+
let num_rows = msg.num_rows();
316+
317+
// Serialize the message, report metrics,
318+
// and keep a handle to the buffer.
319+
let msg_data = serialize(serialize_buffer_pool, msg, client.config);
320+
report_ws_sent_metrics(&addr, workload, num_rows, &msg_data);
321+
let msg_alloc = msg_data.allocation();
322+
323+
// Buffer the message without necessarily sending it.
324+
ws.feed(datamsg_to_wsmsg(msg_data)).await?;
325+
326+
// At this point,
327+
// the underlying allocation of `msg_data` should have a single referent
328+
// and this should be `msg_alloc`.
329+
// We can put this back into our pool.
330+
let msg_alloc = msg_alloc.try_into_mut()
331+
.expect("should have a unique referent to `msg_alloc`");
332+
serialize_buffer_pool.put(msg_alloc);
319333
}
320-
// feed() buffers the message, but does not necessarily send it
321-
ws.feed(msg).await?;
322-
}
323-
// now we flush all the messages to the socket
324-
ws.flush().await
325-
};
334+
// now we flush all the messages to the socket
335+
ws.flush().await
336+
};
326337
// Flush the websocket while continuing to poll the `handle_queue`,
327338
// to avoid deadlocks or delays due to enqueued futures holding resources.
328339
let send_all = also_poll(send_all, make_progress(&mut current_message));
@@ -393,10 +404,24 @@ async fn ws_client_actor_inner(
393404
if let Err(e) = res {
394405
if let MessageHandleError::Execution(err) = e {
395406
log::error!("{err:#}");
396-
let msg = serialize(err, client.config);
397-
if let Err(error) = ws.send(datamsg_to_wsmsg(msg)).await {
407+
// Serialize the message and keep a handle to the buffer.
408+
let msg_data = serialize(serialize_buffer_pool, err, client.config);
409+
let msg_alloc = msg_data.allocation();
410+
411+
// Buffer the message without necessarily sending it.
412+
if let Err(error) = ws.send(datamsg_to_wsmsg(msg_data)).await {
398413
log::warn!("Websocket send error: {error}")
399414
}
415+
416+
// At this point,
417+
// the underlying allocation of `msg_data` should have a single referent
418+
// and this should be `msg_alloc`.
419+
// We can put this back into our pool.
420+
let msg_alloc = msg_alloc
421+
.try_into_mut()
422+
.expect("should have a unique referent to `msg_alloc`");
423+
serialize_buffer_pool.put(msg_alloc);
424+
400425
continue;
401426
}
402427
log::debug!("Client caused error on text message: {}", e);
@@ -460,6 +485,27 @@ impl ClientMessage {
460485
}
461486
}
462487

488+
/// Report metrics on sent rows and message sizes to a websocket client.
489+
fn report_ws_sent_metrics(
490+
addr: &Identity,
491+
workload: Option<WorkloadType>,
492+
num_rows: Option<usize>,
493+
msg_ws: &DataMessage,
494+
) {
495+
// These metrics should be updated together,
496+
// or not at all.
497+
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
498+
WORKER_METRICS
499+
.websocket_sent_num_rows
500+
.with_label_values(addr, &workload)
501+
.observe(num_rows as f64);
502+
WORKER_METRICS
503+
.websocket_sent_msg_size
504+
.with_label_values(addr, &workload)
505+
.observe(msg_ws.len() as f64);
506+
}
507+
}
508+
463509
fn datamsg_to_wsmsg(msg: DataMessage) -> WsMessage {
464510
match msg {
465511
DataMessage::Text(text) => WsMessage::Text(bytestring_to_utf8bytes(text)),

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ bytes.workspace = true
4848
bytestring.workspace = true
4949
chrono.workspace = true
5050
crossbeam-channel.workspace = true
51+
crossbeam-queue.workspace = true
5152
derive_more.workspace = true
5253
dirs.workspace = true
5354
enum-as-inner.workspace = true

crates/core/src/client/client_connection.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,17 +232,27 @@ impl From<Vec<u8>> for DataMessage {
232232
}
233233

234234
impl DataMessage {
235+
/// Returns the number of bytes this message consists of.
235236
pub fn len(&self) -> usize {
236237
match self {
237-
DataMessage::Text(s) => s.len(),
238-
DataMessage::Binary(b) => b.len(),
238+
Self::Text(s) => s.len(),
239+
Self::Binary(b) => b.len(),
239240
}
240241
}
241242

243+
/// Is the message empty?
242244
#[must_use]
243245
pub fn is_empty(&self) -> bool {
244246
self.len() == 0
245247
}
248+
249+
/// Returns a handle to the underlying allocation of the message without consuming it.
250+
pub fn allocation(&self) -> Bytes {
251+
match self {
252+
DataMessage::Text(alloc) => alloc.as_bytes().clone(),
253+
DataMessage::Binary(alloc) => alloc.clone(),
254+
}
255+
}
246256
}
247257

248258
// if a client racks up this many messages in the queue without ACK'ing

0 commit comments

Comments
 (0)