Skip to content

Commit 1262c0c

Browse files
Filter out dropped clients in the send worker
1 parent c3afc17 commit 1262c0c

2 files changed

Lines changed: 26 additions & 6 deletions

File tree

crates/core/src/client/client_connection.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::VecDeque;
22
use std::ops::Deref;
3+
use std::sync::atomic::Ordering;
34
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
45
use std::sync::Arc;
56
use std::time::Instant;
@@ -158,6 +159,10 @@ impl ClientConnectionSender {
158159
Self::dummy_with_channel(id, config).0
159160
}
160161

162+
pub fn is_cancelled(&self) -> bool {
163+
self.cancelled.load(Ordering::Acquire)
164+
}
165+
161166
/// Send a message to the client. For data-related messages, you should probably use
162167
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
163168
pub fn send_message(&self, message: impl Into<SerializableMessage>) -> Result<(), ClientSendError> {

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,12 +1197,7 @@ impl SubscriptionManager {
11971197
SingleQueryUpdate { update, num_rows }
11981198
}
11991199

1200-
// filter out clients that've dropped
1201-
let clients_for_query = qstate.all_clients().filter(|id| {
1202-
self.clients
1203-
.get(*id)
1204-
.is_some_and(|info| !info.dropped.load(Ordering::Acquire))
1205-
});
1200+
let clients_for_query = qstate.all_clients();
12061201

12071202
match eval_delta(tx, &mut acc.metrics, plan) {
12081203
Err(err) => {
@@ -1281,6 +1276,16 @@ struct SendWorkerClient {
12811276
outbound_ref: Client,
12821277
}
12831278

1279+
impl SendWorkerClient {
1280+
fn is_dropped(&self) -> bool {
1281+
self.dropped.load(Ordering::Acquire)
1282+
}
1283+
1284+
fn is_cancelled(&self) -> bool {
1285+
self.outbound_ref.is_cancelled()
1286+
}
1287+
}
1288+
12841289
/// Asynchronous background worker which aggregates each of the clients' updates from a [`ComputedQueries`]
12851290
/// into `DbUpdate`s and then sends them to the clients' WebSocket workers.
12861291
///
@@ -1319,6 +1324,14 @@ impl Drop for SendWorker {
13191324
}
13201325
}
13211326

1327+
impl SendWorker {
1328+
fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool {
1329+
self.clients
1330+
.get(client_id)
1331+
.is_some_and(|client| client.is_cancelled() || client.is_dropped())
1332+
}
1333+
}
1334+
13221335
#[derive(Debug, Clone)]
13231336
pub struct BroadcastQueue(SenderWithGauge<SendWorkerMessage>);
13241337

@@ -1420,6 +1433,8 @@ impl SendWorker {
14201433

14211434
let mut eval = updates
14221435
.into_iter()
1436+
// Filter out dropped or cancelled clients
1437+
.filter(|upd| !self.is_client_dropped_or_cancelled(&upd.id))
14231438
// Filter out clients whose subscriptions failed
14241439
.filter(|upd| !clients_with_errors.contains(&upd.id))
14251440
// For each subscriber, aggregate all the updates for the same table.

0 commit comments

Comments
 (0)