From 254c65248a8364b6513dacf3e8f7e1ab8f5e8232 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 20 Jun 2025 15:10:03 +0200 Subject: [PATCH 1/4] Move the blocking metrics function on to a blocking thread. --- crates/core/src/host/host_controller.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 38acc94bee9..a75749f92d1 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1063,8 +1063,13 @@ async fn metric_reporter(replica_ctx: Arc) { .with_label_values(&replica_ctx.database_identity); loop { - let disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage()); - replica_ctx.update_gauges(); + // We spawn a blocking task here because this grabs blocking locks. + { + let ctx = replica_ctx.clone(); + let _ = tokio::task::spawn_blocking(move || ctx.update_gauges()).await; + } + let ctx = replica_ctx.clone(); + let disk_usage = tokio::task::block_in_place(move || ctx.total_disk_usage()); if let Some(num_bytes) = disk_usage.durability { message_log_size.set(num_bytes as i64); } From b41604418390147ab123cc627e2d754ac4102fd0 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 20 Jun 2025 16:11:56 +0200 Subject: [PATCH 2/4] Use a blocking thread to grab locks in the scheduler. --- crates/core/src/host/scheduler.rs | 96 +++++++++++++++---------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index ae46ca36a73..5a8ba870075 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -15,17 +15,17 @@ use tokio::sync::mpsc; use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue, Expired}; -use crate::db::datastore::locking_tx_datastore::MutTxId; -use crate::db::datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID}; -use crate::db::datastore::traits::IsolationLevel; -use crate::db::relational_db::RelationalDB; -use crate::execution_context::Workload; - use super::module_host::ModuleEvent; use super::module_host::ModuleFunctionCall; use super::module_host::{CallReducerParams, WeakModuleHost}; use super::module_host::{DatabaseUpdate, EventStatus}; use super::{ModuleHost, ReducerArgs, ReducerCallError}; +use crate::db::datastore::locking_tx_datastore::MutTxId; +use crate::db::datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID}; +use crate::db::datastore::traits::IsolationLevel; +use crate::db::relational_db::RelationalDB; +use crate::execution_context::Workload; +use crate::util::asyncify; #[derive(Copy, Clone, Eq, PartialEq, Hash)] pub struct ScheduledReducerId { @@ -388,7 +388,7 @@ impl SchedulerActor { // delete the scheduled reducer row if its not repeated reducer Ok(_) | Err(_) => { if let Some(id) = id { - self.delete_scheduled_reducer_row(&db, id, module_host_clone).await; + let _ = self.delete_scheduled_reducer_row(&db, id, module_host_clone).await; } } } @@ -398,16 +398,50 @@ impl SchedulerActor { }; } - /// Handle repeated schedule by adding it back to queue - /// return true if it is repeated schedule - fn handle_repeated_schedule( + async fn delete_scheduled_reducer_row( &mut self, + db: &RelationalDB, id: ScheduledReducerId, - schedule_row: &RowRef<'_>, - ) -> Result { - let schedule_at = read_schedule_at(schedule_row, id.at_column)?; + module_host: ModuleHost, + ) { + let host_clone = module_host.clone(); + let db = db.clone(); + let schedule_at = asyncify(move || { + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + + match get_schedule_row_mut(&tx, &db, id) { + Ok(schedule_row) => { + if let Ok(schedule_at) = read_schedule_at(&schedule_row, id.at_column) { + // If the schedule is an interval, we handle it as a repeated schedule + if let ScheduleAt::Interval(_) = schedule_at { + return Some(schedule_at); + } + let row_ptr = schedule_row.pointer(); + db.delete(&mut tx, id.table_id, [row_ptr]); + + commit_and_broadcast_deletion_event(tx, host_clone); + } else { + log::debug!( + "Failed to read 'scheduled_at' from row: table_id {}, schedule_id {}", + id.table_id, + id.schedule_id + ); + } + } + Err(_) => { + log::debug!( + "Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}", + id.table_id, + id.schedule_id + ); + } + } + None + }) + .await; - if let ScheduleAt::Interval(dur) = schedule_at { + // If this was repeated, we need to add it back to the queue. + if let Some(ScheduleAt::Interval(dur)) = schedule_at { let key = self.queue.insert( QueueItem::Id { id, @@ -416,40 +450,6 @@ impl SchedulerActor { dur.to_duration().unwrap_or(Duration::ZERO), ); self.key_map.insert(id, key); - Ok(true) - } else { - Ok(false) - } - } - - async fn delete_scheduled_reducer_row( - &mut self, - db: &RelationalDB, - id: ScheduledReducerId, - module_host: ModuleHost, - ) { - let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - - match get_schedule_row_mut(&tx, db, id) { - Ok(schedule_row) => { - if let Ok(is_repeated) = self.handle_repeated_schedule(id, &schedule_row) { - if is_repeated { - return; // Do not delete entry for repeated reducer - } - - let row_ptr = schedule_row.pointer(); - db.delete(&mut tx, id.table_id, [row_ptr]); - - commit_and_broadcast_deletion_event(tx, module_host); - } - } - Err(_) => { - log::debug!( - "Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}", - id.table_id, - id.schedule_id - ); - } } } } From 2d88caddcb6580b9d65a756aaf08125aedef4043 Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 20 Jun 2025 16:14:27 +0200 Subject: [PATCH 3/4] Remove nonsense. --- crates/core/src/host/scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 5a8ba870075..1114ff8b476 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -388,7 +388,7 @@ impl SchedulerActor { // delete the scheduled reducer row if its not repeated reducer Ok(_) | Err(_) => { if let Some(id) = id { - let _ = self.delete_scheduled_reducer_row(&db, id, module_host_clone).await; + self.delete_scheduled_reducer_row(&db, id, module_host_clone).await; } } } From fda9681037fdbf3638de7507bc62c979cc19904e Mon Sep 17 00:00:00 2001 From: Jeffrey Dallatezza Date: Fri, 20 Jun 2025 16:58:27 +0200 Subject: [PATCH 4/4] Combine metrics things into one spawn_blocking --- crates/core/src/host/host_controller.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index a75749f92d1..9b280385fc1 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1063,18 +1063,19 @@ async fn metric_reporter(replica_ctx: Arc) { .with_label_values(&replica_ctx.database_identity); loop { - // We spawn a blocking task here because this grabs blocking locks. - { - let ctx = replica_ctx.clone(); - let _ = tokio::task::spawn_blocking(move || ctx.update_gauges()).await; - } let ctx = replica_ctx.clone(); - let disk_usage = tokio::task::block_in_place(move || ctx.total_disk_usage()); - if let Some(num_bytes) = disk_usage.durability { - message_log_size.set(num_bytes as i64); - } - if let Some(num_bytes) = disk_usage.logs { - module_log_file_size.set(num_bytes as i64); + // We spawn a blocking task here because this grabs blocking locks. + let disk_usage_future = tokio::task::spawn_blocking(move || { + ctx.update_gauges(); + ctx.total_disk_usage() + }); + if let Ok(disk_usage) = disk_usage_future.await { + if let Some(num_bytes) = disk_usage.durability { + message_log_size.set(num_bytes as i64); + } + if let Some(num_bytes) = disk_usage.logs { + module_log_file_size.set(num_bytes as i64); + } } tokio::time::sleep(STORAGE_METERING_INTERVAL).await; }