From 00d7140f5442f3b6737a2d73100f75ceec1c2c80 Mon Sep 17 00:00:00 2001 From: Werdgames <243669902+vcheckk@users.noreply.github.com> Date: Fri, 12 Jun 2026 19:40:10 -0700 Subject: [PATCH 1/2] fix(bridge): split notification cache lock --- crates/mcpls-core/src/bridge/mod.rs | 3 +- crates/mcpls-core/src/lib.rs | 105 ++++++++++++------ crates/mcpls-core/src/mcp/handlers.rs | 10 +- crates/mcpls-core/src/mcp/server.rs | 146 +++++++++++++++++++++----- crates/mcpls-core/src/transport.rs | 10 +- 5 files changed, 212 insertions(+), 62 deletions(-) diff --git a/crates/mcpls-core/src/bridge/mod.rs b/crates/mcpls-core/src/bridge/mod.rs index e6830bb..505e2fc 100644 --- a/crates/mcpls-core/src/bridge/mod.rs +++ b/crates/mcpls-core/src/bridge/mod.rs @@ -18,5 +18,6 @@ pub use state::{DocumentState, DocumentTracker, path_to_uri, uri_to_path}; pub use translator::{ Completion, CompletionsResult, DefinitionResult, Diagnostic, DiagnosticSeverity, DiagnosticsResult, DocumentChanges, DocumentSymbolsResult, FormatDocumentResult, HoverResult, - Location, Position2D, Range, ReferencesResult, RenameResult, Symbol, TextEdit, Translator, + Location, Position2D, Range, ReferencesResult, RenameResult, ServerLogsResult, + ServerMessagesResult, Symbol, TextEdit, Translator, }; diff --git a/crates/mcpls-core/src/lib.rs b/crates/mcpls-core/src/lib.rs index b225dbf..e9bf387 100644 --- a/crates/mcpls-core/src/lib.rs +++ b/crates/mcpls-core/src/lib.rs @@ -44,7 +44,7 @@ use std::path::PathBuf; use std::sync::Arc; use bridge::resources::make_uri; -use bridge::{ResourceSubscriptions, Translator}; +use bridge::{NotificationCache, ResourceSubscriptions, Translator}; pub use config::ServerConfig; pub use error::Error; use lsp::{LspNotification, LspServer, ServerInitConfig}; @@ -72,14 +72,10 @@ use transport::run_stdio; /// - The cancellation watch fires (or the sender is dropped). /// - `notify_resource_updated` returns an error (peer disconnect / transport closed). /// -/// # Note on lock contention (TODO critic-S4) -/// All cache writes acquire `Arc>`, which is the same lock used -/// by every MCP tool call. Splitting `NotificationCache` into its own `Arc` -/// would eliminate this contention. Tracked as a P2 follow-up. pub(crate) async fn diagnostics_pump( _lang: String, mut rx: tokio::sync::mpsc::Receiver, - translator: Arc>, + notification_cache: Arc>, subs: Arc, peer_cell: Arc>>, mut cancel_rx: tokio::sync::watch::Receiver, @@ -99,9 +95,8 @@ pub(crate) async fn diagnostics_pump( LspNotification::PublishDiagnostics(p) => { // Always cache unconditionally. { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_diagnostics(&p.uri, p.version, p.diagnostics); + let mut cache = notification_cache.lock().await; + cache.store_diagnostics(&p.uri, p.version, p.diagnostics); } // Fast path: skip URI construction when nothing is subscribed. @@ -133,14 +128,12 @@ pub(crate) async fn diagnostics_pump( } } LspNotification::LogMessage(m) => { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_log(m.typ.into(), m.message); + let mut cache = notification_cache.lock().await; + cache.store_log(m.typ.into(), m.message); } LspNotification::ShowMessage(m) => { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_message(m.typ.into(), m.message); + let mut cache = notification_cache.lock().await; + cache.store_message(m.typ.into(), m.message); } LspNotification::Progress { .. } | LspNotification::Other { .. } => {} } @@ -346,6 +339,7 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() } let translator = Arc::new(Mutex::new(translator)); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subscriptions = Arc::new(ResourceSubscriptions::new()); // Peer cell is populated after the MCP transport is established (Phase B). let peer_cell = Arc::new(OnceCell::new()); @@ -360,7 +354,7 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() pumps.spawn(diagnostics_pump( lang, rx, - Arc::clone(&translator), + Arc::clone(¬ification_cache), Arc::clone(&subscriptions), Arc::clone(&peer_cell), cancel_rx.clone(), @@ -368,7 +362,11 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() } info!("Starting MCP server with rmcp..."); - let mcp_server = mcp::McplsServer::new(Arc::clone(&translator), Arc::clone(&subscriptions)); + let mcp_server = mcp::McplsServer::new( + Arc::clone(&translator), + Arc::clone(¬ification_cache), + Arc::clone(&subscriptions), + ); info!("MCPLS server initialized successfully"); let result = match transport { @@ -722,8 +720,8 @@ mod tests { use super::*; - fn make_translator() -> Arc> { - Arc::new(Mutex::new(Translator::new())) + fn make_notification_cache() -> Arc> { + Arc::new(Mutex::new(NotificationCache::new())) } fn make_subs() -> Arc { @@ -739,7 +737,7 @@ mod tests { /// `PublishDiagnostics` is cached even when the peer is not yet connected. #[tokio::test] async fn test_pump_caches_before_peer_set() { - let translator = make_translator(); + let notification_cache = make_notification_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (tx, rx) = mpsc::channel(8); @@ -747,11 +745,10 @@ mod tests { // which makes the pump exit before processing any messages. let (_cancel_tx, cancel_rx) = watch::channel(false); - let t = Arc::clone(&translator); tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - t, + Arc::clone(¬ification_cache), Arc::clone(&subs), Arc::clone(&peer_cell), cancel_rx, @@ -774,11 +771,8 @@ mod tests { loop { tokio::task::yield_now().await; let found = { - let guard = translator.lock().await; - guard - .notification_cache() - .get_diagnostics(uri.as_str()) - .is_some() + let cache = notification_cache.lock().await; + cache.get_diagnostics(uri.as_str()).is_some() }; if found { return true; @@ -791,10 +785,59 @@ mod tests { assert!(cached, "diagnostics should be cached before peer is set"); } + /// Cache writes must not wait behind the translator lock. + #[tokio::test] + async fn test_pump_caches_while_translator_locked() { + let translator = Arc::new(Mutex::new(Translator::new())); + let _translator_guard = translator.lock().await; + let notification_cache = make_notification_cache(); + let subs = make_subs(); + let peer_cell = make_peer_cell(); + let (tx, rx) = mpsc::channel(8); + let (_cancel_tx, cancel_rx) = watch::channel(false); + + tokio::spawn(diagnostics_pump( + "rust".to_string(), + rx, + Arc::clone(¬ification_cache), + Arc::clone(&subs), + Arc::clone(&peer_cell), + cancel_rx, + )); + + let uri: Uri = "file:///test/locked.rs".parse().unwrap(); + tx.send(LspNotification::PublishDiagnostics( + PublishDiagnosticsParams { + uri: uri.clone(), + diagnostics: vec![], + version: None, + }, + )) + .await + .unwrap(); + drop(tx); + + tokio::time::timeout(std::time::Duration::from_secs(1), async { + loop { + tokio::task::yield_now().await; + let found = { + let cache = notification_cache.lock().await; + cache.get_diagnostics(uri.as_str()).is_some() + }; + if found { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + }) + .await + .expect("pump should cache without acquiring translator lock"); + } + /// Pump exits cleanly when the cancel watch sends `true`. #[tokio::test] async fn test_pump_exits_on_cancel() { - let translator = make_translator(); + let notification_cache = make_notification_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (_tx, rx) = mpsc::channel::(8); @@ -803,7 +846,7 @@ mod tests { let handle = tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - translator, + notification_cache, subs, peer_cell, cancel_rx, @@ -820,7 +863,7 @@ mod tests { /// Pump exits when the cancel sender is dropped (Err branch). #[tokio::test] async fn test_pump_exits_when_cancel_sender_dropped() { - let translator = make_translator(); + let notification_cache = make_notification_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (_tx, rx) = mpsc::channel::(8); @@ -829,7 +872,7 @@ mod tests { let handle = tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - translator, + notification_cache, subs, peer_cell, cancel_rx, diff --git a/crates/mcpls-core/src/mcp/handlers.rs b/crates/mcpls-core/src/mcp/handlers.rs index 565ab7f..48b796d 100644 --- a/crates/mcpls-core/src/mcp/handlers.rs +++ b/crates/mcpls-core/src/mcp/handlers.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use tokio::sync::Mutex; -use crate::bridge::{ResourceSubscriptions, Translator}; +use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator}; /// Shared context for all tool handlers. /// @@ -18,6 +18,8 @@ use crate::bridge::{ResourceSubscriptions, Translator}; pub struct HandlerContext { /// Translator for converting MCP calls to LSP requests. pub translator: Arc>, + /// Shared cache for diagnostics, logs, and server messages. + pub notification_cache: Arc>, /// Set of resource URIs the MCP client has subscribed to. pub subscriptions: Arc, } @@ -27,10 +29,12 @@ impl HandlerContext { #[must_use] pub const fn new( translator: Arc>, + notification_cache: Arc>, subscriptions: Arc, ) -> Self { Self { translator, + notification_cache, subscriptions, } } @@ -44,8 +48,10 @@ mod tests { #[test] fn test_handler_context_creation() { let translator = Arc::new(Mutex::new(Translator::new())); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subscriptions = Arc::new(ResourceSubscriptions::new()); - let context = HandlerContext::new(translator, subscriptions); + let context = HandlerContext::new(translator, notification_cache, subscriptions); assert!(Arc::strong_count(&context.translator) == 1); + assert!(Arc::strong_count(&context.notification_cache) == 1); } } diff --git a/crates/mcpls-core/src/mcp/server.rs b/crates/mcpls-core/src/mcp/server.rs index 51d2d65..6d82f07 100644 --- a/crates/mcpls-core/src/mcp/server.rs +++ b/crates/mcpls-core/src/mcp/server.rs @@ -23,7 +23,99 @@ use super::tools::{ ServerLogsParams, ServerMessagesParams, SignatureHelpParams, WorkspaceSymbolParams, }; use crate::bridge::resources::{make_uri, parse_uri}; -use crate::bridge::{ResourceSubscriptions, Translator}; +use crate::bridge::{ + Diagnostic, DiagnosticSeverity, DiagnosticsResult, LogLevel, NotificationCache, Position2D, + Range, ResourceSubscriptions, ServerLogsResult, ServerMessagesResult, Translator, +}; +use crate::error::Result as McplsResult; + +fn normalize_lsp_range(range: lsp_types::Range) -> Range { + Range { + start: Position2D { + line: range.start.line + 1, + character: range.start.character + 1, + }, + end: Position2D { + line: range.end.line + 1, + character: range.end.character + 1, + }, + } +} + +fn cached_diagnostics_from_cache(cache: &NotificationCache, uri: &str) -> DiagnosticsResult { + let diagnostics = cache + .get_diagnostics(uri) + .map_or_else(Vec::new, |diag_info| { + diag_info + .diagnostics + .iter() + .map(|diag| Diagnostic { + range: normalize_lsp_range(diag.range), + severity: match diag.severity { + Some(lsp_types::DiagnosticSeverity::ERROR) => DiagnosticSeverity::Error, + Some(lsp_types::DiagnosticSeverity::WARNING) => DiagnosticSeverity::Warning, + Some(lsp_types::DiagnosticSeverity::INFORMATION) => { + DiagnosticSeverity::Information + } + Some(lsp_types::DiagnosticSeverity::HINT) => DiagnosticSeverity::Hint, + _ => DiagnosticSeverity::Information, + }, + message: diag.message.clone(), + code: diag.code.as_ref().map(|c| match c { + lsp_types::NumberOrString::Number(n) => n.to_string(), + lsp_types::NumberOrString::String(s) => s.clone(), + }), + }) + .collect() + }); + + DiagnosticsResult { diagnostics } +} + +fn server_logs_from_cache( + cache: &NotificationCache, + limit: usize, + min_level: Option, +) -> McplsResult { + let min_level_filter = if let Some(level_str) = min_level { + let level = match level_str.to_lowercase().as_str() { + "error" => LogLevel::Error, + "warning" => LogLevel::Warning, + "info" => LogLevel::Info, + "debug" => LogLevel::Debug, + _ => { + return Err(crate::Error::InvalidToolParams(format!( + "Invalid min_level: '{level_str}'. Valid values: error, warning, info, debug" + ))); + } + }; + Some(level) + } else { + None + }; + + let logs = cache + .get_logs() + .iter() + .filter(|log| { + min_level_filter.is_none_or(|min| match min { + LogLevel::Error => matches!(log.level, LogLevel::Error), + LogLevel::Warning => matches!(log.level, LogLevel::Error | LogLevel::Warning), + LogLevel::Info => !matches!(log.level, LogLevel::Debug), + LogLevel::Debug => true, + }) + }) + .take(limit) + .cloned() + .collect(); + + Ok(ServerLogsResult { logs }) +} + +fn server_messages_from_cache(cache: &NotificationCache, limit: usize) -> ServerMessagesResult { + let messages = cache.get_messages().iter().take(limit).cloned().collect(); + ServerMessagesResult { messages } +} /// MCP server that exposes LSP capabilities as tools. #[derive(Clone)] @@ -37,9 +129,14 @@ impl McplsServer { #[must_use] pub fn new( translator: Arc>, + notification_cache: Arc>, subscriptions: Arc, ) -> Self { - let context = Arc::new(HandlerContext::new(translator, subscriptions)); + let context = Arc::new(HandlerContext::new( + translator, + notification_cache, + subscriptions, + )); Self { context } } @@ -376,16 +473,22 @@ impl McplsServer { &self, Parameters(CachedDiagnosticsParams { file_path }): Parameters, ) -> Result { + let uri = { + let translator = self.context.translator.lock().await; + let path = std::path::PathBuf::from(&file_path); + let validated_path = translator + .validate_path(&path) + .map_err(|e| McpError::invalid_params(e.to_string(), None))?; + crate::bridge::path_to_uri(&validated_path).to_string() + }; + let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_cached_diagnostics(&file_path) + let cache = self.context.notification_cache.lock().await; + cached_diagnostics_from_cache(&cache, &uri) }; - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serde_json::to_string(&result) + .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)) } /// Get recent LSP server log messages. @@ -397,8 +500,8 @@ impl McplsServer { Parameters(ServerLogsParams { limit, min_level }): Parameters, ) -> Result { let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_server_logs(limit, min_level) + let cache = self.context.notification_cache.lock().await; + server_logs_from_cache(&cache, limit, min_level) }; match result { @@ -417,15 +520,12 @@ impl McplsServer { Parameters(ServerMessagesParams { limit }): Parameters, ) -> Result { let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_server_messages(limit) + let cache = self.context.notification_cache.lock().await; + server_messages_from_cache(&cache, limit) }; - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serde_json::to_string(&result) + .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)) } /// Get signature help at a position. @@ -602,11 +702,8 @@ impl ServerHandler for McplsServer { // in the response shape. Currently both return `{"diagnostics":null}` which is // ambiguous for clients that need to know whether analysis has run yet. let diagnostics = { - let translator = self.context.translator.lock().await; - translator - .notification_cache() - .get_diagnostics(lsp_uri.as_str()) - .cloned() + let cache = self.context.notification_cache.lock().await; + cache.get_diagnostics(lsp_uri.as_str()).cloned() }; let json = serde_json::to_string(&diagnostics) @@ -692,8 +789,9 @@ mod tests { fn create_test_server() -> McplsServer { let translator = Arc::new(Mutex::new(Translator::new())); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subscriptions = Arc::new(ResourceSubscriptions::new()); - McplsServer::new(translator, subscriptions) + McplsServer::new(translator, notification_cache, subscriptions) } #[tokio::test] diff --git a/crates/mcpls-core/src/transport.rs b/crates/mcpls-core/src/transport.rs index ad5fff9..35cb610 100644 --- a/crates/mcpls-core/src/transport.rs +++ b/crates/mcpls-core/src/transport.rs @@ -250,12 +250,13 @@ mod tests { use tokio::sync::Mutex; - use crate::bridge::{ResourceSubscriptions, Translator}; + use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator}; use crate::mcp::McplsServer; let translator = Arc::new(Mutex::new(Translator::new())); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subs = Arc::new(ResourceSubscriptions::new()); - let server = McplsServer::new(translator, subs); + let server = McplsServer::new(translator, notification_cache, subs); // Bind port 0 so the OS assigns a free port. let probe = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -287,7 +288,7 @@ mod tests { use tokio::sync::Mutex; - use crate::bridge::{ResourceSubscriptions, Translator}; + use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator}; use crate::mcp::McplsServer; // Hold a listener to make the port unavailable. @@ -295,8 +296,9 @@ mod tests { let addr = occupied.local_addr().unwrap(); let translator = Arc::new(Mutex::new(Translator::new())); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subs = Arc::new(ResourceSubscriptions::new()); - let server = McplsServer::new(translator, subs); + let server = McplsServer::new(translator, notification_cache, subs); let cfg = HttpConfig { bind: addr, From 2dbb930cf902bee96cedad61113818e0681ea6bd Mon Sep 17 00:00:00 2001 From: Werdgames <243669902+vcheckk@users.noreply.github.com> Date: Fri, 12 Jun 2026 20:59:31 -0700 Subject: [PATCH 2/2] chore: satisfy clippy for cache split --- crates/mcpls-core/src/mcp/server.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/mcpls-core/src/mcp/server.rs b/crates/mcpls-core/src/mcp/server.rs index 6d82f07..734b91b 100644 --- a/crates/mcpls-core/src/mcp/server.rs +++ b/crates/mcpls-core/src/mcp/server.rs @@ -29,7 +29,7 @@ use crate::bridge::{ }; use crate::error::Result as McplsResult; -fn normalize_lsp_range(range: lsp_types::Range) -> Range { +const fn normalize_lsp_range(range: lsp_types::Range) -> Range { Range { start: Position2D { line: range.start.line + 1, @@ -473,14 +473,14 @@ impl McplsServer { &self, Parameters(CachedDiagnosticsParams { file_path }): Parameters, ) -> Result { - let uri = { + let validated_path = { let translator = self.context.translator.lock().await; let path = std::path::PathBuf::from(&file_path); - let validated_path = translator + translator .validate_path(&path) - .map_err(|e| McpError::invalid_params(e.to_string(), None))?; - crate::bridge::path_to_uri(&validated_path).to_string() + .map_err(|e| McpError::invalid_params(e.to_string(), None))? }; + let uri = crate::bridge::path_to_uri(&validated_path).to_string(); let result = { let cache = self.context.notification_cache.lock().await;