Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions codex-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ codex-code-mode-protocol = { path = "code-mode-protocol" }
codex-home = { path = "codex-home" }
codex-config = { path = "config" }
codex-connectors = { path = "connectors" }
codex-connectors-extension = { path = "ext/connectors" }
codex-context-fragments = { path = "context-fragments" }
codex-core = { path = "core" }
codex-core-api = { path = "core-api" }
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ codex-arg0 = { workspace = true }
codex-cloud-config = { workspace = true }
codex-config = { workspace = true }
codex-connectors = { workspace = true }
codex-connectors-extension = { workspace = true }
codex-core = { workspace = true }
codex-core-plugins = { workspace = true }
codex-home = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat
} }
```

When `threadId` is provided, app feature gating (`Feature::Apps`) is evaluated using that thread's config snapshot. When omitted, the latest global config is used.
When `threadId` is provided, app feature gating (`Feature::Apps`) is evaluated using that thread's config snapshot, and plugin associations come from its frozen selected-capability snapshot. When omitted, the latest global config is used.

`app/list` returns after both accessible apps and directory apps are loaded. Set `forceRefetch: true` to bypass app caches and fetch fresh data from sources. Cache entries are only replaced when those refetches succeed.

Expand Down
6 changes: 2 additions & 4 deletions codex-rs/app-server/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use codex_core::NewThread;
use codex_core::StartThreadOptions;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManager;
use codex_extension_api::AgentSpawnFuture;
use codex_extension_api::AgentSpawner;
use codex_extension_api::ExtensionEventSink;
Expand All @@ -35,7 +34,6 @@ pub(crate) struct ThreadExtensionDependencies {
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) thread_manager: Weak<ThreadManager>,
pub(crate) goal_service: Arc<GoalService>,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider>,
/// Process-scoped persistence backend for extensions that need stored thread history.
pub(crate) thread_store: Arc<dyn ThreadStore>,
Expand All @@ -55,7 +53,6 @@ where
analytics_events_client,
thread_manager,
goal_service,
environment_manager,
executor_skill_provider,
thread_store: _thread_store,
} = dependencies;
Expand All @@ -73,8 +70,9 @@ where
}
codex_guardian::install(&mut builder, guardian_agent_spawner);
codex_memories_extension::install(&mut builder, codex_otel::global());
codex_connectors_extension::install_selected_executor_connectors(&mut builder);
codex_mcp_extension::install(&mut builder);
codex_mcp_extension::install_executor_plugins(&mut builder, environment_manager);
codex_mcp_extension::install_executor_plugins(&mut builder);
codex_web_search_extension::install(&mut builder, auth_manager.clone());
codex_image_generation_extension::install(&mut builder, auth_manager, |config: &Config| {
Some(config.codex_home.clone())
Expand Down
58 changes: 12 additions & 46 deletions codex-rs/app-server/src/mcp_refresh.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::config_manager::ConfigManager;
use codex_core::CodexThread;
use codex_core::ThreadManager;
use codex_protocol::ThreadId;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_core::config::Config;
use std::io;
use std::sync::Arc;
use tracing::warn;
Expand All @@ -21,11 +19,11 @@ pub(crate) async fn queue_strict_refresh(
.get_thread(thread_id)
.await
.map_err(|err| io::Error::other(format!("failed to load thread {thread_id}: {err}")))?;
let config = build_refresh_config(thread.as_ref(), config_manager).await?;
refreshes.push((thread_id, thread, config));
let config = load_refresh_config(thread.as_ref(), config_manager).await?;
refreshes.push((thread, config));
}
for (thread_id, thread, config) in refreshes {
queue_refresh(thread_id, thread, config).await?;
for (thread, config) in refreshes {
thread.queue_mcp_server_refresh_from_config(config).await;
}
Ok(())
}
Expand All @@ -42,54 +40,25 @@ pub(crate) async fn queue_best_effort_refresh(
continue;
}
};
let config = match build_refresh_config(thread.as_ref(), config_manager).await {
let config = match load_refresh_config(thread.as_ref(), config_manager).await {
Ok(config) => config,
Err(err) => {
warn!("failed to build MCP refresh config for thread {thread_id}: {err}");
continue;
}
};
if let Err(err) = queue_refresh(thread_id, thread, config).await {
warn!("{err}");
}
thread.queue_mcp_server_refresh_from_config(config).await;
}
}

async fn build_refresh_config(
async fn load_refresh_config(
thread: &CodexThread,
config_manager: &ConfigManager,
) -> io::Result<McpServerRefreshConfig> {
) -> io::Result<Config> {
let thread_config = thread.config().await;
let config = config_manager
config_manager
.load_latest_config_for_thread(thread_config.as_ref())
.await?;
let mcp_config = thread.runtime_mcp_config(&config).await;
let mcp_servers = codex_mcp::configured_mcp_servers(&mcp_config);
Ok(McpServerRefreshConfig {
mcp_servers: serde_json::to_value(mcp_servers).map_err(io::Error::other)?,
mcp_oauth_credentials_store_mode: serde_json::to_value(
config.mcp_oauth_credentials_store_mode,
)
.map_err(io::Error::other)?,
auth_keyring_backend_kind: serde_json::to_value(config.auth_keyring_backend_kind())
.map_err(io::Error::other)?,
})
}

async fn queue_refresh(
thread_id: ThreadId,
thread: Arc<CodexThread>,
config: McpServerRefreshConfig,
) -> io::Result<()> {
thread
.submit(Op::RefreshMcpServers { config })
.await
.map(|_| ())
.map_err(|err| {
io::Error::other(format!(
"failed to queue MCP refresh for thread {thread_id}: {err}"
))
})
}

#[cfg(test)]
Expand Down Expand Up @@ -164,10 +133,8 @@ mod tests {
}
let thread = good_thread.expect("good test thread should exist");

let refresh_config = build_refresh_config(thread.as_ref(), &config_manager).await?;
let backend = serde_json::from_value::<AuthKeyringBackendKind>(
refresh_config.auth_keyring_backend_kind,
)?;
let refresh_config = load_refresh_config(thread.as_ref(), &config_manager).await?;
let backend = refresh_config.auth_keyring_backend_kind();

assert_eq!(
thread.config().await.auth_keyring_backend_kind(),
Expand Down Expand Up @@ -237,7 +204,6 @@ mod tests {
analytics_events_client: codex_analytics::AnalyticsEventsClient::disabled(),
thread_manager: thread_manager.clone(),
goal_service: Arc::new(codex_goal_extension::GoalService::new()),
environment_manager: Arc::clone(&environment_manager),
executor_skill_provider: Arc::clone(&executor_skill_provider),
thread_store: Arc::clone(&thread_store),
},
Expand Down
1 change: 0 additions & 1 deletion codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ impl MessageProcessor {
analytics_events_client: analytics_events_client.clone(),
thread_manager: thread_manager.clone(),
goal_service: Arc::clone(&goal_service),
environment_manager: Arc::clone(&environment_manager_for_extensions),
executor_skill_provider: Arc::clone(&executor_skill_provider),
thread_store: Arc::clone(&thread_store),
},
Expand Down
2 changes: 0 additions & 2 deletions codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ use codex_connectors::AppInfo;
use codex_core::CodexThread;
use codex_core::CodexThreadSettingsOverrides;
use codex_core::ForkSnapshot;
use codex_core::McpManager;
use codex_core::NewThread;
#[cfg(test)]
use codex_core::SessionMeta;
Expand Down Expand Up @@ -335,7 +334,6 @@ use codex_core_plugins::PluginInstallError as CorePluginInstallError;
use codex_core_plugins::PluginInstallRequest;
use codex_core_plugins::PluginReadRequest;
use codex_core_plugins::PluginUninstallError as CorePluginUninstallError;
use codex_core_plugins::PluginsManager;
use codex_core_plugins::loader::load_plugin_apps;
use codex_core_plugins::loader::load_plugin_mcp_servers;
use codex_core_plugins::manifest::PluginManifestInterface;
Expand Down
76 changes: 40 additions & 36 deletions codex-rs/app-server/src/request_processors/apps_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl AppsRequestProcessor {
};
let mut config = self.load_latest_config(fallback_cwd).await?;

if let Some(thread) = thread {
if let Some(thread) = thread.as_ref() {
let _ = config
.features
.set_enabled(Feature::Apps, thread.enabled(Feature::Apps));
Expand Down Expand Up @@ -90,7 +90,10 @@ impl AppsRequestProcessor {
let outgoing = Arc::clone(&self.outgoing);
let environment_manager = self.thread_manager.environment_manager();
let mcp_manager = self.thread_manager.mcp_manager();
let plugins_manager = self.thread_manager.plugins_manager();
let mcp_config = match thread.as_ref() {
Some(thread) => thread.runtime_mcp_config(&config).await,
None => mcp_manager.runtime_config(&config).await,
};
let shutdown_token = self.shutdown_token.child_token();
tokio::spawn(async move {
tokio::select! {
Expand All @@ -101,8 +104,7 @@ impl AppsRequestProcessor {
params,
config,
environment_manager,
mcp_manager,
plugins_manager,
mcp_config,
) => {}
}
});
Expand All @@ -119,23 +121,15 @@ impl AppsRequestProcessor {
params: AppsListParams,
config: Config,
environment_manager: Arc<EnvironmentManager>,
mcp_manager: Arc<McpManager>,
plugins_manager: Arc<PluginsManager>,
mcp_config: codex_mcp::McpConfig,
) {
let retry_params = params.clone();
let retry_config = config.clone();
let retry_environment_manager = Arc::clone(&environment_manager);
let retry_mcp_manager = Arc::clone(&mcp_manager);
let retry_plugins_manager = Arc::clone(&plugins_manager);
let result = Self::apps_list_response(
&outgoing,
params,
config,
environment_manager,
mcp_manager,
plugins_manager,
)
.await;
let retry_mcp_config = mcp_config.clone();
let result =
Self::apps_list_response(&outgoing, params, config, environment_manager, mcp_config)
.await;
let should_retry = result
.as_ref()
.is_ok_and(|(_, codex_apps_ready)| !codex_apps_ready);
Expand All @@ -151,8 +145,7 @@ impl AppsRequestProcessor {
retry_params,
retry_config,
retry_environment_manager,
retry_mcp_manager,
retry_plugins_manager,
retry_mcp_config,
)
.await
{
Expand All @@ -166,8 +159,7 @@ impl AppsRequestProcessor {
params: AppsListParams,
config: Config,
environment_manager: Arc<EnvironmentManager>,
mcp_manager: Arc<McpManager>,
plugins_manager: Arc<PluginsManager>,
mcp_config: codex_mcp::McpConfig,
) -> Result<(AppsListResponse, bool), JSONRPCErrorError> {
let AppsListParams {
cursor,
Expand All @@ -183,13 +175,7 @@ impl AppsRequestProcessor {
None => 0,
};

let loaded_plugins = plugins_manager
.plugins_for_config(&config.plugins_config_input())
.await;
let connector_snapshot =
codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries(
loaded_plugins.capability_summaries(),
);
let connector_snapshot = mcp_config.connector_snapshot.clone();
let plugin_apps = connector_snapshot.connector_ids().to_vec();
let (mut accessible_connectors, mut all_connectors) = tokio::join!(
connectors::list_cached_accessible_connectors_from_mcp_tools(&config),
Expand All @@ -202,11 +188,11 @@ impl AppsRequestProcessor {
let accessible_config = config.clone();
let accessible_tx = tx.clone();
tokio::spawn(async move {
let result = connectors::list_accessible_connectors_from_mcp_tools_with_mcp_manager(
let result = connectors::list_accessible_connectors_from_mcp_tools_with_mcp_config(
&accessible_config,
force_refetch,
Arc::clone(&environment_manager),
mcp_manager,
mcp_config,
)
.await
.map_err(|err| format!("failed to load accessible apps: {err}"));
Expand All @@ -233,9 +219,12 @@ impl AppsRequestProcessor {
let mut last_notified_apps = None;

if accessible_connectors.is_some() || all_connectors.is_some() {
let merged = connectors::with_app_enabled_state(
merge_loaded_apps(all_connectors.as_deref(), accessible_connectors.as_deref()),
&config,
let merged = with_connector_plugin_sources(
connectors::with_app_enabled_state(
merge_loaded_apps(all_connectors.as_deref(), accessible_connectors.as_deref()),
&config,
),
&connector_snapshot,
);
if should_send_app_list_updated_notification(
merged.as_slice(),
Expand Down Expand Up @@ -292,9 +281,12 @@ impl AppsRequestProcessor {
} else {
accessible_connectors.as_deref()
};
let merged = connectors::with_app_enabled_state(
merge_loaded_apps(all_connectors_for_update, accessible_connectors_for_update),
&config,
let merged = with_connector_plugin_sources(
connectors::with_app_enabled_state(
merge_loaded_apps(all_connectors_for_update, accessible_connectors_for_update),
&config,
),
&connector_snapshot,
);
if should_send_app_list_updated_notification(
merged.as_slice(),
Expand Down Expand Up @@ -379,6 +371,18 @@ fn merge_loaded_apps(
connectors::merge_connectors_with_accessible(all, accessible, all_connectors_loaded)
}

fn with_connector_plugin_sources(
mut connectors: Vec<AppInfo>,
connector_snapshot: &codex_connectors::ConnectorSnapshot,
) -> Vec<AppInfo> {
for connector in &mut connectors {
connector.plugin_display_names = connector_snapshot
.plugin_display_names_for_connector_id(&connector.id)
.to_vec();
}
connectors
}

fn should_send_app_list_updated_notification(
connectors: &[AppInfo],
accessible_loaded: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl ExternalAgentSessionImporter {
.unwrap_or_else(|| model_info.get_model_instructions(config.personality)),
},
dynamic_tools: Vec::new(),
selected_capability_roots: Vec::new(),
multi_agent_version: Some(MultiAgentVersion::V1),
initial_window_id: uuid::Uuid::now_v7().to_string(),
metadata: ThreadPersistenceMetadata {
Expand Down
Loading
Loading