Skip to content

Commit d7aabba

Browse files
committed
messages::serialize: use 4 KiB for tx update compression threshold
1 parent c3afc17 commit d7aabba

8 files changed

Lines changed: 48 additions & 60 deletions

File tree

crates/bench/benches/subscription.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use criterion::{black_box, criterion_group, criterion_main, Criterion};
2+
use spacetimedb::db::relational_db::RelationalDB;
23
use spacetimedb::error::DBError;
34
use spacetimedb::execution_context::Workload;
45
use spacetimedb::host::module_host::DatabaseTableUpdate;
@@ -9,7 +10,6 @@ use spacetimedb::subscription::query::compile_read_only_queryset;
910
use spacetimedb::subscription::subscription::ExecutionSet;
1011
use spacetimedb::subscription::tx::DeltaTx;
1112
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
12-
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
1313
use spacetimedb_bench::database::BenchDatabase as _;
1414
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
1515
use spacetimedb_execution::pipelined::PipelinedProject;
@@ -151,14 +151,7 @@ fn eval(c: &mut Criterion) {
151151
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
152152
let query: ExecutionSet = query.into();
153153

154-
b.iter(|| {
155-
drop(black_box(query.eval::<BsatnFormat>(
156-
&raw.db,
157-
&tx,
158-
None,
159-
Compression::None,
160-
)))
161-
})
154+
b.iter(|| drop(black_box(query.eval::<BsatnFormat>(&raw.db, &tx, None))))
162155
});
163156
};
164157

crates/client-api-messages/src/websocket.rs

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,12 @@ pub trait WebsocketFormat: Sized {
9494
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
9595

9696
/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
97-
/// This allows some formats to e.g., compress the update.
98-
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
97+
///
98+
/// We don't compress individual table updates anymore for any format.
99+
/// Previously we did, but the benefits, if any, were unclear.
100+
/// Note that each message is still compressed before being sent to clients,
101+
/// but we no longer have to hold a tx lock when doing so.
102+
fn into_query_update(qu: QueryUpdate<Self>) -> Self::QueryUpdate;
99103
}
100104

101105
/// Messages sent from the client to the server.
@@ -770,7 +774,7 @@ impl WebsocketFormat for JsonFormat {
770774

771775
type QueryUpdate = QueryUpdate<Self>;
772776

773-
fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
777+
fn into_query_update(qu: QueryUpdate<Self>) -> Self::QueryUpdate {
774778
qu
775779
}
776780
}
@@ -813,24 +817,8 @@ impl WebsocketFormat for BsatnFormat {
813817

814818
type QueryUpdate = CompressableQueryUpdate<Self>;
815819

816-
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
817-
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();
818-
819-
match decide_compression(qu_len_would_have_been, compression) {
820-
Compression::None => CompressableQueryUpdate::Uncompressed(qu),
821-
Compression::Brotli => {
822-
let bytes = bsatn::to_vec(&qu).unwrap();
823-
let mut out = Vec::new();
824-
brotli_compress(&bytes, &mut out);
825-
CompressableQueryUpdate::Brotli(out.into())
826-
}
827-
Compression::Gzip => {
828-
let bytes = bsatn::to_vec(&qu).unwrap();
829-
let mut out = Vec::new();
830-
gzip_compress(&bytes, &mut out);
831-
CompressableQueryUpdate::Gzip(out.into())
832-
}
833-
}
820+
fn into_query_update(qu: QueryUpdate<Self>) -> Self::QueryUpdate {
821+
CompressableQueryUpdate::Uncompressed(qu)
834822
}
835823
}
836824

@@ -846,13 +834,9 @@ pub enum Compression {
846834
Gzip,
847835
}
848836

849-
pub fn decide_compression(len: usize, compression: Compression) -> Compression {
850-
/// The threshold beyond which we start to compress messages.
851-
/// 1KiB was chosen without measurement.
852-
/// TODO(perf): measure!
853-
const COMPRESS_THRESHOLD: usize = 1024;
854-
855-
if len > COMPRESS_THRESHOLD {
837+
/// Based on the `len` of a message and a `threshold`, potentially clamp `compression` to `None`.
838+
pub fn decide_compression(len: usize, threshold: usize, compression: Compression) -> Compression {
839+
if len > threshold {
856840
compression
857841
} else {
858842
Compression::None

crates/core/src/client/messages.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use bytes::{BufMut, Bytes, BytesMut};
77
use bytestring::ByteString;
88
use derive_more::From;
99
use spacetimedb_client_api_messages::websocket::{
10-
BsatnFormat, Compression, FormatSwitch, JsonFormat, OneOffTable, RowListLen, WebsocketFormat,
10+
BsatnFormat, Compression, FormatSwitch, JsonFormat, OneOffTable, RowListLen, ServerMessage, WebsocketFormat,
1111
SERVER_MSG_COMPRESSION_TAG_BROTLI, SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE,
1212
};
1313
use spacetimedb_lib::identity::RequestId;
@@ -147,8 +147,30 @@ pub fn serialize(
147147
bsatn::to_writer(w.into_inner(), &msg).unwrap()
148148
});
149149

150+
/// The threshold beyond which we start to compress messages for non-update messages.
151+
/// This is 1 KiB currently.
152+
const COMPRESS_THRESHOLD_OTHER: usize = 1024;
153+
/// The threshold beyond which we start to compress messages for update messages.
154+
/// This is 4 KiB currently.
155+
const COMPRESS_THRESHOLD_UPDATE: usize = 4 * COMPRESS_THRESHOLD_OTHER;
156+
// Route to the correct compression threshold.
157+
let threshold = match msg {
158+
ServerMessage::TransactionUpdate(_) | ServerMessage::TransactionUpdateLight(_) => {
159+
COMPRESS_THRESHOLD_UPDATE
160+
}
161+
162+
ServerMessage::InitialSubscription(_)
163+
| ServerMessage::IdentityToken(_)
164+
| ServerMessage::OneOffQueryResponse(_)
165+
| ServerMessage::SubscribeApplied(_)
166+
| ServerMessage::UnsubscribeApplied(_)
167+
| ServerMessage::SubscriptionError(_)
168+
| ServerMessage::SubscribeMultiApplied(_)
169+
| ServerMessage::UnsubscribeMultiApplied(_) => COMPRESS_THRESHOLD_OTHER,
170+
};
171+
150172
// Conditionally compress the message.
151-
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
173+
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), threshold, config.compression) {
152174
Compression::None => buffer.uncompressed(),
153175
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress),
154176
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress),

crates/core/src/host/module_host.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use derive_more::From;
2727
use indexmap::IndexSet;
2828
use itertools::Itertools;
2929
use prometheus::{Histogram, IntGauge};
30-
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
30+
use spacetimedb_client_api_messages::websocket::{ByteListLen, OneOffTable, QueryUpdate, WebsocketFormat};
3131
use spacetimedb_data_structures::error_stream::ErrorStream;
3232
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3333
use spacetimedb_execution::pipelined::PipelinedProject;
@@ -139,11 +139,7 @@ impl UpdatesRelValue<'_> {
139139
let num_rows = nr_del + nr_ins;
140140
let num_bytes = deletes.num_bytes() + inserts.num_bytes();
141141
let qu = QueryUpdate { deletes, inserts };
142-
// We don't compress individual table updates.
143-
// Previously we were, but the benefits, if any, were unclear.
144-
// Note, each message is still compressed before being sent to clients,
145-
// but we no longer have to hold a tx lock when doing so.
146-
let cqu = F::into_query_update(qu, Compression::None);
142+
let cqu = F::into_query_update(qu);
147143
(cqu, num_rows, num_bytes)
148144
}
149145
}

crates/core/src/subscription/execution_unit.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue,
88
use crate::messages::websocket::TableUpdate;
99
use crate::util::slow::SlowQueryLogger;
1010
use crate::vm::{build_query, TxMode};
11-
use spacetimedb_client_api_messages::websocket::{
12-
Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
13-
};
11+
use spacetimedb_client_api_messages::websocket::{QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat};
1412
use spacetimedb_lib::db::error::AuthError;
1513
use spacetimedb_lib::relation::DbTable;
1614
use spacetimedb_lib::{Identity, ProductValue};
@@ -242,7 +240,6 @@ impl ExecutionUnit {
242240
tx: &Tx,
243241
sql: &str,
244242
slow_query_threshold: Option<Duration>,
245-
compression: Compression,
246243
) -> Option<TableUpdate<F>> {
247244
let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx.workload()).log_guard();
248245

@@ -255,7 +252,7 @@ impl ExecutionUnit {
255252
(!inserts.is_empty()).then(|| {
256253
let deletes = F::List::default();
257254
let qu = QueryUpdate { deletes, inserts };
258-
let update = F::into_query_update(qu, compression);
255+
let update = F::into_query_update(qu);
259256
TableUpdate::new(
260257
self.return_table(),
261258
self.return_name(),

crates/core/src/subscription/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use module_subscription_manager::Plan;
55
use prometheus::IntCounter;
66
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
77
use spacetimedb_client_api_messages::websocket::{
8-
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
8+
ByteListLen, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
99
};
1010
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1111
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
@@ -147,10 +147,7 @@ where
147147
inserts: empty,
148148
},
149149
};
150-
// We will compress the outer server message,
151-
// after we release the tx lock.
152-
// There's no need to compress the inner table update too.
153-
let update = F::into_query_update(qu, Compression::None);
150+
let update = F::into_query_update(qu);
154151
(
155152
TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }),
156153
metrics,

crates/core/src/subscription/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ mod tests {
160160
use crate::vm::tests::create_table_with_rows;
161161
use crate::vm::DbProgram;
162162
use itertools::Itertools;
163-
use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression};
163+
use spacetimedb_client_api_messages::websocket::BsatnFormat;
164164
use spacetimedb_lib::bsatn;
165165
use spacetimedb_lib::db::auth::{StAccess, StTableType};
166166
use spacetimedb_lib::error::ResultTest;
@@ -353,7 +353,7 @@ mod tests {
353353
total_tables: usize,
354354
rows: &[ProductValue],
355355
) -> ResultTest<()> {
356-
let result = s.eval::<BsatnFormat>(db, tx, None, Compression::Brotli).tables;
356+
let result = s.eval::<BsatnFormat>(db, tx, None).tables;
357357
assert_eq!(
358358
result.len(),
359359
total_tables,

crates/core/src/subscription/subscription.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::sql::ast::SchemaViewer;
3232
use crate::vm::{build_query, TxMode};
3333
use anyhow::Context;
3434
use itertools::Either;
35-
use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat};
35+
use spacetimedb_client_api_messages::websocket::WebsocketFormat;
3636
use spacetimedb_data_structures::map::HashSet;
3737
use spacetimedb_lib::db::auth::{StAccess, StTableType};
3838
use spacetimedb_lib::db::error::AuthError;
@@ -516,14 +516,13 @@ impl ExecutionSet {
516516
db: &RelationalDB,
517517
tx: &Tx,
518518
slow_query_threshold: Option<Duration>,
519-
compression: Compression,
520519
) -> ws::DatabaseUpdate<F> {
521520
// evaluate each of the execution units in this ExecutionSet in parallel
522521
let tables = self
523522
.exec_units
524523
// if you need eval to run single-threaded for debugging, change this to .iter()
525524
.iter()
526-
.filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression))
525+
.filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold))
527526
.collect();
528527
ws::DatabaseUpdate { tables }
529528
}

0 commit comments

Comments
 (0)