-
Notifications
You must be signed in to change notification settings - Fork 29
Move main LSP loop off Tokio to an owned thread #1277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
41a8f17
bb1c10a
c2a9cfc
aed4d43
a104d6a
ee19e88
dcc5514
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,8 +26,11 @@ use oak_scan::ScanRequest; | |
| use oak_scan::ScanScheduler; | ||
| use oak_semantic::library::Library; | ||
| use stdext::result::ResultExt; | ||
| use stdext::spawn; | ||
| use tokio::runtime::Handle; | ||
| use tokio::sync::mpsc; | ||
| use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel; | ||
| use tokio::sync::oneshot; | ||
| use tokio::task::JoinHandle; | ||
| use tower_lsp::jsonrpc; | ||
| use tower_lsp::lsp_types; | ||
|
|
@@ -157,6 +160,21 @@ pub(crate) struct GlobalState { | |
| events_rx: TokioUnboundedReceiver<Event>, | ||
| } | ||
|
|
||
| /// Owns the running LSP loops. Dropping it shuts them down. | ||
| /// | ||
| /// - The auxiliary loop is a runtime task, aborted when `_aux_loop` drops. | ||
| /// - The main loop runs on its own thread. Dropping `_main_shutdown_tx` closes | ||
| /// the channel the loop selects on, so it breaks, drops the owned | ||
| /// `GlobalState`, and the thread exits. We hold the thread's handle but don't | ||
| /// join it on drop (it winds down on its own), so dropping never blocks the | ||
| /// caller. | ||
| #[derive(Debug)] | ||
| pub(crate) struct LoopHandles { | ||
| _main_loop: std::thread::JoinHandle<()>, | ||
| _main_shutdown_tx: oneshot::Sender<()>, | ||
| _aux_loop: tokio::task::JoinSet<()>, | ||
| } | ||
|
|
||
| /// Non-cloneable, per-session state mutated only by exclusive handlers. | ||
| /// Sits alongside [`WorldState`] (which is cloneable for snapshot | ||
| /// handlers); state that can't be cloned lives here instead. | ||
|
|
@@ -265,37 +283,66 @@ impl GlobalState { | |
| self.events_tx.clone() | ||
| } | ||
|
|
||
| /// Start the main and auxiliary loops | ||
| /// Start the main and auxiliary loops. | ||
| /// | ||
| /// Returns a `JoinSet` that holds onto all tasks and state owned by the | ||
| /// event loop. Drop it to cancel everything and shut down the service. | ||
| pub(crate) fn start(self) -> tokio::task::JoinSet<()> { | ||
| let mut set = tokio::task::JoinSet::<()>::new(); | ||
|
|
||
| // Spawn latency-sensitive auxiliary loop. Must be first to initialise | ||
| // global transmission channel. | ||
| let aux = AuxiliaryState::new(self.client.clone()); | ||
| set.spawn(async move { aux.start().await }); | ||
|
|
||
| // Spawn main loop | ||
| set.spawn(async move { self.main_loop().await }); | ||
| /// The returned [`LoopHandles`] owns everything the loops need. Drop it to | ||
| /// shut the loops down and release the owned state. | ||
| pub(crate) fn start(self) -> LoopHandles { | ||
| let mut aux = tokio::task::JoinSet::<()>::new(); | ||
|
|
||
| // The auxiliary loop is fully async and never blocks. Must be started | ||
| // first to initialise the global transmission channel. | ||
| let aux_state = AuxiliaryState::new(self.client.clone()); | ||
| aux.spawn(async move { aux_state.start().await }); | ||
|
|
||
| // Since the main loop owns the Salsa DB and writes to it, we run on its | ||
| // own thread instead of a Tokio worker. Salsa writes are potentially | ||
| // blocking until the writer gains exclusive access. If background tasks | ||
| // holding clones of the DB are stuck on the same thread as the main | ||
| // loop, the LSP deadlocks. This can be avoided by wrapping writes in | ||
| // `block_in_place()` but the safer structure is to have it run on an OS | ||
| // thread that we're in control of. | ||
| let (shutdown_tx, shutdown_rx) = oneshot::channel(); | ||
| let handle = Handle::current(); | ||
| let main_loop = spawn!("oak-main-loop", move || { | ||
| handle.block_on(self.main_loop(shutdown_rx)); | ||
| }); | ||
|
|
||
| set | ||
| LoopHandles { | ||
| _main_shutdown_tx: shutdown_tx, | ||
| _aux_loop: aux, | ||
| _main_loop: main_loop, | ||
| } | ||
| } | ||
|
|
||
| /// Run main loop | ||
| /// | ||
| /// This takes ownership of all global state and handles one by one LSP | ||
| /// requests, notifications, and other internal events. | ||
| async fn main_loop(mut self) { | ||
| async fn main_loop(mut self, mut shutdown_rx: oneshot::Receiver<()>) { | ||
| loop { | ||
| let event = self.next_event().await; | ||
| if let Err(err) = self.handle_event(event).await { | ||
| lsp::log_error!("Failure while handling event:\n{err:?}") | ||
| tokio::select! { | ||
| _ = &mut shutdown_rx => { | ||
| lsp::log_info!("Main loop stopping: handle dropped"); | ||
| break; | ||
| }, | ||
| event = self.events_rx.recv() => { | ||
| let Some(event) = event else { | ||
| lsp::log_info!("Main loop stopping: event channel closed"); | ||
| break; | ||
| }; | ||
| if let Err(err) = self.handle_event(event).await { | ||
| lsp::log_error!("Failure while handling event:\n{err:?}") | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Pull the next event off the channel. Only the test pump uses this; the | ||
| /// running loop selects on the channel directly so it can also watch for | ||
| /// shutdown. | ||
| #[cfg(test)] | ||
|
Comment on lines
+342
to
+345
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i would just remove this
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep it as a token of the nicer loop structure |
||
| async fn next_event(&mut self) -> Event { | ||
| self.events_rx.recv().await.unwrap() | ||
| } | ||
|
|
@@ -329,6 +376,10 @@ impl GlobalState { | |
| Event::Lsp(msg) => match msg { | ||
| LspMessage::Notification(notif) => { | ||
| lsp::log_info!("{notif:#?}"); | ||
| lsp::log_info!( | ||
| "Entering notification handler with {n} outstanding Salsa db holds", | ||
| n = self.world.db.outstanding_holds() | ||
| ); | ||
|
Comment on lines
+379
to
+382
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we want to know this on every iteration? Why do we want to know this at all? It feels like itll be expected for the db to have outstanding holds in background threads (i.e. its working on diagnostics when we get a did-change update)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not every iteration, this is only when we're about to write to the DB. This is meant to help debugging hangs from user logs, we can remove this later once proven stable. |
||
|
|
||
| match notif { | ||
| LspNotification::Initialized(_params) => { | ||
|
|
@@ -449,6 +500,8 @@ impl GlobalState { | |
| }, | ||
|
|
||
| Event::OakScanCompleted(scan) => { | ||
| lsp::log_info!("Received `OakScanCompleted`"); | ||
|
|
||
| // This scan ran on a background task, but it sends its result | ||
| // back here so the write happens on the main loop. Keep it that | ||
| // way: Only the main loop should write to the oak DB (not | ||
|
|
@@ -475,6 +528,12 @@ impl GlobalState { | |
| scan, | ||
| &editor_owned, | ||
| ); | ||
| lsp::log_info!( | ||
| "Dispatching {n} followup scan requests with {n_holds} outstanding Salsa db holds", | ||
| n = followups.len(), | ||
| n_holds = self.world.db.outstanding_holds(), | ||
| ); | ||
|
|
||
| dispatch_scan_requests(&self.events_tx, followups); | ||
|
|
||
| // Warm the workspace index once the scan settles. Editor | ||
|
|
@@ -486,13 +545,15 @@ impl GlobalState { | |
| } | ||
| }, | ||
| } | ||
| lsp::log_info!("Finished handling event in {}ms", loop_tick.elapsed().as_millis()); | ||
|
|
||
| // TODO Make this threshold configurable by the client | ||
| // TODO: Make this threshold configurable by the client | ||
| if loop_tick.elapsed() > std::time::Duration::from_millis(50) { | ||
| lsp::log_info!("Handler took {}ms", loop_tick.elapsed().as_millis()); | ||
| lsp::log_info!("Handler took more than 50ms"); | ||
| } | ||
|
|
||
| if salsa::plumbing::current_revision(&self.world.db) != old_revision { | ||
| lsp::log_info!("World state revision advanced"); | ||
| diagnostics_refresh_all(&self.world); | ||
| } | ||
|
|
||
|
|
@@ -937,11 +998,10 @@ async fn process_diagnostics_queue(mut rx: mpsc::UnboundedReceiver<RefreshDiagno | |
| } | ||
| process_diagnostics_batch(batch); | ||
| } | ||
| lsp::log_warn!("process_diagnostics_queue: channel closed, task exiting"); | ||
| } | ||
|
|
||
| fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) { | ||
| tracing::trace!("Processing {n} diagnostic tasks", n = batch.len()); | ||
|
|
||
| // Deduplicate tasks by keeping only the last one for each URI. We use a | ||
| // `HashMap` so only the last insertion is retained. This is effectively a | ||
| // way of cancelling diagnostics tasks for outdated documents. | ||
|
|
@@ -950,6 +1010,9 @@ fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) { | |
| .map(|task| (task.file.wire_url().clone(), task)) | ||
| .collect(); | ||
|
|
||
| tracing::trace!("Processing {n} diagnostic tasks", n = batch.len()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we don't say How do we decide where to log?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I decided to keep it there because diagnostics code logs at kernel level, so we get a section marker. In general, the current split is that eventing and dispatch belong to the LSP, handling belongs to the kernel. This is a remnant from |
||
| lsp::log_info!("Processing {n} diagnostic tasks", n = batch.len()); | ||
|
|
||
| // Each file is its own blocking task. `spawn_blocking()` catches salsa | ||
| // cancellation, so a pass cancelled by a concurrent edit just produces no | ||
| // event. The publish happens via the returned [`AuxiliaryEvent`]. | ||
|
|
@@ -979,8 +1042,16 @@ fn refresh_diagnostics(task: RefreshDiagnosticsTask) -> RefreshDiagnosticsResult | |
| .components() | ||
| .any(|c| c.as_os_str() == "testthat"); | ||
|
|
||
| let now = std::time::Instant::now(); | ||
| lsp::log_info!("Generating diagnostics for file: {uri}"); | ||
|
|
||
| let diagnostics = generate_diagnostics(file.file(), state, testthat); | ||
|
|
||
| lsp::log_info!( | ||
| "Finished diagnostics for file: {uri} in {:.0?}", | ||
| now.elapsed() | ||
| ); | ||
|
|
||
| RefreshDiagnosticsResult { | ||
| uri, | ||
| diagnostics, | ||
|
|
@@ -1027,7 +1098,10 @@ pub(crate) fn diagnostics_refresh_all(state: &WorldState) { | |
| /// passes spawned by that same write force the same memos and finish the job. | ||
| fn warm_workspace_index(db: OakDatabase) { | ||
| spawn_blocking(move || { | ||
| let now = std::time::Instant::now(); | ||
| lsp::log_info!("Starting workspace index warmup"); | ||
| indexer::warm(&db); | ||
| lsp::log_info!("Finished workspace index warmup ({:.0?})", now.elapsed()); | ||
| Ok(None) | ||
| }) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need a
tokio::task::JoinSetanymore for 1 task?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice way of cancelling on abort. Dropping a task handle doesn't cancel it, so we'd need a custom Drop method to call the handle's
abort()method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i was not aware of that