diff --git a/codex-rs/core-skills/src/loader/environment.rs b/codex-rs/core-skills/src/loader/environment.rs index 5dc29ebfb263..93ab14e9aa58 100644 --- a/codex-rs/core-skills/src/loader/environment.rs +++ b/codex-rs/core-skills/src/loader/environment.rs @@ -10,6 +10,7 @@ use codex_utils_path_uri::PathUri; use codex_utils_plugins::DISCOVERABLE_PLUGIN_MANIFEST_PATHS; use codex_utils_plugins::plugin_namespace_for_root_uri; use codex_utils_plugins::plugin_namespace_for_skill_uri; +use futures::StreamExt; use futures::future::join_all; use crate::model::SkillDependencies; @@ -31,6 +32,7 @@ use super::sanitize_single_line; use super::validate_len; const MAX_SKILLS_ENTRIES_PER_ROOT: usize = 20_000; +const MAX_CONCURRENT_SKILL_LOADS: usize = 64; /// URI-native metadata for one skill owned by an execution environment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -220,24 +222,32 @@ pub async fn load_environment_skills_from_root( .filter_map(|(plugin_root, namespace)| namespace.map(|namespace| (plugin_root, namespace))) .collect::>(); - for path in discovery.skill_files { - let mut ancestor = path.parent(); - let plugin_namespace = loop { - let Some(current) = ancestor else { - break None; + // Remote executors can multiplex these independent per-skill reads, so polling a bounded + // number together allows the I/O for each skill and its metadata to happen concurrently. + let skill_results = futures::stream::iter(discovery.skill_files) + .map(|path| { + let mut ancestor = path.parent(); + let plugin_namespace = loop { + let Some(current) = ancestor else { + break None; + }; + if let Some(namespace) = plugin_namespaces.get(¤t) { + break Some(namespace.as_str()); + } + ancestor = current.parent(); }; - if let Some(namespace) = plugin_namespaces.get(¤t) { - break Some(namespace.as_str()); + async move { + let result = + EnvironmentSkillMetadata::parse(file_system, &path, plugin_namespace).await; + (path, result) } - ancestor = current.parent(); - }; - match EnvironmentSkillMetadata::parse( - file_system, - &path, - /*plugin_namespace*/ plugin_namespace, - ) - .await - { + }) + .buffered(MAX_CONCURRENT_SKILL_LOADS) + .collect::>() + .await; + + for (path, result) in skill_results { + match result { Ok(skill) if skill.matches_product_restriction(restriction_product) => { outcome.skills.push(skill); }