Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions codex-rs/app-server/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub(crate) struct ThreadExtensionDependencies {
pub(crate) thread_manager: Weak<ThreadManager>,
pub(crate) goal_service: Arc<GoalService>,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider>,
/// Process-scoped persistence backend for extensions that need stored thread history.
pub(crate) thread_store: Arc<dyn ThreadStore>,
}
Expand All @@ -56,7 +55,6 @@ where
thread_manager,
goal_service,
environment_manager,
executor_skill_provider,
thread_store: _thread_store,
} = dependencies;
let mut builder = ExtensionRegistryBuilder::<Config>::with_event_sink(event_sink);
Expand All @@ -79,11 +77,9 @@ where
codex_image_generation_extension::install(&mut builder, auth_manager, |config: &Config| {
Some(config.codex_home.clone())
});
let skill_providers = codex_skills_extension::SkillProviders::new()
.with_executor_provider(executor_skill_provider)
.with_orchestrator_provider(Arc::new(
codex_skills_extension::OrchestratorSkillProvider::new(),
));
let skill_providers = codex_skills_extension::SkillProviders::new().with_orchestrator_provider(
Arc::new(codex_skills_extension::OrchestratorSkillProvider::new()),
);
codex_skills_extension::install_with_providers(
&mut builder,
skill_providers,
Expand Down
7 changes: 0 additions & 7 deletions codex-rs/app-server/src/mcp_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,6 @@ mod tests {
.expect("refresh tests require state db");
let thread_store = thread_store_from_config(&good_config, Some(state_db.clone()));
let environment_manager = Arc::new(EnvironmentManager::default_for_tests());
let executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider> = Arc::new(
codex_skills_extension::ExecutorSkillProvider::new_with_restriction_product(
Arc::clone(&environment_manager),
SessionSource::Exec.restriction_product(),
),
);
let thread_manager = Arc::new_cyclic(|thread_manager| {
ThreadManager::new(
&good_config,
Expand All @@ -238,7 +232,6 @@ mod tests {
thread_manager: thread_manager.clone(),
goal_service: Arc::new(codex_goal_extension::GoalService::new()),
environment_manager: Arc::clone(&environment_manager),
executor_skill_provider: Arc::clone(&executor_skill_provider),
thread_store: Arc::clone(&thread_store),
},
),
Expand Down
8 changes: 0 additions & 8 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,6 @@ impl MessageProcessor {
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
let environment_manager_for_requests = Arc::clone(&environment_manager);
let environment_manager_for_extensions = Arc::clone(&environment_manager);
let restriction_product = session_source.restriction_product();
let executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider> = Arc::new(
codex_skills_extension::ExecutorSkillProvider::new_with_restriction_product(
Arc::clone(&environment_manager_for_extensions),
restriction_product,
),
);
let goal_service = Arc::new(GoalService::new());
let thread_manager = Arc::new_cyclic(|thread_manager| {
ThreadManager::new(
Expand All @@ -364,7 +357,6 @@ impl MessageProcessor {
thread_manager: thread_manager.clone(),
goal_service: Arc::clone(&goal_service),
environment_manager: Arc::clone(&environment_manager_for_extensions),
executor_skill_provider: Arc::clone(&executor_skill_provider),
thread_store: Arc::clone(&thread_store),
},
),
Expand Down
37 changes: 29 additions & 8 deletions codex-rs/codex-mcp/src/resource_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ pub struct McpResourceReadResult {
/// snapshot, so calls automatically use replacements installed during startup and refresh.
#[derive(Clone)]
pub struct McpResourceClient {
manager: Arc<ArcSwap<McpConnectionManager>>,
manager: ResourceManager,
}

#[derive(Clone)]
enum ResourceManager {
Live(Arc<ArcSwap<McpConnectionManager>>),
Snapshot(Arc<McpConnectionManager>),
}

/// Opaque identity for the manager currently used by an MCP resource client.
Expand All @@ -59,19 +65,36 @@ impl std::fmt::Debug for McpResourceClient {
impl McpResourceClient {
/// Creates a resource client backed by the session's replaceable MCP manager.
pub fn new(manager: Arc<ArcSwap<McpConnectionManager>>) -> Self {
Self { manager }
Self {
manager: ResourceManager::Live(manager),
}
}

/// Pins resource calls to the manager generation that is current now.
pub fn snapshot(&self) -> Self {
Self {
manager: ResourceManager::Snapshot(self.manager_snapshot()),
}
}

/// Returns the exact manager generation used by this client right now.
pub fn manager_snapshot(&self) -> Arc<McpConnectionManager> {
match &self.manager {
ResourceManager::Live(manager) => manager.load_full(),
ResourceManager::Snapshot(manager) => Arc::clone(manager),
}
}

/// Returns an identity that changes whenever the published manager changes.
pub fn cache_key(&self) -> McpResourceClientCacheKey {
McpResourceClientCacheKey(Arc::downgrade(&self.manager.load_full()))
McpResourceClientCacheKey(Arc::downgrade(&self.manager_snapshot()))
}

/// Returns whether the current manager contains the named server.
///
/// This does not wait for server startup or imply that startup succeeded.
pub async fn has_server(&self, server: &str) -> bool {
self.manager.load_full().contains_server(server)
self.manager_snapshot().contains_server(server)
}

/// Lists one resource page from the named server.
Expand All @@ -83,8 +106,7 @@ impl McpResourceClient {
let params =
cursor.map(|cursor| PaginatedRequestParams::default().with_cursor(Some(cursor)));
let result = self
.manager
.load_full()
.manager_snapshot()
.list_resources(server, params)
.await?;
let resources = result
Expand All @@ -101,8 +123,7 @@ impl McpResourceClient {
/// Reads one resource from the named server.
pub async fn read_resource(&self, server: &str, uri: &str) -> Result<McpResourceReadResult> {
let result = self
.manager
.load_full()
.manager_snapshot()
.read_resource(server, ReadResourceRequestParams::new(uri.to_string()))
.await?;
let contents = result
Expand Down
5 changes: 5 additions & 0 deletions codex-rs/core-skills/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod render;
mod root_loader;
pub mod runtime;
mod runtime_selection;
mod runtime_snapshot;
pub mod service;
mod skill_instructions;
pub mod system;
Expand Down Expand Up @@ -36,6 +37,10 @@ pub use render::default_skill_metadata_budget;
pub use render::render_available_skills_body;
pub use root_loader::PluginSkillSnapshots;
pub use runtime_selection::collect_runtime_skill_mentions;
pub use runtime_snapshot::RuntimeSkillInjection;
pub use runtime_snapshot::RuntimeSkillInjections;
pub use runtime_snapshot::SkillInjectionIdentity;
pub use runtime_snapshot::SkillsSnapshot;
pub use service::SkillsLoadInput;
pub use service::SkillsService;
pub use skill_instructions::SkillInstructions;
4 changes: 4 additions & 0 deletions codex-rs/core-skills/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl HostSkillsSnapshot {
self.outcome.as_ref()
}

pub(crate) fn outcome_arc(&self) -> Arc<SkillLoadOutcome> {
Arc::clone(&self.outcome)
}

pub async fn read_skill_text(&self, skill: &SkillMetadata) -> io::Result<String> {
let fs = self
.outcome
Expand Down
58 changes: 48 additions & 10 deletions codex-rs/core-skills/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::hash::Hash;
Expand Down Expand Up @@ -306,7 +307,7 @@ pub trait SkillSource: Send + Sync {
#[derive(Clone)]
struct RegisteredSkillSource {
label: String,
source: Arc<dyn SkillSource>,
source: Arc<dyn Fn() -> Arc<dyn SkillSource> + Send + Sync>,
}

/// Bound skill sources used to build and read one runtime catalog.
Expand All @@ -321,35 +322,72 @@ impl SkillSources {
}

pub fn with_source(mut self, label: impl Into<String>, source: Arc<dyn SkillSource>) -> Self {
self.sources.push(RegisteredSkillSource {
label: label.into(),
source: Arc::new(move || Arc::clone(&source)),
});
self
}

pub fn with_source_factory(
mut self,
label: impl Into<String>,
source: Arc<dyn Fn() -> Arc<dyn SkillSource> + Send + Sync>,
) -> Self {
self.sources.push(RegisteredSkillSource {
label: label.into(),
source,
});
self
}

pub fn extend(&mut self, other: Self) {
self.sources.extend(other.sources);
}

pub async fn list(&self) -> SkillCatalog {
self.list_with_sources().await.0
}

pub(crate) async fn list_with_sources(
&self,
) -> (
SkillCatalog,
HashMap<(SkillAuthority, SkillPackageId), Arc<dyn SkillSource>>,
) {
let mut catalog = SkillCatalog::default();
let mut sources = HashMap::new();
for source in &self.sources {
match source.source.list().await {
Ok(source_catalog) => catalog.extend(source_catalog),
let bound_source = (source.source)();
match bound_source.list().await {
Ok(source_catalog) => {
catalog.warnings.extend(source_catalog.warnings);
for entry in source_catalog.entries {
let key = (entry.authority.clone(), entry.id.clone());
if let std::collections::hash_map::Entry::Vacant(route) = sources.entry(key)
{
route.insert(Arc::clone(&bound_source));
catalog.entries.push(entry);
}
}
}
Err(err) => catalog.warnings.push(format!(
"{} skills unavailable: {}",
source.label, err.message
)),
}
}
catalog
(catalog, sources)
}

pub async fn list_kind(&self, kind: &SkillSourceKind) -> SkillSourceResult<SkillCatalog> {
let mut catalog = SkillCatalog::default();
for source in self
.sources
.iter()
.filter(|source| source.source.kind() == *kind)
{
let source_catalog = source.source.list().await.map_err(|err| {
for source in self.sources.iter() {
let bound_source = (source.source)();
if bound_source.kind() != *kind {
continue;
}
let source_catalog = bound_source.list().await.map_err(|err| {
SkillSourceError::new(format!(
"{} skills unavailable: {}",
source.label, err.message
Expand Down
Loading
Loading