Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ pub struct TableUpdate<F: WebsocketFormat> {
}

/// Computed update for a single query, annotated with the number of matching rows.
#[derive(Debug)]
pub struct SingleQueryUpdate<F: WebsocketFormat> {
pub update: F::QueryUpdate,
pub num_rows: u64,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl ClientConnectionSender {
Self::dummy_with_channel(id, config).0
}

/// Send a message to the client. For data-related messages, you should probably use
/// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order.
pub fn send_message(&self, message: impl Into<SerializableMessage>) -> Result<(), ClientSendError> {
self.send(message.into())
}
Expand Down
14 changes: 10 additions & 4 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::messages::control_db::{Database, HostType};
use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::SubscriptionManager;
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
use crate::util::asyncify;
use crate::util::jobs::{JobCore, JobCores};
use crate::worker_metrics::WORKER_METRICS;
Expand Down Expand Up @@ -545,11 +545,17 @@ async fn make_replica_ctx(
relational_db: Arc<RelationalDB>,
) -> anyhow::Result<ReplicaContext> {
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::for_database(
database.database_identity,
let send_worker_queue = spawn_send_worker(Some(database.database_identity));
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::new(
send_worker_queue.clone(),
)));
let downgraded = Arc::downgrade(&subscriptions);
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);
let subscriptions = ModuleSubscriptions::new(
relational_db.clone(),
subscriptions,
send_worker_queue,
database.owner_identity,
);

// If an error occurs when evaluating a subscription,
// we mark each client that was affected,
Expand Down
Loading
Loading