Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 61 additions & 44 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::host::module_host::{DatabaseTableUpdate, ModuleEvent, UpdatesRelValue
use crate::messages::websocket::{self as ws, TableUpdate};
use crate::subscription::delta::eval_delta;
use crate::worker_metrics::WORKER_METRICS;
use core::mem;
use hashbrown::hash_map::OccupiedError;
use hashbrown::{HashMap, HashSet};
use parking_lot::RwLock;
Expand Down Expand Up @@ -1318,6 +1319,14 @@ struct SendWorker {
///
/// If `Some`, this type's `drop` method will do `remove_label_values` to clean up the metric on exit.
database_identity_to_clean_up_metric: Option<Identity>,

/// A map (re)used by [`SendWorker::send_one_computed_queries`]
/// to avoid creating new allocations.
table_updates_client_id_table_id: HashMap<(ClientId, TableId), SwitchedTableUpdate>,

/// A map (re)used by [`SendWorker::send_one_computed_queries`]
/// to avoid creating new allocations.
table_updates_client_id: HashMap<ClientId, SwitchedDbUpdate>,
}

impl Drop for SendWorker {
Expand Down Expand Up @@ -1369,6 +1378,8 @@ impl SendWorker {
queue_length_metric,
clients: Default::default(),
database_identity_to_clean_up_metric,
table_updates_client_id_table_id: <_>::default(),
table_updates_client_id: <_>::default(),
}
}

Expand Down Expand Up @@ -1415,7 +1426,7 @@ impl SendWorker {
}

fn send_one_computed_queries(
&self,
&mut self,
ComputedQueries {
updates,
errs,
Expand All @@ -1429,50 +1440,52 @@ impl SendWorker {

let span = tracing::info_span!("eval_incr_group_messages_by_client");

let mut eval = updates
// Reuse the aggregation maps from the worker.
let client_table_id_updates = mem::take(&mut self.table_updates_client_id_table_id);
let client_id_updates = mem::take(&mut self.table_updates_client_id);

// For each subscriber, aggregate all the updates for the same table.
// That is, we build a map `(subscriber_id, table_id) -> updates`.
// A particular subscriber uses only one format,
// so their `TableUpdate` will contain either JSON (`Protocol::Text`)
// or BSATN (`Protocol::Binary`).
let mut client_table_id_updates = updates
.into_iter()
// Filter out clients whose subscriptions failed
.filter(|upd| !clients_with_errors.contains(&upd.id))
// For each subscriber, aggregate all the updates for the same table.
// That is, we build a map `(subscriber_id, table_id) -> updates`.
// A particular subscriber uses only one format,
// so their `TableUpdate` will contain either JSON (`Protocol::Text`)
// or BSATN (`Protocol::Binary`).
.fold(
HashMap::<(ClientId, TableId), SwitchedTableUpdate>::new(),
|mut tables, upd| {
match tables.entry((upd.id, upd.table_id)) {
Entry::Occupied(mut entry) => match entry.get_mut().zip_mut(upd.update) {
Bsatn((tbl_upd, update)) => tbl_upd.push(update),
Json((tbl_upd, update)) => tbl_upd.push(update),
},
Entry::Vacant(entry) => drop(entry.insert(match upd.update {
Bsatn(update) => Bsatn(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
Json(update) => Json(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
})),
}
tables
},
)
.into_iter()
// Each client receives a single list of updates per transaction.
// So before sending the updates to each client,
// we must stitch together the `TableUpdate*`s into an aggregated list.
.fold(
HashMap::<ClientId, SwitchedDbUpdate>::new(),
|mut updates, ((id, _), update)| {
let entry = updates.entry(id);
let entry = entry.or_insert_with(|| match &update {
Bsatn(_) => Bsatn(<_>::default()),
Json(_) => Json(<_>::default()),
});
match entry.zip_mut(update) {
Bsatn((list, elem)) => list.tables.push(elem),
Json((list, elem)) => list.tables.push(elem),
}
updates
},
);
// Do the aggregation.
.fold(client_table_id_updates, |mut tables, upd| {
match tables.entry((upd.id, upd.table_id)) {
Entry::Occupied(mut entry) => match entry.get_mut().zip_mut(upd.update) {
Bsatn((tbl_upd, update)) => tbl_upd.push(update),
Json((tbl_upd, update)) => tbl_upd.push(update),
},
Entry::Vacant(entry) => drop(entry.insert(match upd.update {
Bsatn(update) => Bsatn(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
Json(update) => Json(TableUpdate::new(upd.table_id, (&*upd.table_name).into(), update)),
})),
}
tables
});

// Each client receives a single list of updates per transaction.
// So before sending the updates to each client,
// we must stitch together the `TableUpdate*`s into an aggregated list.
let mut client_id_updates = client_table_id_updates
.drain()
// Do the aggregation.
.fold(client_id_updates, |mut updates, ((id, _), update)| {
let entry = updates.entry(id);
let entry = entry.or_insert_with(|| match &update {
Bsatn(_) => Bsatn(<_>::default()),
Json(_) => Json(<_>::default()),
});
match entry.zip_mut(update) {
Bsatn((list, elem)) => list.tables.push(elem),
Json((list, elem)) => list.tables.push(elem),
}
updates
});

drop(clients_with_errors);
drop(span);
Expand All @@ -1487,7 +1500,7 @@ impl SendWorker {
// That is, in the case of the caller, we don't respect the light setting.
if let Some(caller) = caller {
let caller_id = (caller.id.identity, caller.id.connection_id);
let database_update = eval
let database_update = client_id_updates
.remove(&caller_id)
.map(|update| SubscriptionUpdateMessage::from_event_and_update(&event, update))
.unwrap_or_else(|| {
Expand All @@ -1501,7 +1514,7 @@ impl SendWorker {
}

// Send all the other updates.
for (id, update) in eval {
for (id, update) in client_id_updates.drain() {
let database_update = SubscriptionUpdateMessage::from_event_and_update(&event, update);
let client = self.clients[&id].outbound_ref.clone();
// Conditionally send out a full update or a light one otherwise.
Expand All @@ -1510,6 +1523,10 @@ impl SendWorker {
send_to_client(&client, message);
}

// Put back the aggregation maps into the worker.
self.table_updates_client_id_table_id = client_table_id_updates;
self.table_updates_client_id = client_id_updates;

// Send error messages and mark clients for removal
for (id, message) in errs {
if let Some(client) = self.clients.get(&id) {
Expand Down
Loading