Skip to content

Commit a9fd5e6

Browse files
Pipeline the websocket send path (#5051)
# Description of Changes In keeping with the "pipeline everything" ethos, I've replaced `recv` with `recv_many` on `ClientConnectionReceiver` so that a client connection's websocket send path works on a batch of messages at a time, if possible. # API and ABI breaking changes None # Expected complexity level and risk 1.5 # Testing Refactor
1 parent ca949e5 commit a9fd5e6

3 files changed

Lines changed: 103 additions & 61 deletions

File tree

crates/client-api/src/routes/subscribe.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,13 +1064,13 @@ enum UnorderedWsMessage {
10641064
/// Abstraction over [`ClientConnectionReceiver`], so tests can use a plain
10651065
/// [`mpsc::Receiver`].
10661066
trait Receiver<T> {
1067-
fn recv(&mut self) -> impl Future<Output = Option<T>> + Send;
1067+
fn recv_many(&mut self, buf: &mut Vec<T>, max: usize) -> impl Future<Output = usize> + Send;
10681068
fn close(&mut self);
10691069
}
10701070

10711071
impl Receiver<OutboundMessage> for ClientConnectionReceiver {
1072-
async fn recv(&mut self) -> Option<OutboundMessage> {
1073-
ClientConnectionReceiver::recv(self).await
1072+
async fn recv_many(&mut self, buf: &mut Vec<OutboundMessage>, max: usize) -> usize {
1073+
ClientConnectionReceiver::recv_many(self, buf, max).await
10741074
}
10751075

10761076
fn close(&mut self) {
@@ -1079,8 +1079,8 @@ impl Receiver<OutboundMessage> for ClientConnectionReceiver {
10791079
}
10801080

10811081
impl<T: Send> Receiver<T> for mpsc::Receiver<T> {
1082-
async fn recv(&mut self) -> Option<T> {
1083-
mpsc::Receiver::recv(self).await
1082+
async fn recv_many(&mut self, buf: &mut Vec<T>, max: usize) -> usize {
1083+
mpsc::Receiver::recv_many(self, buf, max).await
10841084
}
10851085

10861086
fn close(&mut self) {
@@ -1148,6 +1148,8 @@ async fn ws_send_loop_inner<T, U, Encoder>(
11481148
// The default frame size is 4KiB, hence we write in batches of 32KiB.
11491149
const FRAME_BATCH_SIZE: usize = 8;
11501150
let mut frames_batch = Vec::with_capacity(FRAME_BATCH_SIZE);
1151+
const MESSAGE_BATCH_SIZE: usize = ClientConnectionReceiver::DEFAULT_RECV_MANY_LIMIT;
1152+
let mut message_batch = Vec::new();
11511153
let (frames_tx, mut frames_rx) = mpsc::unbounded_channel();
11521154

11531155
let (encode_tx, encode_rx) = mpsc::unbounded_channel();
@@ -1262,12 +1264,15 @@ async fn ws_send_loop_inner<T, U, Encoder>(
12621264
// Take on more work.
12631265
//
12641266
// Branch is disabled if we already sent a close frame.
1265-
Some(message) = messages.recv(), if !closed => {
1266-
encode_tx
1267-
.send(message.into())
1268-
// `ws_encode_task` shouldn't terminate until
1269-
// `encode_tx` is dropped, except by panicking.
1270-
.expect("encode task panicked");
1267+
n = messages.recv_many(&mut message_batch, MESSAGE_BATCH_SIZE), if !closed => {
1268+
log::trace!("encoding batch of {n} messages");
1269+
for message in message_batch.drain(..n) {
1270+
encode_tx
1271+
.send(message.into())
1272+
// `ws_encode_task` shouldn't terminate until
1273+
// `encode_tx` is dropped, except by panicking.
1274+
.expect("encode task panicked");
1275+
}
12711276
},
12721277

12731278
}

crates/core/src/client/client_connection.rs

Lines changed: 85 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use bytes::Bytes;
1919
use bytestring::ByteString;
2020
use derive_more::From;
2121
use futures::prelude::*;
22+
use log::warn;
2223
use prometheus::{Histogram, IntCounter, IntGauge};
2324
use spacetimedb_auth::identity::{ConnectionAuthCtx, SpacetimeIdentityClaims};
2425
use spacetimedb_client_api_messages::websocket::{common as ws_common, v1 as ws_v1, v2 as ws_v2};
@@ -29,7 +30,7 @@ use spacetimedb_lib::Identity;
2930
use tokio::sync::mpsc::error::{SendError, TrySendError};
3031
use tokio::sync::{mpsc, oneshot, watch};
3132
use tokio::task::AbortHandle;
32-
use tracing::{trace, warn};
33+
use tracing::trace;
3334

3435
#[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)]
3536
pub enum Protocol {
@@ -154,11 +155,13 @@ impl DurableOffsetSupply for Arc<RelationalDB> {
154155
pub struct ClientConnectionReceiver {
155156
confirmed_reads: bool,
156157
channel: MeteredReceiver<ClientUpdate>,
157-
current: Option<ClientUpdate>,
158+
pending: Vec<ClientUpdate>,
158159
offset_supply: Box<dyn DurableOffsetSupply>,
159160
}
160161

161162
impl ClientConnectionReceiver {
163+
pub const DEFAULT_RECV_MANY_LIMIT: usize = 4096;
164+
162165
fn new(
163166
confirmed_reads: bool,
164167
channel: MeteredReceiver<ClientUpdate>,
@@ -167,82 +170,115 @@ impl ClientConnectionReceiver {
167170
Self {
168171
confirmed_reads,
169172
channel,
170-
current: None,
173+
pending: Vec::new(),
171174
offset_supply: Box::new(offset_supply),
172175
}
173176
}
174177

175-
/// Receive the next message from this channel.
176-
///
177-
/// If this method returns `None`, the channel is closed and no more messages
178-
/// are in the internal buffers. No more messages can ever be received from
179-
/// the channel.
178+
#[cfg(test)]
179+
pub(crate) async fn recv(&mut self) -> Option<OutboundMessage> {
180+
let mut buf = Vec::with_capacity(1);
181+
(self.recv_many(&mut buf, 1).await != 0).then(|| buf.remove(0))
182+
}
183+
184+
/// Receive multiple messages from this channel.
180185
///
181186
/// Messages are returned immediately if:
182187
///
183-
/// - The (internal) [`ClientUpdate`] does not have a `tx_offset`
188+
/// - The [`ClientUpdate`] does not have a `tx_offset`
184189
/// (such as for error messages).
185-
/// - The client hasn't requested confirmed reads (i.e.
186-
/// [`ClientConfig::confirmed_reads`] is `false`).
190+
/// - The client hasn't requested confirmed reads
191+
/// (i.e. [`ClientConfig::confirmed_reads`] is `false`).
187192
/// - The database is configured to not persist transactions.
188193
///
189-
/// Otherwise, the update's `tx_offset` is compared against the module's
194+
/// Otherwise, the last `tx_offset` in the batch is compared against the module's
190195
/// durable offset. If the durable offset is behind the `tx_offset`, the
191196
/// method waits until it catches up before returning the message.
192197
///
193198
/// If the database is shut down while waiting for the durable offset,
194-
/// `None` is returned. In this case, no more messages can ever be received
199+
/// 0 is returned. In this case, no more messages can ever be received
195200
/// from the channel.
196201
///
202+
/// For non-zero values of `max`, this method will never return `0` unless the
203+
/// input channel has been closed and there are no pending messages, or if the
204+
/// database goes away. This indicates that no further values can ever be received
205+
/// from this `Receiver`.
206+
///
197207
/// # Cancel safety
198208
///
199209
/// This method is cancel safe, as long as `self` is not dropped.
200210
///
201-
/// If `recv` is used in a [`tokio::select!`] statement, it may get
211+
/// If `recv_many` is used in a [`tokio::select!`] statement, it may get
202212
/// cancelled while waiting for the durable offset to catch up. At this
203-
/// point, it has already received a value from the underlying channel.
204-
/// This value is stored internally, so calling `recv` again will not lose
205-
/// data.
206-
//
207-
// TODO: Can we make a cancel-safe `recv_many` with confirmed reads semantics?
208-
pub async fn recv(&mut self) -> Option<OutboundMessage> {
209-
let ClientUpdate { tx_offset, message } = match self.current.take() {
210-
None => self.channel.recv().await?,
211-
Some(update) => update,
212-
};
213+
/// point, it has already received values from the underlying channel.
214+
/// These values are stored internally, so calling `recv_many` again will
215+
/// not lose data.
216+
pub async fn recv_many(&mut self, buf: &mut Vec<OutboundMessage>, max: usize) -> usize {
217+
// If there are no pending updates and the input channel has been closed,
218+
// no more messages can be received from this receiver.
219+
if max == 0 || (self.pending.is_empty() && self.channel.recv_many(&mut self.pending, max).await == 0) {
220+
return 0;
221+
}
222+
223+
// If we don't have to wait for txns to be made durable,
224+
// drain the pending updates.
213225
if !self.confirmed_reads {
214-
return Some(message);
226+
return self.drain_pending(buf, max);
227+
}
228+
229+
// If we do have to wait for txns to be made durable,
230+
// but the next client update doesn't have a tx offset,
231+
// there's no reason to wait - just send it.
232+
if !self.pending_update_has_offset() {
233+
return self.drain_pending(buf, 1);
215234
}
216235

217-
if let Some(tx_offset) = tx_offset {
218-
match self.offset_supply.durable_offset() {
219-
Ok(Some(mut durable)) => {
220-
// Store the current update in case we get cancelled while
221-
// waiting for the durable offset.
222-
self.current = Some(ClientUpdate {
223-
tx_offset: Some(tx_offset),
224-
message,
225-
});
226-
trace!("waiting for offset {tx_offset} to become durable");
227-
durable
228-
.wait_for(tx_offset)
229-
.await
230-
.inspect_err(|_| {
231-
warn!("database went away while waiting for durable offset");
232-
})
233-
.ok()?;
234-
self.current.take().map(|update| update.message)
236+
// Otherwise, grab the next offset that we should wait for.
237+
let (n, wait_for_offset) = self.next_confirmed_reads_batch(max);
238+
239+
match self.offset_supply.durable_offset() {
240+
Ok(Some(mut durable)) => {
241+
trace!("waiting for offset {wait_for_offset} to become durable");
242+
if durable.wait_for(wait_for_offset).await.is_err() {
243+
warn!("database went away while waiting for durable offset");
244+
return 0;
235245
}
236-
// Database shut down or crashed.
237-
Err(NoSuchModule) => None,
238-
// In-memory database.
239-
Ok(None) => Some(message),
246+
self.drain_pending(buf, n)
240247
}
241-
} else {
242-
Some(message)
248+
// Database shut down or crashed.
249+
Err(NoSuchModule) => 0,
250+
// In-memory database.
251+
Ok(None) => self.drain_pending(buf, max),
243252
}
244253
}
245254

255+
/// Compute the next batch of pending client updates that have a tx offset.
256+
/// What is the size of the batch and what is the max offset?
257+
fn next_confirmed_reads_batch(&self, max: usize) -> (usize, TxOffset) {
258+
self.pending
259+
.iter()
260+
.take(max)
261+
.map_while(|update| update.tx_offset)
262+
.fold((0, 0), |(count, max_offset), tx_offset| {
263+
(count + 1, max_offset.max(tx_offset))
264+
})
265+
}
266+
267+
/// Drain the pending [`ClientUpdate`]s, up to `max, into `buf`.
268+
fn drain_pending(&mut self, buf: &mut Vec<OutboundMessage>, max: usize) -> usize {
269+
let n = self.pending.len().min(max);
270+
buf.reserve(n);
271+
buf.extend(self.pending.drain(..n).map(|u| u.message));
272+
n
273+
}
274+
275+
/// Does the next pending update have a tx offset?
276+
///
277+
/// Assumes that [`Self::pending`] is not empty.
278+
fn pending_update_has_offset(&self) -> bool {
279+
self.pending.first().is_some_and(|update| update.tx_offset.is_some())
280+
}
281+
246282
/// Close the receiver without dropping it.
247283
///
248284
/// This is used to notify the [`ClientConnectionSender`] that the receiver

crates/testing/src/modules.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ impl ModuleHandle {
101101
}
102102

103103
pub async fn recv_message(&mut self) -> Option<OutboundMessage> {
104-
self.receiver.recv().await
104+
let mut buf = Vec::with_capacity(1);
105+
(self.receiver.recv_many(&mut buf, 1).await != 0).then(|| buf.remove(0))
105106
}
106107

107108
pub async fn recv_reducer_update(&mut self, request_id: RequestId) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)