diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index 08f0935c3b..b9f87908b9 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -47,6 +47,7 @@ use crate::lsp::input_boundaries::InputBoundariesParams; use crate::lsp::input_boundaries::InputBoundariesResponse; use crate::lsp::main_loop::Event; use crate::lsp::main_loop::GlobalState; +use crate::lsp::main_loop::LoopHandles; use crate::lsp::main_loop::TokioUnboundedSender; use crate::lsp::statement_range; use crate::lsp::statement_range::StatementRangeParams; @@ -232,9 +233,9 @@ struct Backend { /// Channel for communication with the main loop. events_tx: TokioUnboundedSender, - /// Handle to main loop. Drop it to cancel the loop, all associated tasks, - /// and drop all owned state. - _main_loop: tokio::task::JoinSet<()>, + /// Handle to the LSP loops. Drop it to shut the loops down and drop all + /// owned state. + _main_loop: LoopHandles, } impl Backend { diff --git a/crates/ark/src/lsp/handler.rs b/crates/ark/src/lsp/handler.rs index b4ef46a1d5..04b96ed32f 100644 --- a/crates/ark/src/lsp/handler.rs +++ b/crates/ark/src/lsp/handler.rs @@ -39,7 +39,8 @@ impl Lsp { ) -> Self { let rt = Builder::new_multi_thread() .enable_all() - // One for the main loop and one spare + // Workers serve tower-lsp, the auxiliary loop, and the diagnostics + // queue. The main loop runs on its own thread. .worker_threads(2) // Used for diagnostics .max_blocking_threads(2) diff --git a/crates/ark/src/lsp/main_loop.rs b/crates/ark/src/lsp/main_loop.rs index 6546841a3c..902548be9e 100644 --- a/crates/ark/src/lsp/main_loop.rs +++ b/crates/ark/src/lsp/main_loop.rs @@ -26,8 +26,11 @@ use oak_scan::ScanRequest; use oak_scan::ScanScheduler; use oak_semantic::library::Library; use stdext::result::ResultExt; +use stdext::spawn; +use tokio::runtime::Handle; use tokio::sync::mpsc; use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel; +use tokio::sync::oneshot; use tokio::task::JoinHandle; use tower_lsp::jsonrpc; use tower_lsp::lsp_types; @@ -157,6 +160,21 @@ pub(crate) struct GlobalState { events_rx: TokioUnboundedReceiver, } +/// Owns the running LSP loops. Dropping it shuts them down. +/// +/// - The auxiliary loop is a runtime task, aborted when `_aux_loop` drops. +/// - The main loop runs on its own thread. Dropping `_main_shutdown_tx` closes +/// the channel the loop selects on, so it breaks, drops the owned +/// `GlobalState`, and the thread exits. We hold the thread's handle but don't +/// join it on drop (it winds down on its own), so dropping never blocks the +/// caller. +#[derive(Debug)] +pub(crate) struct LoopHandles { + _main_loop: std::thread::JoinHandle<()>, + _main_shutdown_tx: oneshot::Sender<()>, + _aux_loop: tokio::task::JoinSet<()>, +} + /// Non-cloneable, per-session state mutated only by exclusive handlers. /// Sits alongside [`WorldState`] (which is cloneable for snapshot /// handlers); state that can't be cloned lives here instead. @@ -265,37 +283,66 @@ impl GlobalState { self.events_tx.clone() } - /// Start the main and auxiliary loops + /// Start the main and auxiliary loops. /// - /// Returns a `JoinSet` that holds onto all tasks and state owned by the - /// event loop. Drop it to cancel everything and shut down the service. - pub(crate) fn start(self) -> tokio::task::JoinSet<()> { - let mut set = tokio::task::JoinSet::<()>::new(); - - // Spawn latency-sensitive auxiliary loop. Must be first to initialise - // global transmission channel. - let aux = AuxiliaryState::new(self.client.clone()); - set.spawn(async move { aux.start().await }); - - // Spawn main loop - set.spawn(async move { self.main_loop().await }); + /// The returned [`LoopHandles`] owns everything the loops need. Drop it to + /// shut the loops down and release the owned state. + pub(crate) fn start(self) -> LoopHandles { + let mut aux = tokio::task::JoinSet::<()>::new(); + + // The auxiliary loop is fully async and never blocks. Must be started + // first to initialise the global transmission channel. + let aux_state = AuxiliaryState::new(self.client.clone()); + aux.spawn(async move { aux_state.start().await }); + + // Since the main loop owns the Salsa DB and writes to it, we run on its + // own thread instead of a Tokio worker. Salsa writes are potentially + // blocking until the writer gains exclusive access. If background tasks + // holding clones of the DB are stuck on the same thread as the main + // loop, the LSP deadlocks. This can be avoided by wrapping writes in + // `block_in_place()` but the safer structure is to have it run on an OS + // thread that we're in control of. + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let handle = Handle::current(); + let main_loop = spawn!("oak-main-loop", move || { + handle.block_on(self.main_loop(shutdown_rx)); + }); - set + LoopHandles { + _main_shutdown_tx: shutdown_tx, + _aux_loop: aux, + _main_loop: main_loop, + } } /// Run main loop /// /// This takes ownership of all global state and handles one by one LSP /// requests, notifications, and other internal events. - async fn main_loop(mut self) { + async fn main_loop(mut self, mut shutdown_rx: oneshot::Receiver<()>) { loop { - let event = self.next_event().await; - if let Err(err) = self.handle_event(event).await { - lsp::log_error!("Failure while handling event:\n{err:?}") + tokio::select! { + _ = &mut shutdown_rx => { + lsp::log_info!("Main loop stopping: handle dropped"); + break; + }, + event = self.events_rx.recv() => { + let Some(event) = event else { + lsp::log_info!("Main loop stopping: event channel closed"); + break; + }; + if let Err(err) = self.handle_event(event).await { + lsp::log_error!("Failure while handling event:\n{err:?}") + } + } } } } + /// Pull the next event off the channel. Only the test pump uses this; the + /// running loop selects on the channel directly so it can also watch for + /// shutdown. + #[cfg(test)] async fn next_event(&mut self) -> Event { self.events_rx.recv().await.unwrap() } @@ -329,6 +376,10 @@ impl GlobalState { Event::Lsp(msg) => match msg { LspMessage::Notification(notif) => { lsp::log_info!("{notif:#?}"); + lsp::log_info!( + "Entering notification handler with {n} outstanding Salsa db holds", + n = self.world.db.outstanding_holds() + ); match notif { LspNotification::Initialized(_params) => { @@ -449,6 +500,8 @@ impl GlobalState { }, Event::OakScanCompleted(scan) => { + lsp::log_info!("Received `OakScanCompleted`"); + // This scan ran on a background task, but it sends its result // back here so the write happens on the main loop. Keep it that // way: Only the main loop should write to the oak DB (not @@ -475,6 +528,12 @@ impl GlobalState { scan, &editor_owned, ); + lsp::log_info!( + "Dispatching {n} followup scan requests with {n_holds} outstanding Salsa db holds", + n = followups.len(), + n_holds = self.world.db.outstanding_holds(), + ); + dispatch_scan_requests(&self.events_tx, followups); // Warm the workspace index once the scan settles. Editor @@ -486,13 +545,15 @@ impl GlobalState { } }, } + lsp::log_info!("Finished handling event in {}ms", loop_tick.elapsed().as_millis()); - // TODO Make this threshold configurable by the client + // TODO: Make this threshold configurable by the client if loop_tick.elapsed() > std::time::Duration::from_millis(50) { - lsp::log_info!("Handler took {}ms", loop_tick.elapsed().as_millis()); + lsp::log_info!("Handler took more than 50ms"); } if salsa::plumbing::current_revision(&self.world.db) != old_revision { + lsp::log_info!("World state revision advanced"); diagnostics_refresh_all(&self.world); } @@ -937,11 +998,10 @@ async fn process_diagnostics_queue(mut rx: mpsc::UnboundedReceiver) { - tracing::trace!("Processing {n} diagnostic tasks", n = batch.len()); - // Deduplicate tasks by keeping only the last one for each URI. We use a // `HashMap` so only the last insertion is retained. This is effectively a // way of cancelling diagnostics tasks for outdated documents. @@ -950,6 +1010,9 @@ fn process_diagnostics_batch(batch: Vec) { .map(|task| (task.file.wire_url().clone(), task)) .collect(); + tracing::trace!("Processing {n} diagnostic tasks", n = batch.len()); + lsp::log_info!("Processing {n} diagnostic tasks", n = batch.len()); + // Each file is its own blocking task. `spawn_blocking()` catches salsa // cancellation, so a pass cancelled by a concurrent edit just produces no // event. The publish happens via the returned [`AuxiliaryEvent`]. @@ -979,8 +1042,16 @@ fn refresh_diagnostics(task: RefreshDiagnosticsTask) -> RefreshDiagnosticsResult .components() .any(|c| c.as_os_str() == "testthat"); + let now = std::time::Instant::now(); + lsp::log_info!("Generating diagnostics for file: {uri}"); + let diagnostics = generate_diagnostics(file.file(), state, testthat); + lsp::log_info!( + "Finished diagnostics for file: {uri} in {:.0?}", + now.elapsed() + ); + RefreshDiagnosticsResult { uri, diagnostics, @@ -1027,7 +1098,10 @@ pub(crate) fn diagnostics_refresh_all(state: &WorldState) { /// passes spawned by that same write force the same memos and finish the job. fn warm_workspace_index(db: OakDatabase) { spawn_blocking(move || { + let now = std::time::Instant::now(); + lsp::log_info!("Starting workspace index warmup"); indexer::warm(&db); + lsp::log_info!("Finished workspace index warmup ({:.0?})", now.elapsed()); Ok(None) }) } diff --git a/crates/oak_db/src/storage.rs b/crates/oak_db/src/storage.rs index b41060887e..0c14546436 100644 --- a/crates/oak_db/src/storage.rs +++ b/crates/oak_db/src/storage.rs @@ -20,12 +20,21 @@ pub struct OakDatabase { library_roots: Arc>, orphan_root: Arc>, stale_root: Arc>, + // Clone counter that represents how many background readers have cloned the database + holds: Arc<()>, } impl OakDatabase { pub fn new() -> Self { Self::default() } + + // Number of live clones of this db (always >= 1, the caller itself). A + // write through `&mut db` parks until this reaches 1, so a value > 1 here + // means a write right now would block on that many outstanding handles. + pub fn outstanding_holds(&self) -> usize { + Arc::strong_count(&self.holds) + } } #[salsa::db]