Skip to content

Commit 1c8b321

Browse files
committed
Respond to Mazdak's review
1 parent 27931af commit 1c8b321

1 file changed

Lines changed: 27 additions & 8 deletions

File tree

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,11 @@ struct ClientInfo {
111111
subscription_ref_count: HashMap<QueryHash, usize>,
112112
// This should be removed when we migrate to SubscribeSingle.
113113
legacy_subscriptions: HashSet<QueryHash>,
114-
// This flag is set if an error occurs during a tx update.
115-
// It will be cleaned up async or on resubscribe.
114+
/// This flag is set if an error occurs during a tx update.
115+
/// It will be cleaned up async or on resubscribe.
116+
///
117+
/// [`Arc`]ed so that this can be updated by the [`SendWorker`]
118+
/// and observed by [`SubscriptionManager::remove_dropped_clients`].
116119
dropped: Arc<AtomicBool>,
117120
}
118121

@@ -450,13 +453,25 @@ struct ComputedQueries {
450453
caller: Option<Arc<ClientConnectionSender>>,
451454
}
452455

456+
/// Message sent by the [`SubscriptionManager`] to the [`SendWorker`].
453457
enum SendWorkerMessage {
458+
/// A transaction has completed and the [`SubscriptionManager`] has evaluated the incremental queries,
459+
/// so the [`SendWorker`] should broadcast them to clients.
454460
Broadcast(ComputedQueries),
461+
462+
/// A new client has been registered in the [`SubscriptionManager`],
463+
/// so the [`SendWorker`] should also record its existence.
455464
AddClient {
456465
client_id: ClientId,
466+
/// Shared handle on the `dropped` flag in the [`Subscriptionmanager`]'s [`ClientInfo`].
467+
///
468+
/// Will be updated by [`SendWorker::run`] and read by [`SubscriptionManager::remove_dropped_clients`].
457469
dropped: Arc<AtomicBool>,
458470
outbound_ref: Client,
459471
},
472+
473+
/// A client previously added by a [`Self::AddClient`] message has been removed from the [`SubscriptionManager`],
474+
/// so the [`SendWorker`] should also forget it.
460475
RemoveClient(ClientId),
461476
}
462477

@@ -553,15 +568,14 @@ impl SubscriptionManager {
553568
})
554569
}
555570

571+
/// Remove a [`ClientInfo`] from the `clients` map,
572+
/// and broadcast a message along `send_worker_tx` that the [`SendWorker`] should also remove it.
556573
fn remove_client_and_inform_send_worker(&mut self, client_id: ClientId) -> Option<ClientInfo> {
557-
if let Some(client) = self.clients.remove(&client_id) {
574+
self.clients.remove(&client_id).inspect(|_| {
558575
self.send_worker_tx
559576
.send(SendWorkerMessage::RemoveClient(client_id))
560577
.expect("send worker has panicked, or otherwise dropped its recv queue!");
561-
Some(client)
562-
} else {
563-
None
564-
}
578+
})
565579
}
566580

567581
pub fn num_unique_queries(&self) -> usize {
@@ -1118,6 +1132,11 @@ impl SubscriptionManager {
11181132
}
11191133

11201134
struct SendWorkerClient {
1135+
/// This flag is set if an error occurs during a tx update.
1136+
/// It will be cleaned up async or on resubscribe.
1137+
///
1138+
/// [`Arc`]ed so that this can be updated by [`Self::run`]
1139+
/// and observed by [`SubscriptionManager::remove_dropped_clients`].
11211140
dropped: Arc<AtomicBool>,
11221141
outbound_ref: Client,
11231142
}
@@ -1141,7 +1160,7 @@ struct SendWorker {
11411160
/// Mirror of the [`SubscriptionManager`]'s `clients` map local to this actor.
11421161
///
11431162
/// Updated by [`SendWorkerMessage::AddClient`] and [`SendWorkerMessage::RemoveClient`] messages
1144-
/// send along `self.rx`.
1163+
/// sent along `self.rx`.
11451164
clients: HashMap<ClientId, SendWorkerClient>,
11461165

11471166
/// The `Identity` which labels the `queue_length_metric`.

0 commit comments

Comments
 (0)