Skip to content

Commit be6d305

Browse files
Filter out dropped clients in the send worker (#2899)
1 parent 03721b4 commit be6d305

2 files changed

Lines changed: 27 additions & 7 deletions

File tree

crates/core/src/client/client_connection.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::VecDeque;
22
use std::ops::Deref;
3+
use std::sync::atomic::Ordering;
34
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
45
use std::sync::Arc;
56
use std::time::Instant;
@@ -158,6 +159,10 @@ impl ClientConnectionSender {
158159
Self::dummy_with_channel(id, config).0
159160
}
160161

162+
pub fn is_cancelled(&self) -> bool {
163+
self.cancelled.load(Ordering::Relaxed)
164+
}
165+
161166
/// Send a message to the client. For data-related messages, you should probably use
162167
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
163168
pub fn send_message(&self, message: impl Into<SerializableMessage>) -> Result<(), ClientSendError> {
@@ -175,7 +180,7 @@ impl ClientConnectionSender {
175180
// the channel, so forcibly kick the client
176181
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
177182
self.abort_handle.abort();
178-
self.cancelled.store(true, Relaxed);
183+
self.cancelled.store(true, Ordering::Relaxed);
179184
return Err(ClientSendError::Cancelled);
180185
}
181186
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,12 +1208,7 @@ impl SubscriptionManager {
12081208
SingleQueryUpdate { update, num_rows }
12091209
}
12101210

1211-
// filter out clients that've dropped
1212-
let clients_for_query = qstate.all_clients().filter(|id| {
1213-
self.clients
1214-
.get(*id)
1215-
.is_some_and(|info| !info.dropped.load(Ordering::Acquire))
1216-
});
1211+
let clients_for_query = qstate.all_clients();
12171212

12181213
match eval_delta(tx, &mut acc.metrics, plan) {
12191214
Err(err) => {
@@ -1292,6 +1287,16 @@ struct SendWorkerClient {
12921287
outbound_ref: Client,
12931288
}
12941289

1290+
impl SendWorkerClient {
1291+
fn is_dropped(&self) -> bool {
1292+
self.dropped.load(Ordering::Relaxed)
1293+
}
1294+
1295+
fn is_cancelled(&self) -> bool {
1296+
self.outbound_ref.is_cancelled()
1297+
}
1298+
}
1299+
12951300
/// Asynchronous background worker which aggregates each of the clients' updates from a [`ComputedQueries`]
12961301
/// into `DbUpdate`s and then sends them to the clients' WebSocket workers.
12971302
///
@@ -1330,6 +1335,14 @@ impl Drop for SendWorker {
13301335
}
13311336
}
13321337

1338+
impl SendWorker {
1339+
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
1340+
self.clients
1341+
.get(client_id)
1342+
.is_some_and(|client| client.is_cancelled() || client.is_dropped())
1343+
}
1344+
}
1345+
13331346
#[derive(Debug, Clone)]
13341347
pub struct BroadcastQueue(SenderWithGauge<SendWorkerMessage>);
13351348

@@ -1431,6 +1444,8 @@ impl SendWorker {
14311444

14321445
let mut eval = updates
14331446
.into_iter()
1447+
// Filter out dropped or cancelled clients
1448+
.filter(|upd| !self.is_client_dropped_or_cancelled(&upd.id))
14341449
// Filter out clients whose subscriptions failed
14351450
.filter(|upd| !clients_with_errors.contains(&upd.id))
14361451
// For each subscriber, aggregate all the updates for the same table.

0 commit comments

Comments
 (0)