Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -158,6 +159,10 @@ impl ClientConnectionSender {
Self::dummy_with_channel(id, config).0
}

pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}

/// Send a message to the client. For data-related messages, you should probably use
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
pub fn send_message(&self, message: impl Into<SerializableMessage>) -> Result<(), ClientSendError> {
Expand All @@ -175,7 +180,7 @@ impl ClientConnectionSender {
// the channel, so forcibly kick the client
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
self.abort_handle.abort();
self.cancelled.store(true, Relaxed);
self.cancelled.store(true, Ordering::Relaxed);
return Err(ClientSendError::Cancelled);
}
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),
Expand Down
27 changes: 21 additions & 6 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,12 +1209,7 @@ impl SubscriptionManager {
SingleQueryUpdate { update, num_rows }
}

// filter out clients that've dropped
let clients_for_query = qstate.all_clients().filter(|id| {
self.clients
.get(*id)
.is_some_and(|info| !info.dropped.load(Ordering::Acquire))
});
let clients_for_query = qstate.all_clients();

match eval_delta(tx, &mut acc.metrics, plan) {
Err(err) => {
Expand Down Expand Up @@ -1293,6 +1288,16 @@ struct SendWorkerClient {
outbound_ref: Client,
}

impl SendWorkerClient {
fn is_dropped(&self) -> bool {
self.dropped.load(Ordering::Relaxed)
}

fn is_cancelled(&self) -> bool {
self.outbound_ref.is_cancelled()
}
}

/// Asynchronous background worker which aggregates each of the clients' updates from a [`ComputedQueries`]
/// into `DbUpdate`s and then sends them to the clients' WebSocket workers.
///
Expand Down Expand Up @@ -1339,6 +1344,14 @@ impl Drop for SendWorker {
}
}

impl SendWorker {
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
self.clients
.get(client_id)
.is_some_and(|client| client.is_cancelled() || client.is_dropped())
}
}

#[derive(Debug, Clone)]
pub struct BroadcastQueue(SenderWithGauge<SendWorkerMessage>);

Expand Down Expand Up @@ -1451,6 +1464,8 @@ impl SendWorker {
// or BSATN (`Protocol::Binary`).
let mut client_table_id_updates = updates
.into_iter()
// Filter out dropped or cancelled clients
.filter(|upd| !self.is_client_dropped_or_cancelled(&upd.id))
// Filter out clients whose subscriptions failed
.filter(|upd| !clients_with_errors.contains(&upd.id))
// Do the aggregation.
Expand Down
Loading