Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 117 additions & 4 deletions src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3329,6 +3329,47 @@ def _resolve_installed_extension(
raise typer.Exit(1)


def _safe_open_download_zip(project_root: Path, download_dir: Path, zip_filename: str) -> int:
"""Open ``download_dir / zip_filename`` for writing without following any symlink.

Closes the TOCTOU window between cache-ancestor validation and the actual
file write. On POSIX, walks each component of ``download_dir`` using
``dir_fd`` + ``O_NOFOLLOW`` so a symlink swap of *any* component (including
ancestors) after validation is rejected. The leaf is created with
``O_EXCL`` so the unique UUID filename cannot collide with anything an
attacker pre-staged.

On platforms without ``dir_fd``/``O_NOFOLLOW`` support (Windows), falls
back to a plain ``O_EXCL`` open. Symlink creation on Windows requires
elevated privileges, and the per-component ancestor walk performed by the
caller already covers the realistic threat surface there.

Returns an open file descriptor; the caller owns and must close it.
Raises ``OSError`` (e.g. ``ELOOP``, ``EEXIST``, ``EACCES``) if any
component is a symlink or the leaf already exists.
"""
o_nofollow = getattr(os, "O_NOFOLLOW", 0)
o_directory = getattr(os, "O_DIRECTORY", 0)
leaf_flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | o_nofollow

# Windows / no openat support: best-effort O_EXCL open.
if os.open not in os.supports_dir_fd:
return os.open(download_dir / zip_filename, leaf_flags, 0o600)
Comment thread
mnriem marked this conversation as resolved.
Outdated

Comment on lines +3412 to +3434
rel_parts = download_dir.relative_to(project_root).parts
parent_fd = os.open(project_root, os.O_RDONLY | o_directory)
try:
for part in rel_parts:
new_fd = os.open(
part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd
)
os.close(parent_fd)
parent_fd = new_fd
return os.open(zip_filename, leaf_flags, 0o600, dir_fd=parent_fd)
finally:
os.close(parent_fd)


def _resolve_catalog_extension(
argument: str,
catalog,
Expand Down Expand Up @@ -3641,23 +3682,95 @@ def extension_add(
console.print("Only install extensions from sources you trust.\n")
console.print(f"Downloading from {from_url}...")

# Download ZIP to temp location
# Download ZIP to temp location.
# Validate and create each ancestor without following symlinks
# (consistent with shared_infra._ensure_safe_shared_directory):
# `mkdir(parents=True)` would silently traverse a symlinked ancestor,
# so each component is checked and created individually instead.
# Each existing component is also re-resolved and required to land
# under project_root, which catches non-symlink directory aliases
# (e.g. Windows junctions / mount points) that resolve outside.
import uuid as _uuid
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
download_dir.mkdir(parents=True, exist_ok=True)
zip_path = download_dir / f"{extension}-url-download.zip"
project_root_resolved = project_root.resolve()
Comment thread
mnriem marked this conversation as resolved.
Outdated
_current = project_root
for _part in download_dir.relative_to(project_root).parts:
_current = _current / _part
if _current.is_symlink():
console.print("[red]Error:[/red] Refusing to use symlinked download cache directory")
raise typer.Exit(1)
if _current.exists():
if not _current.is_dir():
console.print(
f"[red]Error:[/red] Download cache path is not a directory: {_current}"
)
raise typer.Exit(1)
try:
_current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)
Comment thread
mnriem marked this conversation as resolved.
Outdated
continue
_current.mkdir()
if _current.is_symlink():
console.print("[red]Error:[/red] Refusing to use symlinked download cache directory")
raise typer.Exit(1)
Comment thread
mnriem marked this conversation as resolved.
Outdated
Comment thread
mnriem marked this conversation as resolved.
Outdated

safe_name = Path(extension).name.replace("/", "_").replace("\\", "_") or "download"
safe_name = safe_name[:64] # cap length to avoid filesystem errors
zip_filename = f"{safe_name}-{_uuid.uuid4().hex[:8]}.zip"
zip_path = download_dir / zip_filename
Comment thread
mnriem marked this conversation as resolved.

# Guard: resolved path must stay inside download_dir (CWE-22)
try:
zip_path.resolve().relative_to(download_dir.resolve())
Comment thread
mnriem marked this conversation as resolved.
Outdated
except ValueError:
Comment thread
mnriem marked this conversation as resolved.
Outdated
console.print("[red]Error:[/red] Invalid extension name (path traversal detected)")
raise typer.Exit(1)

try:
from specify_cli.authentication.http import open_url as _open_url

with _open_url(from_url, timeout=60) as response:
zip_data = response.read()
zip_path.write_bytes(zip_data)

# Open the ZIP file for writing without following any symlink
# in the path. On POSIX, walk each ancestor of `download_dir`
# using dir_fd + O_NOFOLLOW so a symlink swap of *any*
# component (including ancestors) between validation above
# and this write is rejected. On Windows there is no
# `dir_fd` / `O_NOFOLLOW` support, so we fall back to a
# plain O_EXCL open which still requires the unique UUID
# filename to be freshly created.
try:
_fd = _safe_open_download_zip(
project_root, download_dir, zip_filename
)
except OSError as _open_err:
console.print(
f"[red]Error:[/red] Could not safely create download file: {_open_err}"
)
raise typer.Exit(1)
try:
with os.fdopen(_fd, "wb") as _zf:
_zf.write(zip_data)
except BaseException:
# os.fdopen took ownership only on success; on failure
# before the with-block, the fd may still be open.
try:
os.close(_fd)
except OSError:
pass
raise

# Install from downloaded ZIP
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority)
except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
raise typer.Exit(1)
except OSError as e:
console.print(f"[red]Error:[/red] Failed to write download cache: {e}")
raise typer.Exit(1)
Comment thread
mnriem marked this conversation as resolved.
Outdated
finally:
# Clean up downloaded ZIP
if zip_path.exists():
Expand Down
Loading
Loading