|
1 | | -"""Git worktree identity resolution for shared daemon layers.""" |
2 | | - |
3 | 1 | from __future__ import annotations |
4 | 2 |
|
5 | | -import hashlib |
6 | | -import os |
7 | | -import subprocess |
8 | | -from dataclasses import dataclass |
9 | | -from pathlib import Path |
10 | | -from urllib.parse import urlparse |
11 | | - |
12 | | - |
13 | | -class GitContextError(RuntimeError): |
14 | | - """Raised when a directory cannot be resolved as a usable Git worktree.""" |
15 | | - |
16 | | - |
17 | | -@dataclass(frozen=True) |
18 | | -class GitStatusEntry: |
19 | | - index_status: str |
20 | | - worktree_status: str |
21 | | - path: str |
22 | | - original_path: str | None = None |
23 | | - |
24 | | - |
25 | | -@dataclass(frozen=True) |
26 | | -class WorktreeContext: |
27 | | - repo_id: str |
28 | | - worktree_id: str |
29 | | - repo_root: Path |
30 | | - git_common_dir: Path |
31 | | - remote_url: str |
32 | | - normalized_remote_url: str |
33 | | - branch_name: str |
34 | | - head_commit: str |
35 | | - base_ref: str |
36 | | - base_commit: str |
37 | | - merge_base: str |
38 | | - dirty_snapshot_hash: str | None |
39 | | - status_entries: tuple[GitStatusEntry, ...] |
40 | | - |
41 | | - |
42 | | -def _git(cwd: Path, *args: str) -> str: |
43 | | - try: |
44 | | - return subprocess.check_output( |
45 | | - ["git", "-C", str(cwd), *args], |
46 | | - stderr=subprocess.DEVNULL, |
47 | | - text=True, |
48 | | - ).strip() |
49 | | - except subprocess.CalledProcessError as e: |
50 | | - raise GitContextError(f"git {' '.join(args)} failed in {cwd}") from e |
51 | | - |
52 | | - |
53 | | -def _git_bytes(cwd: Path, *args: str) -> bytes: |
54 | | - try: |
55 | | - return subprocess.check_output( |
56 | | - ["git", "-C", str(cwd), *args], |
57 | | - stderr=subprocess.DEVNULL, |
58 | | - ) |
59 | | - except subprocess.CalledProcessError as e: |
60 | | - raise GitContextError(f"git {' '.join(args)} failed in {cwd}") from e |
61 | | - |
62 | | - |
63 | | -def normalize_remote_url(url: str) -> str: |
64 | | - """Normalize common Git remote URL forms into a stable lowercase identity.""" |
65 | | - raw = url.strip() |
66 | | - if raw.endswith(".git"): |
67 | | - raw = raw[:-4] |
68 | | - if raw.startswith("git@") and ":" in raw: |
69 | | - host, path = raw[4:].split(":", 1) |
70 | | - return f"{host.lower()}/{path.strip('/').lower()}" |
71 | | - parsed = urlparse(raw) |
72 | | - if parsed.scheme and parsed.netloc: |
73 | | - path = parsed.path.strip("/") |
74 | | - return f"{parsed.netloc.lower()}/{path.lower()}" |
75 | | - return raw.strip("/").lower() |
76 | | - |
77 | | - |
78 | | -def _parse_status(raw: bytes) -> tuple[GitStatusEntry, ...]: |
79 | | - if not raw: |
80 | | - return () |
81 | | - parts = raw.decode("utf-8", errors="surrogateescape").split("\0") |
82 | | - entries: list[GitStatusEntry] = [] |
83 | | - i = 0 |
84 | | - while i < len(parts): |
85 | | - item = parts[i] |
86 | | - i += 1 |
87 | | - if not item: |
88 | | - continue |
89 | | - status = item[:2] |
90 | | - path = item[3:] |
91 | | - original_path: str | None = None |
92 | | - if status[0] in {"R", "C"} or status[1] in {"R", "C"}: |
93 | | - if i < len(parts): |
94 | | - original_path = parts[i] or None |
95 | | - i += 1 |
96 | | - entries.append( |
97 | | - GitStatusEntry( |
98 | | - index_status=status[0], |
99 | | - worktree_status=status[1], |
100 | | - path=path, |
101 | | - original_path=original_path, |
102 | | - ) |
103 | | - ) |
104 | | - return tuple(entries) |
105 | | - |
106 | | - |
107 | | -def _dirty_snapshot_hash(repo_root: Path, entries: tuple[GitStatusEntry, ...]) -> str | None: |
108 | | - if not entries: |
109 | | - return None |
110 | | - digest = hashlib.sha256() |
111 | | - for entry in sorted(entries, key=lambda e: (e.path, e.original_path or "")): |
112 | | - digest.update(entry.index_status.encode()) |
113 | | - digest.update(entry.worktree_status.encode()) |
114 | | - digest.update(entry.path.encode()) |
115 | | - if entry.original_path is not None: |
116 | | - digest.update(entry.original_path.encode()) |
117 | | - path = repo_root / entry.path |
118 | | - if path.is_file(): |
119 | | - digest.update(hashlib.sha256(path.read_bytes()).digest()) |
120 | | - return digest.hexdigest()[:24] |
121 | | - |
122 | | - |
123 | | -def _resolve_base_ref(repo_root: Path, requested: str | None) -> str: |
124 | | - candidates = [requested] if requested else ["origin/main", "main", "master", "HEAD"] |
125 | | - for candidate in candidates: |
126 | | - if candidate is None: |
127 | | - continue |
128 | | - try: |
129 | | - _git(repo_root, "rev-parse", "--verify", candidate) |
130 | | - return candidate |
131 | | - except GitContextError: |
132 | | - continue |
133 | | - raise GitContextError("No usable base ref found") |
134 | | - |
135 | | - |
136 | | -def resolve_worktree_context( |
137 | | - cwd: str | os.PathLike[str] | Path, |
138 | | - *, |
139 | | - base_ref: str | None = None, |
140 | | - index_config_hash: str, |
141 | | -) -> WorktreeContext: |
142 | | - """Resolve Git identity and dirty state for *cwd*.""" |
143 | | - start = Path(cwd).resolve() |
144 | | - repo_root = Path(_git(start, "rev-parse", "--show-toplevel")).resolve() |
145 | | - git_common_raw = _git(repo_root, "rev-parse", "--git-common-dir") |
146 | | - git_common_dir = Path(git_common_raw) |
147 | | - if not git_common_dir.is_absolute(): |
148 | | - git_common_dir = (repo_root / git_common_dir).resolve() |
149 | | - remote_url = _git(repo_root, "remote", "get-url", "origin") |
150 | | - normalized_remote = normalize_remote_url(remote_url) |
151 | | - branch_name = _git(repo_root, "branch", "--show-current") or "HEAD" |
152 | | - head_commit = _git(repo_root, "rev-parse", "HEAD") |
153 | | - resolved_base_ref = _resolve_base_ref(repo_root, base_ref) |
154 | | - base_commit = _git(repo_root, "rev-parse", resolved_base_ref) |
155 | | - merge_base = _git(repo_root, "merge-base", "HEAD", resolved_base_ref) |
156 | | - status_entries = _parse_status(_git_bytes(repo_root, "status", "--porcelain=v1", "-z")) |
157 | | - dirty_hash = _dirty_snapshot_hash(repo_root, status_entries) |
158 | | - # The current project model indexes the Git worktree root. Keep this as a |
159 | | - # stable logical root instead of deriving it from the physical worktree |
160 | | - # path, otherwise linked worktrees would get different repo IDs. |
161 | | - repo_relative_root = "." |
162 | | - repo_seed = f"{normalized_remote}\0{repo_relative_root}\0{index_config_hash}" |
163 | | - worktree_seed = str(repo_root) |
164 | | - return WorktreeContext( |
165 | | - repo_id=hashlib.sha256(repo_seed.encode()).hexdigest()[:24], |
166 | | - worktree_id=hashlib.sha256(worktree_seed.encode()).hexdigest()[:24], |
167 | | - repo_root=repo_root, |
168 | | - git_common_dir=git_common_dir, |
169 | | - remote_url=remote_url, |
170 | | - normalized_remote_url=normalized_remote, |
171 | | - branch_name=branch_name, |
172 | | - head_commit=head_commit, |
173 | | - base_ref=resolved_base_ref, |
174 | | - base_commit=base_commit, |
175 | | - merge_base=merge_base, |
176 | | - dirty_snapshot_hash=dirty_hash, |
177 | | - status_entries=status_entries, |
178 | | - ) |
| 3 | +from .version_control import GitContextError, GitStatusEntry, normalize_remote_url |
| 4 | +from .version_control import Worktree as WorktreeContext |
| 5 | +from .version_control import resolve_worktree as resolve_worktree_context |
| 6 | + |
| 7 | +__all__ = [ |
| 8 | + "GitContextError", |
| 9 | + "GitStatusEntry", |
| 10 | + "WorktreeContext", |
| 11 | + "normalize_remote_url", |
| 12 | + "resolve_worktree_context", |
| 13 | +] |
0 commit comments