Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/apm_cli/install/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class InstallContext:
package_deployed_files: dict[str, list[str]] = field(default_factory=dict)
package_types: dict[str, str] = field(default_factory=dict)
package_hashes: dict[str, str] = field(default_factory=dict)
content_hash_verified_deps: set[str] = field(default_factory=set)
# Deps whose content hash is expected to change legitimately:
# populated by _resolve_download_strategy in phases/integrate.py
# (branch-ref `remote_drifted` guard and v<=0.12.2 self-heal block),
Expand Down
17 changes: 17 additions & 0 deletions src/apm_cli/install/phases/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,24 @@ def run(ctx: InstallContext) -> None:
from apm_cli.install.phases._redownload import _should_skip_redownload

if _should_skip_redownload(_pd_locked_chk, _pd_path):
ctx.content_hash_verified_deps.add(_pd_key)
continue
elif (
_pd_path.exists()
and _pd_locked_chk
and _pd_locked_chk.content_hash
and not update_refs
and not _pd_ref_changed
):
# Content-hash-only lockfile entries have no commit anchor.
# The hash is the sole trust signal: skip only after verifying
# on-disk bytes still match it, otherwise fall through to the
# fresh-download path and its supply-chain mismatch check.
from apm_cli.install.phases._redownload import _should_skip_redownload

if _should_skip_redownload(_pd_locked_chk, _pd_path):
ctx.content_hash_verified_deps.add(_pd_key)
continue
# Build download ref (use locked commit for reproducibility).
# build_download_ref() uses the manifest ref when ref_changed is True.
_pd_dlref = build_download_ref(
Expand Down
59 changes: 47 additions & 12 deletions src/apm_cli/install/phases/integrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def _resolve_download_strategy(
update_refs = ctx.update_refs
diagnostics = ctx.diagnostics
logger = ctx.logger
dep_key = dep_ref.get_unique_key()

# npm-like behavior: Branches always fetch latest, only tags/commits use cache
# Resolve git reference to determine type
Expand All @@ -64,14 +65,14 @@ def _resolve_download_strategy(
# entirely for non-git sources.
_source = getattr(dep_ref, "source", None)
is_git_source = not isinstance(_source, str) or _source in (None, "git")
if is_git_source and dep_ref.get_unique_key() not in ctx.pre_downloaded_keys:
if is_git_source and dep_key not in ctx.pre_downloaded_keys:
# Resolve when there is an explicit ref, OR when update_refs
# is True AND we have a non-cached lockfile entry to compare
# against (otherwise resolution is wasted work -- the package
# will be downloaded regardless).
_has_lockfile_sha = False
if update_refs and existing_lockfile:
_lck = existing_lockfile.get_dependency(dep_ref.get_unique_key())
_lck = existing_lockfile.get_dependency(dep_key)
_has_lockfile_sha = bool(
_lck and _lck.resolved_commit and _lck.resolved_commit != "cached"
)
Expand All @@ -87,31 +88,30 @@ def _resolve_download_strategy(
GitReferenceType.COMMIT,
]
# Skip download if: already fetched by resolver callback, or cached tag/commit
already_resolved = dep_ref.get_unique_key() in ctx.callback_downloaded
already_resolved = dep_key in ctx.callback_downloaded
# Detect if manifest ref changed vs what the lockfile recorded.
# detect_ref_change() handles all transitions including None->ref.
_dep_locked_chk = (
existing_lockfile.get_dependency(dep_ref.get_unique_key()) if existing_lockfile else None
)
_dep_locked_chk = existing_lockfile.get_dependency(dep_key) if existing_lockfile else None
ref_changed = detect_ref_change(dep_ref, _dep_locked_chk, update_refs=update_refs)
# When the manifest ref drifted from the lockfile, the content hash
# will legitimately change after re-download. Mark the dep so the
# supply-chain check in sources.py doesn't treat it as an attack.
if ref_changed:
# resolve.py's BFS callback may have already added this;
# set semantics make double-add safe.
ctx.expected_hash_change_deps.add(dep_ref.get_unique_key())
ctx.expected_hash_change_deps.add(dep_key)
# Phase 5 (#171): Also skip when lockfile SHA matches local HEAD
# -- but not when the manifest ref has changed (user wants different version).
lockfile_match = False
content_hash_already_verified = dep_key in ctx.content_hash_verified_deps
# Track whether lockfile_match was satisfied via content-hash fallback only
# (no git HEAD verification possible -- typical for virtual packages, where
# install_path is a carved-out subdirectory rather than a git repo).
# The self-heal logic below uses this to recover from the v<=0.12.2
# branch-ref drift bug for upgrading users.
lockfile_match_via_content_hash_only = False
if install_path.exists() and existing_lockfile:
locked_dep = existing_lockfile.get_dependency(dep_ref.get_unique_key())
locked_dep = existing_lockfile.get_dependency(dep_key)
if locked_dep and locked_dep.resolved_commit and locked_dep.resolved_commit != "cached":
if update_refs:
# Update mode: compare resolved remote SHA with lockfile SHA.
Expand All @@ -130,9 +130,13 @@ def _resolve_download_strategy(
# Git check failed (e.g. .git removed, or virtual
# package install_path is not a git repo). Fall back
# to content-hash verification (#763).
if _should_skip_redownload(locked_dep, install_path):
if content_hash_already_verified or _should_skip_redownload(
locked_dep, install_path
):
lockfile_match = True
lockfile_match_via_content_hash_only = True
content_hash_already_verified = True
ctx.content_hash_verified_deps.add(dep_key)
elif not ref_changed:
# Normal mode: compare local HEAD with lockfile SHA.
try:
Expand All @@ -145,9 +149,13 @@ def _resolve_download_strategy(
# Git check failed (e.g. .git removed, or virtual package
# install_path is not a git repo). Fall back to
# content-hash verification (#763).
if _should_skip_redownload(locked_dep, install_path):
if content_hash_already_verified or _should_skip_redownload(
locked_dep, install_path
):
lockfile_match = True
lockfile_match_via_content_hash_only = True
content_hash_already_verified = True
ctx.content_hash_verified_deps.add(dep_key)
elif (
locked_dep
and getattr(locked_dep, "source", None) == "registry"
Expand All @@ -156,8 +164,30 @@ def _resolve_download_strategy(
):
# Registry deps have no resolved_commit; use content_hash as the
# skip-download signal (mirrors the git content-hash fallback above).
if _should_skip_redownload(locked_dep, install_path):
if content_hash_already_verified or _should_skip_redownload(locked_dep, install_path):
lockfile_match = True
content_hash_already_verified = True
ctx.content_hash_verified_deps.add(dep_key)
elif locked_dep and locked_dep.content_hash and not ref_changed and not update_refs:
# Unpinned git/virtual deps (#1548): the lockfile recorded a
# content_hash but no resolved_commit (e.g. ADO partial-clone
# fallback could not pin a SHA, or a virtual-file dep was carved
# out without a commit anchor). Without this branch the second
# install would re-download every time, and a non-deterministic
# fresh hash trips the supply-chain mismatch check at
# sources.py with a false-positive attack alert.
#
# Cache-skip parity with the resolved_commit branches: when the
# on-disk content still hashes to the lockfile-recorded value,
# the package is intact -- skip re-download. This branch has no
# commit anchor; the content hash is the only trust signal, so any
# divergence must fall through to the fresh-download path and its
# supply-chain mismatch check.
if content_hash_already_verified or _should_skip_redownload(locked_dep, install_path):
lockfile_match = True
lockfile_match_via_content_hash_only = True
content_hash_already_verified = True
ctx.content_hash_verified_deps.add(dep_key)

# Self-heal pipeline (PR #1158).
#
Expand Down Expand Up @@ -190,7 +220,12 @@ def _resolve_download_strategy(
)

# Verify content integrity when lockfile has a hash
if skip_download and _dep_locked_chk and _dep_locked_chk.content_hash:
if (
skip_download
and _dep_locked_chk
and _dep_locked_chk.content_hash
and not content_hash_already_verified
):
from apm_cli.utils.content_hash import verify_package_hash

if not verify_package_hash(install_path, _dep_locked_chk.content_hash):
Expand Down
139 changes: 139 additions & 0 deletions tests/integration/test_install_content_hash_roundtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Integration coverage for content-hash-only lockfile replay.

The no-``resolved_commit`` case happens for unpinned git dependencies whose
clone path cannot provide a stable commit anchor. The lockfile still records a
package ``content_hash``; a second install must trust only that hash and must not
re-download unchanged on-disk bytes.
"""

from __future__ import annotations

from datetime import datetime
from pathlib import Path
from typing import Any
from unittest.mock import patch

import pytest
import yaml
from click.testing import CliRunner

from apm_cli.cli import cli
from apm_cli.models.apm_package import (
APMPackage,
GitReferenceType,
PackageInfo,
ResolvedReference,
)
from apm_cli.models.dependency.reference import DependencyReference

_PATCH_UPDATES = "apm_cli.commands._helpers.check_for_updates"


class _ContentHashOnlyDownloader:
"""Downloader stub that changes bytes if a second download occurs."""

def __init__(self) -> None:
self.calls = 0

def download_package(
self, repo_ref: object, target_path: Path, *args: Any, **kwargs: Any
) -> PackageInfo:
self.calls += 1
dep_ref = (
repo_ref
if isinstance(repo_ref, DependencyReference)
else DependencyReference.parse(str(repo_ref))
)
target_path = Path(target_path)
target_path.mkdir(parents=True, exist_ok=True)
(target_path / "apm.yml").write_text(
yaml.safe_dump(
{
"name": "fixture-pkg",
"version": "1.0.0",
"description": "content hash replay fixture",
}
),
encoding="utf-8",
)
(target_path / ".apm" / "instructions").mkdir(parents=True, exist_ok=True)
(target_path / ".apm" / "instructions" / "fixture.instructions.md").write_text(
f"---\napplyTo: '**'\n---\n# Fixture\ndownload-call: {self.calls}\n",
encoding="utf-8",
)
package = APMPackage.from_apm_yml(target_path / "apm.yml")
return PackageInfo(
package=package,
install_path=target_path,
installed_at=datetime.now().isoformat(),
dependency_ref=dep_ref,
resolved_reference=ResolvedReference(
original_ref="default",
ref_type=GitReferenceType.BRANCH,
resolved_commit=None,
ref_name="default",
),
)


def _write_project(project: Path) -> None:
project.mkdir(parents=True, exist_ok=True)
(project / ".github").mkdir()
(project / ".github" / "copilot-instructions.md").write_text("# Project\n", encoding="utf-8")
(project / "apm.yml").write_text(
yaml.safe_dump(
{
"name": "content-hash-roundtrip",
"version": "1.0.0",
"target": "copilot",
"dependencies": {"apm": ["acme/fixture-pkg"], "mcp": []},
}
),
encoding="utf-8",
)


def _run_install(runner: CliRunner, project: Path, monkeypatch: pytest.MonkeyPatch) -> object:
monkeypatch.chdir(project)
with patch(_PATCH_UPDATES, return_value=None):
return runner.invoke(cli, ["install"], catch_exceptions=False)


def _locked_dep(project: Path) -> dict:
lockfile = yaml.safe_load((project / "apm.lock.yaml").read_text(encoding="utf-8"))
deps = lockfile.get("dependencies") or []
return next(dep for dep in deps if dep.get("repo_url") == "acme/fixture-pkg")


def test_no_resolved_commit_content_hash_reuses_on_disk_package(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Second install reuses content-hash-verified bytes without re-downloading.

The downloader deliberately mutates fixture content on every call. If the
second install re-downloads instead of reusing the content-hash-verified
install path, the fresh-download supply-chain check sees a different hash
from the lockfile and the install fails.
"""
project = tmp_path / "project"
_write_project(project)
downloader = _ContentHashOnlyDownloader()

from apm_cli.deps import github_downloader as _ghd

monkeypatch.setattr(
_ghd.GitHubPackageDownloader, "download_package", downloader.download_package
)
runner = CliRunner()

first = _run_install(runner, project, monkeypatch)
assert first.exit_code == 0, first.output

locked = _locked_dep(project)
assert locked.get("content_hash"), locked
assert not locked.get("resolved_commit"), locked
assert downloader.calls == 1

second = _run_install(runner, project, monkeypatch)
assert second.exit_code == 0, second.output
assert downloader.calls == 1
Loading
Loading