Skip to content

Commit b80c21b

Browse files
committed
Send other subscription messages through the broadcast queue to keep ordering.
1 parent 1c8b321 commit b80c21b

4 files changed

Lines changed: 239 additions & 110 deletions

File tree

crates/client-api-messages/src/websocket.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ pub struct TableUpdate<F: WebsocketFormat> {
623623
}
624624

625625
/// Computed update for a single query, annotated with the number of matching rows.
626+
#[derive(Debug)]
626627
pub struct SingleQueryUpdate<F: WebsocketFormat> {
627628
pub update: F::QueryUpdate,
628629
pub num_rows: u64,

crates/core/src/host/host_controller.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::messages::control_db::{Database, HostType};
1313
use crate::module_host_context::ModuleCreationContext;
1414
use crate::replica_context::ReplicaContext;
1515
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
16-
use crate::subscription::module_subscription_manager::SubscriptionManager;
16+
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
1717
use crate::util::{asyncify, spawn_rayon};
1818
use crate::worker_metrics::WORKER_METRICS;
1919
use anyhow::{anyhow, ensure, Context};
@@ -526,11 +526,17 @@ async fn make_replica_ctx(
526526
relational_db: Arc<RelationalDB>,
527527
) -> anyhow::Result<ReplicaContext> {
528528
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
529-
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::for_database(
530-
database.database_identity,
529+
let send_worker_queue = spawn_send_worker(Some(database.database_identity));
530+
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
531+
send_worker_queue.clone(),
531532
)));
532533
let downgraded = Arc::downgrade(&subscriptions);
533-
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);
534+
let subscriptions = ModuleSubscriptions::new(
535+
relational_db.clone(),
536+
subscriptions,
537+
send_worker_queue,
538+
database.owner_identity,
539+
);
534540

535541
// If an error occurs when evaluating a subscription,
536542
// we mark each client that was affected,

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 119 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::execution_unit::QueryHash;
2-
use super::module_subscription_manager::{Plan, SubscriptionGaugeStats, SubscriptionManager};
2+
use super::module_subscription_manager::{
3+
spawn_send_worker, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
4+
};
35
use super::query::compile_query_with_hashes;
46
use super::tx::DeltaTx;
57
use super::{collect_table_update, TableUpdateType};
@@ -40,6 +42,7 @@ pub struct ModuleSubscriptions {
4042
/// If taking a lock (tx) on the db at the same time, ALWAYS lock the db first.
4143
/// You will deadlock otherwise.
4244
subscriptions: Subscriptions,
45+
broadcast_queue: BroadcastQueue,
4346
owner_identity: Identity,
4447
stats: Arc<SubscriptionGauges>,
4548
}
@@ -133,13 +136,19 @@ macro_rules! return_on_err_with_sql {
133136
}
134137

135138
impl ModuleSubscriptions {
136-
pub fn new(relational_db: Arc<RelationalDB>, subscriptions: Subscriptions, owner_identity: Identity) -> Self {
139+
pub fn new(
140+
relational_db: Arc<RelationalDB>,
141+
subscriptions: Subscriptions,
142+
broadcast_queue: BroadcastQueue,
143+
owner_identity: Identity,
144+
) -> Self {
137145
let db = &relational_db.database_identity();
138146
let stats = Arc::new(SubscriptionGauges::new(db));
139147

140148
Self {
141149
relational_db,
142150
subscriptions,
151+
broadcast_queue,
143152
owner_identity,
144153
stats,
145154
}
@@ -156,9 +165,11 @@ impl ModuleSubscriptions {
156165
/// Construct a new [`ModuleSubscriptions`] for use in testing,
157166
/// running its send worker on the dynamically enclosing [`tokio::runtime::Runtime`]
158167
pub fn for_test_enclosing_runtime(db: Arc<RelationalDB>) -> ModuleSubscriptions {
168+
let send_worker_queue = spawn_send_worker(None);
159169
ModuleSubscriptions::new(
160170
db,
161171
SubscriptionManager::for_test_without_metrics_arc_rwlock(),
172+
send_worker_queue,
162173
Identity::ZERO,
163174
)
164175
}
@@ -263,15 +274,18 @@ impl ModuleSubscriptions {
263274
) -> Result<Option<ExecutionMetrics>, DBError> {
264275
// Send an error message to the client
265276
let send_err_msg = |message| {
266-
sender.send_message(SubscriptionMessage {
267-
request_id: Some(request.request_id),
268-
query_id: Some(request.query_id),
269-
timer: Some(timer),
270-
result: SubscriptionResult::Error(SubscriptionError {
271-
table_id: None,
272-
message,
273-
}),
274-
})
277+
self.broadcast_queue.send_client_message(
278+
sender.clone(),
279+
SubscriptionMessage {
280+
request_id: Some(request.request_id),
281+
query_id: Some(request.query_id),
282+
timer: Some(timer),
283+
result: SubscriptionResult::Error(SubscriptionError {
284+
table_id: None,
285+
message,
286+
}),
287+
},
288+
)
275289
};
276290

277291
let sql = request.query;
@@ -323,16 +337,19 @@ impl ModuleSubscriptions {
323337
// thread it's possible for messages to get sent to the client out of order. If you do
324338
// spawn in another thread messages will need to be buffered until the state is sent out
325339
// on the wire
326-
let _ = sender.send_message(SubscriptionMessage {
327-
request_id: Some(request.request_id),
328-
query_id: Some(request.query_id),
329-
timer: Some(timer),
330-
result: SubscriptionResult::Subscribe(SubscriptionRows {
331-
table_id: query.subscribed_table_id(),
332-
table_name: query.subscribed_table_name().into(),
333-
table_rows,
334-
}),
335-
});
340+
let _ = self.broadcast_queue.send_client_message(
341+
sender.clone(),
342+
SubscriptionMessage {
343+
request_id: Some(request.request_id),
344+
query_id: Some(request.query_id),
345+
timer: Some(timer),
346+
result: SubscriptionResult::Subscribe(SubscriptionRows {
347+
table_id: query.subscribed_table_id(),
348+
table_name: query.subscribed_table_name().into(),
349+
table_rows,
350+
}),
351+
},
352+
);
336353
Ok(Some(metrics))
337354
}
338355

@@ -345,15 +362,18 @@ impl ModuleSubscriptions {
345362
) -> Result<Option<ExecutionMetrics>, DBError> {
346363
// Send an error message to the client
347364
let send_err_msg = |message| {
348-
sender.send_message(SubscriptionMessage {
349-
request_id: Some(request.request_id),
350-
query_id: Some(request.query_id),
351-
timer: Some(timer),
352-
result: SubscriptionResult::Error(SubscriptionError {
353-
table_id: None,
354-
message,
355-
}),
356-
})
365+
self.broadcast_queue.send_client_message(
366+
sender.clone(),
367+
SubscriptionMessage {
368+
request_id: Some(request.request_id),
369+
query_id: Some(request.query_id),
370+
timer: Some(timer),
371+
result: SubscriptionResult::Error(SubscriptionError {
372+
table_id: None,
373+
message,
374+
}),
375+
},
376+
)
357377
};
358378

359379
let mut subscriptions = self.subscriptions.write();
@@ -383,16 +403,19 @@ impl ModuleSubscriptions {
383403
send_err_msg
384404
);
385405

386-
let _ = sender.send_message(SubscriptionMessage {
387-
request_id: Some(request.request_id),
388-
query_id: Some(request.query_id),
389-
timer: Some(timer),
390-
result: SubscriptionResult::Unsubscribe(SubscriptionRows {
391-
table_id: query.subscribed_table_id(),
392-
table_name: query.subscribed_table_name().into(),
393-
table_rows,
394-
}),
395-
});
406+
let _ = self.broadcast_queue.send_client_message(
407+
sender.clone(),
408+
SubscriptionMessage {
409+
request_id: Some(request.request_id),
410+
query_id: Some(request.query_id),
411+
timer: Some(timer),
412+
result: SubscriptionResult::Unsubscribe(SubscriptionRows {
413+
table_id: query.subscribed_table_id(),
414+
table_name: query.subscribed_table_name().into(),
415+
table_rows,
416+
}),
417+
},
418+
);
396419
Ok(Some(metrics))
397420
}
398421

@@ -406,15 +429,18 @@ impl ModuleSubscriptions {
406429
) -> Result<Option<ExecutionMetrics>, DBError> {
407430
// Send an error message to the client
408431
let send_err_msg = |message| {
409-
sender.send_message(SubscriptionMessage {
410-
request_id: Some(request.request_id),
411-
query_id: Some(request.query_id),
412-
timer: Some(timer),
413-
result: SubscriptionResult::Error(SubscriptionError {
414-
table_id: None,
415-
message,
416-
}),
417-
})
432+
self.broadcast_queue.send_client_message(
433+
sender.clone(),
434+
SubscriptionMessage {
435+
request_id: Some(request.request_id),
436+
query_id: Some(request.query_id),
437+
timer: Some(timer),
438+
result: SubscriptionResult::Error(SubscriptionError {
439+
table_id: None,
440+
message,
441+
}),
442+
},
443+
)
418444
};
419445

420446
// Always lock the db before the subscription lock to avoid deadlocks.
@@ -445,12 +471,15 @@ impl ModuleSubscriptions {
445471
None
446472
);
447473

448-
let _ = sender.send_message(SubscriptionMessage {
449-
request_id: Some(request.request_id),
450-
query_id: Some(request.query_id),
451-
timer: Some(timer),
452-
result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
453-
});
474+
let _ = self.broadcast_queue.send_client_message(
475+
sender,
476+
SubscriptionMessage {
477+
request_id: Some(request.request_id),
478+
query_id: Some(request.query_id),
479+
timer: Some(timer),
480+
result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
481+
},
482+
);
454483

455484
Ok(Some(metrics))
456485
}
@@ -534,15 +563,18 @@ impl ModuleSubscriptions {
534563
) -> Result<Option<ExecutionMetrics>, DBError> {
535564
// Send an error message to the client
536565
let send_err_msg = |message| {
537-
let _ = sender.send_message(SubscriptionMessage {
538-
request_id: Some(request.request_id),
539-
query_id: Some(request.query_id),
540-
timer: Some(timer),
541-
result: SubscriptionResult::Error(SubscriptionError {
542-
table_id: None,
543-
message,
544-
}),
545-
});
566+
let _ = self.broadcast_queue.send_client_message(
567+
sender.clone(),
568+
SubscriptionMessage {
569+
request_id: Some(request.request_id),
570+
query_id: Some(request.query_id),
571+
timer: Some(timer),
572+
result: SubscriptionResult::Error(SubscriptionError {
573+
table_id: None,
574+
message,
575+
}),
576+
},
577+
);
546578
};
547579

548580
let num_queries = request.query_strings.len();
@@ -585,12 +617,16 @@ impl ModuleSubscriptions {
585617
// thread it's possible for messages to get sent to the client out of order. If you do
586618
// spawn in another thread messages will need to be buffered until the state is sent out
587619
// on the wire
588-
let _ = sender.send_message(SubscriptionMessage {
589-
request_id: Some(request.request_id),
590-
query_id: Some(request.query_id),
591-
timer: Some(timer),
592-
result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
593-
});
620+
621+
let _ = self.broadcast_queue.send_client_message(
622+
sender.clone(),
623+
SubscriptionMessage {
624+
request_id: Some(request.request_id),
625+
query_id: Some(request.query_id),
626+
timer: Some(timer),
627+
result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
628+
},
629+
);
594630

595631
Ok(Some(metrics))
596632
}
@@ -647,11 +683,14 @@ impl ModuleSubscriptions {
647683
// thread it's possible for messages to get sent to the client out of order. If you do
648684
// spawn in another thread messages will need to be buffered until the state is sent out
649685
// on the wire
650-
let _ = sender.send_message(SubscriptionUpdateMessage {
651-
database_update,
652-
request_id: Some(subscription.request_id),
653-
timer: Some(timer),
654-
});
686+
let _ = self.broadcast_queue.send_client_message(
687+
sender,
688+
SubscriptionUpdateMessage {
689+
database_update,
690+
request_id: Some(subscription.request_id),
691+
timer: Some(timer),
692+
},
693+
);
655694

656695
Ok(metrics)
657696
}
@@ -715,7 +754,8 @@ impl ModuleSubscriptions {
715754
event: Some(event.clone()),
716755
database_update: SubscriptionUpdateMessage::default_for_protocol(client.config.protocol, None),
717756
};
718-
let _ = client.send_message(message);
757+
758+
let _ = self.broadcast_queue.send_client_message(client, message);
719759
} else {
720760
log::trace!("Reducer failed but there is no client to send the failure to!")
721761
}
@@ -748,7 +788,7 @@ mod tests {
748788
use crate::error::DBError;
749789
use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall};
750790
use crate::messages::websocket as ws;
751-
use crate::subscription::module_subscription_manager::SubscriptionManager;
791+
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
752792
use crate::subscription::query::compile_read_only_query;
753793
use crate::subscription::TableUpdateType;
754794
use hashbrown::HashMap;
@@ -780,9 +820,11 @@ mod tests {
780820
let client = ClientActorId::for_test(Identity::ZERO);
781821
let config = ClientConfig::for_test();
782822
let sender = Arc::new(ClientConnectionSender::dummy(client, config));
823+
let send_worker_queue = spawn_send_worker(None);
783824
let module_subscriptions = ModuleSubscriptions::new(
784825
db.clone(),
785826
SubscriptionManager::for_test_without_metrics_arc_rwlock(),
827+
send_worker_queue,
786828
owner,
787829
);
788830

0 commit comments

Comments
 (0)