diff --git a/crates/mcpls-core/src/bridge/mod.rs b/crates/mcpls-core/src/bridge/mod.rs index e6830bb..505e2fc 100644 --- a/crates/mcpls-core/src/bridge/mod.rs +++ b/crates/mcpls-core/src/bridge/mod.rs @@ -18,5 +18,6 @@ pub use state::{DocumentState, DocumentTracker, path_to_uri, uri_to_path}; pub use translator::{ Completion, CompletionsResult, DefinitionResult, Diagnostic, DiagnosticSeverity, DiagnosticsResult, DocumentChanges, DocumentSymbolsResult, FormatDocumentResult, HoverResult, - Location, Position2D, Range, ReferencesResult, RenameResult, Symbol, TextEdit, Translator, + Location, Position2D, Range, ReferencesResult, RenameResult, ServerLogsResult, + ServerMessagesResult, Symbol, TextEdit, Translator, }; diff --git a/crates/mcpls-core/src/bridge/translator.rs b/crates/mcpls-core/src/bridge/translator.rs index e0a6de7..93875c0 100644 --- a/crates/mcpls-core/src/bridge/translator.rs +++ b/crates/mcpls-core/src/bridge/translator.rs @@ -16,6 +16,7 @@ use lsp_types::{ WorkspaceSymbolParams as LspWorkspaceSymbolParams, }; use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; use tokio::time::Duration; use super::state::{ResourceLimits, detect_language, path_to_uri}; @@ -32,7 +33,7 @@ pub struct Translator { /// LSP servers indexed by language ID (held for lifetime management). lsp_servers: HashMap, /// Document state tracker. - document_tracker: DocumentTracker, + document_tracker: Mutex, /// Notification cache for LSP server notifications. notification_cache: NotificationCache, /// Allowed workspace roots for path validation. @@ -48,7 +49,10 @@ impl Translator { Self { lsp_clients: HashMap::new(), lsp_servers: HashMap::new(), - document_tracker: DocumentTracker::new(ResourceLimits::default(), HashMap::new()), + document_tracker: Mutex::new(DocumentTracker::new( + ResourceLimits::default(), + HashMap::new(), + )), notification_cache: NotificationCache::new(), workspace_roots: vec![], extension_map: HashMap::new(), @@ -66,7 +70,7 @@ impl Translator { /// to use the same mappings for language detection. #[must_use] pub fn with_extensions(mut self, extension_map: HashMap) -> Self { - self.document_tracker = + *self.document_tracker.get_mut() = DocumentTracker::new(ResourceLimits::default(), extension_map.clone()); self.extension_map = extension_map; self @@ -84,13 +88,13 @@ impl Translator { /// Get the document tracker. #[must_use] - pub const fn document_tracker(&self) -> &DocumentTracker { + pub const fn document_tracker(&self) -> &Mutex { &self.document_tracker } /// Get a mutable reference to the document tracker. - pub const fn document_tracker_mut(&mut self) -> &mut DocumentTracker { - &mut self.document_tracker + pub fn document_tracker_mut(&mut self) -> &mut DocumentTracker { + self.document_tracker.get_mut() } /// Get the notification cache. @@ -605,7 +609,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_hover( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -615,6 +619,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -653,7 +659,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_definition( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -663,6 +669,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -713,7 +721,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_references( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -724,6 +732,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -765,12 +775,14 @@ impl Translator { /// # Errors /// /// Returns an error if the LSP request fails or the file cannot be opened. - pub async fn handle_diagnostics(&mut self, file_path: String) -> Result { + pub async fn handle_diagnostics(&self, file_path: String) -> Result { let path = PathBuf::from(&file_path); let validated_path = self.validate_path(&path)?; let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; @@ -823,7 +835,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_rename( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -834,6 +846,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -919,7 +933,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_completions( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -930,6 +944,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -984,7 +1000,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_document_symbols( - &mut self, + &self, file_path: String, ) -> Result { let path = PathBuf::from(&file_path); @@ -992,6 +1008,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; @@ -1032,7 +1050,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_format_document( - &mut self, + &self, file_path: String, tab_size: u32, insert_spaces: bool, @@ -1042,6 +1060,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; @@ -1081,7 +1101,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or no server is configured. pub async fn handle_workspace_symbol( - &mut self, + &self, query: String, kind_filter: Option, limit: u32, @@ -1185,7 +1205,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_code_actions( - &mut self, + &self, file_path: String, start_line: u32, start_character: u32, @@ -1206,6 +1226,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; @@ -1273,7 +1295,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_call_hierarchy_prepare( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -1296,6 +1318,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -1333,7 +1357,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the item is invalid. pub async fn handle_incoming_calls( - &mut self, + &self, item: serde_json::Value, ) -> Result { // Deserialize as our own type (1-based coords) then convert to LSP (0-based). @@ -1382,7 +1406,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the item is invalid. pub async fn handle_outgoing_calls( - &mut self, + &self, item: serde_json::Value, ) -> Result { // Deserialize as our own type (1-based coords) then convert to LSP (0-based). @@ -1542,7 +1566,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_signature_help( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -1552,6 +1576,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -1615,7 +1641,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_implementation( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -1625,6 +1651,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -1657,7 +1685,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_type_definition( - &mut self, + &self, file_path: String, line: u32, character: u32, @@ -1667,6 +1695,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; let lsp_position = mcp_to_lsp_position(line, character); @@ -1699,7 +1729,7 @@ impl Translator { /// /// Returns an error if the LSP request fails or the file cannot be opened. pub async fn handle_inlay_hints( - &mut self, + &self, file_path: String, start_line: u32, start_character: u32, @@ -1713,6 +1743,8 @@ impl Translator { let client = self.get_client_for_file(&validated_path)?; let uri = self .document_tracker + .lock() + .await .ensure_open(&validated_path, &client) .await?; @@ -2198,7 +2230,7 @@ mod tests { #[tokio::test] async fn test_handle_workspace_symbol_no_server() { - let mut translator = Translator::new(); + let translator = Translator::new(); let result = translator .handle_workspace_symbol("test".to_string(), None, 100) .await; @@ -2207,7 +2239,7 @@ mod tests { #[tokio::test] async fn test_handle_code_actions_invalid_kind() { - let mut translator = Translator::new(); + let translator = Translator::new(); let result = translator .handle_code_actions( "/tmp/test.rs".to_string(), @@ -2225,7 +2257,7 @@ mod tests { async fn test_handle_code_actions_valid_kind_quickfix() { use tempfile::TempDir; - let mut translator = Translator::new(); + let translator = Translator::new(); let temp_dir = TempDir::new().unwrap(); let test_file = temp_dir.path().join("test.rs"); fs::write(&test_file, "fn main() {}").unwrap(); @@ -2249,7 +2281,7 @@ mod tests { async fn test_handle_code_actions_valid_kind_refactor() { use tempfile::TempDir; - let mut translator = Translator::new(); + let translator = Translator::new(); let temp_dir = TempDir::new().unwrap(); let test_file = temp_dir.path().join("test.rs"); fs::write(&test_file, "fn main() {}").unwrap(); @@ -2272,7 +2304,7 @@ mod tests { async fn test_handle_code_actions_valid_kind_refactor_extract() { use tempfile::TempDir; - let mut translator = Translator::new(); + let translator = Translator::new(); let temp_dir = TempDir::new().unwrap(); let test_file = temp_dir.path().join("test.rs"); fs::write(&test_file, "fn main() {}").unwrap(); @@ -2295,7 +2327,7 @@ mod tests { async fn test_handle_code_actions_valid_kind_source() { use tempfile::TempDir; - let mut translator = Translator::new(); + let translator = Translator::new(); let temp_dir = TempDir::new().unwrap(); let test_file = temp_dir.path().join("test.rs"); fs::write(&test_file, "fn main() {}").unwrap(); @@ -2316,7 +2348,7 @@ mod tests { #[tokio::test] async fn test_handle_code_actions_invalid_range_zero() { - let mut translator = Translator::new(); + let translator = Translator::new(); let result = translator .handle_code_actions("/tmp/test.rs".to_string(), 0, 1, 1, 10, None) .await; @@ -2325,7 +2357,7 @@ mod tests { #[tokio::test] async fn test_handle_code_actions_invalid_range_order() { - let mut translator = Translator::new(); + let translator = Translator::new(); let result = translator .handle_code_actions("/tmp/test.rs".to_string(), 10, 5, 5, 1, None) .await; @@ -2336,7 +2368,7 @@ mod tests { async fn test_handle_code_actions_empty_range() { use tempfile::TempDir; - let mut translator = Translator::new(); + let translator = Translator::new(); let temp_dir = TempDir::new().unwrap(); let test_file = temp_dir.path().join("test.rs"); fs::write(&test_file, "fn main() {}").unwrap(); @@ -2568,7 +2600,7 @@ mod tests { #[tokio::test] async fn test_handle_call_hierarchy_prepare_invalid_position_zero() { - let mut translator = Translator::new(); + let translator = Translator::new(); let result = translator .handle_call_hierarchy_prepare("/tmp/test.rs".to_string(), 0, 1) .await; @@ -2582,7 +2614,7 @@ mod tests { #[tokio::test] async fn test_handle_call_hierarchy_prepare_invalid_position_too_large() { - let mut translator = Translator::new(); + let translator = Translator::new(); let result = translator .handle_call_hierarchy_prepare("/tmp/test.rs".to_string(), 1_000_001, 1) .await; @@ -2596,7 +2628,7 @@ mod tests { #[tokio::test] async fn test_handle_incoming_calls_invalid_json() { - let mut translator = Translator::new(); + let translator = Translator::new(); let invalid_item = serde_json::json!({"invalid": "structure"}); let result = translator.handle_incoming_calls(invalid_item).await; assert!(matches!(result, Err(Error::InvalidToolParams(_)))); @@ -2604,7 +2636,7 @@ mod tests { #[tokio::test] async fn test_handle_outgoing_calls_invalid_json() { - let mut translator = Translator::new(); + let translator = Translator::new(); let invalid_item = serde_json::json!({"invalid": "structure"}); let result = translator.handle_outgoing_calls(invalid_item).await; assert!(matches!(result, Err(Error::InvalidToolParams(_)))); diff --git a/crates/mcpls-core/src/lib.rs b/crates/mcpls-core/src/lib.rs index b225dbf..7d29dff 100644 --- a/crates/mcpls-core/src/lib.rs +++ b/crates/mcpls-core/src/lib.rs @@ -44,7 +44,7 @@ use std::path::PathBuf; use std::sync::Arc; use bridge::resources::make_uri; -use bridge::{ResourceSubscriptions, Translator}; +use bridge::{NotificationCache, ResourceSubscriptions, Translator}; pub use config::ServerConfig; pub use error::Error; use lsp::{LspNotification, LspServer, ServerInitConfig}; @@ -72,14 +72,10 @@ use transport::run_stdio; /// - The cancellation watch fires (or the sender is dropped). /// - `notify_resource_updated` returns an error (peer disconnect / transport closed). /// -/// # Note on lock contention (TODO critic-S4) -/// All cache writes acquire `Arc>`, which is the same lock used -/// by every MCP tool call. Splitting `NotificationCache` into its own `Arc` -/// would eliminate this contention. Tracked as a P2 follow-up. pub(crate) async fn diagnostics_pump( _lang: String, mut rx: tokio::sync::mpsc::Receiver, - translator: Arc>, + notification_cache: Arc>, subs: Arc, peer_cell: Arc>>, mut cancel_rx: tokio::sync::watch::Receiver, @@ -99,9 +95,8 @@ pub(crate) async fn diagnostics_pump( LspNotification::PublishDiagnostics(p) => { // Always cache unconditionally. { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_diagnostics(&p.uri, p.version, p.diagnostics); + let mut cache = notification_cache.lock().await; + cache.store_diagnostics(&p.uri, p.version, p.diagnostics); } // Fast path: skip URI construction when nothing is subscribed. @@ -133,14 +128,12 @@ pub(crate) async fn diagnostics_pump( } } LspNotification::LogMessage(m) => { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_log(m.typ.into(), m.message); + let mut cache = notification_cache.lock().await; + cache.store_log(m.typ.into(), m.message); } LspNotification::ShowMessage(m) => { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_message(m.typ.into(), m.message); + let mut cache = notification_cache.lock().await; + cache.store_message(m.typ.into(), m.message); } LspNotification::Progress { .. } | LspNotification::Other { .. } => {} } @@ -345,7 +338,8 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() info!("Proceeding with {} LSP server(s)", server_count); } - let translator = Arc::new(Mutex::new(translator)); + let translator = Arc::new(translator); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subscriptions = Arc::new(ResourceSubscriptions::new()); // Peer cell is populated after the MCP transport is established (Phase B). let peer_cell = Arc::new(OnceCell::new()); @@ -360,7 +354,7 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() pumps.spawn(diagnostics_pump( lang, rx, - Arc::clone(&translator), + Arc::clone(¬ification_cache), Arc::clone(&subscriptions), Arc::clone(&peer_cell), cancel_rx.clone(), @@ -368,7 +362,11 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() } info!("Starting MCP server with rmcp..."); - let mcp_server = mcp::McplsServer::new(Arc::clone(&translator), Arc::clone(&subscriptions)); + let mcp_server = mcp::McplsServer::new( + Arc::clone(&translator), + Arc::clone(¬ification_cache), + Arc::clone(&subscriptions), + ); info!("MCPLS server initialized successfully"); let result = match transport { @@ -722,8 +720,8 @@ mod tests { use super::*; - fn make_translator() -> Arc> { - Arc::new(Mutex::new(Translator::new())) + fn make_notification_cache() -> Arc> { + Arc::new(Mutex::new(NotificationCache::new())) } fn make_subs() -> Arc { @@ -739,7 +737,7 @@ mod tests { /// `PublishDiagnostics` is cached even when the peer is not yet connected. #[tokio::test] async fn test_pump_caches_before_peer_set() { - let translator = make_translator(); + let notification_cache = make_notification_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (tx, rx) = mpsc::channel(8); @@ -747,11 +745,10 @@ mod tests { // which makes the pump exit before processing any messages. let (_cancel_tx, cancel_rx) = watch::channel(false); - let t = Arc::clone(&translator); tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - t, + Arc::clone(¬ification_cache), Arc::clone(&subs), Arc::clone(&peer_cell), cancel_rx, @@ -774,11 +771,8 @@ mod tests { loop { tokio::task::yield_now().await; let found = { - let guard = translator.lock().await; - guard - .notification_cache() - .get_diagnostics(uri.as_str()) - .is_some() + let cache = notification_cache.lock().await; + cache.get_diagnostics(uri.as_str()).is_some() }; if found { return true; @@ -791,10 +785,59 @@ mod tests { assert!(cached, "diagnostics should be cached before peer is set"); } + /// Cache writes must not wait behind the translator lock. + #[tokio::test] + async fn test_pump_caches_while_translator_locked() { + let translator = Arc::new(Mutex::new(Translator::new())); + let _translator_guard = translator.lock().await; + let notification_cache = make_notification_cache(); + let subs = make_subs(); + let peer_cell = make_peer_cell(); + let (tx, rx) = mpsc::channel(8); + let (_cancel_tx, cancel_rx) = watch::channel(false); + + tokio::spawn(diagnostics_pump( + "rust".to_string(), + rx, + Arc::clone(¬ification_cache), + Arc::clone(&subs), + Arc::clone(&peer_cell), + cancel_rx, + )); + + let uri: Uri = "file:///test/locked.rs".parse().unwrap(); + tx.send(LspNotification::PublishDiagnostics( + PublishDiagnosticsParams { + uri: uri.clone(), + diagnostics: vec![], + version: None, + }, + )) + .await + .unwrap(); + drop(tx); + + tokio::time::timeout(std::time::Duration::from_secs(1), async { + loop { + tokio::task::yield_now().await; + let found = { + let cache = notification_cache.lock().await; + cache.get_diagnostics(uri.as_str()).is_some() + }; + if found { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + }) + .await + .expect("pump should cache without acquiring translator lock"); + } + /// Pump exits cleanly when the cancel watch sends `true`. #[tokio::test] async fn test_pump_exits_on_cancel() { - let translator = make_translator(); + let notification_cache = make_notification_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (_tx, rx) = mpsc::channel::(8); @@ -803,7 +846,7 @@ mod tests { let handle = tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - translator, + notification_cache, subs, peer_cell, cancel_rx, @@ -820,7 +863,7 @@ mod tests { /// Pump exits when the cancel sender is dropped (Err branch). #[tokio::test] async fn test_pump_exits_when_cancel_sender_dropped() { - let translator = make_translator(); + let notification_cache = make_notification_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (_tx, rx) = mpsc::channel::(8); @@ -829,7 +872,7 @@ mod tests { let handle = tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - translator, + notification_cache, subs, peer_cell, cancel_rx, diff --git a/crates/mcpls-core/src/mcp/handlers.rs b/crates/mcpls-core/src/mcp/handlers.rs index 565ab7f..165b677 100644 --- a/crates/mcpls-core/src/mcp/handlers.rs +++ b/crates/mcpls-core/src/mcp/handlers.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use tokio::sync::Mutex; -use crate::bridge::{ResourceSubscriptions, Translator}; +use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator}; /// Shared context for all tool handlers. /// @@ -17,7 +17,9 @@ use crate::bridge::{ResourceSubscriptions, Translator}; /// tasks in `lib.rs`, which own their own `Arc>>`. pub struct HandlerContext { /// Translator for converting MCP calls to LSP requests. - pub translator: Arc>, + pub translator: Arc, + /// Shared cache for diagnostics, logs, and server messages. + pub notification_cache: Arc>, /// Set of resource URIs the MCP client has subscribed to. pub subscriptions: Arc, } @@ -26,11 +28,13 @@ impl HandlerContext { /// Create a new handler context. #[must_use] pub const fn new( - translator: Arc>, + translator: Arc, + notification_cache: Arc>, subscriptions: Arc, ) -> Self { Self { translator, + notification_cache, subscriptions, } } @@ -43,9 +47,11 @@ mod tests { #[test] fn test_handler_context_creation() { - let translator = Arc::new(Mutex::new(Translator::new())); + let translator = Arc::new(Translator::new()); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subscriptions = Arc::new(ResourceSubscriptions::new()); - let context = HandlerContext::new(translator, subscriptions); + let context = HandlerContext::new(translator, notification_cache, subscriptions); assert!(Arc::strong_count(&context.translator) == 1); + assert!(Arc::strong_count(&context.notification_cache) == 1); } } diff --git a/crates/mcpls-core/src/mcp/server.rs b/crates/mcpls-core/src/mcp/server.rs index 51d2d65..04d4352 100644 --- a/crates/mcpls-core/src/mcp/server.rs +++ b/crates/mcpls-core/src/mcp/server.rs @@ -12,6 +12,7 @@ use rmcp::model::{ UnsubscribeRequestParams, }; use rmcp::{ErrorData as McpError, RoleServer, ServerHandler, tool, tool_handler, tool_router}; +use serde::Serialize; use tokio::sync::Mutex; use super::handlers::HandlerContext; @@ -23,7 +24,107 @@ use super::tools::{ ServerLogsParams, ServerMessagesParams, SignatureHelpParams, WorkspaceSymbolParams, }; use crate::bridge::resources::{make_uri, parse_uri}; -use crate::bridge::{ResourceSubscriptions, Translator}; +use crate::bridge::{ + Diagnostic, DiagnosticSeverity, DiagnosticsResult, LogLevel, NotificationCache, Position2D, + Range, ResourceSubscriptions, ServerLogsResult, ServerMessagesResult, Translator, +}; +use crate::error::Result as McplsResult; + +const fn normalize_lsp_range(range: lsp_types::Range) -> Range { + Range { + start: Position2D { + line: range.start.line + 1, + character: range.start.character + 1, + }, + end: Position2D { + line: range.end.line + 1, + character: range.end.character + 1, + }, + } +} + +fn cached_diagnostics_from_cache(cache: &NotificationCache, uri: &str) -> DiagnosticsResult { + let diagnostics = cache + .get_diagnostics(uri) + .map_or_else(Vec::new, |diag_info| { + diag_info + .diagnostics + .iter() + .map(|diag| Diagnostic { + range: normalize_lsp_range(diag.range), + severity: match diag.severity { + Some(lsp_types::DiagnosticSeverity::ERROR) => DiagnosticSeverity::Error, + Some(lsp_types::DiagnosticSeverity::WARNING) => DiagnosticSeverity::Warning, + Some(lsp_types::DiagnosticSeverity::INFORMATION) => { + DiagnosticSeverity::Information + } + Some(lsp_types::DiagnosticSeverity::HINT) => DiagnosticSeverity::Hint, + _ => DiagnosticSeverity::Information, + }, + message: diag.message.clone(), + code: diag.code.as_ref().map(|c| match c { + lsp_types::NumberOrString::Number(n) => n.to_string(), + lsp_types::NumberOrString::String(s) => s.clone(), + }), + }) + .collect() + }); + + DiagnosticsResult { diagnostics } +} + +fn server_logs_from_cache( + cache: &NotificationCache, + limit: usize, + min_level: Option, +) -> McplsResult { + let min_level_filter = if let Some(level_str) = min_level { + let level = match level_str.to_lowercase().as_str() { + "error" => LogLevel::Error, + "warning" => LogLevel::Warning, + "info" => LogLevel::Info, + "debug" => LogLevel::Debug, + _ => { + return Err(crate::Error::InvalidToolParams(format!( + "Invalid min_level: '{level_str}'. Valid values: error, warning, info, debug" + ))); + } + }; + Some(level) + } else { + None + }; + + let logs = cache + .get_logs() + .iter() + .filter(|log| { + min_level_filter.is_none_or(|min| match min { + LogLevel::Error => matches!(log.level, LogLevel::Error), + LogLevel::Warning => matches!(log.level, LogLevel::Error | LogLevel::Warning), + LogLevel::Info => !matches!(log.level, LogLevel::Debug), + LogLevel::Debug => true, + }) + }) + .take(limit) + .cloned() + .collect(); + + Ok(ServerLogsResult { logs }) +} + +fn server_messages_from_cache(cache: &NotificationCache, limit: usize) -> ServerMessagesResult { + let messages = cache.get_messages().iter().take(limit).cloned().collect(); + ServerMessagesResult { messages } +} + +fn serialize_tool_result(result: McplsResult) -> Result { + match result { + Ok(value) => serde_json::to_string(&value) + .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), + Err(e) => Err(McpError::internal_error(e.to_string(), None)), + } +} /// MCP server that exposes LSP capabilities as tools. #[derive(Clone)] @@ -36,10 +137,15 @@ impl McplsServer { /// Create a new MCP server with the given translator and subscriptions. #[must_use] pub fn new( - translator: Arc>, + translator: Arc, + notification_cache: Arc>, subscriptions: Arc, ) -> Self { - let context = Arc::new(HandlerContext::new(translator, subscriptions)); + let context = Arc::new(HandlerContext::new( + translator, + notification_cache, + subscriptions, + )); Self { context } } @@ -55,16 +161,12 @@ impl McplsServer { character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_hover(file_path, line, character).await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serialize_tool_result( + self.context + .translator + .handle_hover(file_path, line, character) + .await, + ) } /// Get the definition location of a symbol. @@ -79,18 +181,12 @@ impl McplsServer { character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_definition(file_path, line, character) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Find all references to a symbol. @@ -106,18 +202,12 @@ impl McplsServer { include_declaration, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_references(file_path, line, character, include_declaration) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Get diagnostics for a file. @@ -128,16 +218,7 @@ impl McplsServer { &self, Parameters(DiagnosticsParams { file_path }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_diagnostics(file_path).await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serialize_tool_result(self.context.translator.handle_diagnostics(file_path).await) } /// Rename a symbol across the workspace. @@ -153,18 +234,12 @@ impl McplsServer { new_name, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_rename(file_path, line, character, new_name) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Get code completion suggestions. @@ -180,18 +255,12 @@ impl McplsServer { trigger, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_completions(file_path, line, character, trigger) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Get all symbols in a document. @@ -202,16 +271,12 @@ impl McplsServer { &self, Parameters(DocumentSymbolsParams { file_path }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_document_symbols(file_path).await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serialize_tool_result( + self.context + .translator + .handle_document_symbols(file_path) + .await, + ) } /// Format a document according to language server rules. @@ -226,18 +291,12 @@ impl McplsServer { insert_spaces, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_format_document(file_path, tab_size, insert_spaces) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Search for symbols across the workspace. @@ -252,18 +311,12 @@ impl McplsServer { limit, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_workspace_symbol(query, kind_filter, limit) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Get code actions for a range. @@ -281,9 +334,9 @@ impl McplsServer { kind_filter, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_code_actions( file_path, start_line, @@ -292,14 +345,8 @@ impl McplsServer { end_character, kind_filter, ) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Prepare call hierarchy at a position. @@ -314,18 +361,12 @@ impl McplsServer { character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_call_hierarchy_prepare(file_path, line, character) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Get incoming calls (callers). @@ -336,16 +377,7 @@ impl McplsServer { &self, Parameters(CallHierarchyCallsParams { item }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_incoming_calls(item).await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serialize_tool_result(self.context.translator.handle_incoming_calls(item).await) } /// Get outgoing calls (callees). @@ -356,16 +388,7 @@ impl McplsServer { &self, Parameters(CallHierarchyCallsParams { item }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_outgoing_calls(item).await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serialize_tool_result(self.context.translator.handle_outgoing_calls(item).await) } /// Get cached diagnostics for a file. @@ -376,16 +399,23 @@ impl McplsServer { &self, Parameters(CachedDiagnosticsParams { file_path }): Parameters, ) -> Result { + let uri = { + let path = std::path::PathBuf::from(&file_path); + let validated_path = self + .context + .translator + .validate_path(&path) + .map_err(|e| McpError::invalid_params(e.to_string(), None))?; + crate::bridge::path_to_uri(&validated_path).to_string() + }; + let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_cached_diagnostics(&file_path) + let cache = self.context.notification_cache.lock().await; + cached_diagnostics_from_cache(&cache, &uri) }; - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serde_json::to_string(&result) + .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)) } /// Get recent LSP server log messages. @@ -397,8 +427,8 @@ impl McplsServer { Parameters(ServerLogsParams { limit, min_level }): Parameters, ) -> Result { let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_server_logs(limit, min_level) + let cache = self.context.notification_cache.lock().await; + server_logs_from_cache(&cache, limit, min_level) }; match result { @@ -417,15 +447,12 @@ impl McplsServer { Parameters(ServerMessagesParams { limit }): Parameters, ) -> Result { let result = { - let mut translator = self.context.translator.lock().await; - translator.handle_server_messages(limit) + let cache = self.context.notification_cache.lock().await; + server_messages_from_cache(&cache, limit) }; - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + serde_json::to_string(&result) + .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)) } /// Get signature help at a position. @@ -440,18 +467,12 @@ impl McplsServer { character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_signature_help(file_path, line, character) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Go to implementation locations. @@ -466,18 +487,12 @@ impl McplsServer { character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_implementation(file_path, line, character) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Go to type definition location. @@ -492,18 +507,12 @@ impl McplsServer { character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_type_definition(file_path, line, character) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } /// Get inlay hints for a range. @@ -520,9 +529,9 @@ impl McplsServer { end_character, }): Parameters, ) -> Result { - let result = { - let mut translator = self.context.translator.lock().await; - translator + serialize_tool_result( + self.context + .translator .handle_inlay_hints( file_path, start_line, @@ -530,14 +539,8 @@ impl McplsServer { end_line, end_character, ) - .await - }; - - match result { - Ok(value) => serde_json::to_string(&value) - .map_err(|e| McpError::internal_error(format!("Serialization error: {e}"), None)), - Err(e) => Err(McpError::internal_error(e.to_string(), None)), - } + .await, + ) } } @@ -551,9 +554,8 @@ impl ServerHandler for McplsServer { // TODO(critic-S5): paginate when max_documents == 0 (unlimited mode can produce // very large single-page responses that may exceed transport buffers). let resources: Vec<_> = { - let translator = self.context.translator.lock().await; - translator - .document_tracker() + let document_tracker = self.context.translator.document_tracker().lock().await; + document_tracker .open_paths() .filter_map(|path| { let uri = make_uri(path) @@ -589,12 +591,10 @@ impl ServerHandler for McplsServer { parse_uri(&request.uri).map_err(|e| McpError::invalid_params(e.to_string(), None))?; // Enforce workspace-root containment — mirrors the guard in every LSP tool. - { - let translator = self.context.translator.lock().await; - translator - .validate_path(&path) - .map_err(|e| McpError::invalid_params(e.to_string(), None))?; - } + self.context + .translator + .validate_path(&path) + .map_err(|e| McpError::invalid_params(e.to_string(), None))?; let lsp_uri = crate::bridge::path_to_uri(&path); @@ -602,11 +602,8 @@ impl ServerHandler for McplsServer { // in the response shape. Currently both return `{"diagnostics":null}` which is // ambiguous for clients that need to know whether analysis has run yet. let diagnostics = { - let translator = self.context.translator.lock().await; - translator - .notification_cache() - .get_diagnostics(lsp_uri.as_str()) - .cloned() + let cache = self.context.notification_cache.lock().await; + cache.get_diagnostics(lsp_uri.as_str()).cloned() }; let json = serde_json::to_string(&diagnostics) @@ -627,12 +624,10 @@ impl ServerHandler for McplsServer { parse_uri(&request.uri).map_err(|e| McpError::invalid_params(e.to_string(), None))?; // Enforce workspace-root containment (same invariant as every LSP tool). - { - let translator = self.context.translator.lock().await; - translator - .validate_path(&path) - .map_err(|e| McpError::invalid_params(e.to_string(), None))?; - } + self.context + .translator + .validate_path(&path) + .map_err(|e| McpError::invalid_params(e.to_string(), None))?; // TODO(S3): If diagnostics are already cached for this URI, emit a synthetic // notify_resource_updated so clients subscribing after initial workspace indexing @@ -691,9 +686,10 @@ mod tests { use super::*; fn create_test_server() -> McplsServer { - let translator = Arc::new(Mutex::new(Translator::new())); + let translator = Arc::new(Translator::new()); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subscriptions = Arc::new(ResourceSubscriptions::new()); - McplsServer::new(translator, subscriptions) + McplsServer::new(translator, notification_cache, subscriptions) } #[tokio::test] @@ -1138,8 +1134,8 @@ mod tests { async fn test_list_resources_returns_empty_when_no_open_documents() { let server = create_test_server(); let empty = { - let translator = server.context.translator.lock().await; - translator.document_tracker().open_paths().count() == 0 + let document_tracker = server.context.translator.document_tracker().lock().await; + document_tracker.open_paths().count() == 0 }; assert!(empty); } @@ -1163,11 +1159,8 @@ mod tests { async fn test_validate_path_rejects_nonexistent_path() { use std::path::Path; - let translator = Arc::new(Mutex::new(Translator::new())); - let result = { - let t = translator.lock().await; - t.validate_path(Path::new("/this/path/does/not/exist/at/all.rs")) - }; + let translator = Translator::new(); + let result = translator.validate_path(Path::new("/this/path/does/not/exist/at/all.rs")); assert!(result.is_err()); } diff --git a/crates/mcpls-core/src/transport.rs b/crates/mcpls-core/src/transport.rs index ad5fff9..9b8a23b 100644 --- a/crates/mcpls-core/src/transport.rs +++ b/crates/mcpls-core/src/transport.rs @@ -107,8 +107,8 @@ pub(crate) async fn run_stdio( /// serves until `Ctrl-C` or `SIGTERM` is received. /// /// Each HTTP session receives its own `McplsServer` clone. The shared -/// `Arc>` inside is the same across all sessions, so LSP -/// state is still global per process. +/// `Arc` inside is the same across all sessions, so LSP state is +/// still global per process. /// /// # Note /// @@ -250,12 +250,13 @@ mod tests { use tokio::sync::Mutex; - use crate::bridge::{ResourceSubscriptions, Translator}; + use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator}; use crate::mcp::McplsServer; - let translator = Arc::new(Mutex::new(Translator::new())); + let translator = Arc::new(Translator::new()); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subs = Arc::new(ResourceSubscriptions::new()); - let server = McplsServer::new(translator, subs); + let server = McplsServer::new(translator, notification_cache, subs); // Bind port 0 so the OS assigns a free port. let probe = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -287,16 +288,17 @@ mod tests { use tokio::sync::Mutex; - use crate::bridge::{ResourceSubscriptions, Translator}; + use crate::bridge::{NotificationCache, ResourceSubscriptions, Translator}; use crate::mcp::McplsServer; // Hold a listener to make the port unavailable. let occupied = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = occupied.local_addr().unwrap(); - let translator = Arc::new(Mutex::new(Translator::new())); + let translator = Arc::new(Translator::new()); + let notification_cache = Arc::new(Mutex::new(NotificationCache::new())); let subs = Arc::new(ResourceSubscriptions::new()); - let server = McplsServer::new(translator, subs); + let server = McplsServer::new(translator, notification_cache, subs); let cfg = HttpConfig { bind: addr, diff --git a/crates/mcpls-core/tests/integration/basic_tests.rs b/crates/mcpls-core/tests/integration/basic_tests.rs index 3f5e8b6..48b9c2a 100644 --- a/crates/mcpls-core/tests/integration/basic_tests.rs +++ b/crates/mcpls-core/tests/integration/basic_tests.rs @@ -8,10 +8,10 @@ use crate::common::test_utils::{ config_fixture_path, rust_analyzer_available, rust_workspace_path, }; -#[test] -fn test_translator_creation() { +#[tokio::test] +async fn test_translator_creation() { let translator = Translator::new(); - assert!(translator.document_tracker().is_empty()); + assert!(translator.document_tracker().lock().await.is_empty()); } #[test]