Skip to content

Commit 1fc65dd

Browse files
authored
(Sources-4) Implement Event::SourceCompleted for installed package sources (#1289)
Progress towards #1234 Branched from #1288 This PR sets up the `main_loop.rs` handling for package sources. Here is the set up: - The database changes via a `revision` bump - We `dispatch()` a series of `SourceRequest`s via `lsp::spawn_blocking()`. One `SourceRequest` is made per package currently used by the workspace (removing any packages we've already requested sources for). - On the tokio task, an implementer of `trait SourceProvider` is called to `provide()` sources for a package. On success, it sends the `directory` of the sources back to the main loop via a new `Event::SourceCompleted` event. - Note that this all happens on a task, not blocking the main loop while we are creating these sources. - The main loop picks this `Event` up and calls `db.set_package_sources(package, &directory)` for that package, which actually loads the files from the directory. - The next `revision` of the `db` can now use these `package.files()` in LSP requests I purposefully have not hooked up a live production level implementer of `trait SourceProvider`. Instead we have a `TestProvider` that is used to verify that all of this infrastructure is working, but for production this is all set to `None` and no `SourceRequest`s are ever sent out, so it is safe to merge. If we agree on the shape of all of this, then in the next PRs I can hook up the real source provider.
1 parent 01f5d51 commit 1fc65dd

8 files changed

Lines changed: 534 additions & 86 deletions

File tree

crates/ark/src/lsp.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod inputs;
3232
pub mod main_loop;
3333
pub mod markdown;
3434
pub(crate) mod open_file;
35+
pub mod sources;
3536

3637
pub mod find_references;
3738
pub mod rename;

crates/ark/src/lsp/main_loop.rs

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ use crate::lsp::diagnostics::generate_diagnostics;
5353
use crate::lsp::handlers;
5454
use crate::lsp::indexer;
5555
use crate::lsp::open_file::OpenFile;
56+
use crate::lsp::sources::SourceCompleted;
57+
use crate::lsp::sources::SourceScheduler;
5658
use crate::lsp::state::WorldState;
5759
use crate::lsp::state_handlers;
5860
use crate::lsp::state_handlers::ConsoleInputs;
@@ -98,6 +100,7 @@ pub(crate) enum Event {
98100
Lsp(LspMessage),
99101
Kernel(KernelNotification),
100102
OakScanCompleted(ScanCompleted),
103+
SourceCompleted(SourceCompleted),
101104
}
102105

103106
#[derive(Debug)]
@@ -189,6 +192,24 @@ pub(crate) struct LspState {
189192
/// main-loop handlers. Must be out of [`WorldState`] because the scheduler
190193
/// is not clonable.
191194
pub(crate) oak_scheduler: ScanScheduler,
195+
196+
/// Scheduler of [crate::lsp::sources::SourceRequest]s. Scheduling and source
197+
/// consumption all happen from the main loop.
198+
pub(crate) source_scheduler: SourceScheduler,
199+
}
200+
201+
impl LspState {
202+
pub(crate) fn new(
203+
console_notification_tx: TokioUnboundedSender<ConsoleNotification>,
204+
source_scheduler: SourceScheduler,
205+
) -> Self {
206+
Self {
207+
capabilities: Capabilities::default(),
208+
console_notification_tx,
209+
oak_scheduler: ScanScheduler::new(),
210+
source_scheduler,
211+
}
212+
}
192213
}
193214

194215
/// State for the auxiliary loop
@@ -246,29 +267,20 @@ impl GlobalState {
246267

247268
Self::from_parts(
248269
client,
249-
console_notification_tx,
250270
WorldState::new(db, library),
271+
LspState::new(console_notification_tx, SourceScheduler::new(None)),
251272
)
252273
}
253274

254-
/// Assemble the state around an already-built `WorldState`. Splitting this
255-
/// out from [`GlobalState::new`] lets tests construct a state without the
256-
/// R `.libPaths()` lookup that `new` does.
257-
fn from_parts(
258-
client: Client,
259-
console_notification_tx: TokioUnboundedSender<ConsoleNotification>,
260-
world: WorldState,
261-
) -> Self {
275+
/// Assemble the state around an already-built [`WorldState`] and
276+
/// [`LspState`]. Splitting this out from [`GlobalState::new`] lets tests
277+
/// construct a state without the R `.libPaths()` lookup that `new` does, and
278+
/// with a db / provider configured up front.
279+
pub(crate) fn from_parts(client: Client, world: WorldState, lsp_state: LspState) -> Self {
262280
// Transmission channel for the main loop events. Shared with the
263281
// tower-lsp backend and the Jupyter kernel.
264282
let (events_tx, events_rx) = tokio_unbounded_channel::<Event>();
265283

266-
let lsp_state = LspState {
267-
capabilities: Capabilities::default(),
268-
console_notification_tx,
269-
oak_scheduler: ScanScheduler::new(),
270-
};
271-
272284
Self {
273285
world,
274286
lsp_state,
@@ -544,6 +556,12 @@ impl GlobalState {
544556
warm_workspace_index(self.world.db.clone());
545557
}
546558
},
559+
560+
Event::SourceCompleted(SourceCompleted { package, response }) => {
561+
if let Some(directory) = self.lsp_state.source_scheduler.finish(package, response) {
562+
self.world.db.set_package_sources(package, &directory);
563+
}
564+
},
547565
}
548566
lsp::log_info!("Finished handling event in {}ms", loop_tick.elapsed().as_millis());
549567

@@ -555,6 +573,9 @@ impl GlobalState {
555573
if salsa::plumbing::current_revision(&self.world.db) != old_revision {
556574
lsp::log_info!("World state revision advanced");
557575
diagnostics_refresh_all(&self.world);
576+
self.lsp_state
577+
.source_scheduler
578+
.schedule(&self.world.db, &self.events_tx);
558579
}
559580

560581
Ok(())
@@ -587,21 +608,15 @@ impl GlobalState {
587608
/// real `handle_event()` and the private channels rather than a reconstruction.
588609
#[cfg(test)]
589610
impl GlobalState {
590-
/// Build a state with an empty db and no R library paths. Takes a `client`
591-
/// because the struct holds one, but the event paths exercised in tests
592-
/// never touch it.
593-
pub(crate) fn new_test(client: Client) -> Self {
594-
let (console_notification_tx, _) = tokio_unbounded_channel::<ConsoleNotification>();
595-
let world = WorldState::new(OakDatabase::new(), Library::new(vec![]));
596-
Self::from_parts(client, console_notification_tx, world)
597-
}
598-
599-
/// Run `event` through the real `handle_event`, then pump the scan
600-
/// completions it spawns until the scheduler goes idle. This is what
601-
/// `main_loop()` does, minus the surrounding `loop`.
611+
/// Run `event` through the real `handle_event`, then pump any pending
612+
/// events until we reach quiescence. This includes:
613+
/// - Pending oak scans
614+
/// - Pending source requests
602615
pub(crate) async fn handle_event_to_quiescence(&mut self, event: Event) {
603616
self.handle_event(event).await.unwrap();
604-
while self.lsp_state.oak_scheduler.has_pending_scans() {
617+
while self.lsp_state.oak_scheduler.has_pending_scans() ||
618+
self.lsp_state.source_scheduler.has_pending()
619+
{
605620
let event = self.next_event().await;
606621
self.handle_event(event).await.unwrap();
607622
}

crates/ark/src/lsp/sources.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
use std::collections::HashMap;
2+
use std::path::Path;
3+
use std::path::PathBuf;
4+
use std::sync::Arc;
5+
6+
use aether_path::FilePath;
7+
use oak_db::Db;
8+
use oak_db::Package;
9+
use stdext::result::ResultExt;
10+
11+
use crate::lsp::main_loop::Event;
12+
use crate::lsp::main_loop::TokioUnboundedSender;
13+
14+
/// Handle source requests to provide R source files for an installed package
15+
///
16+
/// Implementations live outside of oak, oak is only in charge of ingesting the
17+
/// returned directory.
18+
pub(crate) trait SourceHandler: Send + Sync {
19+
fn handle(&self, request: &SourceRequest) -> SourceResponse;
20+
}
21+
22+
#[derive(Debug, Clone)]
23+
pub(crate) struct SourceRequest {
24+
name: String,
25+
version: String,
26+
library_path: PathBuf,
27+
}
28+
29+
// TODO!: Remove when we have a production `SourceHandler`
30+
#[cfg_attr(not(test), expect(dead_code))]
31+
#[derive(Debug)]
32+
pub(crate) enum SourceResponse {
33+
Success(PathBuf),
34+
Failure,
35+
}
36+
37+
#[derive(Debug)]
38+
pub(crate) struct SourceCompleted {
39+
pub(crate) package: Package,
40+
pub(crate) response: SourceResponse,
41+
}
42+
43+
/// State of a particular [Package]'s [SourceRequest]
44+
///
45+
/// There is also a 3rd implied state of "we've never seen this package before" if it
46+
/// isn't in the `state` hash map.
47+
enum SourceState {
48+
/// The [SourceRequest] is in flight
49+
Pending,
50+
51+
/// We have received a [SourceResponse]. Regardless of [SourceResponse::Success] or
52+
/// [SourceResponse::Failure], we mark the package as `Finished` so we never request
53+
/// it again.
54+
Finished,
55+
}
56+
57+
pub(crate) struct SourceScheduler {
58+
// TODO!: Remove the `Option<>` when we implement a production `SourceHandler`
59+
handler: Option<Arc<dyn SourceHandler>>,
60+
state: HashMap<Package, SourceState>,
61+
}
62+
63+
impl SourceScheduler {
64+
pub(crate) fn new(handler: Option<Arc<dyn SourceHandler>>) -> Self {
65+
Self {
66+
handler,
67+
state: HashMap::new(),
68+
}
69+
}
70+
71+
pub(crate) fn schedule(&mut self, db: &dyn Db, events_tx: &TokioUnboundedSender<Event>) {
72+
let Some(handler) = &self.handler else {
73+
return;
74+
};
75+
76+
// For each package used by the workspace, request its sources if we have never
77+
// seen it before
78+
for package in oak_db::workspace_dependencies(db) {
79+
if self.state.contains_key(package) {
80+
// If we've seen this package before, don't request sources again!
81+
continue;
82+
}
83+
84+
let package = *package;
85+
86+
let Some(request) = SourceRequest::from_package(db, &package).log_err() else {
87+
// Go straight to `Finished` if we can't generate the source request,
88+
// something is structurally wrong
89+
self.state.insert(package, SourceState::Finished);
90+
continue;
91+
};
92+
93+
let handler = Arc::clone(handler);
94+
let tx = events_tx.clone();
95+
96+
// Mark as `Pending` just before launching the tokio task
97+
self.state.insert(package, SourceState::Pending);
98+
99+
crate::lsp::spawn_blocking(move || {
100+
let response = handler.handle(&request);
101+
102+
tx.send(Event::SourceCompleted(SourceCompleted {
103+
package,
104+
response,
105+
}))
106+
.log_err();
107+
108+
Ok(None)
109+
});
110+
}
111+
}
112+
113+
#[must_use]
114+
pub(crate) fn finish(&mut self, package: Package, response: SourceResponse) -> Option<PathBuf> {
115+
self.state.insert(package, SourceState::Finished);
116+
match response {
117+
SourceResponse::Success(directory) => Some(directory),
118+
SourceResponse::Failure => None,
119+
}
120+
}
121+
122+
/// Whether any source request is in flight. Allows tests to deterministically "wait"
123+
/// for pending source requests to finish.
124+
#[cfg(test)]
125+
pub(crate) fn has_pending(&self) -> bool {
126+
self.state
127+
.values()
128+
.any(|state| matches!(state, SourceState::Pending))
129+
}
130+
}
131+
132+
impl SourceRequest {
133+
fn from_package(db: &dyn Db, package: &Package) -> anyhow::Result<Self> {
134+
let name = package.name(db).clone();
135+
136+
let Some(version) = package.version(db).to_owned() else {
137+
return Err(anyhow::anyhow!(
138+
"Package {name} is missing a version to provide sources for."
139+
));
140+
};
141+
142+
let library_path = match package.description_path(db) {
143+
FilePath::File(path) => {
144+
match path.as_path().as_std_path().parent().and_then(Path::parent) {
145+
Some(library_path) => library_path.to_path_buf(),
146+
None => {
147+
return Err(anyhow::anyhow!(
148+
"Package {name} does not have an associated library path."
149+
))
150+
},
151+
}
152+
},
153+
FilePath::Virtual(uri) => {
154+
return Err(anyhow::anyhow!(
155+
"Package {name} is unexpectedly a virtual uri {uri}."
156+
))
157+
},
158+
};
159+
160+
Ok(Self {
161+
name,
162+
version,
163+
library_path,
164+
})
165+
}
166+
167+
// TODO!: Remove when we have a production `SourceHandler`
168+
#[cfg_attr(not(test), expect(dead_code))]
169+
pub(crate) fn name(&self) -> &str {
170+
&self.name
171+
}
172+
173+
// TODO!: Remove when we have a production `SourceHandler`
174+
#[cfg_attr(not(test), expect(dead_code))]
175+
pub(crate) fn version(&self) -> &str {
176+
&self.version
177+
}
178+
179+
// TODO!: Remove when we have a production `SourceHandler`
180+
#[cfg_attr(not(test), expect(dead_code))]
181+
pub(crate) fn library_path(&self) -> &Path {
182+
&self.library_path
183+
}
184+
}

crates/ark/src/lsp/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod find_references;
44
mod goto_definition;
55
mod main_loop;
66
mod rename;
7+
mod sources;
78
mod state;
89
mod state_handlers;
910
mod utils;

0 commit comments

Comments
 (0)