diff --git a/astrbot/core/backup/importer.py b/astrbot/core/backup/importer.py index b51c7d9560..e994242a88 100644 --- a/astrbot/core/backup/importer.py +++ b/astrbot/core/backup/importer.py @@ -59,6 +59,20 @@ def _get_major_version(version_str: str) -> str: return "0.0" +def _validate_path_within(target_path: Path, base_dir: Path) -> bool: + """Validate that target_path is within base_dir after resolving symlinks. + + Prevents path traversal attacks (CWE-22) by ensuring the resolved + target path is relative to the resolved base directory. + """ + try: + resolved = target_path.resolve(strict=False) + base_resolved = base_dir.resolve(strict=False) + return resolved.is_relative_to(base_resolved) + except (OSError, ValueError): + return False + + CMD_CONFIG_FILE_PATH = os.path.join(get_astrbot_data_path(), "cmd_config.json") KB_PATH = get_astrbot_knowledge_base_path() DEFAULT_PLATFORM_STATS_INVALID_COUNT_WARN_LIMIT = 5 @@ -765,6 +779,10 @@ async def _import_knowledge_bases( try: rel_path = name[len(media_prefix) :] target_path = kb_dir / rel_path + # Validate path is within kb directory (CWE-22) + if not _validate_path_within(target_path, kb_dir): + logger.warning(f"媒体文件路径越界,已跳过: {target_path}") + continue target_path.parent.mkdir(parents=True, exist_ok=True) with zf.open(name) as src, open(target_path, "wb") as dst: dst.write(src.read()) @@ -827,6 +845,11 @@ async def _import_attachments( else: target_path = attachments_dir / os.path.basename(name) + # Validate path is within attachments directory (CWE-22) + if not _validate_path_within(target_path, attachments_dir): + logger.warning(f"附件路径越界,已跳过: {target_path}") + continue + target_path.parent.mkdir(parents=True, exist_ok=True) with zf.open(name) as src, open(target_path, "wb") as dst: dst.write(src.read()) @@ -904,6 +927,10 @@ async def _import_directories( continue target_path = target_dir / rel_path + # Validate path is within target directory (CWE-22) + if not _validate_path_within(target_path, target_dir): + result.add_warning(f"文件路径越界,已跳过: {name}") + continue target_path.parent.mkdir(parents=True, exist_ok=True) with zf.open(name) as src, open(target_path, "wb") as dst: