Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions codex-rs/core-skills/src/loader/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use codex_utils_path_uri::PathUri;
use codex_utils_plugins::DISCOVERABLE_PLUGIN_MANIFEST_PATHS;
use codex_utils_plugins::plugin_namespace_for_root_uri;
use codex_utils_plugins::plugin_namespace_for_skill_uri;
use futures::StreamExt;
use futures::future::join_all;

use crate::model::SkillDependencies;
Expand All @@ -31,6 +32,7 @@ use super::sanitize_single_line;
use super::validate_len;

const MAX_SKILLS_ENTRIES_PER_ROOT: usize = 20_000;
const MAX_CONCURRENT_SKILL_LOADS: usize = 64;

/// URI-native metadata for one skill owned by an execution environment.
#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -220,24 +222,32 @@ pub async fn load_environment_skills_from_root(
.filter_map(|(plugin_root, namespace)| namespace.map(|namespace| (plugin_root, namespace)))
.collect::<HashMap<_, _>>();

for path in discovery.skill_files {
let mut ancestor = path.parent();
let plugin_namespace = loop {
let Some(current) = ancestor else {
break None;
// Remote executors can multiplex these independent per-skill reads, so polling a bounded
// number together allows the I/O for each skill and its metadata to happen concurrently.
let skill_results = futures::stream::iter(discovery.skill_files)
.map(|path| {
let mut ancestor = path.parent();
let plugin_namespace = loop {
let Some(current) = ancestor else {
break None;
};
if let Some(namespace) = plugin_namespaces.get(&current) {
break Some(namespace.as_str());
}
ancestor = current.parent();
};
if let Some(namespace) = plugin_namespaces.get(&current) {
break Some(namespace.as_str());
async move {
let result =
EnvironmentSkillMetadata::parse(file_system, &path, plugin_namespace).await;
(path, result)
}
ancestor = current.parent();
};
match EnvironmentSkillMetadata::parse(
file_system,
&path,
/*plugin_namespace*/ plugin_namespace,
)
.await
{
})
.buffered(MAX_CONCURRENT_SKILL_LOADS)
Comment thread
anp-oai marked this conversation as resolved.
Comment thread
anp-oai marked this conversation as resolved.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In term of defensive, this will be interesting because it means the executor can make the memory usage of the orchestrator explode through this. To me this is a potential attack surface

.collect::<Vec<_>>()
.await;

for (path, result) in skill_results {
match result {
Ok(skill) if skill.matches_product_restriction(restriction_product) => {
outcome.skills.push(skill);
}
Expand Down
Loading