Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::lsp::input_boundaries::InputBoundariesParams;
use crate::lsp::input_boundaries::InputBoundariesResponse;
use crate::lsp::main_loop::Event;
use crate::lsp::main_loop::GlobalState;
use crate::lsp::main_loop::LoopHandles;
use crate::lsp::main_loop::TokioUnboundedSender;
use crate::lsp::statement_range;
use crate::lsp::statement_range::StatementRangeParams;
Expand Down Expand Up @@ -232,9 +233,9 @@ struct Backend {
/// Channel for communication with the main loop.
events_tx: TokioUnboundedSender<Event>,

/// Handle to main loop. Drop it to cancel the loop, all associated tasks,
/// and drop all owned state.
_main_loop: tokio::task::JoinSet<()>,
/// Handle to the LSP loops. Drop it to shut the loops down and drop all
/// owned state.
_main_loop: LoopHandles,
}

impl Backend {
Expand Down
3 changes: 2 additions & 1 deletion crates/ark/src/lsp/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ impl Lsp {
) -> Self {
let rt = Builder::new_multi_thread()
.enable_all()
// One for the main loop and one spare
// Workers serve tower-lsp, the auxiliary loop, and the diagnostics
// queue. The main loop runs on its own thread.
.worker_threads(2)
// Used for diagnostics
.max_blocking_threads(2)
Expand Down
118 changes: 96 additions & 22 deletions crates/ark/src/lsp/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ use oak_scan::ScanRequest;
use oak_scan::ScanScheduler;
use oak_semantic::library::Library;
use stdext::result::ResultExt;
use stdext::spawn;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tower_lsp::jsonrpc;
use tower_lsp::lsp_types;
Expand Down Expand Up @@ -157,6 +160,21 @@ pub(crate) struct GlobalState {
events_rx: TokioUnboundedReceiver<Event>,
}

/// Owns the running LSP loops. Dropping it shuts them down.
///
/// - The auxiliary loop is a runtime task, aborted when `_aux_loop` drops.
/// - The main loop runs on its own thread. Dropping `_main_shutdown_tx` closes
/// the channel the loop selects on, so it breaks, drops the owned
/// `GlobalState`, and the thread exits. We hold the thread's handle but don't
/// join it on drop (it winds down on its own), so dropping never blocks the
/// caller.
#[derive(Debug)]
pub(crate) struct LoopHandles {
_main_loop: std::thread::JoinHandle<()>,
_main_shutdown_tx: oneshot::Sender<()>,
_aux_loop: tokio::task::JoinSet<()>,
}

/// Non-cloneable, per-session state mutated only by exclusive handlers.
/// Sits alongside [`WorldState`] (which is cloneable for snapshot
/// handlers); state that can't be cloned lives here instead.
Expand Down Expand Up @@ -265,37 +283,66 @@ impl GlobalState {
self.events_tx.clone()
}

/// Start the main and auxiliary loops
/// Start the main and auxiliary loops.
///
/// Returns a `JoinSet` that holds onto all tasks and state owned by the
/// event loop. Drop it to cancel everything and shut down the service.
pub(crate) fn start(self) -> tokio::task::JoinSet<()> {
let mut set = tokio::task::JoinSet::<()>::new();

// Spawn latency-sensitive auxiliary loop. Must be first to initialise
// global transmission channel.
let aux = AuxiliaryState::new(self.client.clone());
set.spawn(async move { aux.start().await });

// Spawn main loop
set.spawn(async move { self.main_loop().await });
/// The returned [`LoopHandles`] owns everything the loops need. Drop it to
/// shut the loops down and release the owned state.
pub(crate) fn start(self) -> LoopHandles {
let mut aux = tokio::task::JoinSet::<()>::new();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need a tokio::task::JoinSet anymore for 1 task?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice way of cancelling on abort. Dropping a task handle doesn't cancel it, so we'd need a custom Drop method to call the handle's abort() method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping a task handle doesn't cancel it

oh i was not aware of that


// The auxiliary loop is fully async and never blocks. Must be started
// first to initialise the global transmission channel.
let aux_state = AuxiliaryState::new(self.client.clone());
aux.spawn(async move { aux_state.start().await });

// Since the main loop owns the Salsa DB and writes to it, we run on its
// own thread instead of a Tokio worker. Salsa writes are potentially
// blocking until the writer gains exclusive access. If background tasks
// holding clones of the DB are stuck on the same thread as the main
// loop, the LSP deadlocks. This can be avoided by wrapping writes in
// `block_in_place()` but the safer structure is to have it run on an OS
// thread that we're in control of.
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let handle = Handle::current();
let main_loop = spawn!("oak-main-loop", move || {
handle.block_on(self.main_loop(shutdown_rx));
});

set
LoopHandles {
_main_shutdown_tx: shutdown_tx,
_aux_loop: aux,
_main_loop: main_loop,
}
}

/// Run main loop
///
/// This takes ownership of all global state and handles one by one LSP
/// requests, notifications, and other internal events.
async fn main_loop(mut self) {
async fn main_loop(mut self, mut shutdown_rx: oneshot::Receiver<()>) {
loop {
let event = self.next_event().await;
if let Err(err) = self.handle_event(event).await {
lsp::log_error!("Failure while handling event:\n{err:?}")
tokio::select! {
_ = &mut shutdown_rx => {
lsp::log_info!("Main loop stopping: handle dropped");
break;
},
event = self.events_rx.recv() => {
let Some(event) = event else {
lsp::log_info!("Main loop stopping: event channel closed");
break;
};
if let Err(err) = self.handle_event(event).await {
lsp::log_error!("Failure while handling event:\n{err:?}")
}
}
}
}
}

/// Pull the next event off the channel. Only the test pump uses this; the
/// running loop selects on the channel directly so it can also watch for
/// shutdown.
#[cfg(test)]
Comment on lines +342 to +345

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would just remove this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as a token of the nicer loop structure

async fn next_event(&mut self) -> Event {
self.events_rx.recv().await.unwrap()
}
Expand Down Expand Up @@ -329,6 +376,10 @@ impl GlobalState {
Event::Lsp(msg) => match msg {
LspMessage::Notification(notif) => {
lsp::log_info!("{notif:#?}");
lsp::log_info!(
"Entering notification handler with {n} outstanding Salsa db holds",
n = self.world.db.outstanding_holds()
);
Comment on lines +379 to +382

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to know this on every iteration? Why do we want to know this at all? It feels like itll be expected for the db to have outstanding holds in background threads (i.e. its working on diagnostics when we get a did-change update)

@lionel- lionel- Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not every iteration, this is only when we're about to write to the DB.

This is meant to help debugging hangs from user logs, we can remove this later once proven stable.


match notif {
LspNotification::Initialized(_params) => {
Expand Down Expand Up @@ -449,6 +500,8 @@ impl GlobalState {
},

Event::OakScanCompleted(scan) => {
lsp::log_info!("Received `OakScanCompleted`");

// This scan ran on a background task, but it sends its result
// back here so the write happens on the main loop. Keep it that
// way: Only the main loop should write to the oak DB (not
Expand All @@ -475,6 +528,12 @@ impl GlobalState {
scan,
&editor_owned,
);
lsp::log_info!(
"Dispatching {n} followup scan requests with {n_holds} outstanding Salsa db holds",
n = followups.len(),
n_holds = self.world.db.outstanding_holds(),
);

dispatch_scan_requests(&self.events_tx, followups);

// Warm the workspace index once the scan settles. Editor
Expand All @@ -486,13 +545,15 @@ impl GlobalState {
}
},
}
lsp::log_info!("Finished handling event in {}ms", loop_tick.elapsed().as_millis());

// TODO Make this threshold configurable by the client
// TODO: Make this threshold configurable by the client
if loop_tick.elapsed() > std::time::Duration::from_millis(50) {
lsp::log_info!("Handler took {}ms", loop_tick.elapsed().as_millis());
lsp::log_info!("Handler took more than 50ms");
}

if salsa::plumbing::current_revision(&self.world.db) != old_revision {
lsp::log_info!("World state revision advanced");
diagnostics_refresh_all(&self.world);
}

Expand Down Expand Up @@ -937,11 +998,10 @@ async fn process_diagnostics_queue(mut rx: mpsc::UnboundedReceiver<RefreshDiagno
}
process_diagnostics_batch(batch);
}
lsp::log_warn!("process_diagnostics_queue: channel closed, task exiting");
}

fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) {
tracing::trace!("Processing {n} diagnostic tasks", n = batch.len());

// Deduplicate tasks by keeping only the last one for each URI. We use a
// `HashMap` so only the last insertion is retained. This is effectively a
// way of cancelling diagnostics tasks for outdated documents.
Expand All @@ -950,6 +1010,9 @@ fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) {
.map(|task| (task.file.wire_url().clone(), task))
.collect();

tracing::trace!("Processing {n} diagnostic tasks", n = batch.len());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't say Generating diagnostics or Finished diagnostics via tracing::trace! (we only do that via lsp::log_info!), i feel like this isn't actually that useful on the kernel side (i know its not new from you, but maybe we can just remove it)

How do we decide where to log?

@lionel- lionel- Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to keep it there because diagnostics code logs at kernel level, so we get a section marker.

In general, the current split is that eventing and dispatch belong to the LSP, handling belongs to the kernel. This is a remnant from r_task()-heavy handling and we have nicer logging facilities on the kernel side (context, profiling).
But: when the LSP becomes standalone, everything moves to the LSP.

lsp::log_info!("Processing {n} diagnostic tasks", n = batch.len());

// Each file is its own blocking task. `spawn_blocking()` catches salsa
// cancellation, so a pass cancelled by a concurrent edit just produces no
// event. The publish happens via the returned [`AuxiliaryEvent`].
Expand Down Expand Up @@ -979,8 +1042,16 @@ fn refresh_diagnostics(task: RefreshDiagnosticsTask) -> RefreshDiagnosticsResult
.components()
.any(|c| c.as_os_str() == "testthat");

let now = std::time::Instant::now();
lsp::log_info!("Generating diagnostics for file: {uri}");

let diagnostics = generate_diagnostics(file.file(), state, testthat);

lsp::log_info!(
"Finished diagnostics for file: {uri} in {:.0?}",
now.elapsed()
);

RefreshDiagnosticsResult {
uri,
diagnostics,
Expand Down Expand Up @@ -1027,7 +1098,10 @@ pub(crate) fn diagnostics_refresh_all(state: &WorldState) {
/// passes spawned by that same write force the same memos and finish the job.
fn warm_workspace_index(db: OakDatabase) {
spawn_blocking(move || {
let now = std::time::Instant::now();
lsp::log_info!("Starting workspace index warmup");
indexer::warm(&db);
lsp::log_info!("Finished workspace index warmup ({:.0?})", now.elapsed());
Ok(None)
})
}
Expand Down
9 changes: 9 additions & 0 deletions crates/oak_db/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@ pub struct OakDatabase {
library_roots: Arc<OnceLock<LibraryRoots>>,
orphan_root: Arc<OnceLock<OrphanRoot>>,
stale_root: Arc<OnceLock<StaleRoot>>,
// Clone counter that represents how many background readers have cloned the database
holds: Arc<()>,
}

impl OakDatabase {
pub fn new() -> Self {
Self::default()
}

// Number of live clones of this db (always >= 1, the caller itself). A
// write through `&mut db` parks until this reaches 1, so a value > 1 here
// means a write right now would block on that many outstanding handles.
pub fn outstanding_holds(&self) -> usize {
Arc::strong_count(&self.holds)
}
}

#[salsa::db]
Expand Down
Loading