@@ -1209,12 +1209,7 @@ impl SubscriptionManager {
12091209 SingleQueryUpdate { update, num_rows }
12101210 }
12111211
1212- // filter out clients that've dropped
1213- let clients_for_query = qstate. all_clients ( ) . filter ( |id| {
1214- self . clients
1215- . get ( * id)
1216- . is_some_and ( |info| !info. dropped . load ( Ordering :: Acquire ) )
1217- } ) ;
1212+ let clients_for_query = qstate. all_clients ( ) ;
12181213
12191214 match eval_delta ( tx, & mut acc. metrics , plan) {
12201215 Err ( err) => {
@@ -1293,6 +1288,16 @@ struct SendWorkerClient {
12931288 outbound_ref : Client ,
12941289}
12951290
1291+ impl SendWorkerClient {
1292+ fn is_dropped ( & self ) -> bool {
1293+ self . dropped . load ( Ordering :: Relaxed )
1294+ }
1295+
1296+ fn is_cancelled ( & self ) -> bool {
1297+ self . outbound_ref . is_cancelled ( )
1298+ }
1299+ }
1300+
12961301/// Asynchronous background worker which aggregates each of the clients' updates from a [`ComputedQueries`]
12971302/// into `DbUpdate`s and then sends them to the clients' WebSocket workers.
12981303///
@@ -1339,6 +1344,14 @@ impl Drop for SendWorker {
13391344 }
13401345}
13411346
1347+ impl SendWorker {
1348+ fn is_client_dropped_or_cancelled ( & self , client_id : & ClientId ) -> bool {
1349+ self . clients
1350+ . get ( client_id)
1351+ . is_some_and ( |client| client. is_cancelled ( ) || client. is_dropped ( ) )
1352+ }
1353+ }
1354+
13421355#[ derive( Debug , Clone ) ]
13431356pub struct BroadcastQueue ( SenderWithGauge < SendWorkerMessage > ) ;
13441357
@@ -1451,6 +1464,8 @@ impl SendWorker {
14511464 // or BSATN (`Protocol::Binary`).
14521465 let mut client_table_id_updates = updates
14531466 . into_iter ( )
1467+ // Filter out dropped or cancelled clients
1468+ . filter ( |upd| !self . is_client_dropped_or_cancelled ( & upd. id ) )
14541469 // Filter out clients whose subscriptions failed
14551470 . filter ( |upd| !clients_with_errors. contains ( & upd. id ) )
14561471 // Do the aggregation.
0 commit comments