Skip to content

Commit dc5feca

Browse files
authored
server: multi threading via task pool & crossbeam (#1026)
We're only running completions in latency sensitive threads, but we'll always want to run semantic tokens in there once we build that.
1 parent 9778980 commit dc5feca

9 files changed

Lines changed: 111 additions & 44 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/squawk_ide/src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub fn bind(db: &dyn Db, file: File) -> Binder {
3131
}
3232

3333
#[salsa::db]
34-
#[derive(Default)]
34+
#[derive(Clone, Default)]
3535
pub struct Database {
3636
storage: Storage<Self>,
3737
}

crates/squawk_server/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ line-index.workspace = true
3131
insta.workspace = true
3232
etcetera.workspace = true
3333
rustc-hash.workspace = true
34+
squawk-thread.workspace = true
35+
crossbeam-channel.workspace = true
3436

3537
[lints]
3638
workspace = true

crates/squawk_server/src/dispatch.rs

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,18 @@ use anyhow::Result;
44
use log::{error, info};
55
use lsp_server::{Connection, Message, Response};
66
use lsp_types::{notification::Notification as LspNotification, request::Request as LspRequest};
7+
use squawk_thread::ThreadIntent;
78

8-
use crate::system::System;
9+
use crate::system::{GlobalState, MutableSystem, System};
910

1011
pub(crate) struct RequestDispatcher<'a> {
11-
connection: &'a Connection,
1212
req: Option<lsp_server::Request>,
13-
system: &'a dyn System,
13+
system: &'a mut GlobalState,
1414
}
1515

1616
impl<'a> RequestDispatcher<'a> {
17-
pub(crate) fn new(
18-
connection: &'a Connection,
19-
req: lsp_server::Request,
20-
system: &'a dyn System,
21-
) -> Self {
17+
pub(crate) fn new(req: lsp_server::Request, system: &'a mut GlobalState) -> Self {
2218
Self {
23-
connection,
2419
req: Some(req),
2520
system,
2621
}
@@ -41,7 +36,7 @@ impl<'a> RequestDispatcher<'a> {
4136
lsp_server::ErrorCode::ParseError as i32,
4237
err.to_string(),
4338
);
44-
if let Err(err) = self.connection.sender.send(Message::Response(response)) {
39+
if let Err(err) = self.system.sender.send(Message::Response(response)) {
4540
error!("Failed to send parse error response: {err}");
4641
}
4742
None
@@ -50,25 +45,54 @@ impl<'a> RequestDispatcher<'a> {
5045
}
5146

5247
pub(crate) fn on<R>(
48+
self,
49+
handler: fn(&dyn System, R::Params) -> Result<R::Result>,
50+
) -> Result<Self>
51+
where
52+
R: LspRequest,
53+
R::Params: Send + 'static,
54+
{
55+
self.on_with_thread_intent::<R>(ThreadIntent::Worker, handler)
56+
}
57+
58+
pub(crate) fn on_latency_sensitive<R>(
59+
self,
60+
handler: fn(&dyn System, R::Params) -> Result<R::Result>,
61+
) -> Result<Self>
62+
where
63+
R: LspRequest,
64+
R::Params: Send + 'static,
65+
{
66+
self.on_with_thread_intent::<R>(ThreadIntent::LatencySensitive, handler)
67+
}
68+
69+
fn on_with_thread_intent<R>(
5370
mut self,
71+
intent: ThreadIntent,
5472
handler: fn(&dyn System, R::Params) -> Result<R::Result>,
5573
) -> Result<Self>
5674
where
5775
R: LspRequest,
76+
R::Params: Send + 'static,
5877
{
5978
if let Some((id, params)) = self.parse::<R>() {
60-
let resp = match handler(self.system, params) {
61-
Ok(result) => Response::new_ok(id, result),
62-
Err(err) => {
63-
error!("Request handler failed: {err}");
64-
Response::new_err(
65-
id,
66-
lsp_server::ErrorCode::InternalError as i32,
67-
err.to_string(),
68-
)
69-
}
70-
};
71-
self.connection.sender.send(Message::Response(resp))?;
79+
let snapshot = self.system.snapshot();
80+
81+
self.system.task_pool.spawn(intent, move || {
82+
let resp = match handler(&snapshot, params) {
83+
Ok(result) => Response::new_ok(id, result),
84+
Err(err) => {
85+
error!("Request handler failed: {err}");
86+
Response::new_err(
87+
id,
88+
lsp_server::ErrorCode::InternalError as i32,
89+
err.to_string(),
90+
)
91+
}
92+
};
93+
94+
Message::Response(resp)
95+
});
7296
}
7397

7498
Ok(self)
@@ -84,14 +108,14 @@ impl<'a> RequestDispatcher<'a> {
84108
pub(crate) struct NotificationDispatcher<'a> {
85109
connection: &'a Connection,
86110
notif: Option<lsp_server::Notification>,
87-
system: &'a mut dyn System,
111+
system: &'a mut dyn MutableSystem,
88112
}
89113

90114
impl<'a> NotificationDispatcher<'a> {
91115
pub(crate) fn new(
92116
connection: &'a Connection,
93117
notif: lsp_server::Notification,
94-
system: &'a mut dyn System,
118+
system: &'a mut dyn MutableSystem,
95119
) -> Self {
96120
Self {
97121
connection,
@@ -119,7 +143,7 @@ impl<'a> NotificationDispatcher<'a> {
119143

120144
pub(crate) fn on<N>(
121145
mut self,
122-
handler: fn(&Connection, N::Params, &mut dyn System) -> Result<()>,
146+
handler: fn(&Connection, N::Params, &mut dyn MutableSystem) -> Result<()>,
123147
) -> Result<Self>
124148
where
125149
N: LspNotification,

crates/squawk_server/src/handlers/notifications.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use lsp_types::{
77
};
88

99
use crate::lsp_utils;
10-
use crate::system::System;
10+
use crate::system::MutableSystem;
1111

1212
pub(crate) fn handle_did_open(
1313
_connection: &Connection,
1414
params: DidOpenTextDocumentParams,
15-
system: &mut dyn System,
15+
system: &mut dyn MutableSystem,
1616
) -> Result<()> {
1717
let uri = params.text_document.uri;
1818
let content = params.text_document.text;
@@ -25,7 +25,7 @@ pub(crate) fn handle_did_open(
2525
pub(crate) fn handle_did_change(
2626
_connection: &Connection,
2727
params: DidChangeTextDocumentParams,
28-
system: &mut dyn System,
28+
system: &mut dyn MutableSystem,
2929
) -> Result<()> {
3030
let uri = params.text_document.uri;
3131

@@ -43,7 +43,7 @@ pub(crate) fn handle_did_change(
4343
pub(crate) fn handle_did_close(
4444
connection: &Connection,
4545
params: DidCloseTextDocumentParams,
46-
system: &mut dyn System,
46+
system: &mut dyn MutableSystem,
4747
) -> Result<()> {
4848
let uri = params.text_document.uri;
4949

crates/squawk_server/src/server.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::num::NonZeroUsize;
2+
13
use anyhow::Result;
24
use log::info;
35
use lsp_server::{Connection, Message};
@@ -90,7 +92,8 @@ fn main_loop(connection: Connection, params: serde_json::Value) -> Result<()> {
9092
let client_name = init_params.client_info.map(|x| x.name);
9193
info!("Client name: {client_name:?}");
9294

93-
let mut system = GlobalState::new();
95+
let threads = std::thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
96+
let mut system = GlobalState::new(connection.sender.clone(), threads);
9497

9598
for msg in &connection.receiver {
9699
match msg {
@@ -102,15 +105,15 @@ fn main_loop(connection: Connection, params: serde_json::Value) -> Result<()> {
102105
return Ok(());
103106
}
104107

105-
RequestDispatcher::new(&connection, req, &system)
108+
RequestDispatcher::new(req, &mut system)
106109
.on::<GotoDefinition>(handle_goto_definition)?
107110
.on::<HoverRequest>(handle_hover)?
108111
.on::<CodeActionRequest>(handle_code_action)?
109112
.on::<SelectionRangeRequest>(handle_selection_range)?
110113
.on::<InlayHintRequest>(handle_inlay_hints)?
111114
.on::<DocumentSymbolRequest>(handle_document_symbol)?
112115
.on::<FoldingRangeRequest>(handle_folding_range)?
113-
.on::<Completion>(handle_completion)?
116+
.on_latency_sensitive::<Completion>(handle_completion)?
114117
.on::<DocumentDiagnosticRequest>(handle_document_diagnostic)?
115118
.on::<SyntaxTreeRequest>(handle_syntax_tree)?
116119
.on::<TokensRequest>(handle_tokens)?

crates/squawk_server/src/system.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,84 @@
1+
use std::{num::NonZeroUsize, sync::Arc};
2+
3+
use crossbeam_channel::Sender;
4+
use lsp_server::Message;
15
use lsp_types::Url;
26
use rustc_hash::FxHashMap;
37
use salsa::Setter;
48
use squawk_ide::db::{Database, File};
9+
use squawk_thread::TaskPool;
510

611
pub(crate) trait System {
712
fn db(&self) -> &Database;
813
fn file(&self, uri: &Url) -> Option<File>;
14+
}
15+
16+
pub(crate) trait MutableSystem: System {
917
fn set(&mut self, uri: Url, content: String);
1018
fn remove(&mut self, uri: &Url);
1119
}
1220

21+
pub(crate) struct Snapshot {
22+
db: Database,
23+
files: Arc<FxHashMap<Url, File>>,
24+
}
25+
26+
impl System for Snapshot {
27+
fn db(&self) -> &Database {
28+
&self.db
29+
}
30+
31+
fn file(&self, uri: &Url) -> Option<File> {
32+
self.files.get(uri).copied()
33+
}
34+
}
35+
1336
pub(super) struct GlobalState {
14-
pub db: Database,
15-
files: FxHashMap<Url, File>,
37+
db: Database,
38+
files: Arc<FxHashMap<Url, File>>,
39+
pub(crate) sender: Sender<Message>,
40+
pub(crate) task_pool: TaskPool<Message>,
1641
}
1742

1843
impl GlobalState {
19-
pub(super) fn new() -> Self {
44+
pub(super) fn new(sender: Sender<Message>, threads: NonZeroUsize) -> Self {
2045
Self {
2146
db: Database::default(),
22-
files: FxHashMap::default(),
47+
files: Arc::new(FxHashMap::default()),
48+
task_pool: TaskPool::new_with_threads(sender.clone(), threads),
49+
sender,
50+
}
51+
}
52+
53+
pub(crate) fn snapshot(&self) -> Snapshot {
54+
Snapshot {
55+
db: self.db.clone(),
56+
files: self.files.clone(),
2357
}
2458
}
2559
}
2660

2761
impl System for GlobalState {
2862
fn db(&self) -> &Database {
29-
return &self.db;
63+
&self.db
3064
}
3165

3266
fn file(&self, uri: &Url) -> Option<File> {
3367
self.files.get(uri).copied()
3468
}
69+
}
3570

71+
impl MutableSystem for GlobalState {
3672
fn set(&mut self, uri: Url, content: String) {
3773
if let Some(file) = self.files.get(&uri).copied() {
3874
file.set_content(&mut self.db).to(content.into());
3975
} else {
4076
let file = File::new(&self.db, content.into());
41-
self.files.insert(uri, file);
77+
Arc::make_mut(&mut self.files).insert(uri, file);
4278
}
4379
}
4480

4581
fn remove(&mut self, uri: &Url) {
46-
self.files.remove(uri);
82+
Arc::make_mut(&mut self.files).remove(uri);
4783
}
4884
}

crates/squawk_thread/src/pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl Pool {
5454

5555
let mut handles = Vec::with_capacity(threads.into());
5656
for idx in 0..threads.into() {
57-
let handle = Builder::new(INITIAL_INTENT, format!("squawk:worker:{idx}",))
57+
let handle = Builder::new(INITIAL_INTENT, format!("Worker:{idx}",))
5858
.stack_size(STACK_SIZE)
5959
.allow_leak(true)
6060
.spawn({

crates/squawk_thread/src/taskpool.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// https://github.com/rust-lang/rust-analyzer/blob/2efc80078029894eec0699f62ec8d5c1a56af763/crates/rust-analyzer/src/task_pool.rs#L6C19-L6C19
33
//! A thin wrapper around [`crate::Pool`] which threads a sender through spawned jobs.
44
5-
use std::{num::NonZeroUsize, panic::UnwindSafe};
5+
use std::num::NonZeroUsize;
66

77
use crossbeam_channel::Sender;
88

@@ -23,7 +23,7 @@ impl<T> TaskPool<T> {
2323

2424
pub fn spawn<F>(&mut self, intent: ThreadIntent, task: F)
2525
where
26-
F: FnOnce() -> T + Send + UnwindSafe + 'static,
26+
F: FnOnce() -> T + Send + 'static,
2727
T: Send + 'static,
2828
{
2929
self.pool.spawn(intent, {
@@ -34,7 +34,7 @@ impl<T> TaskPool<T> {
3434

3535
pub fn spawn_with_sender<F>(&mut self, intent: ThreadIntent, task: F)
3636
where
37-
F: FnOnce(Sender<T>) + Send + UnwindSafe + 'static,
37+
F: FnOnce(Sender<T>) + Send + 'static,
3838
T: Send + 'static,
3939
{
4040
self.pool.spawn(intent, {

0 commit comments

Comments
 (0)