Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 278 additions & 11 deletions src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3329,6 +3329,184 @@ def _resolve_installed_extension(
raise typer.Exit(1)


def _validate_safe_cache_dir(project_root: Path) -> Path:
"""Validate (and create) the per-project download cache directory safely.

Walks ``.specify/extensions/.cache/downloads`` one component at a time,
refusing symlinked components and re-resolving each existing/new component
under ``project_root`` to catch non-symlink directory aliases (Windows
junctions / mount points) and mid-creation parent swaps. ``mkdir(parents=
True)`` would silently traverse a symlinked ancestor, so each component is
checked and created individually instead.

Returns the validated download directory. Raises ``typer.Exit`` on any
safety violation (after printing a user-facing error).
"""
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
project_root_resolved = project_root.resolve()
try:
current = project_root
for part in download_dir.relative_to(project_root).parts:
current = current / part
if current.is_symlink():
console.print("[red]Error:[/red] Refusing to use symlinked download cache directory")
raise typer.Exit(1)
if current.exists():
if not current.is_dir():
console.print(
f"[red]Error:[/red] Download cache path is not a directory: {current}"
)
raise typer.Exit(1)
try:
current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)
continue
current.mkdir()
Comment thread
mnriem marked this conversation as resolved.
Outdated
if current.is_symlink():
console.print("[red]Error:[/red] Refusing to use symlinked download cache directory")
raise typer.Exit(1)
try:
current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)
Comment on lines +3347 to +3387
except OSError as exc:
console.print(
f"[red]Error:[/red] Could not prepare download cache directory: {exc}"
)
raise typer.Exit(1)
return download_dir


def _safe_open_download_zip(project_root: Path, download_dir: Path, zip_filename: str) -> int:
"""Open ``download_dir / zip_filename`` for writing without following any symlink.

Closes the TOCTOU window between cache-ancestor validation and the actual
file write. On POSIX, walks each component of ``download_dir`` using
``dir_fd`` + ``O_NOFOLLOW`` so a symlink swap of *any* component (including
ancestors) after validation is rejected. The leaf is created with
``O_EXCL`` so the unique UUID filename cannot collide with anything an
attacker pre-staged.

On platforms without ``dir_fd``/``O_NOFOLLOW`` support (Windows), the
function performs an immediate per-component ancestor walk (re-checking
``is_symlink`` and re-resolving each component under ``project_root``)
*just before* the leaf open. This narrows the TOCTOU window between the
earlier caller-side validation and the actual write so a swap performed in
that window is still caught. Symlink creation on Windows additionally
requires elevated privileges.

Returns an open file descriptor; the caller owns and must close it.
Raises ``OSError`` (e.g. ``ELOOP``, ``EEXIST``, ``EACCES``) if any
component is a symlink or the leaf already exists.
"""
o_nofollow = getattr(os, "O_NOFOLLOW", 0)
o_directory = getattr(os, "O_DIRECTORY", 0)
leaf_flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | o_nofollow

# Windows / no openat support: re-validate ancestors immediately before
# opening so a swap in the (caller-validation -> open) window is caught.
if os.open not in os.supports_dir_fd:
project_root_resolved = project_root.resolve()
current = project_root
for part in download_dir.relative_to(project_root).parts:
current = current / part
if current.is_symlink():
raise OSError(
f"Refusing to follow symlinked cache component: {current}"
)
try:
current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
raise OSError(
f"Cache component escapes project root: {current}"
)
return os.open(download_dir / zip_filename, leaf_flags, 0o600)
Comment thread
mnriem marked this conversation as resolved.
Outdated

Comment on lines +3412 to +3434
rel_parts = download_dir.relative_to(project_root).parts
parent_fd = os.open(project_root, os.O_RDONLY | o_directory)
try:
for part in rel_parts:
new_fd = os.open(
part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd
)
os.close(parent_fd)
parent_fd = new_fd
return os.open(zip_filename, leaf_flags, 0o600, dir_fd=parent_fd)
finally:
os.close(parent_fd)


def _safe_unlink_download_zip(
project_root: Path, download_dir: Path, zip_filename: str
) -> None:
"""Best-effort unlink of the downloaded ZIP without following symlinks.

Mirrors ``_safe_open_download_zip``: on POSIX walks each cache ancestor
with ``dir_fd`` + ``O_NOFOLLOW`` and unlinks via
``os.unlink(zip_filename, dir_fd=parent_fd)`` so an attacker who swaps a
cache ancestor between the write and the cleanup cannot redirect the
unlink onto a same-named file outside the project. On Windows (no
``dir_fd``/``O_NOFOLLOW``), re-validates the cache ancestor chain via
:func:`_validate_safe_cache_dir`-equivalent checks before unlinking.

Errors are swallowed; this is best-effort cleanup.
"""
o_nofollow = getattr(os, "O_NOFOLLOW", 0)
o_directory = getattr(os, "O_DIRECTORY", 0)

if os.open not in os.supports_dir_fd:
Comment on lines +3472 to +3475
# Windows / no openat support: re-validate immediately before unlink.
project_root_resolved = project_root.resolve()
current = project_root
for part in download_dir.relative_to(project_root).parts:
current = current / part
if current.is_symlink():
return
if not current.exists():
return
try:
current.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
return
zip_path = download_dir / zip_filename
try:
if zip_path.is_symlink():
return
if zip_path.exists():
zip_path.unlink()
Comment thread
mnriem marked this conversation as resolved.
Outdated
except OSError:
pass
return

rel_parts = download_dir.relative_to(project_root).parts
try:
parent_fd = os.open(project_root, os.O_RDONLY | o_directory)
except OSError:
return
try:
for part in rel_parts:
try:
new_fd = os.open(
part, os.O_RDONLY | o_directory | o_nofollow, dir_fd=parent_fd
)
except OSError:
return
os.close(parent_fd)
parent_fd = new_fd
try:
os.unlink(zip_filename, dir_fd=parent_fd)
except OSError:
pass
finally:
try:
os.close(parent_fd)
except OSError:
pass


def _resolve_catalog_extension(
argument: str,
catalog,
Expand Down Expand Up @@ -3603,6 +3781,16 @@ def extension_add(
console.print("[red]Error:[/red] Priority must be a positive integer (1 or higher)")
raise typer.Exit(1)

# For URL installs, validate (and create) the download cache *before*
# constructing ExtensionManager. The manager's constructor reads
# ``.specify/extensions/.registry`` via ``ExtensionRegistry``; if a
# ``.specify/extensions`` ancestor is symlinked, that read would follow
# the symlink before any guard ran. Performing the safe per-component
# walk first ensures the manager only ever opens registry files inside a
# validated, project-contained tree.
if from_url:
_download_dir = _validate_safe_cache_dir(project_root)

manager = ExtensionManager(project_root)
speckit_version = get_speckit_version()

Expand Down Expand Up @@ -3641,27 +3829,106 @@ def extension_add(
console.print("Only install extensions from sources you trust.\n")
console.print(f"Downloading from {from_url}...")

# Download ZIP to temp location
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
download_dir.mkdir(parents=True, exist_ok=True)
zip_path = download_dir / f"{extension}-url-download.zip"
# The download cache directory has already been safely
# validated/created above (before ExtensionManager construction)
# via `_validate_safe_cache_dir(project_root)`. Reuse the same
# validated `download_dir` here.
import uuid as _uuid
download_dir = _download_dir
project_root_resolved = project_root.resolve()

safe_name = Path(extension).name.replace("/", "_").replace("\\", "_") or "download"
safe_name = safe_name[:64] # cap length to avoid filesystem errors
zip_filename = f"{safe_name}-{_uuid.uuid4().hex[:8]}.zip"
zip_path = download_dir / zip_filename
Comment thread
mnriem marked this conversation as resolved.

# Containment guard: the resolved zip_path must stay inside the
# validated cache directory.
try:
zip_path.resolve().relative_to(download_dir.resolve())
except ValueError:
console.print("[red]Error:[/red] Invalid extension name (path traversal detected)")
raise typer.Exit(1)

try:
from specify_cli.authentication.http import open_url as _open_url

with _open_url(from_url, timeout=60) as response:
zip_data = response.read()
zip_path.write_bytes(zip_data)

# Install from downloaded ZIP
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority)
except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
raise typer.Exit(1)

# Re-validate cache containment immediately before opening the
# write fd. This narrows the (validation -> write) TOCTOU
# window for the Windows fallback path of
# `_safe_open_download_zip` (which has no `dir_fd`/
# `O_NOFOLLOW`); on POSIX, the helper also re-walks ancestors
# with `dir_fd + O_NOFOLLOW` so any swap is rejected at open
# time as well.
try:
download_dir.resolve().relative_to(project_root_resolved)
except (OSError, ValueError):
console.print("[red]Error:[/red] Download cache directory escapes project root")
raise typer.Exit(1)

try:
# Open the ZIP file for writing without following any
# symlink in the path. On POSIX, the helper walks each
# ancestor of `download_dir` using dir_fd + O_NOFOLLOW so
# a symlink swap of *any* component (including ancestors)
# between validation and this write is rejected. On
# Windows the helper re-walks the ancestor chain
# immediately before opening to narrow the same window.
try:
_fd = _safe_open_download_zip(
project_root, download_dir, zip_filename
)
except OSError as _open_err:
console.print(
f"[red]Error:[/red] Could not safely create download file: {_open_err}"
)
raise typer.Exit(1)
try:
with os.fdopen(_fd, "wb") as _zf:
_zf.write(zip_data)
except BaseException:
# os.fdopen took ownership only on success; on failure
# before the with-block, the fd may still be open.
try:
os.close(_fd)
except OSError:
pass
raise
except OSError as e:
# Narrow handler: only covers the cache-write fd
# acquisition + write. Errors from `install_from_zip`
# (e.g. corrupted archive, manifest validation) are
# surfaced separately below so they aren't misreported as
# "Failed to write download cache".
console.print(
f"[red]Error:[/red] Failed to write download cache: {e}"
)
raise typer.Exit(1)
Comment thread
mnriem marked this conversation as resolved.
Outdated

try:
# Install from downloaded ZIP. Errors from the manager
# (ValidationError / CompatibilityError / ExtensionError)
# are caught by the outer handler so users see the
# extension-specific message rather than a generic
# "Failed to write download cache".
manifest = manager.install_from_zip(
zip_path, speckit_version, priority=priority
)
Comment on lines +3933 to +3935
finally:
# Clean up downloaded ZIP
if zip_path.exists():
zip_path.unlink()
# Best-effort cleanup of the downloaded ZIP. Uses a
# symlink-safe walk so an attacker who swaps a cache
# ancestor between the write and the unlink cannot
# redirect the delete onto a same-named file outside the
# project root.
_safe_unlink_download_zip(
project_root, download_dir, zip_filename
)

else:
# Try bundled extensions first (shipped with spec-kit)
Expand Down
Loading
Loading