diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index bc5953803ed7..78c3787ed9a1 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2596,18 +2596,6 @@ dependencies = [ "urlencoding", ] -[[package]] -name = "codex-connectors-extension" -version = "0.0.0" -dependencies = [ - "codex-connectors", - "codex-core-plugins", - "codex-plugin", - "codex-utils-path-uri", - "serde_json", - "thiserror 2.0.18", -] - [[package]] name = "codex-context-fragments" version = "0.0.0" @@ -3363,14 +3351,10 @@ dependencies = [ "codex-features", "codex-login", "codex-mcp", - "codex-plugin", "codex-protocol", - "codex-utils-absolute-path", "codex-utils-path-uri", "pretty_assertions", - "serde_json", "tempfile", - "thiserror 2.0.18", "tokio", "tracing", ] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index ef17cb9ce5f1..abf5e0411a11 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -48,7 +48,6 @@ members = [ "exec-server", "execpolicy", "execpolicy-legacy", - "ext/connectors", "ext/extension-api", "ext/goal", "ext/guardian", diff --git a/codex-rs/codex-mcp/src/connection_manager_tests.rs b/codex-rs/codex-mcp/src/connection_manager_tests.rs index d66474c7bb30..750938708ff6 100644 --- a/codex-rs/codex-mcp/src/connection_manager_tests.rs +++ b/codex-rs/codex-mcp/src/connection_manager_tests.rs @@ -310,6 +310,101 @@ async fn disabled_permissions_do_not_auto_accept_elicitation_with_requested_fiel ); } +#[tokio::test] +async fn elicitation_route_ids_are_unique_across_managers() { + let first_manager = ElicitationRequestManager::new( + AskForApproval::OnRequest, + PermissionProfile::default(), + /*reviewer*/ None, + ); + let second_manager = ElicitationRequestManager::new( + AskForApproval::OnRequest, + PermissionProfile::default(), + /*reviewer*/ None, + ); + let (first_tx, first_rx) = async_channel::bounded(1); + let (second_tx, second_rx) = async_channel::bounded(1); + let first_sender = first_manager.make_sender("server".to_string(), first_tx); + let second_sender = second_manager.make_sender("server".to_string(), second_tx); + + let first_task = tokio::spawn(first_sender( + NumberOrString::Number(1), + codex_rmcp_client::Elicitation::OpenAiForm { + meta: None, + message: "First?".to_string(), + requested_schema: serde_json::json!({}), + }, + )); + let second_task = tokio::spawn(second_sender( + NumberOrString::Number(1), + codex_rmcp_client::Elicitation::OpenAiForm { + meta: None, + message: "Second?".to_string(), + requested_schema: serde_json::json!({}), + }, + )); + + let EventMsg::ElicitationRequest(first_request) = first_rx + .recv() + .await + .expect("first elicitation should be emitted") + .msg + else { + panic!("expected first elicitation request"); + }; + let EventMsg::ElicitationRequest(second_request) = second_rx + .recv() + .await + .expect("second elicitation should be emitted") + .msg + else { + panic!("expected second elicitation request"); + }; + assert_ne!(first_request.id, second_request.id); + + let first_response = ElicitationResponse { + action: ElicitationAction::Accept, + content: Some(serde_json::json!({"manager": "first"})), + meta: None, + }; + let second_response = ElicitationResponse { + action: ElicitationAction::Decline, + content: Some(serde_json::json!({"manager": "second"})), + meta: None, + }; + let first_id = match first_request.id { + codex_protocol::mcp::RequestId::String(value) => NumberOrString::String(Arc::from(value)), + codex_protocol::mcp::RequestId::Integer(value) => NumberOrString::Number(value), + }; + let second_id = match second_request.id { + codex_protocol::mcp::RequestId::String(value) => NumberOrString::String(Arc::from(value)), + codex_protocol::mcp::RequestId::Integer(value) => NumberOrString::Number(value), + }; + first_manager + .resolve("server".to_string(), first_id, first_response.clone()) + .await + .expect("first manager should resolve its routed request"); + second_manager + .resolve("server".to_string(), second_id, second_response.clone()) + .await + .expect("second manager should resolve its routed request"); + + assert_eq!( + first_task + .await + .expect("first elicitation task should join") + .expect("first elicitation should resolve"), + first_response + ); + assert_eq!( + second_task + .await + .expect("second elicitation task should join") + .expect("second elicitation should resolve"), + second_response + ); +} + #[test] fn test_normalize_tools_short_non_duplicated_names() { let tools = vec![ diff --git a/codex-rs/codex-mcp/src/elicitation.rs b/codex-rs/codex-mcp/src/elicitation.rs index d6a685b50203..799852437eec 100644 --- a/codex-rs/codex-mcp/src/elicitation.rs +++ b/codex-rs/codex-mcp/src/elicitation.rs @@ -8,6 +8,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex as StdMutex; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use crate::mcp::McpPermissionPromptAutoApproveContext; use crate::mcp::mcp_permission_prompt_is_auto_approved; @@ -48,6 +50,8 @@ pub trait ElicitationReviewer: Send + Sync { pub type ElicitationReviewerHandle = Arc; +static NEXT_ELICITATION_ROUTE_ID: AtomicU64 = AtomicU64::new(1); + #[derive(Clone)] pub(crate) struct ElicitationRequestManager { requests: Arc>, @@ -55,6 +59,7 @@ pub(crate) struct ElicitationRequestManager { pub(crate) permission_profile: Arc>, auto_deny: Arc>, reviewer: Option, + route_id: u64, } impl ElicitationRequestManager { @@ -69,6 +74,7 @@ impl ElicitationRequestManager { permission_profile: Arc::new(StdMutex::new(permission_profile)), auto_deny: Arc::new(StdMutex::new(false)), reviewer, + route_id: NEXT_ELICITATION_ROUTE_ID.fetch_add(1, Ordering::Relaxed), } } @@ -110,6 +116,7 @@ impl ElicitationRequestManager { let permission_profile = self.permission_profile.clone(); let auto_deny = self.auto_deny.clone(); let reviewer = self.reviewer.clone(); + let route_id = self.route_id; Box::new(move |id, elicitation| { let elicitation_requests = elicitation_requests.clone(); let tx_event = tx_event.clone(); @@ -214,9 +221,10 @@ impl ElicitationRequestManager { }, }; let (tx, rx) = oneshot::channel(); + let response_id = RequestId::String(Arc::from(format!("{route_id}:{id}"))); { let mut lock = elicitation_requests.lock().await; - lock.insert((server_name.clone(), id.clone()), tx); + lock.insert((server_name.clone(), response_id.clone()), tx); } let _ = tx_event .send(Event { @@ -224,7 +232,7 @@ impl ElicitationRequestManager { msg: EventMsg::ElicitationRequest(ElicitationRequestEvent { turn_id: None, server_name, - id: match id.clone() { + id: match response_id { rmcp::model::NumberOrString::String(value) => { ProtocolRequestId::String(value.to_string()) } diff --git a/codex-rs/core-plugins/src/executor_runtime.rs b/codex-rs/core-plugins/src/executor_runtime.rs new file mode 100644 index 000000000000..c70750805643 --- /dev/null +++ b/codex-rs/core-plugins/src/executor_runtime.rs @@ -0,0 +1,224 @@ +use codex_config::McpServerConfig; +use codex_connectors::parse_plugin_app_config; +use codex_exec_server::ExecutorFileSystem; +use codex_exec_server::ResolvedSelectedCapabilityRoot; +use codex_mcp::parse_executor_plugin_mcp_config; +use codex_plugin::AppDeclaration; +use codex_plugin::PluginResourceLocator; +use codex_plugin::ResolvedPlugin; +use codex_plugin::ResolvedPluginLocation; +use codex_plugin::manifest::PluginManifestMcpServers; +use codex_utils_path_uri::PathUri; +use codex_utils_path_uri::PathUriParseError; +use std::io; +use thiserror::Error; + +use crate::ExecutorPluginProvider; +use crate::ExecutorPluginProviderError; +use crate::ResolvedExecutorPlugin; + +const DEFAULT_MCP_CONFIG_FILE: &str = ".mcp.json"; + +/// MCP and connector declarations read from one exact executor binding. +#[derive(Clone, Debug)] +pub struct ExecutorPluginRuntime { + plugin: ResolvedPlugin, + mcp_servers: Vec<(String, McpServerConfig)>, + apps: Vec, +} + +/// Failure to project runtime capabilities from an executor plugin. +#[derive(Debug, Error)] +pub enum ExecutorPluginRuntimeError { + #[error(transparent)] + Resolve(#[from] ExecutorPluginProviderError), + #[error("failed to read MCP config for selected plugin `{plugin_id}` at `{path}`: {source}")] + ReadConfig { + plugin_id: String, + path: PathUri, + #[source] + source: io::Error, + }, + #[error( + "failed to resolve MCP config path `{relative_path}` below selected plugin `{plugin_id}` at `{root}`: {source}" + )] + InvalidConfigPath { + plugin_id: String, + root: PathUri, + relative_path: &'static str, + #[source] + source: PathUriParseError, + }, + #[error("failed to parse MCP config for selected plugin `{plugin_id}` at `{path}`: {source}")] + ParseConfig { + plugin_id: String, + path: PathUri, + #[source] + source: serde_json::Error, + }, + #[error("failed to read app config for selected plugin `{plugin_id}` at `{path}`: {source}")] + ReadAppConfig { + plugin_id: String, + path: PathUri, + #[source] + source: io::Error, + }, + #[error("failed to parse app config for selected plugin `{plugin_id}` at `{path}`: {source}")] + ParseAppConfig { + plugin_id: String, + path: PathUri, + #[source] + source: serde_json::Error, + }, +} + +impl ExecutorPluginRuntime { + /// Reads both runtime declaration files through the root's pinned filesystem. + /// + /// `Ok(None)` is intentionally not cacheable: the plugin manifest may appear + /// once a deferred executor finishes starting. + pub async fn project( + root: &ResolvedSelectedCapabilityRoot, + ) -> Result, ExecutorPluginRuntimeError> { + let Some(plugin) = ExecutorPluginProvider::resolve_pinned(root).await? else { + return Ok(None); + }; + let ResolvedPluginLocation::Environment { + root: plugin_root, .. + } = plugin.plugin().location(); + let mcp_servers = + load_from_file_system(plugin.plugin(), plugin_root, plugin.file_system()).await?; + let apps = match load_apps(&plugin).await { + Ok(apps) => apps, + Err(err) => { + tracing::warn!( + plugin = plugin.plugin().selected_root_id(), + error = %err, + "ignoring invalid executor plugin app declarations" + ); + Vec::new() + } + }; + Ok(Some(Self { + plugin: plugin.plugin().clone(), + mcp_servers, + apps, + })) + } + + pub fn plugin(&self) -> &ResolvedPlugin { + &self.plugin + } + + pub fn mcp_servers(&self) -> &[(String, McpServerConfig)] { + &self.mcp_servers + } + + pub fn apps(&self) -> &[AppDeclaration] { + &self.apps + } +} + +async fn load_from_file_system( + plugin: &ResolvedPlugin, + plugin_root: &PathUri, + file_system: &dyn ExecutorFileSystem, +) -> Result, ExecutorPluginRuntimeError> { + let ResolvedPluginLocation::Environment { environment_id, .. } = plugin.location(); + let plugin_id = plugin.selected_root_id(); + let (contents, config_path) = match plugin.manifest().paths.mcp_servers.as_ref() { + Some(PluginManifestMcpServers::Path(PluginResourceLocator::Environment { + path, .. + })) => { + ( + file_system + .read_file_text(path, /*sandbox*/ None) + .await + .map_err(|source| ExecutorPluginRuntimeError::ReadConfig { + plugin_id: plugin_id.to_string(), + path: path.clone(), + source, + })?, + path.clone(), + ) + } + Some(PluginManifestMcpServers::Object(object_config)) => { + let PluginResourceLocator::Environment { path, .. } = plugin.manifest_path(); + (object_config.clone(), path.clone()) + } + None => { + let config_path = plugin_root + .join(DEFAULT_MCP_CONFIG_FILE) + .map_err(|source| ExecutorPluginRuntimeError::InvalidConfigPath { + plugin_id: plugin_id.to_string(), + root: plugin_root.clone(), + relative_path: DEFAULT_MCP_CONFIG_FILE, + source, + })?; + let contents = match file_system + .read_file_text(&config_path, /*sandbox*/ None) + .await + { + Ok(contents) => contents, + Err(source) if source.kind() == io::ErrorKind::NotFound => { + return Ok(Vec::new()); + } + Err(source) => { + return Err(ExecutorPluginRuntimeError::ReadConfig { + plugin_id: plugin_id.to_string(), + path: config_path.clone(), + source, + }); + } + }; + (contents, config_path) + } + }; + let parsed = parse_executor_plugin_mcp_config(plugin_root, &contents, environment_id).map_err( + |source| ExecutorPluginRuntimeError::ParseConfig { + plugin_id: plugin_id.to_string(), + path: config_path, + source, + }, + )?; + + for error in parsed.errors { + tracing::warn!( + plugin = plugin_id, + server = error.name, + error = error.message, + "ignoring invalid executor plugin MCP server" + ); + } + + Ok(parsed.servers.into_iter().collect()) +} + +async fn load_apps( + plugin: &ResolvedExecutorPlugin, +) -> Result, ExecutorPluginRuntimeError> { + let resolved_plugin = plugin.plugin(); + let plugin_id = resolved_plugin.selected_root_id(); + let Some(PluginResourceLocator::Environment { + path: config_path, .. + }) = resolved_plugin.manifest().paths.apps.as_ref() + else { + return Ok(Vec::new()); + }; + let contents = plugin + .file_system() + .read_file_text(config_path, /*sandbox*/ None) + .await + .map_err(|source| ExecutorPluginRuntimeError::ReadAppConfig { + plugin_id: plugin_id.to_string(), + path: config_path.clone(), + source, + })?; + parse_plugin_app_config(&contents).map_err(|source| { + ExecutorPluginRuntimeError::ParseAppConfig { + plugin_id: plugin_id.to_string(), + path: config_path.clone(), + source, + } + }) +} diff --git a/codex-rs/core-plugins/src/lib.rs b/codex-rs/core-plugins/src/lib.rs index 7ee5ddd117ca..ad6d7ba352c4 100644 --- a/codex-rs/core-plugins/src/lib.rs +++ b/codex-rs/core-plugins/src/lib.rs @@ -1,5 +1,6 @@ mod app_mcp_routing; mod discoverable; +mod executor_runtime; pub mod installed_marketplaces; pub mod loader; mod manager; @@ -38,6 +39,8 @@ pub type PluginLoadOutcome = codex_plugin::PluginLoadOutcome Result, ExecutorPluginProviderError> { + let selected_root = resolved_root.selected_root(); + let plugin_root = selected_plugin_root(selected_root); + let file_system = resolved_root.file_system(); + let plugin = resolve_plugin_root(selected_root, plugin_root, file_system.as_ref()).await?; + Ok(plugin.map(|plugin| ResolvedExecutorPlugin { + plugin, + file_system, + })) + } } impl PluginProvider for ExecutorPluginProvider { diff --git a/codex-rs/ext/connectors/BUILD.bazel b/codex-rs/ext/connectors/BUILD.bazel deleted file mode 100644 index 304349b8a17c..000000000000 --- a/codex-rs/ext/connectors/BUILD.bazel +++ /dev/null @@ -1,6 +0,0 @@ -load("//:defs.bzl", "codex_rust_crate") - -codex_rust_crate( - name = "connectors", - crate_name = "codex_connectors_extension", -) diff --git a/codex-rs/ext/connectors/Cargo.toml b/codex-rs/ext/connectors/Cargo.toml deleted file mode 100644 index 1103ae4e2b34..000000000000 --- a/codex-rs/ext/connectors/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -edition.workspace = true -license.workspace = true -name = "codex-connectors-extension" -version.workspace = true - -[lib] -name = "codex_connectors_extension" -path = "src/lib.rs" -doctest = false -test = false - -[lints] -workspace = true - -[dependencies] -codex-connectors = { workspace = true } -codex-core-plugins = { workspace = true } -codex-plugin = { workspace = true } -codex-utils-path-uri = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } diff --git a/codex-rs/ext/connectors/src/executor_plugin.rs b/codex-rs/ext/connectors/src/executor_plugin.rs deleted file mode 100644 index c27ae3ac6221..000000000000 --- a/codex-rs/ext/connectors/src/executor_plugin.rs +++ /dev/null @@ -1,64 +0,0 @@ -use codex_connectors::parse_plugin_app_config; -use codex_core_plugins::ResolvedExecutorPlugin; -use codex_plugin::AppDeclaration; -use codex_plugin::PluginResourceLocator; -use codex_utils_path_uri::PathUri; -use std::io; -use thiserror::Error; - -/// Loads connector declarations from a resolved plugin through its owning executor. -#[derive(Clone, Copy, Debug, Default)] -pub struct ExecutorPluginConnectorProvider; - -/// Failure to load connector declarations from an executor plugin. -#[derive(Debug, Error)] -pub enum ExecutorPluginConnectorProviderError { - #[error("failed to read app config for selected plugin `{plugin_id}` at `{path}`: {source}")] - ReadConfig { - plugin_id: String, - path: PathUri, - #[source] - source: io::Error, - }, - #[error("failed to parse app config for selected plugin `{plugin_id}` at `{path}`: {source}")] - ParseConfig { - plugin_id: String, - path: PathUri, - #[source] - source: serde_json::Error, - }, -} - -impl ExecutorPluginConnectorProvider { - /// Returns the connector declarations contributed by `plugin`. - pub async fn load( - &self, - plugin: &ResolvedExecutorPlugin, - ) -> Result, ExecutorPluginConnectorProviderError> { - let resolved_plugin = plugin.plugin(); - let plugin_id = resolved_plugin.selected_root_id(); - let Some(PluginResourceLocator::Environment { - path: config_path, .. - }) = resolved_plugin.manifest().paths.apps.as_ref() - else { - return Ok(Vec::new()); - }; - let contents = plugin - .file_system() - .read_file_text(config_path, /*sandbox*/ None) - .await - .map_err(|source| ExecutorPluginConnectorProviderError::ReadConfig { - plugin_id: plugin_id.to_string(), - path: config_path.clone(), - source, - })?; - - parse_plugin_app_config(&contents).map_err(|source| { - ExecutorPluginConnectorProviderError::ParseConfig { - plugin_id: plugin_id.to_string(), - path: config_path.clone(), - source, - } - }) - } -} diff --git a/codex-rs/ext/connectors/src/lib.rs b/codex-rs/ext/connectors/src/lib.rs deleted file mode 100644 index f60e5f916a39..000000000000 --- a/codex-rs/ext/connectors/src/lib.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Executor-backed connector declaration loading. - -mod executor_plugin; - -pub use executor_plugin::ExecutorPluginConnectorProvider; -pub use executor_plugin::ExecutorPluginConnectorProviderError; diff --git a/codex-rs/ext/mcp/Cargo.toml b/codex-rs/ext/mcp/Cargo.toml index b98c0905dbc2..13c219edb9f7 100644 --- a/codex-rs/ext/mcp/Cargo.toml +++ b/codex-rs/ext/mcp/Cargo.toml @@ -15,22 +15,18 @@ workspace = true [dependencies] codex-core = { workspace = true } codex-core-plugins = { workspace = true } -codex-config = { workspace = true } codex-exec-server = { workspace = true } codex-extension-api = { workspace = true } codex-features = { workspace = true } codex-mcp = { workspace = true } -codex-plugin = { workspace = true } codex-protocol = { workspace = true } -codex-utils-absolute-path = { workspace = true } -codex-utils-path-uri = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } tracing = { workspace = true } tokio = { workspace = true, features = ["sync"] } [dev-dependencies] +codex-config = { workspace = true } codex-login = { workspace = true } +codex-utils-path-uri = { workspace = true } pretty_assertions = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/codex-rs/ext/mcp/src/executor_plugin.rs b/codex-rs/ext/mcp/src/executor_plugin.rs index 69d1a47b31b3..e8eb7aacba1d 100644 --- a/codex-rs/ext/mcp/src/executor_plugin.rs +++ b/codex-rs/ext/mcp/src/executor_plugin.rs @@ -1,5 +1,5 @@ use codex_core::config::Config; -use codex_core_plugins::ExecutorPluginProvider; +use codex_core_plugins::ExecutorPluginRuntime; use codex_exec_server::EnvironmentManager; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionFuture; @@ -11,25 +11,16 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::OnceCell; -use self::provider::ExecutorPluginMcpProvider; - -mod provider; - -/// Frozen MCP declarations for one selected package. -/// -/// Each server config retains the stable logical environment ID. Reconnection may replace the -/// concrete environment instance without changing that authority. +/// The runtime declarations frozen for one selected package at thread start. #[derive(Clone)] -struct SelectedPluginMcpServers { - plugin_id: String, - plugin_display_name: String, +struct SelectedPluginRuntime { selection_order: usize, - servers: Vec<(String, codex_config::McpServerConfig)>, + runtime: ExecutorPluginRuntime, } #[derive(Default)] pub(crate) struct SelectedExecutorPluginMcpState { - snapshot: OnceCell>, + snapshot: OnceCell>, } pub(crate) fn seed_thread_state(thread_init: &mut ExtensionDataInit) { @@ -37,49 +28,38 @@ pub(crate) fn seed_thread_state(thread_init: &mut ExtensionDataInit) { } pub(crate) struct SelectedExecutorPluginMcpContributor { - plugin_provider: ExecutorPluginProvider, - mcp_provider: ExecutorPluginMcpProvider, + environment_manager: Arc, } impl SelectedExecutorPluginMcpContributor { pub(crate) fn new(environment_manager: Arc) -> Self { Self { - plugin_provider: ExecutorPluginProvider::new(Arc::clone(&environment_manager)), - mcp_provider: ExecutorPluginMcpProvider, + environment_manager, } } async fn resolve_snapshot( &self, selected_roots: &[SelectedCapabilityRoot], - ) -> Vec { + ) -> Vec { let mut snapshot = Vec::new(); + let resolved_roots = self + .environment_manager + .bind_selected_capability_roots(selected_roots); - for (selection_order, selected_root) in selected_roots.iter().enumerate() { - let plugin = match self.plugin_provider.resolve_bound(selected_root).await { - Ok(Some(plugin)) => plugin, - Ok(None) => continue, - Err(err) => { - tracing::warn!( - selected_root = selected_root.id, - error = %err, - "failed to resolve selected executor plugin for MCP discovery" - ); - continue; - } - }; - match self.mcp_provider.load(&plugin).await { - Ok(servers) => snapshot.push(SelectedPluginMcpServers { - plugin_id: plugin.plugin().selected_root_id().to_string(), - plugin_display_name: plugin.plugin().manifest().display_name().to_string(), + for (selection_order, resolved_root) in resolved_roots.iter().enumerate() { + let selected_root = resolved_root.selected_root(); + match ExecutorPluginRuntime::project(resolved_root).await { + Ok(Some(runtime)) => snapshot.push(SelectedPluginRuntime { selection_order, - servers, + runtime, }), + Ok(None) => {} Err(err) => { tracing::warn!( selected_root = selected_root.id, error = %err, - "failed to load selected executor plugin MCP servers" + "failed to project selected executor plugin runtime" ); } } @@ -115,19 +95,26 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { .await; let mut contributions = Vec::new(); - for plugin in snapshot { - let mut servers = plugin.servers.iter().cloned().collect::>(); + for selected in snapshot { + let plugin = selected.runtime.plugin(); + let plugin_id = plugin.selected_root_id(); + let mut servers = selected + .runtime + .mcp_servers() + .iter() + .cloned() + .collect::>(); context .config() - .apply_plugin_mcp_server_requirements(&plugin.plugin_id, &mut servers); + .apply_plugin_mcp_server_requirements(plugin_id, &mut servers); let mut servers = servers.into_iter().collect::>(); servers.sort_unstable_by(|left, right| left.0.cmp(&right.0)); contributions.extend(servers.into_iter().map(|(name, config)| { McpServerContribution::SelectedPlugin { name, - plugin_id: plugin.plugin_id.clone(), - plugin_display_name: plugin.plugin_display_name.clone(), - selection_order: plugin.selection_order, + plugin_id: plugin_id.to_string(), + plugin_display_name: plugin.manifest().display_name().to_string(), + selection_order: selected.selection_order, config: Box::new(config), } })); diff --git a/codex-rs/ext/mcp/src/executor_plugin/provider.rs b/codex-rs/ext/mcp/src/executor_plugin/provider.rs deleted file mode 100644 index a6972df0969e..000000000000 --- a/codex-rs/ext/mcp/src/executor_plugin/provider.rs +++ /dev/null @@ -1,138 +0,0 @@ -use codex_config::McpServerConfig; -use codex_core_plugins::ResolvedExecutorPlugin; -use codex_exec_server::ExecutorFileSystem; -use codex_mcp::parse_executor_plugin_mcp_config; -use codex_plugin::PluginResourceLocator; -use codex_plugin::ResolvedPlugin; -use codex_plugin::ResolvedPluginLocation; -use codex_plugin::manifest::PluginManifestMcpServers; -use codex_utils_path_uri::PathUri; -use codex_utils_path_uri::PathUriParseError; -use std::io; -use thiserror::Error; - -const DEFAULT_MCP_CONFIG_FILE: &str = ".mcp.json"; - -/// Loads MCP declarations from resolved plugins through their owning executor. -#[derive(Clone, Copy, Debug, Default)] -pub(super) struct ExecutorPluginMcpProvider; - -/// Failure to load an executor plugin's MCP declarations. -#[derive(Debug, Error)] -pub(super) enum ExecutorPluginMcpProviderError { - #[error("failed to read MCP config for selected plugin `{plugin_id}` at `{path}`: {source}")] - ReadConfig { - plugin_id: String, - path: PathUri, - #[source] - source: io::Error, - }, - #[error( - "failed to resolve MCP config path `{relative_path}` below selected plugin `{plugin_id}` at `{root}`: {source}" - )] - InvalidConfigPath { - plugin_id: String, - root: PathUri, - relative_path: &'static str, - #[source] - source: PathUriParseError, - }, - #[error("failed to parse MCP config for selected plugin `{plugin_id}` at `{path}`: {source}")] - ParseConfig { - plugin_id: String, - path: PathUri, - #[source] - source: serde_json::Error, - }, -} - -impl ExecutorPluginMcpProvider { - /// Returns MCP servers declared by `plugin`, bound to its environment. - pub(super) async fn load( - &self, - plugin: &ResolvedExecutorPlugin, - ) -> Result, ExecutorPluginMcpProviderError> { - let ResolvedPluginLocation::Environment { root, .. } = plugin.plugin().location(); - - load_from_file_system(plugin.plugin(), root, plugin.file_system()).await - } -} - -async fn load_from_file_system( - plugin: &ResolvedPlugin, - plugin_root: &PathUri, - file_system: &dyn ExecutorFileSystem, -) -> Result, ExecutorPluginMcpProviderError> { - let ResolvedPluginLocation::Environment { environment_id, .. } = plugin.location(); - let plugin_id = plugin.selected_root_id(); - let (contents, config_path) = match plugin.manifest().paths.mcp_servers.as_ref() { - Some(PluginManifestMcpServers::Path(PluginResourceLocator::Environment { - path, .. - })) => { - ( - file_system - .read_file_text(path, /*sandbox*/ None) - .await - .map_err(|source| ExecutorPluginMcpProviderError::ReadConfig { - plugin_id: plugin_id.to_string(), - path: path.clone(), - source, - })?, - path.clone(), - ) - } - Some(PluginManifestMcpServers::Object(object_config)) => { - let PluginResourceLocator::Environment { path, .. } = plugin.manifest_path(); - (object_config.clone(), path.clone()) - } - None => { - let config_path = plugin_root - .join(DEFAULT_MCP_CONFIG_FILE) - .map_err(|source| ExecutorPluginMcpProviderError::InvalidConfigPath { - plugin_id: plugin_id.to_string(), - root: plugin_root.clone(), - relative_path: DEFAULT_MCP_CONFIG_FILE, - source, - })?; - let contents = match file_system - .read_file_text(&config_path, /*sandbox*/ None) - .await - { - Ok(contents) => contents, - Err(source) if source.kind() == io::ErrorKind::NotFound => { - return Ok(Vec::new()); - } - Err(source) => { - return Err(ExecutorPluginMcpProviderError::ReadConfig { - plugin_id: plugin_id.to_string(), - path: config_path.clone(), - source, - }); - } - }; - (contents, config_path) - } - }; - let parsed = parse_executor_plugin_mcp_config(plugin_root, &contents, environment_id).map_err( - |source| ExecutorPluginMcpProviderError::ParseConfig { - plugin_id: plugin_id.to_string(), - path: config_path, - source, - }, - )?; - - for error in parsed.errors { - tracing::warn!( - plugin = plugin_id, - server = error.name, - error = error.message, - "ignoring invalid executor plugin MCP server" - ); - } - - Ok(parsed.servers.into_iter().collect()) -} - -#[cfg(test)] -#[path = "provider_tests.rs"] -mod tests; diff --git a/codex-rs/ext/mcp/src/executor_plugin/provider_tests.rs b/codex-rs/ext/mcp/src/executor_plugin/provider_tests.rs deleted file mode 100644 index bc59e104a01f..000000000000 --- a/codex-rs/ext/mcp/src/executor_plugin/provider_tests.rs +++ /dev/null @@ -1,421 +0,0 @@ -use super::DEFAULT_MCP_CONFIG_FILE; -use super::ExecutorPluginMcpProviderError; -use super::load_from_file_system; -use codex_config::McpServerConfig; -use codex_config::McpServerTransportConfig; -use codex_exec_server::CopyOptions; -use codex_exec_server::CreateDirectoryOptions; -use codex_exec_server::ExecutorFileSystem; -use codex_exec_server::ExecutorFileSystemFuture; -use codex_exec_server::FileMetadata; -use codex_exec_server::FileSystemReadStream; -use codex_exec_server::FileSystemResult; -use codex_exec_server::FileSystemSandboxContext; -use codex_exec_server::ReadDirectoryEntry; -use codex_exec_server::RemoveOptions; -use codex_plugin::ResolvedPlugin; -use codex_plugin::manifest::PluginManifest; -use codex_plugin::manifest::PluginManifestMcpServers; -use codex_plugin::manifest::PluginManifestPaths; -use codex_utils_absolute_path::AbsolutePathBuf; -use codex_utils_path_uri::LegacyAppPathString; -use codex_utils_path_uri::PathUri; -use pretty_assertions::assert_eq; -use std::collections::HashMap; -use std::io; -use std::sync::Mutex; - -const MCP_CONFIG_CONTENTS: &str = r#"{ - "mcpServers": { - "demo": {"command": "demo-mcp", "environment_id": "local"}, - "hosted": {"url": "https://example.com/mcp"} - } -}"#; - -struct SyntheticExecutorFileSystem { - config_path: AbsolutePathBuf, - config_contents: Option<&'static str>, - reads: Mutex>, -} - -impl SyntheticExecutorFileSystem { - fn unsupported() -> FileSystemResult { - Err(io::Error::new( - io::ErrorKind::Unsupported, - "operation is not used by executor MCP provider tests", - )) - } -} - -impl ExecutorFileSystem for SyntheticExecutorFileSystem { - fn canonicalize<'a>( - &'a self, - _path: &'a PathUri, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, PathUri> { - Box::pin(async { Self::unsupported() }) - } - - fn read_file<'a>( - &'a self, - path: &'a PathUri, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, Vec> { - Box::pin(async move { - let path = path.to_abs_path()?; - self.reads - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .push(path.clone()); - if path != self.config_path { - return Err(io::Error::new(io::ErrorKind::NotFound, "not found")); - } - self.config_contents - .map(|contents| contents.as_bytes().to_vec()) - .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "not found")) - }) - } - - fn read_file_stream<'a>( - &'a self, - _path: &'a PathUri, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, FileSystemReadStream> { - Box::pin(async { Self::unsupported() }) - } - - fn write_file<'a>( - &'a self, - _path: &'a PathUri, - _contents: Vec, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, ()> { - Box::pin(async { Self::unsupported() }) - } - - fn create_directory<'a>( - &'a self, - _path: &'a PathUri, - _options: CreateDirectoryOptions, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, ()> { - Box::pin(async { Self::unsupported() }) - } - - fn get_metadata<'a>( - &'a self, - _path: &'a PathUri, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, FileMetadata> { - Box::pin(async { Self::unsupported() }) - } - - fn read_directory<'a>( - &'a self, - _path: &'a PathUri, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, Vec> { - Box::pin(async { Self::unsupported() }) - } - - fn remove<'a>( - &'a self, - _path: &'a PathUri, - _options: RemoveOptions, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, ()> { - Box::pin(async { Self::unsupported() }) - } - - fn copy<'a>( - &'a self, - _source_path: &'a PathUri, - _destination_path: &'a PathUri, - _options: CopyOptions, - _sandbox: Option<&'a FileSystemSandboxContext>, - ) -> ExecutorFileSystemFuture<'a, ()> { - Box::pin(async { Self::unsupported() }) - } -} - -#[tokio::test] -async fn reads_declared_config_only_through_executor_file_system() { - let temp_dir = tempfile::tempdir().expect("tempdir"); - let plugin_root = - AbsolutePathBuf::from_absolute_path_checked(temp_dir.path().join("executor-only-plugin")) - .expect("absolute plugin root"); - assert!(!plugin_root.as_path().exists()); - let config_path = plugin_root.join("config/mcp.json"); - let plugin = resolved_plugin( - &plugin_root, - Some(PluginManifestMcpServers::Path(config_path.clone())), - ); - let file_system = SyntheticExecutorFileSystem { - config_path: config_path.clone(), - config_contents: Some(MCP_CONFIG_CONTENTS), - reads: Mutex::new(Vec::new()), - }; - - let plugin_root_uri = PathUri::from_abs_path(&plugin_root); - let servers = load_from_file_system(&plugin, &plugin_root_uri, &file_system) - .await - .expect("load executor MCP config"); - - assert_eq!( - servers, - vec![ - ( - "demo".to_string(), - McpServerConfig { - auth: Default::default(), - transport: McpServerTransportConfig::Stdio { - command: "demo-mcp".to_string(), - args: Vec::new(), - env: None, - env_vars: Vec::new(), - cwd: Some(LegacyAppPathString::from_path(plugin_root.as_path())), - }, - environment_id: "executor-test".to_string(), - enabled: true, - required: false, - supports_parallel_tool_calls: false, - disabled_reason: None, - startup_timeout_sec: None, - tool_timeout_sec: None, - default_tools_approval_mode: None, - enabled_tools: None, - disabled_tools: None, - scopes: None, - oauth: None, - oauth_resource: None, - tools: HashMap::new(), - }, - ), - ( - "hosted".to_string(), - McpServerConfig { - auth: Default::default(), - transport: McpServerTransportConfig::StreamableHttp { - url: "https://example.com/mcp".to_string(), - bearer_token_env_var: None, - http_headers: None, - env_http_headers: None, - }, - environment_id: "executor-test".to_string(), - enabled: true, - required: false, - supports_parallel_tool_calls: false, - disabled_reason: None, - startup_timeout_sec: None, - tool_timeout_sec: None, - default_tools_approval_mode: None, - enabled_tools: None, - disabled_tools: None, - scopes: None, - oauth: None, - oauth_resource: None, - tools: HashMap::new(), - }, - ), - ] - ); - assert_eq!(reads(&file_system), vec![config_path]); -} - -#[tokio::test] -async fn reads_manifest_object_config_without_executor_file_system_access() { - let temp_dir = tempfile::tempdir().expect("tempdir"); - let plugin_root = AbsolutePathBuf::from_absolute_path_checked(temp_dir.path().join("plugin")) - .expect("absolute plugin root"); - let config_path = plugin_root.join(DEFAULT_MCP_CONFIG_FILE); - let plugin = resolved_plugin( - &plugin_root, - Some(PluginManifestMcpServers::Object( - r#"{"counter":{"command":"counter-mcp","environment_id":"local"}}"#.to_string(), - )), - ); - let file_system = SyntheticExecutorFileSystem { - config_path, - config_contents: None, - reads: Mutex::new(Vec::new()), - }; - - let plugin_root_uri = PathUri::from_abs_path(&plugin_root); - let servers = load_from_file_system(&plugin, &plugin_root_uri, &file_system) - .await - .expect("load manifest object executor MCP config"); - - assert_eq!( - servers, - vec![( - "counter".to_string(), - McpServerConfig { - auth: Default::default(), - transport: McpServerTransportConfig::Stdio { - command: "counter-mcp".to_string(), - args: Vec::new(), - env: None, - env_vars: Vec::new(), - cwd: Some(LegacyAppPathString::from_path(plugin_root.as_path())), - }, - environment_id: "executor-test".to_string(), - enabled: true, - required: false, - supports_parallel_tool_calls: false, - disabled_reason: None, - startup_timeout_sec: None, - tool_timeout_sec: None, - default_tools_approval_mode: None, - enabled_tools: None, - disabled_tools: None, - scopes: None, - oauth: None, - oauth_resource: None, - tools: HashMap::new(), - }, - )] - ); - assert_eq!(reads(&file_system), Vec::new()); -} - -#[tokio::test] -async fn missing_default_config_is_empty() { - let temp_dir = tempfile::tempdir().expect("tempdir"); - let plugin_root = AbsolutePathBuf::from_absolute_path_checked(temp_dir.path().join("plugin")) - .expect("absolute plugin root"); - let config_path = plugin_root.join(DEFAULT_MCP_CONFIG_FILE); - let plugin = resolved_plugin(&plugin_root, /*mcp_servers*/ None); - let file_system = SyntheticExecutorFileSystem { - config_path: config_path.clone(), - config_contents: None, - reads: Mutex::new(Vec::new()), - }; - - let plugin_root_uri = PathUri::from_abs_path(&plugin_root); - let servers = load_from_file_system(&plugin, &plugin_root_uri, &file_system) - .await - .expect("missing default config should be ignored"); - - assert_eq!(servers, Vec::new()); - assert_eq!(reads(&file_system), vec![config_path]); -} - -#[tokio::test] -async fn malformed_declared_config_is_an_error() { - let temp_dir = tempfile::tempdir().expect("tempdir"); - let plugin_root = AbsolutePathBuf::from_absolute_path_checked(temp_dir.path().join("plugin")) - .expect("absolute plugin root"); - let config_path = plugin_root.join("mcp.json"); - let plugin = resolved_plugin( - &plugin_root, - Some(PluginManifestMcpServers::Path(config_path.clone())), - ); - let file_system = SyntheticExecutorFileSystem { - config_path: config_path.clone(), - config_contents: Some("{not-json"), - reads: Mutex::new(Vec::new()), - }; - - let plugin_root_uri = PathUri::from_abs_path(&plugin_root); - let err = load_from_file_system(&plugin, &plugin_root_uri, &file_system) - .await - .expect_err("malformed declared config should fail"); - - let ExecutorPluginMcpProviderError::ParseConfig { - plugin_id, - path, - source: _, - } = err - else { - panic!("expected parse error"); - }; - assert_eq!( - (plugin_id, path), - ( - "selected-root".to_string(), - PathUri::from_abs_path(&config_path) - ) - ); - assert_eq!(reads(&file_system), vec![config_path]); -} - -#[tokio::test] -async fn malformed_manifest_object_config_reports_actual_manifest_path() { - let temp_dir = tempfile::tempdir().expect("tempdir"); - let plugin_root = AbsolutePathBuf::from_absolute_path_checked(temp_dir.path().join("plugin")) - .expect("absolute plugin root"); - let plugin = resolved_plugin( - &plugin_root, - Some(PluginManifestMcpServers::Object("{not-json".to_string())), - ); - let file_system = SyntheticExecutorFileSystem { - config_path: plugin_root.join(DEFAULT_MCP_CONFIG_FILE), - config_contents: None, - reads: Mutex::new(Vec::new()), - }; - - let plugin_root_uri = PathUri::from_abs_path(&plugin_root); - let err = load_from_file_system(&plugin, &plugin_root_uri, &file_system) - .await - .expect_err("malformed manifest object config should fail"); - - let ExecutorPluginMcpProviderError::ParseConfig { - plugin_id, - path, - source: _, - } = err - else { - panic!("expected parse error"); - }; - assert_eq!( - (plugin_id, path), - ( - "selected-root".to_string(), - PathUri::from_abs_path(&plugin_root.join(".claude-plugin/plugin.json")) - ) - ); - assert_eq!(reads(&file_system), Vec::new()); -} - -fn resolved_plugin( - plugin_root: &AbsolutePathBuf, - mcp_servers: Option>, -) -> ResolvedPlugin { - let plugin_root_uri = PathUri::from_abs_path(plugin_root); - let mcp_servers = mcp_servers.map(|mcp_servers| match mcp_servers { - PluginManifestMcpServers::Path(path) => { - PluginManifestMcpServers::Path(PathUri::from_abs_path(&path)) - } - PluginManifestMcpServers::Object(config) => PluginManifestMcpServers::Object(config), - }); - ResolvedPlugin::from_environment( - "selected-root".to_string(), - "executor-test".to_string(), - plugin_root_uri.clone(), - plugin_root_uri - .join(".claude-plugin/plugin.json") - .expect("manifest URI"), - PluginManifest { - name: "demo-plugin".to_string(), - version: None, - description: None, - keywords: Vec::new(), - paths: PluginManifestPaths { - skills: Vec::new(), - mcp_servers, - apps: None, - hooks: None, - }, - interface: None, - }, - ) - .expect("valid plugin descriptor") -} - -fn reads(file_system: &SyntheticExecutorFileSystem) -> Vec { - file_system - .reads - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .clone() -}