diff --git a/crates/ark/src/lsp/main_loop.rs b/crates/ark/src/lsp/main_loop.rs index e9dbcda9e..c5a77be78 100644 --- a/crates/ark/src/lsp/main_loop.rs +++ b/crates/ark/src/lsp/main_loop.rs @@ -6,6 +6,7 @@ // use std::collections::HashMap; +use std::collections::HashSet; use std::future; use std::path::Path; use std::path::PathBuf; @@ -15,12 +16,17 @@ use std::sync::atomic::Ordering; use std::sync::LazyLock; use std::sync::RwLock; +use aether_url::UrlId; use anyhow::anyhow; use futures::stream::FuturesUnordered; use futures::StreamExt; use oak_db::OakDatabase; use oak_scan::DbScan; +use oak_scan::ScanCompleted; +use oak_scan::ScanRequest; +use oak_scan::ScanScheduler; use oak_semantic::library::Library; +use stdext::result::ResultExt; use tokio::sync::mpsc; use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel; use tokio::task; @@ -88,6 +94,7 @@ type TaskList = futures::stream::FuturesUnordered, } -/// Unlike `WorldState`, `ParserState` cannot be cloned and is only accessed by -/// exclusive handlers. +/// Non-cloneable, per-session state mutated only by exclusive handlers. +/// Sits alongside [`WorldState`] (which is cloneable for snapshot +/// handlers); state that can't be cloned lives here instead. pub(crate) struct LspState { /// The set of tree-sitter document parsers managed by the `GlobalState`. pub(crate) parsers: HashMap, @@ -162,6 +170,11 @@ pub(crate) struct LspState { /// Channel for sending notifications to Console (e.g., document changes for DAP) pub(crate) console_notification_tx: TokioUnboundedSender, + + /// Coordinator for asynchronous workspace scans. Mutated only from + /// main-loop handlers. Must be out of [`WorldState`] because the scheduler + /// is not clonable. + pub(crate) oak_scheduler: ScanScheduler, } /// State for the auxiliary loop @@ -191,16 +204,6 @@ impl GlobalState { _r_home: PathBuf, console_notification_tx: TokioUnboundedSender, ) -> Self { - // Transmission channel for the main loop events. Shared with the - // tower-lsp backend and the Jupyter kernel. - let (events_tx, events_rx) = tokio_unbounded_channel::(); - - let lsp_state = LspState { - parsers: HashMap::new(), - capabilities: Capabilities::default(), - console_notification_tx, - }; - // FIXME: We shouldn't call R code in the kernel to figure this out let library_paths = crate::r_task(|| -> anyhow::Result> { Ok(harp::RFunction::new("base", ".libPaths") @@ -223,8 +226,34 @@ impl GlobalState { let library = Library::new(library_paths); + Self::from_parts( + client, + console_notification_tx, + WorldState::new(db, library), + ) + } + + /// Assemble the state around an already-built `WorldState`. Splitting this + /// out from [`GlobalState::new`] lets tests construct a state without the + /// R `.libPaths()` lookup that `new` does. + fn from_parts( + client: Client, + console_notification_tx: TokioUnboundedSender, + world: WorldState, + ) -> Self { + // Transmission channel for the main loop events. Shared with the + // tower-lsp backend and the Jupyter kernel. + let (events_tx, events_rx) = tokio_unbounded_channel::(); + + let lsp_state = LspState { + parsers: HashMap::new(), + capabilities: Capabilities::default(), + console_notification_tx, + oak_scheduler: ScanScheduler::new(), + }; + Self { - world: WorldState::new(db, library), + world, lsp_state, client, events_tx, @@ -299,13 +328,13 @@ impl GlobalState { handlers::handle_initialized(&self.client, &self.lsp_state).await?; }, LspNotification::DidChangeWorkspaceFolders(params) => { - state_handlers::did_change_workspace_folders(params, &mut self.world)?; + state_handlers::did_change_workspace_folders(params, &mut self.world, &mut self.lsp_state, &self.events_tx)?; }, LspNotification::DidChangeConfiguration(params) => { state_handlers::did_change_configuration(params, &self.client, &mut self.world).await?; }, LspNotification::DidChangeWatchedFiles(params) => { - state_handlers::did_change_watched_files(params, &mut self.world)?; + state_handlers::did_change_watched_files(params, &mut self.world, &mut self.lsp_state, &self.events_tx)?; }, LspNotification::DidOpenTextDocument(params) => { state_handlers::did_open(params, &mut self.lsp_state, &mut self.world)?; @@ -336,7 +365,7 @@ impl GlobalState { match request { LspRequest::Initialize(params) => { - respond(tx, || state_handlers::initialize(params, &mut self.lsp_state, &mut self.world), LspResponse::Initialize)?; + respond(tx, || state_handlers::initialize(params, &mut self.lsp_state, &mut self.world, &self.events_tx), LspResponse::Initialize)?; }, LspRequest::WorkspaceSymbol(params) => { respond(tx, || handlers::handle_symbol(params, &self.world), LspResponse::WorkspaceSymbol)?; @@ -420,6 +449,25 @@ impl GlobalState { } } }, + + Event::OakScanCompleted(scan) => { + // Recompute editor-owned files at apply time, not at spawn + // time: a buffer may have opened or closed since the scan + // kicked off. The buffer-drain inside `apply_scan_completed` uses + // this set as its watcher-event `skip` argument. + let editor_owned: HashSet = self.world + .documents + .keys() + .map(|url| UrlId::from_url(url.clone())) + .collect(); + + let followups = self.lsp_state.oak_scheduler.apply_scan_completed( + &mut self.world.db, + scan, + &editor_owned, + ); + dispatch_scan_requests(&self.events_tx, followups); + }, } // TODO Make this threshold configurable by the client @@ -452,6 +500,54 @@ impl GlobalState { } } +/// Test-only methods for driving the main loop without R or a live LSP +/// connection. Kept here, next to the loop they exercise, so the pump uses the +/// real `handle_event()` and the private channels rather than a reconstruction. +#[cfg(test)] +impl GlobalState { + /// Build a state with an empty db and no R library paths. Takes a `client` + /// because the struct holds one, but the event paths exercised in tests + /// never touch it. + pub(crate) fn new_test(client: Client) -> Self { + let (console_notification_tx, _) = tokio_unbounded_channel::(); + let world = WorldState::new(OakDatabase::new(), Library::new(vec![])); + Self::from_parts(client, console_notification_tx, world) + } + + /// Run `event` through the real `handle_event`, then pump the scan + /// completions it spawns until the scheduler goes idle. This is what + /// `main_loop()` does, minus the surrounding `loop`. + pub(crate) async fn handle_event_to_quiescence(&mut self, event: Event) { + self.handle_event(event).await.unwrap(); + while self.lsp_state.oak_scheduler.has_pending_scans() { + let event = self.next_event().await; + self.handle_event(event).await.unwrap(); + } + } + + pub(crate) fn world(&self) -> &WorldState { + &self.world + } +} + +/// Spawn each [`ScanRequest`] on a blocking task. Each task runs the +/// pure-I/O [`ScanRequest::run`] and ships the [`ScanCompleted`] back +/// to the main loop as [`Event::OakScanCompleted`], where the scheduler +/// then applies it. +pub(super) fn dispatch_scan_requests( + events_tx: &TokioUnboundedSender, + requests: Vec, +) { + for req in requests { + let tx = events_tx.clone(); + spawn_blocking(move || { + let scan = req.run(); + tx.send(Event::OakScanCompleted(scan)).log_err(); + Ok(None) + }); + } +} + /// Respond to a request from the LSP /// /// We receive requests from the LSP client with a response channel. Once we @@ -476,10 +572,8 @@ fn respond( into_lsp_response: impl FnOnce(T) -> LspResponse, ) -> anyhow::Result<()> { let response = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(response)) { - Ok(response) => { - let response = response.map(into_lsp_response); - RequestResponse::Result(response) - }, + Ok(Ok(t)) => RequestResponse::Result(Ok(into_lsp_response(t))), + Ok(Err(e)) => RequestResponse::Result(Err(e)), Err(err) => { // Set global crash flag to disable the LSP LSP_HAS_CRASHED.store(true, Ordering::Release); @@ -1047,10 +1141,14 @@ pub(crate) fn diagnostics_refresh_all(state: WorldState) { continue; } + // The task sits in the indexer queue off the main loop. + // `legacy_snapshot()` hands it a detached oak db so it can't pin the + // live one against the main loop's next `set_*` (diagnostics read only + // non-oak state). INDEXER_QUEUE .send(IndexerQueueTask::Diagnostics(RefreshDiagnosticsTask { uri: uri.clone(), - state: state.clone(), + state: state.legacy_snapshot(), })) .unwrap_or_else(|err| lsp::log_error!("Failed to queue diagnostics refresh: {err}")); } diff --git a/crates/ark/src/lsp/state.rs b/crates/ark/src/lsp/state.rs index 4566d7b06..74a5d9e34 100644 --- a/crates/ark/src/lsp/state.rs +++ b/crates/ark/src/lsp/state.rs @@ -93,6 +93,28 @@ impl WorldState { Err(anyhow!("Can't find document for URI {uri}")) } } + + /// Copy the world state for a background handler that does not query oak. + /// + /// The copy gets a fresh, empty `OakDatabase` instead of a handle to the + /// live one. A salsa db handle held off the main loop blocks the next + /// `set_*` on the owner: the setter waits for `clones == 1`, and an idle + /// handle (parked in the indexer queue, or held by a handler blocked in + /// `r_task`) never drops on its own. + /// + /// This is the snapshot for the non-salsa handlers (diagnostics, + /// indexing) that read only the plain `WorldState` fields. A salsa-based + /// handler that queries oak off the main loop needs a different snapshot, + /// one that keeps the live db handle and runs its queries under + /// cancellation (catch `Cancelled`, don't span `r_task`), so it sees real + /// oak data. That's what the `legacy_` prefix warns: don't reach for this + /// from oak-querying code. + pub(crate) fn legacy_snapshot(&self) -> WorldState { + WorldState { + db: OakDatabase::new(), + ..self.clone() + } + } } pub(crate) fn with_document( @@ -132,3 +154,61 @@ pub(crate) fn workspace_uris(state: &WorldState) -> Vec { let uris: Vec = state.documents.iter().map(|elt| elt.0.clone()).collect(); uris } + +#[cfg(test)] +mod tests { + use std::sync::mpsc; + use std::sync::Arc; + use std::sync::Barrier; + use std::time::Duration; + + use oak_db::OakDatabase; + use oak_scan::DbScan; + use oak_semantic::library::Library; + + use super::WorldState; + + /// A legacy background snapshot must not pin the oak db against a + /// main-loop mutation. + /// + /// salsa reclaims `&mut` access for a setter by raising the cancellation + /// flag and then blocking on `clones == 1`. That flag only frees a clone + /// whose thread is inside a running query and notices it. A snapshot that + /// sits idle (parked in the indexer queue, or held by a `spawn_blocking` + /// handler blocked in `r_task`) never notices, so the next setter on the + /// owner blocks until the snapshot drops. This test parks a snapshot with + /// no query running and asserts a setter on the owner still completes. + #[test] + fn legacy_snapshot_does_not_pin_oak_against_mutation() { + let mut state = WorldState::new(OakDatabase::new(), Library::new(vec![])); + + let snapshot = state.legacy_snapshot(); + + // Park the snapshot with no salsa query running, then hold it until + // the main thread has finished timing the mutation. + let release = Arc::new(Barrier::new(2)); + let held = { + let release = Arc::clone(&release); + std::thread::spawn(move || { + let _snapshot = snapshot; + release.wait(); + }) + }; + + let (tx, rx) = mpsc::channel(); + let mutator = std::thread::spawn(move || { + state.db.set_library_paths(&[]); + let _ = tx.send(()); + }); + + let completed = rx.recv_timeout(Duration::from_secs(2)).is_ok(); + + // Release the parked snapshot so a blocked mutator can finish and both + // threads join, regardless of the outcome. + release.wait(); + held.join().unwrap(); + mutator.join().unwrap(); + + assert!(completed); + } +} diff --git a/crates/ark/src/lsp/state_handlers.rs b/crates/ark/src/lsp/state_handlers.rs index f75e37f99..92eee1a03 100644 --- a/crates/ark/src/lsp/state_handlers.rs +++ b/crates/ark/src/lsp/state_handlers.rs @@ -64,9 +64,12 @@ use crate::lsp::config::DOCUMENT_SETTINGS; use crate::lsp::config::GLOBAL_SETTINGS; use crate::lsp::document::Document; use crate::lsp::inputs::source_root::SourceRoot; +use crate::lsp::main_loop::dispatch_scan_requests; use crate::lsp::main_loop::DidCloseVirtualDocumentParams; use crate::lsp::main_loop::DidOpenVirtualDocumentParams; +use crate::lsp::main_loop::Event; use crate::lsp::main_loop::LspState; +use crate::lsp::main_loop::TokioUnboundedSender; use crate::lsp::state::workspace_uris; use crate::lsp::state::WorldState; @@ -94,6 +97,7 @@ pub(crate) fn initialize( params: InitializeParams, lsp_state: &mut LspState, state: &mut WorldState, + events_tx: &TokioUnboundedSender, ) -> LspResult { let workspace_uris = effective_workspace_uris(¶ms); lsp_state.capabilities = Capabilities::new(params.capabilities); @@ -139,14 +143,22 @@ pub(crate) fn initialize( } } - // Start first round of indexing. We are initializing, so no documents have - // been opened yet and nothing is editor-owned. - state - .db - .set_workspace_paths(&workspace_paths, &HashSet::new()); + // Start first round of indexing. `state.documents` is empty at init since + // no `didOpen` has fired yet, but build the set through the same shape we + // use elsewhere so the call site reads consistently. + let editor_owned: HashSet = state + .documents + .keys() + .map(|url| UrlId::from_url(url.clone())) + .collect(); + let requests = + lsp_state + .oak_scheduler + .set_workspace_paths(&mut state.db, &workspace_paths, &editor_owned); + dispatch_scan_requests(events_tx, requests); lsp::main_loop::index_start(folders, state.clone()); - Ok(InitializeResult { + let result = InitializeResult { server_info: Some(ServerInfo { name: "Ark R Kernel".to_string(), version: Some(crate::BUILD_VERSION.to_string()), @@ -228,7 +240,9 @@ pub(crate) fn initialize( }), ..ServerCapabilities::default() }, - }) + }; + + Ok(result) } /// Resolve the effective workspace folders from `InitializeParams`. @@ -393,6 +407,8 @@ pub(crate) fn did_rename_files( pub(crate) fn did_change_watched_files( params: DidChangeWatchedFilesParams, state: &mut WorldState, + lsp_state: &mut LspState, + events_tx: &TokioUnboundedSender, ) -> anyhow::Result<()> { // Editor owns the contents of files it has open: Oak should ignore // disk-side events for those URLs. @@ -419,7 +435,11 @@ pub(crate) fn did_change_watched_files( }) .collect(); - state.db.apply_watcher_events(events, &editor_owned); + let requests = + lsp_state + .oak_scheduler + .apply_watcher_events(&mut state.db, events, &editor_owned); + dispatch_scan_requests(events_tx, requests); Ok(()) } @@ -427,6 +447,8 @@ pub(crate) fn did_change_watched_files( pub(crate) fn did_change_workspace_folders( params: DidChangeWorkspaceFoldersParams, state: &mut WorldState, + lsp_state: &mut LspState, + events_tx: &TokioUnboundedSender, ) -> anyhow::Result<()> { let removed: HashSet = params.event.removed.iter().map(|f| f.uri.clone()).collect(); state.workspace.folders.retain(|uri| !removed.contains(uri)); @@ -453,9 +475,11 @@ pub(crate) fn did_change_workspace_folders( .map(|url| UrlId::from_url(url.clone())) .collect(); - state - .db - .set_workspace_paths(&workspace_paths, &editor_owned); + let requests = + lsp_state + .oak_scheduler + .set_workspace_paths(&mut state.db, &workspace_paths, &editor_owned); + dispatch_scan_requests(events_tx, requests); Ok(()) } diff --git a/crates/ark/src/lsp/tests.rs b/crates/ark/src/lsp/tests.rs index 91f5a9996..0bbcd3e82 100644 --- a/crates/ark/src/lsp/tests.rs +++ b/crates/ark/src/lsp/tests.rs @@ -1,5 +1,6 @@ mod find_references; mod goto_definition; +mod main_loop; mod rename; mod state_handlers; mod utils; diff --git a/crates/ark/src/lsp/tests/main_loop.rs b/crates/ark/src/lsp/tests/main_loop.rs new file mode 100644 index 000000000..2f00dabfc --- /dev/null +++ b/crates/ark/src/lsp/tests/main_loop.rs @@ -0,0 +1,105 @@ +//! Integration test that drives the real [`GlobalState`] event loop. +//! +//! Where the handler tests in [`super::state_handlers`] reconstruct the scan +//! pump by hand, this one feeds an event through the production `handle_event` +//! and lets the loop dispatch the scan, run it on a blocking task, route the +//! [`Event::OakScanCompleted`] back, and apply it. So it pins the main loop's +//! own wiring: which arm calls which handler, and the apply-and-redispatch +//! step. The scheduler's policy is unit tested without tokio in `oak_scan`. + +use std::path::Path; + +use oak_db::DbInputs; +use tower_lsp::lsp_types::DidChangeWorkspaceFoldersParams; +use tower_lsp::lsp_types::InitializeParams; +use tower_lsp::lsp_types::InitializeResult; +use tower_lsp::lsp_types::WorkspaceFolder; +use tower_lsp::lsp_types::WorkspaceFoldersChangeEvent; +use tower_lsp::Client; +use tower_lsp::LanguageServer; +use tower_lsp::LspService; +use url::Url; + +use crate::lsp::backend::LspMessage; +use crate::lsp::backend::LspNotification; +use crate::lsp::main_loop::init_aux_for_test; +use crate::lsp::main_loop::Event; +use crate::lsp::main_loop::GlobalState; + +/// Get a real `Client` without a live connection. `LspService::new` hands a +/// `Client` to its init closure; we capture it and drop the service. The +/// client's sends go nowhere, which is fine since the event paths under test +/// never use it. +fn test_client() -> Client { + struct Dummy; + + #[tower_lsp::async_trait] + impl LanguageServer for Dummy { + async fn initialize( + &self, + _: InitializeParams, + ) -> tower_lsp::jsonrpc::Result { + Ok(InitializeResult::default()) + } + async fn shutdown(&self) -> tower_lsp::jsonrpc::Result<()> { + Ok(()) + } + } + + let captured = std::sync::Arc::new(std::sync::Mutex::new(None)); + let sink = std::sync::Arc::clone(&captured); + let (_service, _socket) = LspService::new(move |client| { + *sink.lock().unwrap() = Some(client); + Dummy + }); + + // Bind first so the `MutexGuard` temporary drops at the `;`, not at the + // end of the block. + let client = captured.lock().unwrap().take(); + client.unwrap() +} + +fn write_package(dir: &Path, name: &str, basename: &str, contents: &str) { + std::fs::create_dir_all(dir.join("R")).unwrap(); + std::fs::write( + dir.join("DESCRIPTION"), + format!("Package: {name}\nVersion: 0.0.0\n"), + ) + .unwrap(); + std::fs::write(dir.join("R").join(basename), contents).unwrap(); +} + +/// Drive `didChangeWorkspaceFolders` through the real `handle_event`, including +/// the real `OakScanCompleted` arm, to check that the main loop wires scan +/// dispatch and completion-apply together. +#[tokio::test] +async fn test_workspace_folder_scan_drives_through_main_loop() { + let _aux = init_aux_for_test(); + let mut state = GlobalState::new_test(test_client()); + + let tmp = tempfile::tempdir().unwrap(); + write_package(&tmp.path().join("pkg"), "pkg", "a.R", "x <- 1\n"); + + let params = DidChangeWorkspaceFoldersParams { + event: WorkspaceFoldersChangeEvent { + added: vec![WorkspaceFolder { + uri: Url::from_file_path(tmp.path()).unwrap(), + name: String::new(), + }], + removed: vec![], + }, + }; + state + .handle_event_to_quiescence(Event::Lsp(LspMessage::Notification( + LspNotification::DidChangeWorkspaceFolders(params), + ))) + .await; + + let db = &state.world().db; + let roots = db.workspace_roots().roots(db).clone(); + assert_eq!(roots.len(), 1); + let packages = roots[0].packages(db); + assert_eq!(packages.len(), 1); + assert_eq!(packages[0].name(db), "pkg"); + assert_eq!(packages[0].files(db).len(), 1); +} diff --git a/crates/ark/src/lsp/tests/state_handlers.rs b/crates/ark/src/lsp/tests/state_handlers.rs index dcfdbaeaa..ed9de60a9 100644 --- a/crates/ark/src/lsp/tests/state_handlers.rs +++ b/crates/ark/src/lsp/tests/state_handlers.rs @@ -8,11 +8,14 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fs; use std::path::Path; +use std::path::PathBuf; use aether_url::UrlId; use oak_db::Db; use oak_db::DbInputs; use oak_scan::DbScan; +use oak_scan::ScanRequest; +use oak_scan::ScanScheduler; use tower_lsp::lsp_types::DidChangeWatchedFilesParams; use tower_lsp::lsp_types::DidChangeWorkspaceFoldersParams; use tower_lsp::lsp_types::DidCloseTextDocumentParams; @@ -26,15 +29,128 @@ use url::Url; use crate::lsp::capabilities::Capabilities; use crate::lsp::document::Document; +use crate::lsp::main_loop::dispatch_scan_requests; use crate::lsp::main_loop::init_aux_for_test; use crate::lsp::main_loop::AuxiliaryEvent; +use crate::lsp::main_loop::Event; use crate::lsp::main_loop::LspState; +use crate::lsp::main_loop::TokioUnboundedSender; use crate::lsp::state::WorldState; -use crate::lsp::state_handlers::did_change_watched_files; -use crate::lsp::state_handlers::did_change_workspace_folders; use crate::lsp::state_handlers::did_close; use crate::lsp::state_handlers::effective_workspace_uris; +/// Local sync wrappers around the async-shaped scheduler API. Tests +/// don't need the timing flexibility, so each operation kicks off +/// any scans, drains them on the current thread, and returns. Each +/// call constructs a fresh `LspState` (which owns the scheduler) +/// because tests assert post-quiescent state; carrying scheduler +/// state across calls only matters for mid-flight timing assertions, +/// which live in `oak_scan`'s scheduler tests. +fn set_workspace_paths(state: &mut WorldState, paths: &[PathBuf], editor_owned: &HashSet) { + let mut lsp_state = test_lsp_state(); + let reqs = lsp_state + .oak_scheduler + .set_workspace_paths(&mut state.db, paths, editor_owned); + drain( + &mut state.db, + &mut lsp_state.oak_scheduler, + reqs, + editor_owned, + ); +} + +fn editor_owned_of(state: &WorldState) -> HashSet { + state + .documents + .keys() + .map(|u| UrlId::from_url(u.clone())) + .collect() +} + +fn did_change_watched_files( + params: DidChangeWatchedFilesParams, + state: &mut WorldState, +) -> anyhow::Result<()> { + let mut lsp_state = test_lsp_state(); + run_handler_to_quiescence(state, &mut lsp_state, |state, lsp_state, events_tx| { + crate::lsp::state_handlers::did_change_watched_files(params, state, lsp_state, events_tx) + }) +} + +fn did_change_workspace_folders( + params: DidChangeWorkspaceFoldersParams, + state: &mut WorldState, +) -> anyhow::Result<()> { + let mut lsp_state = test_lsp_state(); + run_handler_to_quiescence(state, &mut lsp_state, |state, lsp_state, events_tx| { + crate::lsp::state_handlers::did_change_workspace_folders( + params, state, lsp_state, events_tx, + ) + }) +} + +/// Drive a production handler that dispatches its scans through `events_tx`, +/// then pump the resulting `OakScanCompleted` events to quiescence on a local +/// runtime. Production does this pumping in the main loop's event handler; +/// the tests have to stand up the same machinery (tokio runtime so +/// `spawn_blocking` works, aux channel so `send_auxiliary` doesn't panic, an +/// events channel to receive completions). +fn run_handler_to_quiescence( + state: &mut WorldState, + lsp_state: &mut LspState, + handler: F, +) -> anyhow::Result<()> +where + F: FnOnce(&mut WorldState, &mut LspState, &TokioUnboundedSender) -> anyhow::Result<()>, +{ + let _aux = init_aux_for_test(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + let (events_tx, mut events_rx) = tokio::sync::mpsc::unbounded_channel::(); + + rt.block_on(async { + handler(state, lsp_state, &events_tx)?; + let editor_owned = editor_owned_of(state); + while lsp_state.oak_scheduler.has_pending_scans() { + let Some(Event::OakScanCompleted(scan)) = events_rx.recv().await else { + break; + }; + let followups = + lsp_state + .oak_scheduler + .apply_scan_completed(&mut state.db, scan, &editor_owned); + dispatch_scan_requests(&events_tx, followups); + } + Ok(()) + }) +} + +fn test_lsp_state() -> LspState { + LspState { + parsers: HashMap::new(), + capabilities: Capabilities::default(), + console_notification_tx: tokio::sync::mpsc::unbounded_channel().0, + oak_scheduler: ScanScheduler::new(), + } +} + +/// Inline drain loop: oak_scan keeps its equivalent crate-private so +/// it can't leak into production callers. The implementation here is +/// just `ScanRequest::run` + `apply_scan_completed` until the request queue +/// empties. +fn drain( + db: &mut oak_db::OakDatabase, + scheduler: &mut ScanScheduler, + mut requests: Vec, + editor_owned: &HashSet, +) { + while let Some(req) = requests.pop() { + let result = req.run(); + requests.extend(scheduler.apply_scan_completed(db, result, editor_owned)); + } +} + fn write_package(dir: &Path, name: &str, r_files: &[(&str, &str)]) { fs::create_dir_all(dir.join("R")).unwrap(); fs::write( @@ -60,9 +176,7 @@ fn workspace_state(workspace: &Path) -> WorldState { .workspace .folders .push(Url::from_file_path(workspace).unwrap()); - state - .db - .set_workspace_paths(&[workspace.to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut state, &[workspace.to_path_buf()], &HashSet::new()); state } @@ -407,7 +521,8 @@ fn test_did_change_workspace_folders_removes_folder() { .workspace .folders .push(Url::from_file_path(second.path()).unwrap()); - state.db.set_workspace_paths( + set_workspace_paths( + &mut state, &[first.path().to_path_buf(), second.path().to_path_buf()], &HashSet::new(), ); @@ -523,16 +638,10 @@ fn test_did_close_releases_orphan_file_to_stale() { // to orphan, editor-owned) → close → file leaves orphan, lands in // stale. Without the `close_editor` hook in `did_close`, the file // would zombie in orphan with the editor's last content. - let mut aux_rx = init_aux_for_test(); - let tmp = tempfile::tempdir().unwrap(); write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); let mut state = workspace_state(tmp.path()); - let mut lsp_state = LspState { - parsers: HashMap::new(), - capabilities: Capabilities::default(), - console_notification_tx: tokio::sync::mpsc::unbounded_channel().0, - }; + let mut lsp_state = test_lsp_state(); let r_path = tmp.path().join("pkg/R/a.R"); let url = Url::from_file_path(&r_path).unwrap(); @@ -559,6 +668,11 @@ fn test_did_close_releases_orphan_file_to_stale() { let file = state.db.file_by_url(&url_id).unwrap(); assert!(state.db.orphan_root().files(&state.db).contains(&file)); + // Init the aux channel here, after the workspace-folders churn: the + // handler wrapper resets the channel each call (it stands up its own to + // satisfy `spawn_blocking`), so grab the receiver only once that's done. + let mut aux_rx = init_aux_for_test(); + // Now close the buffer. File should move from orphan to stale. let params = DidCloseTextDocumentParams { text_document: TextDocumentIdentifier { uri: url.clone() }, diff --git a/crates/oak_ide/tests/integration/support.rs b/crates/oak_ide/tests/integration/support.rs new file mode 100644 index 000000000..795d126d1 --- /dev/null +++ b/crates/oak_ide/tests/integration/support.rs @@ -0,0 +1,135 @@ +//! Shared test helpers for the oak_ide integration suite. + +use aether_path::FilePath; +use biome_rowan::TextRange; +use biome_rowan::TextSize; +use oak_db::DbInputs; +use oak_db::File; +use oak_db::OakDatabase; +use oak_db::Package; +use oak_db::Root; +use oak_db::RootKind; +use oak_ide::FileRange; +use oak_package_metadata::namespace::Namespace; +use oak_scan::DbScan; +use salsa::Setter; +use stdext::SortedVec; +use url::Url; + +pub fn file_url(name: &str) -> Url { + // `Url::to_file_path` on Windows requires a drive-letter prefix, so + // synthesize one for tests. Linux is happy with rootless paths. + if cfg!(windows) { + Url::parse(&format!("file:///C:/project/R/{name}")).unwrap() + } else { + Url::parse(&format!("file:///project/R/{name}")).unwrap() + } +} + +pub fn lib_url(name: &str) -> Url { + Url::parse(&format!("file:///library/{name}")).unwrap() +} + +pub fn workspace_url(name: &str) -> Url { + Url::parse(&format!("file:///workspace/{name}")).unwrap() +} + +pub fn upsert(db: &mut OakDatabase, name: &str, contents: &str) -> File { + db.upsert_editor(FilePath::from_url(&file_url(name)), contents.to_string()) +} + +pub fn offset(n: u32) -> TextSize { + TextSize::from(n) +} + +pub fn range(start: u32, end: u32) -> TextRange { + TextRange::new(TextSize::from(start), TextSize::from(end)) +} + +/// Project results to in-file ranges (single-file tests). +pub fn ranges(refs: &[FileRange]) -> Vec { + refs.iter().map(|r| r.range).collect() +} + +/// Project results to `(file, range)` pairs (cross-file tests). +pub fn pairs(refs: &[FileRange]) -> Vec<(File, TextRange)> { + refs.iter().map(|r| (r.file, r.range)).collect() +} + +/// Install `name` as a library package exporting `exports`, with one file at +/// `R/{file_name}`. Returns the package file. +pub fn install_library_package( + db: &mut OakDatabase, + name: &str, + exports: &[&str], + file_name: &str, + contents: &str, +) -> File { + install_pkg(db, RootKind::Library, name, exports, file_name, contents) +} + +/// Install `name` as a workspace package exporting `exports`, with one file at +/// `R/{file_name}`. Returns the package file. +pub fn install_workspace_package( + db: &mut OakDatabase, + name: &str, + exports: &[&str], + file_name: &str, + contents: &str, +) -> File { + install_pkg(db, RootKind::Workspace, name, exports, file_name, contents) +} + +fn install_pkg( + db: &mut OakDatabase, + kind: RootKind, + name: &str, + exports: &[&str], + file_name: &str, + contents: &str, +) -> File { + let (pkg_url, file_url, root_url, version) = match kind { + RootKind::Library => ( + lib_url(&format!("{name}/DESCRIPTION")), + lib_url(&format!("{name}/R/{file_name}")), + lib_url(name), + Some("1.0.0".to_string()), + ), + RootKind::Workspace => ( + workspace_url(&format!("{name}/DESCRIPTION")), + workspace_url(&format!("{name}/R/{file_name}")), + workspace_url(name), + None, + ), + }; + let namespace = Namespace { + exports: SortedVec::from_vec(exports.iter().map(|s| s.to_string()).collect()), + ..Default::default() + }; + let pkg = Package::new( + db, + FilePath::from_url(&pkg_url), + name.to_string(), + version, + namespace, + Vec::new(), + Vec::new(), + None, + ); + let file = File::new( + db, + FilePath::from_url(&file_url), + oak_db::FileRevision::zero(), + Some(contents.to_string()), + Some(pkg), + ); + pkg.set_files(db).to(vec![file]); + let root = Root::new(db, FilePath::from_url(&root_url), kind, Vec::new(), vec![ + pkg, + ]); + match kind { + RootKind::Library => db.library_roots().set_roots(db).to(vec![root]), + RootKind::Workspace => db.workspace_roots().set_roots(db).to(vec![root]), + }; + file +} diff --git a/crates/oak_scan/src/inputs.rs b/crates/oak_scan/src/inputs.rs index 2610e8a29..41da0b491 100644 --- a/crates/oak_scan/src/inputs.rs +++ b/crates/oak_scan/src/inputs.rs @@ -33,9 +33,6 @@ use crate::lookup::package_by_url; use crate::stale::remove_from_stale_files; use crate::stale::remove_from_stale_packages; use crate::stale::stale_file_by_url; -use crate::watch; -use crate::watch::FileEvent; -use crate::workspace; /// Description of one R file the scanner wants to register. /// @@ -56,6 +53,10 @@ pub struct FileEntry { /// Extension methods on the database for scanner orchestration and /// placement-aware updates that don't have a natural `Root` receiver. +/// +/// Workspace-level orchestration (path diff, watcher dispatch, rescan +/// coalescing) lives on [`crate::ScanScheduler`] instead, since it +/// needs scheduler state that can't be kept in salsa inputs. pub trait DbScan: Db + DbInputs { /// Reconcile `LibraryRoots` to exactly `paths`. /// @@ -71,25 +72,18 @@ pub trait DbScan: Db + DbInputs { /// /// Order in `LibraryRoots.roots` follows `paths`, matching R's /// `.libPaths()` precedence. - fn set_library_paths(&mut self, paths: &[PathBuf]); - - /// Reconcile `WorkspaceRoots` to exactly `paths`. - /// - /// - Paths already present as a `Root`: untouched. No fs walk, no salsa - /// churn. The file watcher handles in-folder changes. - /// - /// - New paths: scanned (`DESCRIPTION` files at any depth, honouring - /// `.gitignore`, plus top-level R scripts) and added. /// - /// - Removed paths: their `Root` is evicted. Files whose URLs are in - /// `editor_owned` move to [`oak_db::OrphanRoot`] (analysis-visible: the - /// buffer is still open). Everything else moves to [`oak_db::StaleRoot`] - /// for entity reuse if the path comes back. - fn set_workspace_paths(&mut self, paths: &[PathBuf], editor_owned: &HashSet); - - /// Rescan one workspace root. Used as the coarse fallback when - /// `DESCRIPTION` events change the package classification of a directory. - fn rescan_workspace_root(&mut self, root: Root); + /// **Why this is sync while workspaces go through + /// [`crate::ScanScheduler`].** Libraries are scanned exactly + /// once at LSP init today. Workspaces churn (folders open and + /// close at any time) and have a file watcher pushing events + /// mid-scan, so the workspace path needs the buffering / + /// stale-result machinery the scheduler exists for. Libraries + /// have neither, so the extra plumbing buys nothing. If + /// `.libPaths()` ever becomes mutable mid-session (e.g. user + /// runs `.libPaths(...)` in the console), this should join + /// the scheduler. + fn set_library_paths(&mut self, paths: &[PathBuf]); /// Upsert the editor's view of a file. Used by the LSP layer to apply /// `didOpen` / `didChange` content for any URL the editor touches. @@ -112,31 +106,14 @@ pub trait DbScan: Db + DbInputs { /// /// If the file lives in [`OrphanRoot`] (placed there by /// [`Self::upsert_editor`] because the URL didn't belong to a live root, or - /// by `set_workspace_paths()` eviction routing for an open buffer in a - /// removed workspace), it gets moved to [`StaleRoot`]. Future + /// by workspace eviction routing for an open buffer in a removed + /// workspace), it gets moved to [`StaleRoot`]. Future /// [`Self::upsert_editor`] for the same URL resurrects the entity from /// stale instead of minting a fresh one. /// /// If the file is in a live workspace / library container, the call is a /// no-op. fn close_editor(&mut self, url: &UrlId); - - /// React to a Created or Changed watcher event on an R file. Classifies the - /// URL against the current workspace tree and either creates a new `File` - /// or updates an existing one's content. Files outside every workspace, or - /// inside a package's non-`R/` subdir, are skipped. - fn add_watched_file(&mut self, url: UrlId, contents: String); - - /// React to a Deleted watcher event. Unlinks the file from whichever - /// container holds it (package files, root scripts, or orphan). - fn remove_watched_file(&mut self, url: UrlId); - - /// Apply a batch of file-watcher events. Routes DESCRIPTION events to a - /// coarse rescan of the containing workspace root (deduped within the - /// batch), and R-file events to per-file add / remove. URLs in `editor_owned` are - /// left alone, so callers can defer to an in-memory source of truth (e.g. - /// the editor's open buffers). - fn apply_watcher_events(&mut self, events: Vec, editor_owned: &HashSet); } impl DbScan for DB { @@ -144,14 +121,6 @@ impl DbScan for DB { crate::library::set_library_paths(self, paths); } - fn set_workspace_paths(&mut self, paths: &[PathBuf], editor_owned: &HashSet) { - crate::workspace::set_workspace_paths(self, paths, editor_owned); - } - - fn rescan_workspace_root(&mut self, root: Root) { - workspace::rescan_workspace_root(self, root); - } - fn upsert_editor(&mut self, url: UrlId, contents: String) -> File { if let Some(existing) = self.file_by_url(&url) { existing.set_contents(self).to(contents); @@ -192,18 +161,6 @@ impl DbScan for DB { stale.set_files(self).to(stale_files); } } - - fn add_watched_file(&mut self, url: UrlId, contents: String) { - watch::add_watched_file(self, url, contents); - } - - fn remove_watched_file(&mut self, url: UrlId) { - watch::remove_watched_file(self, url); - } - - fn apply_watcher_events(&mut self, events: Vec, editor_owned: &HashSet) { - watch::apply_watcher_events(self, events, editor_owned); - } } /// Extension methods on [`Root`] for placement-aware updates. diff --git a/crates/oak_scan/src/lib.rs b/crates/oak_scan/src/lib.rs index 82413b506..6c2347e50 100644 --- a/crates/oak_scan/src/lib.rs +++ b/crates/oak_scan/src/lib.rs @@ -26,9 +26,9 @@ mod inputs; mod library; mod lookup; mod packages; +mod scheduler; mod stale; mod watch; -mod workspace; #[cfg(test)] mod tests; @@ -36,5 +36,8 @@ mod tests; pub use inputs::DbScan; pub use inputs::FileEntry; pub use inputs::RootExt; +pub use scheduler::ScanCompleted; +pub use scheduler::ScanRequest; +pub use scheduler::ScanScheduler; pub use watch::FileEvent; pub use watch::FileEventKind; diff --git a/crates/oak_scan/src/scheduler.rs b/crates/oak_scan/src/scheduler.rs new file mode 100644 index 000000000..706f91c4f --- /dev/null +++ b/crates/oak_scan/src/scheduler.rs @@ -0,0 +1,483 @@ +//! Async-friendly coordinator for workspace scanning. +//! +//! Architecture: serial main loop, off-loop async scanners. Scheduler +//! state and salsa inputs are mutated only on the main loop, one event +//! at a time. The expensive scan work (filesystem walk, DESCRIPTION +//! parsing, R file reads) runs on a task pool. The two never touch the +//! same data, so the race-handling below is about event *ordering*, +//! not concurrent access. Same shape `rust-analyzer` and `ty` use to +//! keep `initialize` and `didChangeWorkspaceFolders` from blocking the +//! editor. +//! +//! [`ScanScheduler`] owns the *policy* (when to scan, how to handle +//! events arriving mid-scan), not the runtime. Drivers take a +//! [`ScanRequest`] from the scheduler, run it via [`ScanRequest::run`] +//! on whatever task pool they like, and hand the [`ScanCompleted`] to +//! [`ScanScheduler::apply_scan_completed`]. Tests do the same on the +//! current thread via `drain_scheduler`. +//! +//! # Race surface +//! +//! Three things race once scans are async: the in-flight scan, watcher +//! events for files inside that scan's root, and editor events. They're +//! handled like this: +//! +//! - **didOpen / didChange during scan.** No special handling. The file +//! lands in `OrphanRoot` via [`crate::DbScan::upsert_editor`]. When the +//! scan applies, `upsert_root_file()` finds the existing entity, +//! promotes it into the right container, and leaves its contents +//! alone (the buffer wins over the disk read). +//! +//! - **didClose during scan.** The orphan entity moves to stale. The +//! scan's `upsert_root_file` then resurrects it from stale, restoring +//! the disk contents the scanner read. +//! +//! - **Watcher events during scan.** R-file events for a pending root +//! get buffered here and replayed after the scan applies. DESCRIPTION +//! events flip the root into [`ScanState::ScanningWithRescanQueued`] +//! so a follow-up scan kicks off after the current one finishes, the +//! buffered events ride along until the root is finally idle, then +//! drain in one batch. +//! +//! - **Stale results.** If the workspace folder is removed while its +//! scan is in flight, the result arrives carrying a `Root` that's no +//! longer in `workspace_roots`. [`ScanScheduler::apply_scan_completed`] +//! silently drops it. The `Root` salsa entity stays as a leak (no +//! GC), but nothing references it. +//! +//! - **Folder added back during in-flight scan.** Each add mints a +//! fresh `Root`, not a revival of the staled one. That way the +//! first scan's stale result drops on identity check instead of +//! clobbering the second scan's empty state. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::path::PathBuf; + +use aether_url::UrlId; +use oak_db::Db; +use oak_db::DbInputs; +use oak_db::Package; +use oak_db::Root; +use oak_db::RootKind; +use salsa::Setter; +use stdext::result::ResultExt; + +use crate::inputs::FileEntry; +use crate::inputs::RootExt; +use crate::packages::scan_workspace_packages; +use crate::packages::scan_workspace_scripts; +use crate::packages::PackageEntry; +use crate::watch::add_watched_file; +use crate::watch::remove_watched_file; +use crate::watch::FileEvent; +use crate::watch::FileEventKind; + +/// One scan unit the caller should dispatch. +/// +/// Returned from every [`ScanScheduler`] method that can kick off a scan. The +/// caller calls [`ScanRequest::run`] on a worker thread and ships the +/// [`ScanCompleted`] back to [`ScanScheduler::apply_scan_completed`]. +#[derive(Clone, Debug)] +#[must_use = "scan requests are dispatched by the caller"] +pub struct ScanRequest { + pub root: Root, + pub path: PathBuf, +} + +impl ScanRequest { + /// Run the scan synchronously. No db access, safe to call from any + /// thread. Production drivers run this on a task pool; tests call + /// it directly. + pub fn run(self) -> ScanCompleted { + let packages = scan_workspace_packages(&self.path); + let scripts = scan_workspace_scripts(&self.path); + ScanCompleted { + root: self.root, + packages, + scripts, + } + } +} + +/// Output of [`ScanRequest::run`]. Opaque payload carried from the scan back +/// to [`ScanScheduler::apply_scan_completed`]. +#[derive(Debug)] +pub struct ScanCompleted { + root: Root, + packages: Vec, + scripts: Vec, +} + +impl ScanCompleted { + /// Push the scan's output into salsa inputs for `self.root`. + /// + /// Atomic full-replacement of the root's packages and scripts. + /// Existing `File` and `Package` entities are reused by URL where + /// possible (see [`RootExt::set_package`] / + /// [`RootExt::set_workspace_scripts`]), so a rescan that doesn't + /// actually change anything is a no-op as far as downstream salsa + /// caches are concerned. + fn apply(self, db: &mut DB) { + let ScanCompleted { + root, + packages, + scripts, + } = self; + + let package_entities: Vec = packages + .into_iter() + .map(|pkg| { + root.set_package( + db, + pkg.description_url, + pkg.name, + pkg.version, + pkg.namespace, + pkg.files, + pkg.scripts, + pkg.collation, + ) + }) + .collect(); + + root.set_packages(db).to(package_entities); + root.set_workspace_scripts(db, scripts); + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ScanState { + Scanning, + ScanningWithRescanQueued, +} + +/// Coordinator for asynchronous workspace scanning. +/// +/// Tracks which roots have a scan in flight, buffers R-file watcher +/// events for those roots, and coalesces follow-up scan requests. See +/// the module docs for the race-handling design. +#[derive(Debug, Default)] +pub struct ScanScheduler { + state: HashMap, + buffered: HashMap>, +} + +impl ScanScheduler { + pub fn new() -> Self { + Self::default() + } + + /// Whether any scan is currently in flight. + pub fn has_pending_scans(&self) -> bool { + !self.state.is_empty() + } + + /// Reconcile `WorkspaceRoots` to exactly `paths`. Returns one + /// [`ScanRequest`] per newly-added path; unchanged paths skip the + /// rescan, removed paths are evicted via + /// [`RootExt::set_stale`]. + /// + /// New roots are inserted into `workspace_roots` empty so watcher + /// events can find them while their scan is in flight (the events + /// land in [`ScanScheduler`]'s buffer until the scan applies). + pub fn set_workspace_paths( + &mut self, + db: &mut DB, + paths: &[PathBuf], + editor_owned: &HashSet, + ) -> Vec { + let new: Vec<(PathBuf, UrlId)> = paths + .iter() + .filter_map(|p| Some((p.clone(), UrlId::from_file_path(p).ok()?))) + .collect(); + let new_urls: HashSet = new.iter().map(|(_, u)| u.clone()).collect(); + + let old: HashMap = db + .workspace_roots() + .roots(db) + .iter() + .map(|r| (r.path(db).clone(), *r)) + .collect(); + + for (old_url, &old_root) in &old { + if !new_urls.contains(old_url) { + old_root.set_stale(db, Some(editor_owned)); + self.state.remove(&old_root); + self.buffered.remove(&old_root); + } + } + + let mut new_roots = Vec::with_capacity(new.len()); + let mut requests = Vec::new(); + for (path, url) in new { + let root = match old.get(&url) { + Some(&r) => r, + None => { + let root = Root::new(db, url, RootKind::Workspace, Vec::new(), Vec::new()); + self.state.insert(root, ScanState::Scanning); + requests.push(ScanRequest { root, path }); + root + }, + }; + new_roots.push(root); + } + db.workspace_roots().set_roots(db).to(new_roots); + + requests + } + + /// Apply a batch of file-watcher events. + /// + /// Per-event routing: + /// - `DESCRIPTION` events trigger a rescan of the containing root. + /// If the root is idle, it transitions to `Scanning` and a + /// [`ScanRequest`] is returned. If it's already pending, it + /// transitions to `ScanningWithRescanQueued` and the queued + /// scan kicks off when `apply_scan_completed()` runs. + /// - R-file events for an idle root apply surgically (the watcher's + /// single-file fast path). + /// - R-file events for a pending root are buffered and replayed + /// after the scan applies, so they don't get dropped against an + /// empty `Root`. + /// - URLs in `skip` are left alone, letting drivers defer to an + /// in-memory source of truth (e.g. the editor's open buffers). + pub fn apply_watcher_events( + &mut self, + db: &mut DB, + events: Vec, + skip: &HashSet, + ) -> Vec { + let roots = workspace_root_paths(db); + let mut requests = Vec::new(); + + // Pass 1: DESCRIPTION events. Mark each affected root as needing a + // rescan before any R-file event runs, so an R-file event in the + // same batch correctly sees the root as pending and buffers + // instead of applying surgically against a transient world. + let mut description_roots: HashSet = HashSet::new(); + for event in &events { + let Ok(path) = event.url.to_file_path() else { + continue; + }; + if path.file_name().is_some_and(|name| name == "DESCRIPTION") { + if let Some(root) = roots + .iter() + .find(|(root_path, _)| path.starts_with(root_path)) + .map(|(_, root)| *root) + { + description_roots.insert(root); + } + } + } + for root in description_roots { + if let Some(req) = self.request_rescan(db, root) { + requests.push(req); + } + } + + // Pass 2: R-file events. + for event in events { + let Ok(path) = event.url.to_file_path() else { + continue; + }; + if path.file_name().is_some_and(|name| name == "DESCRIPTION") { + continue; + } + if skip.contains(&event.url) { + continue; + } + + let root = roots + .iter() + .find(|(root_path, _)| path.starts_with(root_path)) + .map(|(_, root)| *root); + + match root { + Some(root) if self.state.contains_key(&root) => { + self.buffered.entry(root).or_default().push(event); + }, + // No in-flight scan for this root: apply the event directly, + // the watcher's single-file fast path. + _ => match event.kind { + FileEventKind::Created | FileEventKind::Changed => { + match std::fs::read_to_string(&path) { + Ok(contents) => add_watched_file(db, event.url, contents), + Err(err) => { + log::warn!("Skipped watched file {}: {err:?}", path.display()) + }, + } + }, + FileEventKind::Deleted => remove_watched_file(db, event.url), + }, + } + } + + requests + } + + /// Apply a [`ScanCompleted`] produced by [`ScanRequest::run`]. + /// + /// Drops the result silently if `result.root` is no longer in + /// `workspace_roots` (the user removed the folder while its scan was in + /// flight). Otherwise updates the root's packages and scripts, then handles + /// the post-apply state: + /// + /// - `Scanning`: state cleared, buffered events drained through + /// `apply_watcher_events()` (which may itself return new requests). + /// - `ScanningWithRescanQueued`: fresh `ScanRequest` returned, state stays + /// `Scanning`, buffer preserved for the next round. + /// - Untracked (`None`): unexpected, since dispatch always marks the root + /// `Scanning`. Logged as a warning, the result is still applied. + pub fn apply_scan_completed( + &mut self, + db: &mut DB, + result: ScanCompleted, + editor_owned: &HashSet, + ) -> Vec { + let root = result.root; + + let live = db.workspace_roots().roots(db).contains(&root); + if !live { + // Workspace folder removed while we were scanning. Drop the + // result and any buffered events for this root. + self.state.remove(&root); + self.buffered.remove(&root); + return Vec::new(); + } + + result.apply(db); + + let prior = self.state.remove(&root); + match prior { + Some(ScanState::ScanningWithRescanQueued) => { + // A rescan was queued mid-scan. Resolve its path before + // re-marking the root `Scanning`: a path we can't resolve must + // not leave the root `Scanning` with no scan in flight, or it + // would stay pending forever. On success the buffer rides along + // and replays when the requeued scan finishes. On failure we + // fall back to the idle drain. + match root.path(db).to_file_path().warn_on_err() { + Some(path) => { + self.state.insert(root, ScanState::Scanning); + vec![ScanRequest { root, path }] + }, + None => self.drain_buffered(db, root, editor_owned), + } + }, + // We're now idle. Drain any buffered events through the normal path. + Some(ScanState::Scanning) => self.drain_buffered(db, root, editor_owned), + None => { + // A completion for a root we weren't tracking as scanning. + // Every dispatched scan marks its root `Scanning`, so reaching + // here means our state diverged from the in-flight work. The + // result is already applied. Since buffered events only + // accumulate against a tracked root, there's nothing to drain. + log::warn!( + "Applied a `ScanCompleted` for an untracked root: {path:?}", + path = root.path(db) + ); + Vec::new() + }, + } + } + + /// Replay the watcher events buffered for `root` while its scan was in + /// flight, now that the root is idle. Routes them through + /// [`Self::apply_watcher_events`], which may itself return fresh requests. + fn drain_buffered( + &mut self, + db: &mut DB, + root: Root, + editor_owned: &HashSet, + ) -> Vec { + match self.buffered.remove(&root) { + Some(buffered) => self.apply_watcher_events(db, buffered, editor_owned), + None => Vec::new(), + } + } + + fn request_rescan( + &mut self, + db: &mut DB, + root: Root, + ) -> Option { + match self.state.get(&root) { + Some(ScanState::Scanning) => { + self.state.insert(root, ScanState::ScanningWithRescanQueued); + None + }, + Some(ScanState::ScanningWithRescanQueued) => None, + None => { + let path = root.path(db).to_file_path().warn_on_err()?; + self.state.insert(root, ScanState::Scanning); + Some(ScanRequest { root, path }) + }, + } + } +} + +/// Run every request on the current thread, feeding results back +/// until no further rescans are queued. +/// +/// Test-only: production drivers spawn each request on a task pool +/// so the LSP handler doesn't block. Crate-private and `cfg(test)` +/// so this can't leak into a production caller; out-of-crate tests +/// that need a synchronous drainer write their own 3-line loop over +/// [`ScanRequest::run`] + [`ScanScheduler::apply_scan_completed`]. +#[cfg(test)] +pub(crate) fn drain_scheduler( + db: &mut DB, + scheduler: &mut ScanScheduler, + mut requests: Vec, + editor_owned: &HashSet, +) { + while let Some(req) = requests.pop() { + let result = req.run(); + requests.extend(scheduler.apply_scan_completed(db, result, editor_owned)); + } +} + +fn workspace_root_paths(db: &DB) -> Vec<(PathBuf, Root)> { + db.workspace_roots() + .roots(db) + .iter() + .filter_map(|root| Some((root.path(db).to_file_path().warn_on_err()?, *root))) + .collect() +} + +#[cfg(test)] +mod tests { + use oak_db::OakDatabase; + + use super::*; + + /// A root whose URL has no filesystem path (e.g. an `untitled:` buffer) + /// can't produce a `ScanRequest`. When such a root is + /// `ScanningWithRescanQueued` and its scan completes, the scheduler must + /// not leave it `Scanning` with nothing in flight, or it would stay pending + /// forever. This state is unreachable through the public API (workspace + /// roots always come from `from_file_path`), so we build it by hand. + #[test] + fn test_unresolvable_rescan_path_does_not_strand_root_scanning() { + let mut db = OakDatabase::new(); + let url = UrlId::parse("untitled:Untitled-1").unwrap(); + let root = Root::new(&db, url, RootKind::Workspace, Vec::new(), Vec::new()); + db.workspace_roots().set_roots(&mut db).to(vec![root]); + + let mut scheduler = ScanScheduler::new(); + scheduler + .state + .insert(root, ScanState::ScanningWithRescanQueued); + + let result = ScanCompleted { + root, + packages: Vec::new(), + scripts: Vec::new(), + }; + let requests = scheduler.apply_scan_completed(&mut db, result, &HashSet::new()); + + assert!(requests.is_empty()); + assert!(!scheduler.has_pending_scans()); + } +} diff --git a/crates/oak_scan/src/tests.rs b/crates/oak_scan/src/tests.rs index 52df9fa02..06aeca412 100644 --- a/crates/oak_scan/src/tests.rs +++ b/crates/oak_scan/src/tests.rs @@ -1 +1,4 @@ +mod scheduler; mod stale; +mod watch; +mod workspace; diff --git a/crates/oak_scan/src/tests/scheduler.rs b/crates/oak_scan/src/tests/scheduler.rs new file mode 100644 index 000000000..7f75c47b5 --- /dev/null +++ b/crates/oak_scan/src/tests/scheduler.rs @@ -0,0 +1,273 @@ +//! Tests for the async-shape behavior of [`crate::ScanScheduler`]. +//! +//! Unlike the workspace/watch tests (which drain the scheduler in a +//! single shot), these tests pause between stages: spawn the scan but +//! don't run it yet, fire other events in the middle, then run the +//! scan, then assert. That's the only way to exercise the buffering +//! and stale-result drop paths that exist precisely to handle work +//! arriving mid-scan. + +use std::collections::HashSet; +use std::fs; +use std::path::Path; + +use aether_url::UrlId; +use oak_db::Db; +use oak_db::DbInputs; +use oak_db::OakDatabase; + +use crate::lookup::package_by_url; +use crate::scheduler::drain_scheduler; +use crate::FileEvent; +use crate::FileEventKind; +use crate::ScanScheduler; + +fn write_package(dir: &Path, name: &str, r_files: &[(&str, &str)]) { + fs::create_dir_all(dir.join("R")).unwrap(); + fs::write( + dir.join("DESCRIPTION"), + format!("Package: {name}\nVersion: 0.0.0\n"), + ) + .unwrap(); + for (basename, contents) in r_files { + fs::write(dir.join("R").join(basename), contents).unwrap(); + } +} + +#[test] +fn test_stale_result_dropped_when_root_removed_mid_scan() { + // Spawn a scan, remove the workspace folder before the scan + // applies, then apply the result. The scan output should be + // silently discarded since `result.root` is no longer in + // `workspace_roots`. + let tmp = tempfile::tempdir().unwrap(); + write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); + let mut db = OakDatabase::new(); + let mut scheduler = ScanScheduler::new(); + + let mut requests = + scheduler.set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); + assert_eq!(requests.len(), 1); + let req = requests.pop().unwrap(); + let dead_root = req.root; + + // User removes the folder while the scan is still in flight. + let evict = scheduler.set_workspace_paths(&mut db, &[], &HashSet::new()); + assert!(evict.is_empty()); + assert!(!db.workspace_roots().roots(&db).contains(&dead_root)); + + // Scan finally completes. Result should drop. + let result = req.run(); + let followups = scheduler.apply_scan_completed(&mut db, result, &HashSet::new()); + assert!(followups.is_empty()); + + // The package the scan would have created shouldn't surface. + let pkg_url = UrlId::from_file_path(tmp.path().join("pkg/DESCRIPTION")).unwrap(); + assert!(package_by_url(&db, &pkg_url).is_none()); +} + +#[test] +fn test_remove_then_readd_during_scan_uses_distinct_root_entities() { + // The stale-result drop hinges on `Root` entity identity, not path. + // After remove + re-add of the same path, the second add mints a + // fresh `Root`; the first scan's result keys off the now-dead + // first `Root` and gets dropped. + let tmp = tempfile::tempdir().unwrap(); + write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); + let mut db = OakDatabase::new(); + let mut scheduler = ScanScheduler::new(); + + let first = scheduler + .set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()) + .pop() + .unwrap(); + let root_a = first.root; + + // Folder removed. + scheduler.set_workspace_paths(&mut db, &[], &HashSet::new()); + + // Folder re-added: distinct `Root` entity. + let second = scheduler + .set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()) + .pop() + .unwrap(); + let root_b = second.root; + assert_ne!(root_a, root_b); + + // First scan's result lands on the dead `Root` and gets dropped. + let result_a = first.run(); + let followups_a = scheduler.apply_scan_completed(&mut db, result_a, &HashSet::new()); + assert!(followups_a.is_empty()); + + // Second scan applies normally. + let result_b = second.run(); + let followups_b = scheduler.apply_scan_completed(&mut db, result_b, &HashSet::new()); + assert!(followups_b.is_empty()); + let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; + assert_eq!(pkg.name(&db), "pkg"); +} + +#[test] +fn test_watcher_event_buffered_during_scan_and_replayed() { + // An R-file watcher event for a pending root should be buffered, + // not lost. After the scan applies, the buffered event is replayed + // and the new file appears in the right container. + let tmp = tempfile::tempdir().unwrap(); + write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); + let mut db = OakDatabase::new(); + let mut scheduler = ScanScheduler::new(); + + let request = scheduler + .set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()) + .pop() + .unwrap(); + + // Mid-scan: a new file appears under pkg/R/, the watcher fires. + let new_path = tmp.path().join("pkg/R/b.R"); + fs::write(&new_path, "y <- 2\n").unwrap(); + let new_url = UrlId::from_file_path(&new_path).unwrap(); + let event_followups = scheduler.apply_watcher_events( + &mut db, + vec![FileEvent { + kind: FileEventKind::Created, + url: new_url.clone(), + }], + &HashSet::new(), + ); + // Event was buffered, not dispatched as a scan. + assert!(event_followups.is_empty()); + // And not yet visible to the db: the scan that would create the + // root's `Package` hasn't run yet. + assert!(db.file_by_url(&new_url).is_none()); + + // Scan completes. Buffered event replays automatically. + let result = request.run(); + let followups = scheduler.apply_scan_completed(&mut db, result, &HashSet::new()); + assert!(followups.is_empty()); + + // Both files are now present in pkg.files. + let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; + assert_eq!(pkg.files(&db).len(), 2); + assert!(db.file_by_url(&new_url).is_some()); +} + +#[test] +fn test_description_event_during_scan_queues_rescan() { + // A DESCRIPTION event hitting a pending root should flip the root + // to `ScanningWithRescanQueued`. When the first scan applies, a + // fresh `ScanRequest` for the same root comes back. + let tmp = tempfile::tempdir().unwrap(); + fs::create_dir_all(tmp.path().join("pkg/R")).unwrap(); + fs::write(tmp.path().join("pkg/R/a.R"), "x <- 1\n").unwrap(); + let mut db = OakDatabase::new(); + let mut scheduler = ScanScheduler::new(); + + let request = scheduler + .set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()) + .pop() + .unwrap(); + let root = request.root; + + // Mid-scan: DESCRIPTION appears, watcher fires. + fs::write( + tmp.path().join("pkg/DESCRIPTION"), + "Package: pkg\nVersion: 0.0.0\n", + ) + .unwrap(); + let desc_url = UrlId::from_file_path(tmp.path().join("pkg/DESCRIPTION")).unwrap(); + let watcher_followups = scheduler.apply_watcher_events( + &mut db, + vec![FileEvent { + kind: FileEventKind::Created, + url: desc_url, + }], + &HashSet::new(), + ); + assert!(watcher_followups.is_empty()); + + // First scan applies. It saw no DESCRIPTION yet (was written after + // walk started in this test, but our fake `ScanRequest::run` will pick it + // up). The queued rescan should still kick off. + let result = request.run(); + let mut followups = scheduler.apply_scan_completed(&mut db, result, &HashSet::new()); + assert_eq!(followups.len(), 1); + assert_eq!(followups[0].root, root); + + // Drive the queued rescan to completion. + let req2 = followups.pop().unwrap(); + let result2 = req2.run(); + let final_followups = scheduler.apply_scan_completed(&mut db, result2, &HashSet::new()); + assert!(final_followups.is_empty()); + + // Package is now classified. + let root = db.workspace_roots().roots(&db)[0]; + assert_eq!(root.packages(&db).len(), 1); +} + +#[test] +fn test_description_event_on_idle_root_returns_scan_request() { + // A DESCRIPTION event on an idle root should kick off a fresh + // scan, not silently no-op. The previous (sync) implementation + // called rescan_workspace_root inline; the new contract returns a + // `ScanRequest` for the caller to dispatch. + let tmp = tempfile::tempdir().unwrap(); + fs::create_dir_all(tmp.path().join("pkg/R")).unwrap(); + fs::write(tmp.path().join("pkg/R/a.R"), "x <- 1\n").unwrap(); + let mut db = OakDatabase::new(); + let mut scheduler = ScanScheduler::new(); + + // Initial scan: no DESCRIPTION yet, so root has no packages. + let init = scheduler.set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); + drain_scheduler(&mut db, &mut scheduler, init, &HashSet::new()); + let root = db.workspace_roots().roots(&db)[0]; + assert!(root.packages(&db).is_empty()); + + // DESCRIPTION appears. Watcher fires while root is idle. + fs::write( + tmp.path().join("pkg/DESCRIPTION"), + "Package: pkg\nVersion: 0.0.0\n", + ) + .unwrap(); + let desc_url = UrlId::from_file_path(tmp.path().join("pkg/DESCRIPTION")).unwrap(); + let followups = scheduler.apply_watcher_events( + &mut db, + vec![FileEvent { + kind: FileEventKind::Created, + url: desc_url, + }], + &HashSet::new(), + ); + assert_eq!(followups.len(), 1); + assert_eq!(followups[0].root, root); + + drain_scheduler(&mut db, &mut scheduler, followups, &HashSet::new()); + assert_eq!(root.packages(&db).len(), 1); +} + +#[test] +fn test_set_workspace_paths_inserts_empty_root_immediately() { + // While the scan is in flight, the new `Root` is already in + // `workspace_roots` (empty). This is what lets the watcher + // scheduler classify events for files in the pending root and + // buffer them, instead of dropping them as "no workspace + // contains this URL". + let tmp = tempfile::tempdir().unwrap(); + write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); + let mut db = OakDatabase::new(); + let mut scheduler = ScanScheduler::new(); + + let _requests = + scheduler.set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); + + // Before any scan runs: + let roots = db.workspace_roots().roots(&db).clone(); + assert_eq!(roots.len(), 1); + assert!(roots[0].packages(&db).is_empty()); + // Path matches. Compare via the URL conversion the scheduler uses + // internally, so this stays cross-platform: macOS canonicalization + // resolves symlinks (`/var` -> `/private/var`), Windows adds a `\\?\` + // UNC prefix, and going through `UrlId::from_file_path` on both sides + // is the only round-trip that matches. + let expected = UrlId::from_file_path(tmp.path()).unwrap(); + assert_eq!(roots[0].path(&db), &expected); +} diff --git a/crates/oak_scan/tests/watch.rs b/crates/oak_scan/src/tests/watch.rs similarity index 77% rename from crates/oak_scan/tests/watch.rs rename to crates/oak_scan/src/tests/watch.rs index 1f0153220..9bc41a55c 100644 --- a/crates/oak_scan/tests/watch.rs +++ b/crates/oak_scan/src/tests/watch.rs @@ -1,14 +1,20 @@ use std::collections::HashSet; use std::fs; use std::path::Path; +use std::path::PathBuf; use aether_url::UrlId; use oak_db::Db; use oak_db::DbInputs; use oak_db::OakDatabase; -use oak_scan::DbScan; -use oak_scan::FileEvent; -use oak_scan::FileEventKind; +use oak_db::Root; + +use crate::scheduler::drain_scheduler; +use crate::DbScan; +use crate::FileEvent; +use crate::FileEventKind; +use crate::ScanRequest; +use crate::ScanScheduler; fn write_package(dir: &Path, name: &str, r_files: &[(&str, &str)]) { fs::create_dir_all(dir.join("R")).unwrap(); @@ -22,19 +28,67 @@ fn write_package(dir: &Path, name: &str, r_files: &[(&str, &str)]) { } } +/// Sync helper: set workspace paths and drive the scheduler to +/// quiescence on the current thread. +fn set_workspace_paths(db: &mut OakDatabase, paths: &[PathBuf], editor_owned: &HashSet) { + let mut scheduler = ScanScheduler::new(); + let reqs = scheduler.set_workspace_paths(db, paths, editor_owned); + drain_scheduler(db, &mut scheduler, reqs, editor_owned); +} + +/// Sync helper: dispatch watcher events through the scheduler and +/// drive it to quiescence. +fn apply_watcher_events(db: &mut OakDatabase, events: Vec, skip: &HashSet) { + let mut scheduler = ScanScheduler::new(); + let reqs = scheduler.apply_watcher_events(db, events, skip); + drain_scheduler(db, &mut scheduler, reqs, skip); +} + +/// Sync helper: synthesize a single Changed watcher event. The +/// scheduler reads contents from disk, so callers must write the +/// expected content to disk before calling. +fn add_watched_file(db: &mut OakDatabase, url: UrlId) { + apply_watcher_events( + db, + vec![FileEvent { + kind: FileEventKind::Changed, + url, + }], + &HashSet::new(), + ); +} + +fn remove_watched_file(db: &mut OakDatabase, url: UrlId) { + apply_watcher_events( + db, + vec![FileEvent { + kind: FileEventKind::Deleted, + url, + }], + &HashSet::new(), + ); +} + +/// Sync helper: force a fresh full rescan of `root`. Equivalent to the +/// production trigger of a DESCRIPTION watcher event hitting the root. +fn rescan_workspace_root(db: &mut OakDatabase, root: Root) { + let path = root.path(db).to_file_path().unwrap(); + let result = ScanRequest { root, path }.run(); + let mut scheduler = ScanScheduler::new(); + let reqs = scheduler.apply_scan_completed(db, result, &HashSet::new()); + drain_scheduler(db, &mut scheduler, reqs, &HashSet::new()); +} + #[test] fn test_add_watched_file_new_top_level_script() { let tmp = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("new.R"); fs::write(&path, "x <- 1\n").unwrap(); let url = UrlId::from_file_path(&path).unwrap(); - db.add_watched_file(url.clone(), "x <- 1\n".to_string()); + add_watched_file(&mut db, url.clone()); let scripts = db.workspace_roots().roots(&db)[0].scripts(&db).clone(); assert_eq!(scripts.len(), 1); @@ -46,15 +100,12 @@ fn test_add_watched_file_into_existing_package() { let tmp = tempfile::tempdir().unwrap(); write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/R/b.R"); fs::write(&path, "y <- 2\n").unwrap(); let url = UrlId::from_file_path(&path).unwrap(); - db.add_watched_file(url.clone(), "y <- 2\n".to_string()); + add_watched_file(&mut db, url.clone()); let packages = db.workspace_roots().roots(&db)[0].packages(&db).clone(); assert_eq!(packages.len(), 1); @@ -73,15 +124,12 @@ fn test_add_watched_file_routes_package_subdir_to_pkg_scripts() { write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); fs::create_dir_all(tmp.path().join("pkg/tests")).unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/tests/test-foo.R"); fs::write(&path, "test code\n").unwrap(); let url = UrlId::from_file_path(&path).unwrap(); - db.add_watched_file(url.clone(), "test code\n".to_string()); + add_watched_file(&mut db, url.clone()); let root = db.workspace_roots().roots(&db)[0]; let pkg = root.packages(&db)[0]; @@ -102,15 +150,12 @@ fn test_add_watched_file_skips_nested_r_subdir() { write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); fs::create_dir_all(tmp.path().join("pkg/R/nested")).unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/R/nested/deep.R"); fs::write(&path, "z <- 3\n").unwrap(); let url = UrlId::from_file_path(&path).unwrap(); - db.add_watched_file(url.clone(), "z <- 3\n".to_string()); + add_watched_file(&mut db, url.clone()); let root = db.workspace_roots().roots(&db)[0]; let pkg = root.packages(&db)[0]; @@ -128,17 +173,15 @@ fn test_add_watched_file_updates_pkg_scripts_content_preserves_placement() { fs::create_dir_all(tmp.path().join("pkg/tests")).unwrap(); fs::write(tmp.path().join("pkg/tests/test-foo.R"), "v1\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/tests/test-foo.R"); let url = UrlId::from_file_path(&path).unwrap(); let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; let file_before = pkg.scripts(&db)[0]; - db.add_watched_file(url.clone(), "v2\n".to_string()); + fs::write(&path, "v2\n").unwrap(); + add_watched_file(&mut db, url.clone()); let file_after = db.file_by_url(&url).unwrap(); assert_eq!(file_before, file_after); @@ -155,14 +198,11 @@ fn test_remove_watched_file_from_pkg_scripts() { fs::create_dir_all(tmp.path().join("pkg/tests")).unwrap(); fs::write(tmp.path().join("pkg/tests/test-foo.R"), "t\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/tests/test-foo.R"); let url = UrlId::from_file_path(&path).unwrap(); - db.remove_watched_file(url.clone()); + remove_watched_file(&mut db, url.clone()); let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; assert!(pkg.scripts(&db).is_empty()); @@ -175,7 +215,8 @@ fn test_add_watched_file_outside_workspace_is_skipped() { let workspace = tempfile::tempdir().unwrap(); let outside = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( + set_workspace_paths( + &mut db, &[workspace.path().to_path_buf()], &std::collections::HashSet::new(), ); @@ -183,7 +224,7 @@ fn test_add_watched_file_outside_workspace_is_skipped() { let path = outside.path().join("stray.R"); fs::write(&path, "z <- 3\n").unwrap(); let url = UrlId::from_file_path(&path).unwrap(); - db.add_watched_file(url.clone(), "z <- 3\n".to_string()); + add_watched_file(&mut db, url.clone()); assert!(db.file_by_url(&url).is_none()); let root = db.workspace_roots().roots(&db)[0]; @@ -195,17 +236,15 @@ fn test_add_watched_file_updates_existing_content_preserves_placement() { let tmp = tempfile::tempdir().unwrap(); write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "v1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/R/a.R"); let url = UrlId::from_file_path(&path).unwrap(); let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; let file_before = pkg.files(&db)[0]; - db.add_watched_file(url.clone(), "v2\n".to_string()); + fs::write(&path, "v2\n").unwrap(); + add_watched_file(&mut db, url.clone()); let file_after = db.file_by_url(&url).unwrap(); assert_eq!(file_before, file_after); @@ -222,14 +261,11 @@ fn test_remove_watched_file_from_package() { ("b.R", "y <- 2\n"), ]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("pkg/R/a.R"); let url = UrlId::from_file_path(&path).unwrap(); - db.remove_watched_file(url.clone()); + remove_watched_file(&mut db, url.clone()); let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; assert_eq!(pkg.files(&db).len(), 1); @@ -242,14 +278,11 @@ fn test_remove_watched_file_from_workspace_scripts() { fs::write(tmp.path().join("a.R"), "x <- 1\n").unwrap(); fs::write(tmp.path().join("b.R"), "y <- 2\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("a.R"); let url = UrlId::from_file_path(&path).unwrap(); - db.remove_watched_file(url.clone()); + remove_watched_file(&mut db, url.clone()); let scripts = db.workspace_roots().roots(&db)[0].scripts(&db).clone(); assert_eq!(scripts.len(), 1); @@ -260,13 +293,10 @@ fn test_remove_watched_file_from_workspace_scripts() { fn test_remove_watched_file_unknown_url_is_noop() { let tmp = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let url = UrlId::from_file_path(tmp.path().join("ghost.R")).unwrap(); - db.remove_watched_file(url); + remove_watched_file(&mut db, url); } #[test] @@ -278,10 +308,7 @@ fn test_rescan_workspace_root_picks_up_new_description() { fs::create_dir_all(tmp.path().join("pkg/R")).unwrap(); fs::write(tmp.path().join("pkg/R/a.R"), "x <- 1\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); // No DESCRIPTION yet, so the R file came in as a script. let root = db.workspace_roots().roots(&db)[0]; @@ -293,7 +320,7 @@ fn test_rescan_workspace_root_picks_up_new_description() { "Package: pkg\nVersion: 0.0.0\n", ) .unwrap(); - db.rescan_workspace_root(root); + rescan_workspace_root(&mut db, root); assert_eq!(root.packages(&db).len(), 1); assert_eq!(root.packages(&db)[0].files(&db).len(), 1); @@ -307,17 +334,14 @@ fn test_rescan_workspace_root_drops_removed_pkg_scripts() { fs::create_dir_all(tmp.path().join("pkg/tests")).unwrap(); fs::write(tmp.path().join("pkg/tests/test-foo.R"), "t\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; let pkg = root.packages(&db)[0]; assert_eq!(pkg.scripts(&db).len(), 1); fs::remove_file(tmp.path().join("pkg/tests/test-foo.R")).unwrap(); - db.rescan_workspace_root(root); + rescan_workspace_root(&mut db, root); assert!(pkg.scripts(&db).is_empty()); assert_eq!(pkg.files(&db).len(), 1); @@ -333,16 +357,13 @@ fn test_rescan_workspace_root_preserves_pkg_scripts_identity() { fs::create_dir_all(tmp.path().join("pkg/tests")).unwrap(); fs::write(tmp.path().join("pkg/tests/test-foo.R"), "t\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; let pkg = root.packages(&db)[0]; let file_before = pkg.scripts(&db)[0]; - db.rescan_workspace_root(root); + rescan_workspace_root(&mut db, root); let pkg = db.workspace_roots().roots(&db)[0].packages(&db)[0]; assert_eq!(pkg.scripts(&db).len(), 1); @@ -354,16 +375,13 @@ fn test_rescan_workspace_root_demotes_removed_description() { let tmp = tempfile::tempdir().unwrap(); write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; assert_eq!(root.packages(&db).len(), 1); fs::remove_file(tmp.path().join("pkg/DESCRIPTION")).unwrap(); - db.rescan_workspace_root(root); + rescan_workspace_root(&mut db, root); assert!(root.packages(&db).is_empty()); // The R file under pkg/R/ is no longer in a recognised package, so @@ -384,17 +402,15 @@ fn test_apply_watcher_events_routes_description_to_rescan() { fs::create_dir_all(tmp.path().join("pkg/R")).unwrap(); fs::write(tmp.path().join("pkg/R/a.R"), "x <- 1\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); fs::write( tmp.path().join("pkg/DESCRIPTION"), "Package: pkg\nVersion: 0.0.0\n", ) .unwrap(); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![file_event( &tmp.path().join("pkg/DESCRIPTION"), FileEventKind::Created, @@ -415,12 +431,10 @@ fn test_apply_watcher_events_dedupes_descriptions_per_root() { write_package(&tmp.path().join("pkg1"), "pkg1", &[]); write_package(&tmp.path().join("pkg2"), "pkg2", &[]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![ file_event(&tmp.path().join("pkg1/DESCRIPTION"), FileEventKind::Changed), file_event(&tmp.path().join("pkg2/DESCRIPTION"), FileEventKind::Changed), @@ -436,14 +450,12 @@ fn test_apply_watcher_events_dedupes_descriptions_per_root() { fn test_apply_watcher_events_routes_r_file_to_add() { let tmp = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("new.R"); fs::write(&path, "x <- 1\n").unwrap(); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![file_event(&path, FileEventKind::Created)], &HashSet::new(), ); @@ -457,14 +469,12 @@ fn test_apply_watcher_events_routes_r_file_to_remove() { let tmp = tempfile::tempdir().unwrap(); fs::write(tmp.path().join("a.R"), "x <- 1\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("a.R"); let url = UrlId::from_file_path(&path).unwrap(); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![file_event(&path, FileEventKind::Deleted)], &HashSet::new(), ); @@ -480,10 +490,7 @@ fn test_apply_watcher_events_skip_set_blocks_r_file_event() { let path = tmp.path().join("a.R"); fs::write(&path, "disk_v1\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); // Driver "owns" this URL (the editor has it open). let url = UrlId::from_file_path(&path).unwrap(); @@ -493,7 +500,11 @@ fn test_apply_watcher_events_skip_set_blocks_r_file_event() { skip.insert(url.clone()); fs::write(&path, "disk_v3\n").unwrap(); - db.apply_watcher_events(vec![file_event(&path, FileEventKind::Changed)], &skip); + apply_watcher_events( + &mut db, + vec![file_event(&path, FileEventKind::Changed)], + &skip, + ); let file = db.file_by_url(&url).unwrap(); assert_eq!(file.contents(&db), "editor_v2\n"); @@ -507,10 +518,7 @@ fn test_apply_watcher_events_skip_set_does_not_block_description() { let tmp = tempfile::tempdir().unwrap(); fs::create_dir_all(tmp.path().join("pkg/R")).unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); fs::write( tmp.path().join("pkg/DESCRIPTION"), @@ -523,7 +531,11 @@ fn test_apply_watcher_events_skip_set_does_not_block_description() { let mut skip = HashSet::new(); skip.insert(desc_url); - db.apply_watcher_events(vec![file_event(&desc_path, FileEventKind::Created)], &skip); + apply_watcher_events( + &mut db, + vec![file_event(&desc_path, FileEventKind::Created)], + &skip, + ); let root = db.workspace_roots().roots(&db)[0]; assert_eq!(root.packages(&db).len(), 1); @@ -542,12 +554,14 @@ fn test_apply_watcher_events_description_outside_any_workspace_is_noop() { ) .unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( + set_workspace_paths( + &mut db, &[workspace.path().to_path_buf()], &std::collections::HashSet::new(), ); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![file_event( &outside.path().join("DESCRIPTION"), FileEventKind::Created, @@ -567,15 +581,13 @@ fn test_apply_watcher_events_ignores_non_r_files() { // landing them in the orphan bucket or some root container. let tmp = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let path = tmp.path().join("notes.txt"); fs::write(&path, "not R\n").unwrap(); let url = UrlId::from_file_path(&path).unwrap(); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![file_event(&path, FileEventKind::Created)], &HashSet::new(), ); @@ -594,10 +606,7 @@ fn test_apply_watcher_events_tolerates_non_package_description() { // workspace unclassified rather than panicking or erroring. let tmp = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); fs::create_dir_all(tmp.path().join("not-a-pkg")).unwrap(); fs::write( @@ -605,7 +614,8 @@ fn test_apply_watcher_events_tolerates_non_package_description() { "Title: Some other project\nVersion: 1.0\n", ) .unwrap(); - db.apply_watcher_events( + apply_watcher_events( + &mut db, vec![file_event( &tmp.path().join("not-a-pkg/DESCRIPTION"), FileEventKind::Created, diff --git a/crates/oak_scan/tests/workspace.rs b/crates/oak_scan/src/tests/workspace.rs similarity index 88% rename from crates/oak_scan/tests/workspace.rs rename to crates/oak_scan/src/tests/workspace.rs index bdef762fa..50bcefe82 100644 --- a/crates/oak_scan/tests/workspace.rs +++ b/crates/oak_scan/src/tests/workspace.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::fs; use std::path::Path; use std::path::PathBuf; @@ -8,7 +9,18 @@ use oak_db::DbInputs; use oak_db::File; use oak_db::OakDatabase; use oak_db::RootKind; -use oak_scan::DbScan; + +use crate::scheduler::drain_scheduler; +use crate::DbScan; +use crate::ScanScheduler; + +/// Sync helper: scan to quiescence on the current thread. Production +/// drivers spawn each request on a task pool. +fn set_workspace_paths(db: &mut OakDatabase, paths: &[PathBuf], editor_owned: &HashSet) { + let mut scheduler = ScanScheduler::new(); + let reqs = scheduler.set_workspace_paths(db, paths, editor_owned); + drain_scheduler(db, &mut scheduler, reqs, editor_owned); +} fn basenames(db: &OakDatabase, files: &[File]) -> Vec { files @@ -42,10 +54,7 @@ fn test_scan_empty_workspace_registers_empty_root() { let tmp = tempfile::tempdir().unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let roots = db.workspace_roots().roots(&db).clone(); assert_eq!(roots.len(), 1); @@ -61,10 +70,7 @@ fn test_scan_workspace_discovers_package_at_root() { write_package(tmp.path(), "myproj", &[("a.R", "x <- 1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let packages = db.workspace_roots().roots(&db)[0].packages(&db).clone(); assert_eq!(packages.len(), 1); @@ -78,10 +84,7 @@ fn test_scan_workspace_discovers_multiple_nested_packages() { write_package(&tmp.path().join("pkg2"), "pkg2", &[("b.R", "y <- 2\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let packages = db.workspace_roots().roots(&db)[0].packages(&db).clone(); assert_eq!(packages.len(), 2); @@ -97,10 +100,7 @@ fn test_scan_workspace_collects_top_level_scripts() { fs::write(tmp.path().join("helpers.R"), "y <- 2\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let scripts = db.workspace_roots().roots(&db)[0].scripts(&db).clone(); assert_eq!(scripts.len(), 2); @@ -127,10 +127,7 @@ fn test_scan_workspace_excludes_package_r_files_from_scripts() { fs::write(tmp.path().join("outside.R"), "x <- 1\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; let scripts = root.scripts(&db).clone(); @@ -155,10 +152,7 @@ fn test_scan_workspace_routes_package_subdir_r_files_to_pkg_scripts() { fs::write(tmp.path().join("pkg/inst/helper.R"), "y <- 2\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; assert!(root.scripts(&db).is_empty()); @@ -193,10 +187,7 @@ fn test_scan_workspace_pkg_scripts_findable_via_file_by_url() { ) .unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let url = UrlId::from_file_path(tmp.path().join("pkg/tests/testthat/test-x.R")).unwrap(); let file = db.file_by_url(&url).expect("script must be findable"); @@ -222,10 +213,7 @@ fn test_scan_workspace_honors_gitignore() { fs::create_dir_all(tmp.path().join(".git")).unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let scripts = db.workspace_roots().roots(&db)[0].scripts(&db).clone(); let basenames: Vec = scripts @@ -258,7 +246,8 @@ fn test_scan_workspace_honors_gitignore_for_package_files_and_scripts() { fs::write(tmp.path().join("pkg/tests/ignored.R"), "skip me\n").unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( + set_workspace_paths( + &mut db, &[tmp.path().to_path_buf()], &std::collections::HashSet::new(), ); @@ -282,10 +271,7 @@ fn test_scan_workspace_preserves_orphan_content_on_promotion() { let url = UrlId::from_file_path(&r_path).unwrap(); db.upsert_editor(url.clone(), "edited_version <- 2\n".to_string()); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let file = db .file_by_url(&url) @@ -308,10 +294,7 @@ fn test_scan_workspace_preserves_package_file_content_on_promotion() { let url = UrlId::from_file_path(&r_path).unwrap(); db.upsert_editor(url.clone(), "edited <- 2\n".to_string()); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let file = db.file_by_url(&url).expect("package file findable"); assert_eq!(file.contents(&db), "edited <- 2\n"); @@ -326,7 +309,7 @@ fn test_scan_multiple_workspace_paths_preserve_order() { let mut db = OakDatabase::new(); let paths: Vec = vec![tmp1.path().to_path_buf(), tmp2.path().to_path_buf()]; - db.set_workspace_paths(&paths, &std::collections::HashSet::new()); + set_workspace_paths(&mut db, &paths, &HashSet::new()); let roots = db.workspace_roots().roots(&db).clone(); assert_eq!(roots.len(), 2); @@ -349,10 +332,7 @@ fn test_scan_workspace_tolerates_non_package_description() { .unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; assert!(root.packages(&db).is_empty()); @@ -373,10 +353,7 @@ fn test_scan_workspace_dedup_keys_on_description_name_not_folder_name() { )]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let packages = db.workspace_roots().roots(&db)[0].packages(&db).clone(); assert_eq!(packages.len(), 2); @@ -404,10 +381,7 @@ fn test_scan_workspace_drops_duplicate_package_names() { )]); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root = db.workspace_roots().roots(&db)[0]; let packages = root.packages(&db).clone(); @@ -448,10 +422,7 @@ fn test_scan_workspace_excludes_renv_library() { fs::create_dir_all(tmp.path().join(".git")).unwrap(); let mut db = OakDatabase::new(); - db.set_workspace_paths( - &[tmp.path().to_path_buf()], - &std::collections::HashSet::new(), - ); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let packages = db.workspace_roots().roots(&db)[0].packages(&db).clone(); assert_eq!(packages.len(), 1); @@ -471,7 +442,7 @@ fn test_set_workspace_paths_preserves_editor_owned_file_across_churn() { write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let url = UrlId::from_file_path(tmp.path().join("pkg/R/a.R")).unwrap(); let file = db.file_by_url(&url).unwrap(); assert!(file.package(&db).is_some()); @@ -482,7 +453,7 @@ fn test_set_workspace_paths_preserves_editor_owned_file_across_churn() { let editor_owned: HashSet = [url.clone()].into_iter().collect(); // Workspace folder removed. File routes to orphan, package goes to stale. - db.set_workspace_paths(&[], &editor_owned); + set_workspace_paths(&mut db, &[], &editor_owned); let after_remove = db.file_by_url(&url).unwrap(); assert_eq!(file, after_remove); assert_eq!(after_remove.package(&db), None); @@ -492,7 +463,7 @@ fn test_set_workspace_paths_preserves_editor_owned_file_across_churn() { // Workspace folder re-added. File snaps back into pkg.files, same // entity, editor content preserved (the scan's disk snapshot // doesn't overwrite). - db.set_workspace_paths(&[tmp.path().to_path_buf()], &editor_owned); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &editor_owned); let after_readd = db.file_by_url(&url).unwrap(); assert_eq!(file, after_readd); assert!(after_readd.package(&db).is_some()); @@ -512,15 +483,15 @@ fn test_set_workspace_paths_non_editor_owned_file_goes_to_stale() { write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let url = UrlId::from_file_path(tmp.path().join("pkg/R/a.R")).unwrap(); let file = db.file_by_url(&url).unwrap(); - db.set_workspace_paths(&[], &HashSet::new()); + set_workspace_paths(&mut db, &[], &HashSet::new()); assert!(db.file_by_url(&url).is_none()); assert!(db.stale_root().files(&db).contains(&file)); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let resurrected = db.file_by_url(&url).unwrap(); assert_eq!(file, resurrected); } @@ -539,12 +510,12 @@ fn test_set_workspace_paths_unchanged_path_preserves_root_and_package_identity() write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "x <- 1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root_id_before = db.workspace_roots().roots(&db)[0].as_id(); let pkg_id_before = db.workspace_roots().roots(&db)[0].packages(&db)[0].as_id(); let file_id_before = db.workspace_roots().roots(&db)[0].packages(&db)[0].files(&db)[0].as_id(); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let root_id_after = db.workspace_roots().roots(&db)[0].as_id(); let pkg_id_after = db.workspace_roots().roots(&db)[0].packages(&db)[0].as_id(); let file_id_after = db.workspace_roots().roots(&db)[0].packages(&db)[0].files(&db)[0].as_id(); @@ -565,7 +536,8 @@ fn test_scan_workspace_package_files_sorted_by_basename() { ]); let mut db = OakDatabase::new(); - db.set_workspace_paths( + set_workspace_paths( + &mut db, &[tmp.path().to_path_buf()], &std::collections::HashSet::new(), ); @@ -588,13 +560,13 @@ fn test_set_workspace_paths_resurrected_file_picks_up_disk_contents() { write_package(&tmp.path().join("pkg"), "pkg", &[("a.R", "v1\n")]); let mut db = OakDatabase::new(); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); - db.set_workspace_paths(&[], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[], &HashSet::new()); let r_path = tmp.path().join("pkg/R/a.R"); fs::write(&r_path, "v2\n").unwrap(); - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); let url = UrlId::from_file_path(&r_path).unwrap(); let file = db.file_by_url(&url).unwrap(); assert_eq!(file.contents(&db), "v2\n"); @@ -612,8 +584,8 @@ fn test_set_workspace_paths_stale_no_duplicates_across_cycles() { let mut db = OakDatabase::new(); for _ in 0..3 { - db.set_workspace_paths(&[tmp.path().to_path_buf()], &HashSet::new()); - db.set_workspace_paths(&[], &HashSet::new()); + set_workspace_paths(&mut db, &[tmp.path().to_path_buf()], &HashSet::new()); + set_workspace_paths(&mut db, &[], &HashSet::new()); } let stale = db.stale_root(); diff --git a/crates/oak_scan/src/watch.rs b/crates/oak_scan/src/watch.rs index 310d02d9d..e5b4b01ee 100644 --- a/crates/oak_scan/src/watch.rs +++ b/crates/oak_scan/src/watch.rs @@ -1,29 +1,13 @@ -//! Surgical updates from file-watcher events. +//! Surgical single-file updates from file-watcher events. //! -//! The workspace scanner ([`crate::workspace`]) is the bulk path: it -//! walks an entire root and rebuilds packages and scripts. The helpers -//! here handle one file event at a time, so a burst of file watcher -//! notifications doesn't trigger a full rescan per event. //! -//! [`apply_watcher_events`] is the entry point used by drivers (the LSP, -//! tests, eventually anything else that gets a stream of file events). -//! Drivers translate their native event type into [`FileEvent`] and -//! call in, and `apply_watcher_events` does the routing: -//! -//! - DESCRIPTION events fall back to -//! [`crate::workspace::rescan_workspace_root`] on the containing -//! workspace root, deduped within the batch. A `DESCRIPTION` add or -//! removal can promote or demote a whole directory, which is too -//! tangled for a one-file update. -//! -//! - R file events route through [`add_watched_file`] (Created / Changed) or -//! [`remove_watched_file`] (Deleted). The `editor_owned` set lets the driver hold -//! back URLs whose contents it owns (the LSP uses this for files -//! the editor has open). +//! Dispatch (DESCRIPTION rescan vs surgical add/remove, plus mid-scan +//! buffering) lives on [`crate::ScanScheduler`]. This module just exposes +//! [`add_watched_file`] / [`remove_watched_file`] for the scheduler to call +//! after it has decided a single event can apply surgically against the live +//! root. -use std::collections::HashSet; use std::path::Path; -use std::path::PathBuf; use aether_url::UrlId; use oak_db::Db; @@ -60,70 +44,6 @@ pub enum FileEventKind { Deleted, } -/// Apply a batch of file events to the oak input tree. -/// -/// DESCRIPTION events are deduped to one rescan per containing root. -/// R-file events route through [`add_watched_file`] / [`remove_watched_file`]. URLs -/// in `editor_owned` are not touched even if their event is for an R file; -/// callers use this to defer to an in-memory source of truth (e.g. -/// the LSP's editor buffers). -pub(crate) fn apply_watcher_events( - db: &mut DB, - events: Vec, - editor_owned: &HashSet, -) { - let roots = workspace_root_paths(db); - let mut stale_roots: HashSet = HashSet::new(); - - for event in events { - let Ok(path) = event.url.to_file_path() else { - continue; - }; - - if path.file_name().is_some_and(|name| name == "DESCRIPTION") { - if let Some(root) = roots - .iter() - .find(|(root_path, _)| path.starts_with(root_path)) - .map(|(_, root)| *root) - { - stale_roots.insert(root); - } - continue; - } - - if editor_owned.contains(&event.url) { - continue; - } - - match event.kind { - FileEventKind::Created | FileEventKind::Changed => match std::fs::read_to_string(&path) - { - Ok(contents) => add_watched_file(db, event.url, contents), - Err(err) => log::warn!("Skipped watched file {}: {err:?}", path.display()), - }, - FileEventKind::Deleted => remove_watched_file(db, event.url), - } - } - - for root in stale_roots { - crate::workspace::rescan_workspace_root(db, root); - } -} - -fn workspace_root_paths(db: &DB) -> Vec<(PathBuf, Root)> { - db.workspace_roots() - .roots(db) - .iter() - .filter_map(|root| match root.path(db).to_file_path() { - Ok(path) => Some((path, *root)), - Err(err) => { - log::warn!("Can't get file path of workspace root, skipping: {err:?}"); - None - }, - }) - .collect() -} - /// React to a Created or Changed event on an R file. Idempotent: if a `File` /// already exists at this URL, its contents are updated and its placement is /// left alone. If not, the URL is classified against the current workspace diff --git a/crates/oak_scan/src/workspace.rs b/crates/oak_scan/src/workspace.rs deleted file mode 100644 index dfc297265..000000000 --- a/crates/oak_scan/src/workspace.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! Workspace scanner. Drives `WorkspaceRoots` from the editor's open folders. -//! -//! [`set_workspace_paths`] is declarative: it reconciles the live set of -//! workspace roots to exactly the paths it's given. Unchanged paths are left -//! alone (the watcher handles in-folder changes via -//! [`rescan_workspace_root`]). Removed paths are evicted; their files route -//! to `OrphanRoot` if editor-owned, otherwise to `StaleRoot` for entity reuse -//! on re-add. New paths are scanned: `DESCRIPTION` files at any depth -//! (honouring `.gitignore`), plus top-level R scripts. - -use std::collections::HashMap; -use std::collections::HashSet; -use std::path::Path; -use std::path::PathBuf; - -use aether_url::UrlId; -use oak_db::Db; -use oak_db::DbInputs; -use oak_db::Package; -use oak_db::Root; -use oak_db::RootKind; -use salsa::Setter; - -use crate::inputs::RootExt; -use crate::packages::scan_workspace_packages; -use crate::packages::scan_workspace_scripts; - -/// Reconcile `WorkspaceRoots` to exactly `paths`. Called through -/// [`crate::DbScan::set_workspace_paths`]. -pub(crate) fn set_workspace_paths( - db: &mut DB, - paths: &[PathBuf], - editor_owned: &HashSet, -) { - let new: Vec<(PathBuf, UrlId)> = paths - .iter() - .filter_map(|p| { - let url = UrlId::from_file_path(p).ok()?; - Some((p.clone(), url)) - }) - .collect(); - let new_urls: HashSet = new.iter().map(|(_, u)| u.clone()).collect(); - - let old: HashMap = db - .workspace_roots() - .roots(db) - .iter() - .map(|r| (r.path(db).clone(), *r)) - .collect(); - - // Evict roots not in the new set. Editor-owned files survive in - // `OrphanRoot` so their buffers stay analysable. Everything else goes - // to `StaleRoot` for entity reuse on re-add. - for (old_url, &old_root) in &old { - if !new_urls.contains(old_url) { - old_root.set_stale(db, Some(editor_owned)); - } - } - - // Build the new roots list: reuse the existing `Root` for unchanged paths - // (no rescan, the watcher is the path for in-folder changes), scan the - // rest. - let mut new_roots = Vec::with_capacity(new.len()); - for (path, url) in new { - let root = match old.get(&url) { - Some(&r) => r, - None => scan_new_workspace_path(db, &path, url), - }; - new_roots.push(root); - } - db.workspace_roots().set_roots(db).to(new_roots); -} - -/// Initial scan of a path that wasn't previously a workspace root. Walks the -/// directory tree, calls `set_package` per discovered package, sets scripts. -fn scan_new_workspace_path(db: &mut DB, path: &Path, url: UrlId) -> Root { - let root = Root::new(db, url, RootKind::Workspace, Vec::new(), Vec::new()); - rescan_into(db, root, path); - root -} - -/// Re-run the workspace scan against an existing root. Used as the -/// fallback for events (DESCRIPTION add / remove / edit) that can -/// change the set of packages under the root. -pub(crate) fn rescan_workspace_root(db: &mut DB, root: Root) { - let Ok(path) = root.path(db).to_file_path() else { - log::warn!("Skipped rescan: root URL is not a file path"); - return; - }; - rescan_into(db, root, &path); -} - -fn rescan_into(db: &mut DB, root: Root, path: &Path) { - let packages = scan_workspace_packages(path); - let scripts = scan_workspace_scripts(path); - - let package_entities: Vec = packages - .into_iter() - .map(|pkg| { - root.set_package( - db, - pkg.description_url, - pkg.name, - pkg.version, - pkg.namespace, - pkg.files, - pkg.scripts, - pkg.collation, - ) - }) - .collect(); - - root.set_packages(db).to(package_entities); - root.set_workspace_scripts(db, scripts); -}