Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/mcpls-core/src/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ pub use state::{DocumentState, DocumentTracker, path_to_uri, uri_to_path};
pub use translator::{
Completion, CompletionsResult, DefinitionResult, Diagnostic, DiagnosticSeverity,
DiagnosticsResult, DocumentChanges, DocumentSymbolsResult, FormatDocumentResult, HoverResult,
Location, Position2D, Range, ReferencesResult, RenameResult, Symbol, TextEdit, Translator,
Location, Position2D, Range, ReferencesResult, RenameResult, ServerLogsResult,
ServerMessagesResult, Symbol, TextEdit, Translator,
};
105 changes: 74 additions & 31 deletions crates/mcpls-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use bridge::resources::make_uri;
use bridge::{ResourceSubscriptions, Translator};
use bridge::{NotificationCache, ResourceSubscriptions, Translator};
pub use config::ServerConfig;
pub use error::Error;
use lsp::{LspNotification, LspServer, ServerInitConfig};
Expand Down Expand Up @@ -72,14 +72,10 @@ use transport::run_stdio;
/// - The cancellation watch fires (or the sender is dropped).
/// - `notify_resource_updated` returns an error (peer disconnect / transport closed).
///
/// # Note on lock contention (TODO critic-S4)
/// All cache writes acquire `Arc<Mutex<Translator>>`, which is the same lock used
/// by every MCP tool call. Splitting `NotificationCache` into its own `Arc<RwLock>`
/// would eliminate this contention. Tracked as a P2 follow-up.
pub(crate) async fn diagnostics_pump(
_lang: String,
mut rx: tokio::sync::mpsc::Receiver<LspNotification>,
translator: Arc<Mutex<Translator>>,
notification_cache: Arc<Mutex<NotificationCache>>,
subs: Arc<ResourceSubscriptions>,
peer_cell: Arc<OnceCell<rmcp::Peer<rmcp::RoleServer>>>,
mut cancel_rx: tokio::sync::watch::Receiver<bool>,
Expand All @@ -99,9 +95,8 @@ pub(crate) async fn diagnostics_pump(
LspNotification::PublishDiagnostics(p) => {
// Always cache unconditionally.
{
let mut t = translator.lock().await;
t.notification_cache_mut()
.store_diagnostics(&p.uri, p.version, p.diagnostics);
let mut cache = notification_cache.lock().await;
cache.store_diagnostics(&p.uri, p.version, p.diagnostics);
}

// Fast path: skip URI construction when nothing is subscribed.
Expand Down Expand Up @@ -133,14 +128,12 @@ pub(crate) async fn diagnostics_pump(
}
}
LspNotification::LogMessage(m) => {
let mut t = translator.lock().await;
t.notification_cache_mut()
.store_log(m.typ.into(), m.message);
let mut cache = notification_cache.lock().await;
cache.store_log(m.typ.into(), m.message);
}
LspNotification::ShowMessage(m) => {
let mut t = translator.lock().await;
t.notification_cache_mut()
.store_message(m.typ.into(), m.message);
let mut cache = notification_cache.lock().await;
cache.store_message(m.typ.into(), m.message);
}
LspNotification::Progress { .. } | LspNotification::Other { .. } => {}
}
Expand Down Expand Up @@ -346,6 +339,7 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<()
}

let translator = Arc::new(Mutex::new(translator));
let notification_cache = Arc::new(Mutex::new(NotificationCache::new()));
let subscriptions = Arc::new(ResourceSubscriptions::new());
// Peer cell is populated after the MCP transport is established (Phase B).
let peer_cell = Arc::new(OnceCell::new());
Expand All @@ -360,15 +354,19 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<()
pumps.spawn(diagnostics_pump(
lang,
rx,
Arc::clone(&translator),
Arc::clone(&notification_cache),
Arc::clone(&subscriptions),
Arc::clone(&peer_cell),
cancel_rx.clone(),
));
}

info!("Starting MCP server with rmcp...");
let mcp_server = mcp::McplsServer::new(Arc::clone(&translator), Arc::clone(&subscriptions));
let mcp_server = mcp::McplsServer::new(
Arc::clone(&translator),
Arc::clone(&notification_cache),
Arc::clone(&subscriptions),
);
info!("MCPLS server initialized successfully");

let result = match transport {
Expand Down Expand Up @@ -722,8 +720,8 @@ mod tests {

use super::*;

fn make_translator() -> Arc<Mutex<Translator>> {
Arc::new(Mutex::new(Translator::new()))
fn make_notification_cache() -> Arc<Mutex<NotificationCache>> {
Arc::new(Mutex::new(NotificationCache::new()))
}

fn make_subs() -> Arc<ResourceSubscriptions> {
Expand All @@ -739,19 +737,18 @@ mod tests {
/// `PublishDiagnostics` is cached even when the peer is not yet connected.
#[tokio::test]
async fn test_pump_caches_before_peer_set() {
let translator = make_translator();
let notification_cache = make_notification_cache();
let subs = make_subs();
let peer_cell = make_peer_cell();
let (tx, rx) = mpsc::channel(8);
// Keep _cancel_tx alive: dropping it causes cancel_rx.changed() to return Err,
// which makes the pump exit before processing any messages.
let (_cancel_tx, cancel_rx) = watch::channel(false);

let t = Arc::clone(&translator);
tokio::spawn(diagnostics_pump(
"rust".to_string(),
rx,
t,
Arc::clone(&notification_cache),
Arc::clone(&subs),
Arc::clone(&peer_cell),
cancel_rx,
Expand All @@ -774,11 +771,8 @@ mod tests {
loop {
tokio::task::yield_now().await;
let found = {
let guard = translator.lock().await;
guard
.notification_cache()
.get_diagnostics(uri.as_str())
.is_some()
let cache = notification_cache.lock().await;
cache.get_diagnostics(uri.as_str()).is_some()
};
if found {
return true;
Expand All @@ -791,10 +785,59 @@ mod tests {
assert!(cached, "diagnostics should be cached before peer is set");
}

/// Cache writes must not wait behind the translator lock.
#[tokio::test]
async fn test_pump_caches_while_translator_locked() {
let translator = Arc::new(Mutex::new(Translator::new()));
let _translator_guard = translator.lock().await;
let notification_cache = make_notification_cache();
let subs = make_subs();
let peer_cell = make_peer_cell();
let (tx, rx) = mpsc::channel(8);
let (_cancel_tx, cancel_rx) = watch::channel(false);

tokio::spawn(diagnostics_pump(
"rust".to_string(),
rx,
Arc::clone(&notification_cache),
Arc::clone(&subs),
Arc::clone(&peer_cell),
cancel_rx,
));

let uri: Uri = "file:///test/locked.rs".parse().unwrap();
tx.send(LspNotification::PublishDiagnostics(
PublishDiagnosticsParams {
uri: uri.clone(),
diagnostics: vec![],
version: None,
},
))
.await
.unwrap();
drop(tx);

tokio::time::timeout(std::time::Duration::from_secs(1), async {
loop {
tokio::task::yield_now().await;
let found = {
let cache = notification_cache.lock().await;
cache.get_diagnostics(uri.as_str()).is_some()
};
if found {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
})
.await
.expect("pump should cache without acquiring translator lock");
}

/// Pump exits cleanly when the cancel watch sends `true`.
#[tokio::test]
async fn test_pump_exits_on_cancel() {
let translator = make_translator();
let notification_cache = make_notification_cache();
let subs = make_subs();
let peer_cell = make_peer_cell();
let (_tx, rx) = mpsc::channel::<LspNotification>(8);
Expand All @@ -803,7 +846,7 @@ mod tests {
let handle = tokio::spawn(diagnostics_pump(
"rust".to_string(),
rx,
translator,
notification_cache,
subs,
peer_cell,
cancel_rx,
Expand All @@ -820,7 +863,7 @@ mod tests {
/// Pump exits when the cancel sender is dropped (Err branch).
#[tokio::test]
async fn test_pump_exits_when_cancel_sender_dropped() {
let translator = make_translator();
let notification_cache = make_notification_cache();
let subs = make_subs();
let peer_cell = make_peer_cell();
let (_tx, rx) = mpsc::channel::<LspNotification>(8);
Expand All @@ -829,7 +872,7 @@ mod tests {
let handle = tokio::spawn(diagnostics_pump(
"rust".to_string(),
rx,
translator,
notification_cache,
subs,
peer_cell,
cancel_rx,
Expand Down
10 changes: 8 additions & 2 deletions crates/mcpls-core/src/mcp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;

use tokio::sync::Mutex;

use crate::bridge::{ResourceSubscriptions, Translator};
use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator};

/// Shared context for all tool handlers.
///
Expand All @@ -18,6 +18,8 @@ use crate::bridge::{ResourceSubscriptions, Translator};
pub struct HandlerContext {
/// Translator for converting MCP calls to LSP requests.
pub translator: Arc<Mutex<Translator>>,
/// Shared cache for diagnostics, logs, and server messages.
pub notification_cache: Arc<Mutex<NotificationCache>>,
/// Set of resource URIs the MCP client has subscribed to.
pub subscriptions: Arc<ResourceSubscriptions>,
}
Expand All @@ -27,10 +29,12 @@ impl HandlerContext {
#[must_use]
pub const fn new(
translator: Arc<Mutex<Translator>>,
notification_cache: Arc<Mutex<NotificationCache>>,
subscriptions: Arc<ResourceSubscriptions>,
) -> Self {
Self {
translator,
notification_cache,
subscriptions,
}
}
Expand All @@ -44,8 +48,10 @@ mod tests {
#[test]
fn test_handler_context_creation() {
let translator = Arc::new(Mutex::new(Translator::new()));
let notification_cache = Arc::new(Mutex::new(NotificationCache::new()));
let subscriptions = Arc::new(ResourceSubscriptions::new());
let context = HandlerContext::new(translator, subscriptions);
let context = HandlerContext::new(translator, notification_cache, subscriptions);
assert!(Arc::strong_count(&context.translator) == 1);
assert!(Arc::strong_count(&context.notification_cache) == 1);
}
}
Loading