diff --git a/astrbot/core/skills/skill_manager.py b/astrbot/core/skills/skill_manager.py index 1b71e5371f..a8121c42a4 100644 --- a/astrbot/core/skills/skill_manager.py +++ b/astrbot/core/skills/skill_manager.py @@ -548,6 +548,8 @@ def install_skill_from_zip( if not zipfile.is_zipfile(zip_path): raise ValueError("Uploaded file is not a valid zip archive.") + installed_skills = [] + with zipfile.ZipFile(zip_path) as zf: names = [ name @@ -573,34 +575,6 @@ def install_skill_from_zip( ): raise ValueError("Invalid skill name.") - if root_mode: - archive_hint = _normalize_skill_name( - archive_skill_name or zip_path_obj.stem - ) - if not archive_hint or not _SKILL_NAME_RE.fullmatch(archive_hint): - raise ValueError("Invalid skill name.") - skill_name = archive_hint - else: - top_dirs = { - PurePosixPath(name).parts[0] for name in file_names if name.strip() - } - if len(top_dirs) != 1: - raise ValueError( - "Zip archive must contain a single top-level folder." - ) - archive_root_name = next(iter(top_dirs)) - archive_root_name_normalized = _normalize_skill_name(archive_root_name) - if archive_root_name in {".", "..", ""} or not _SKILL_NAME_RE.fullmatch( - archive_root_name_normalized - ): - raise ValueError("Invalid skill folder name.") - if archive_skill_name: - if not _SKILL_NAME_RE.fullmatch(archive_skill_name): - raise ValueError("Invalid skill name.") - skill_name = archive_skill_name - else: - skill_name = archive_root_name_normalized - for name in names: if not name: continue @@ -609,20 +583,38 @@ def install_skill_from_zip( parts = PurePosixPath(name).parts if ".." in parts: raise ValueError("Zip archive contains invalid relative paths.") - if (not root_mode) and parts and parts[0] != archive_root_name: - raise ValueError( - "Zip archive contains unexpected top-level entries." - ) - if root_mode: - if "SKILL.md" not in file_names and "skill.md" not in file_names: - raise ValueError("SKILL.md not found in the skill folder.") - else: - if ( - f"{archive_root_name}/SKILL.md" not in file_names - and f"{archive_root_name}/skill.md" not in file_names - ): - raise ValueError("SKILL.md not found in the skill folder.") + if not root_mode and not overwrite: + top_dirs = {PurePosixPath(n).parts[0] for n in file_names if n.strip()} + conflict_dirs: list[str] = [] + for src_dir_name in top_dirs: + if ( + f"{src_dir_name}/SKILL.md" not in file_names + and f"{src_dir_name}/skill.md" not in file_names + ): + continue + + candidate_name = _normalize_skill_name(src_dir_name) + if not candidate_name or not _SKILL_NAME_RE.fullmatch( + candidate_name + ): + continue + + if archive_skill_name and len(top_dirs) == 1: + target_name = archive_skill_name + else: + target_name = candidate_name + + dest_dir = Path(self.skills_root) / target_name + if dest_dir.exists(): + conflict_dirs.append(str(dest_dir)) + + if conflict_dirs: + raise FileExistsError( + "One or more skills from the archive already exist and " + "overwrite=False. No skills were installed. Conflicting " + f"paths: {', '.join(conflict_dirs)}" + ) with tempfile.TemporaryDirectory(dir=get_astrbot_temp_path()) as tmp_dir: for member in zf.infolist(): @@ -630,21 +622,78 @@ def install_skill_from_zip( if not member_name or _is_ignored_zip_entry(member_name): continue zf.extract(member, tmp_dir) - src_dir = ( - Path(tmp_dir) if root_mode else Path(tmp_dir) / archive_root_name - ) - normalized_path = _normalize_skill_markdown_path(src_dir) - if normalized_path is None: - raise ValueError("SKILL.md not found in the skill folder.") - _normalize_skill_markdown_path(src_dir) - if not src_dir.exists(): - raise ValueError("Skill folder not found after extraction.") - dest_dir = Path(self.skills_root) / skill_name - if dest_dir.exists(): - if not overwrite: - raise FileExistsError("Skill already exists.") - shutil.rmtree(dest_dir) - shutil.move(str(src_dir), str(dest_dir)) - - self.set_skill_active(skill_name, True) - return skill_name + + if root_mode: + archive_hint = _normalize_skill_name( + archive_skill_name or zip_path_obj.stem + ) + if not archive_hint or not _SKILL_NAME_RE.fullmatch(archive_hint): + raise ValueError("Invalid skill name.") + skill_name = archive_hint + + src_dir = Path(tmp_dir) + normalized_path = _normalize_skill_markdown_path(src_dir) + if normalized_path is None: + raise ValueError( + "SKILL.md not found in the root of the zip archive." + ) + + dest_dir = Path(self.skills_root) / skill_name + if dest_dir.exists() and overwrite: + shutil.rmtree(dest_dir) + elif dest_dir.exists() and not overwrite: + raise FileExistsError(f"Skill {skill_name} already exists.") + + shutil.move(str(src_dir), str(dest_dir)) + self.set_skill_active(skill_name, True) + installed_skills.append(skill_name) + + else: + top_dirs = { + PurePosixPath(n).parts[0] for n in file_names if n.strip() + } + + for archive_root_name in top_dirs: + archive_root_name_normalized = _normalize_skill_name( + archive_root_name + ) + + if ( + f"{archive_root_name}/SKILL.md" not in file_names + and f"{archive_root_name}/skill.md" not in file_names + ): + continue + + if archive_root_name in {".", "..", ""} or not ( + _SKILL_NAME_RE.fullmatch(archive_root_name_normalized) + ): + continue + + if archive_skill_name and len(top_dirs) == 1: + skill_name = archive_skill_name + else: + skill_name = archive_root_name_normalized + + src_dir = Path(tmp_dir) / archive_root_name + normalized_path = _normalize_skill_markdown_path(src_dir) + if normalized_path is None: + continue + + dest_dir = Path(self.skills_root) / skill_name + if dest_dir.exists(): + if not overwrite: + raise FileExistsError( + f"Skill {skill_name} already exists." + ) + shutil.rmtree(dest_dir) + + shutil.move(str(src_dir), str(dest_dir)) + self.set_skill_active(skill_name, True) + installed_skills.append(skill_name) + + if not installed_skills: + raise ValueError( + "No valid SKILL.md found in any folder of the zip archive." + ) + + return ", ".join(installed_skills)