Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 108 additions & 59 deletions astrbot/core/skills/skill_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ def install_skill_from_zip(
if not zipfile.is_zipfile(zip_path):
raise ValueError("Uploaded file is not a valid zip archive.")

installed_skills = []

with zipfile.ZipFile(zip_path) as zf:
names = [
name
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Expand All @@ -573,34 +575,6 @@ def install_skill_from_zip(
):
raise ValueError("Invalid skill name.")

if root_mode:
archive_hint = _normalize_skill_name(
archive_skill_name or zip_path_obj.stem
)
if not archive_hint or not _SKILL_NAME_RE.fullmatch(archive_hint):
raise ValueError("Invalid skill name.")
skill_name = archive_hint
else:
top_dirs = {
PurePosixPath(name).parts[0] for name in file_names if name.strip()
}
if len(top_dirs) != 1:
raise ValueError(
"Zip archive must contain a single top-level folder."
)
archive_root_name = next(iter(top_dirs))
archive_root_name_normalized = _normalize_skill_name(archive_root_name)
if archive_root_name in {".", "..", ""} or not _SKILL_NAME_RE.fullmatch(
archive_root_name_normalized
):
raise ValueError("Invalid skill folder name.")
if archive_skill_name:
if not _SKILL_NAME_RE.fullmatch(archive_skill_name):
raise ValueError("Invalid skill name.")
skill_name = archive_skill_name
else:
skill_name = archive_root_name_normalized

for name in names:
if not name:
continue
Expand All @@ -609,42 +583,117 @@ def install_skill_from_zip(
parts = PurePosixPath(name).parts
if ".." in parts:
raise ValueError("Zip archive contains invalid relative paths.")
if (not root_mode) and parts and parts[0] != archive_root_name:
raise ValueError(
"Zip archive contains unexpected top-level entries."
)

if root_mode:
if "SKILL.md" not in file_names and "skill.md" not in file_names:
raise ValueError("SKILL.md not found in the skill folder.")
else:
if (
f"{archive_root_name}/SKILL.md" not in file_names
and f"{archive_root_name}/skill.md" not in file_names
):
raise ValueError("SKILL.md not found in the skill folder.")
if not root_mode and not overwrite:
top_dirs = {PurePosixPath(n).parts[0] for n in file_names if n.strip()}
conflict_dirs: list[str] = []
for src_dir_name in top_dirs:
if (
f"{src_dir_name}/SKILL.md" not in file_names
and f"{src_dir_name}/skill.md" not in file_names
):
continue

candidate_name = _normalize_skill_name(src_dir_name)
if not candidate_name or not _SKILL_NAME_RE.fullmatch(
candidate_name
):
continue

if archive_skill_name and len(top_dirs) == 1:
target_name = archive_skill_name
else:
target_name = candidate_name

dest_dir = Path(self.skills_root) / target_name
if dest_dir.exists():
conflict_dirs.append(str(dest_dir))

if conflict_dirs:
raise FileExistsError(
"One or more skills from the archive already exist and "
"overwrite=False. No skills were installed. Conflicting "
f"paths: {', '.join(conflict_dirs)}"
)

with tempfile.TemporaryDirectory(dir=get_astrbot_temp_path()) as tmp_dir:
for member in zf.infolist():
member_name = member.filename.replace("\\", "/")
if not member_name or _is_ignored_zip_entry(member_name):
continue
zf.extract(member, tmp_dir)
src_dir = (
Path(tmp_dir) if root_mode else Path(tmp_dir) / archive_root_name
)
normalized_path = _normalize_skill_markdown_path(src_dir)
if normalized_path is None:
raise ValueError("SKILL.md not found in the skill folder.")
_normalize_skill_markdown_path(src_dir)
if not src_dir.exists():
raise ValueError("Skill folder not found after extraction.")
dest_dir = Path(self.skills_root) / skill_name
if dest_dir.exists():
if not overwrite:
raise FileExistsError("Skill already exists.")
shutil.rmtree(dest_dir)
shutil.move(str(src_dir), str(dest_dir))

self.set_skill_active(skill_name, True)
return skill_name

if root_mode:
archive_hint = _normalize_skill_name(
archive_skill_name or zip_path_obj.stem
)
if not archive_hint or not _SKILL_NAME_RE.fullmatch(archive_hint):
raise ValueError("Invalid skill name.")
skill_name = archive_hint

src_dir = Path(tmp_dir)
normalized_path = _normalize_skill_markdown_path(src_dir)
if normalized_path is None:
raise ValueError(
"SKILL.md not found in the root of the zip archive."
)

dest_dir = Path(self.skills_root) / skill_name
if dest_dir.exists() and overwrite:
shutil.rmtree(dest_dir)
elif dest_dir.exists() and not overwrite:
raise FileExistsError(f"Skill {skill_name} already exists.")

shutil.move(str(src_dir), str(dest_dir))
self.set_skill_active(skill_name, True)
installed_skills.append(skill_name)

else:
top_dirs = {
PurePosixPath(n).parts[0] for n in file_names if n.strip()
}

for archive_root_name in top_dirs:
archive_root_name_normalized = _normalize_skill_name(
archive_root_name
)

if (
f"{archive_root_name}/SKILL.md" not in file_names
and f"{archive_root_name}/skill.md" not in file_names
):
continue

if archive_root_name in {".", "..", ""} or not (
_SKILL_NAME_RE.fullmatch(archive_root_name_normalized)
):
continue

if archive_skill_name and len(top_dirs) == 1:
skill_name = archive_skill_name
else:
skill_name = archive_root_name_normalized

src_dir = Path(tmp_dir) / archive_root_name
normalized_path = _normalize_skill_markdown_path(src_dir)
if normalized_path is None:
continue

dest_dir = Path(self.skills_root) / skill_name
if dest_dir.exists():
if not overwrite:
raise FileExistsError(
f"Skill {skill_name} already exists."
)
shutil.rmtree(dest_dir)

shutil.move(str(src_dir), str(dest_dir))
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
self.set_skill_active(skill_name, True)
installed_skills.append(skill_name)

if not installed_skills:
raise ValueError(
"No valid SKILL.md found in any folder of the zip archive."
)

return ", ".join(installed_skills)
Loading