forked from tox-dev/python-discovery
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_cached_py_info.py
More file actions
263 lines (230 loc) · 8.28 KB
/
_cached_py_info.py
File metadata and controls
263 lines (230 loc) · 8.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""Acquire Python information via subprocess interrogation with multi-level caching."""
from __future__ import annotations
import hashlib
import json
import logging
import os
import pkgutil
import secrets
import subprocess # noqa: S404
import sys
import tempfile
from collections import OrderedDict
from contextlib import contextmanager
from pathlib import Path
from shlex import quote
from subprocess import Popen, TimeoutExpired # noqa: S404
from typing import TYPE_CHECKING, Final
from ._cache import NoOpCache
from ._py_info import PythonInfo
if TYPE_CHECKING:
from collections.abc import Generator, Mapping
from ._cache import ContentStore, PyInfoCache
_CACHE: OrderedDict[Path, PythonInfo | Exception] = OrderedDict()
_CACHE[Path(sys.executable)] = PythonInfo()
_LOGGER: Final[logging.Logger] = logging.getLogger(__name__)
def from_exe( # noqa: PLR0913
cls: type[PythonInfo],
cache: PyInfoCache | None,
exe: str,
env: Mapping[str, str] | None = None,
*,
raise_on_error: bool = True,
ignore_cache: bool = False,
) -> PythonInfo | None:
env = os.environ if env is None else env
result = _get_from_cache(cls, cache, exe, env, ignore_cache=ignore_cache)
if isinstance(result, Exception):
if raise_on_error:
raise result
_LOGGER.info("%s", result)
result = None
return result
def _get_from_cache(
cls: type[PythonInfo],
cache: PyInfoCache | None,
exe: str,
env: Mapping[str, str],
*,
ignore_cache: bool = True,
) -> PythonInfo | Exception:
exe_path = Path(exe)
if not ignore_cache and exe_path in _CACHE:
result = _CACHE[exe_path]
else:
py_info = _get_via_file_cache(cls, cache, exe_path, exe, env)
result = _CACHE[exe_path] = py_info
if isinstance(result, PythonInfo):
result.executable = exe
return result
def _get_via_file_cache(
cls: type[PythonInfo],
cache: PyInfoCache | None,
path: Path,
exe: str,
env: Mapping[str, str],
) -> PythonInfo | Exception:
path_text = str(path)
try:
path_modified = path.stat().st_mtime
except OSError:
path_modified = -1
py_info_script = Path(Path(__file__).resolve()).parent / "_py_info.py"
try:
py_info_hash: str | None = hashlib.sha256(py_info_script.read_bytes()).hexdigest()
except OSError:
py_info_hash = None
resolved_cache = cache if cache is not None else NoOpCache()
py_info: PythonInfo | None = None
py_info_store = resolved_cache.py_info(path)
with py_info_store.locked():
if py_info_store.exists() and (data := py_info_store.read()) is not None:
of_path, of_st_mtime = data.get("path"), data.get("st_mtime")
of_content, of_hash = data.get("content"), data.get("hash")
if (
of_path == path_text
and of_st_mtime == path_modified
and of_hash == py_info_hash
and isinstance(of_content, dict)
):
py_info = _load_cached_py_info(cls, py_info_store, of_content)
else:
py_info_store.remove()
if py_info is None:
failure, py_info = _run_subprocess(cls, exe, env)
if failure is not None:
_LOGGER.debug("first subprocess attempt failed for %s (%s), retrying", exe, failure)
failure, py_info = _run_subprocess(cls, exe, env)
if failure is not None:
return failure
if py_info is not None:
py_info_store.write({
"st_mtime": path_modified,
"path": path_text,
"content": py_info.to_dict(),
"hash": py_info_hash,
})
if py_info is None:
msg = f"{exe} failed to produce interpreter info"
return RuntimeError(msg)
return py_info
def _load_cached_py_info(
cls: type[PythonInfo],
py_info_store: ContentStore,
content: dict,
) -> PythonInfo | None:
try:
py_info = cls.from_dict(content.copy())
except (KeyError, TypeError):
py_info_store.remove()
return None
if (sys_exe := py_info.system_executable) is not None and not Path(sys_exe).exists():
py_info_store.remove()
return None
return py_info
COOKIE_LENGTH: Final[int] = 32
def gen_cookie() -> str:
return secrets.token_hex(COOKIE_LENGTH // 2)
@contextmanager
def _resolve_py_info_script() -> Generator[Path]:
py_info_script = Path(Path(__file__).resolve()).parent / "_py_info.py"
if py_info_script.is_file():
yield py_info_script
else:
data = pkgutil.get_data(__package__ or __name__, "_py_info.py")
if data is None:
msg = "cannot locate _py_info.py for subprocess interrogation"
raise FileNotFoundError(msg)
fd, tmp = tempfile.mkstemp(suffix=".py")
try:
os.write(fd, data)
os.close(fd)
yield Path(tmp)
finally:
Path(tmp).unlink()
def _extract_between_cookies(out: str, start_cookie: str, end_cookie: str) -> tuple[str, str, int, int]:
"""Extract payload between reversed cookie markers, forwarding any surrounding output to stdout."""
raw_out = out
out_starts = out.find(start_cookie[::-1])
if out_starts > -1:
if pre_cookie := out[:out_starts]:
sys.stdout.write(pre_cookie)
out = out[out_starts + COOKIE_LENGTH :]
out_ends = out.find(end_cookie[::-1])
if out_ends > -1:
if post_cookie := out[out_ends + COOKIE_LENGTH :]:
sys.stdout.write(post_cookie)
out = out[:out_ends]
return out, raw_out, out_starts, out_ends
def _run_subprocess(
cls: type[PythonInfo],
exe: str,
env: Mapping[str, str],
) -> tuple[Exception | None, PythonInfo | None]:
start_cookie = gen_cookie()
end_cookie = gen_cookie()
with _resolve_py_info_script() as py_info_script:
cmd = [exe, str(py_info_script), start_cookie, end_cookie]
env = dict(env)
env.pop("__PYVENV_LAUNCHER__", None)
env["PYTHONUTF8"] = "1"
_LOGGER.debug("get interpreter info via cmd: %s", LogCmd(cmd))
try:
process = Popen( # noqa: S603
cmd,
universal_newlines=True,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=env,
encoding="utf-8",
errors="backslashreplace",
)
out, err = process.communicate(timeout=5)
code = process.returncode
except TimeoutExpired:
process.kill()
process.communicate()
out, err, code = "", "timed out", -1
except OSError as os_error:
out, err, code = "", os_error.strerror, os_error.errno
if code != 0:
msg = f"{exe} with code {code}{f' out: {out!r}' if out else ''}{f' err: {err!r}' if err else ''}"
return RuntimeError(f"failed to query {msg}"), None
out, raw_out, out_starts, out_ends = _extract_between_cookies(out, start_cookie, end_cookie)
try:
result = cls.from_json(out)
result.executable = exe
except json.JSONDecodeError as exc:
_LOGGER.warning(
"subprocess %s returned invalid JSON; raw stdout %d chars, start cookie %s, end cookie %s, "
"parsed output %d chars: %r",
exe,
len(raw_out),
"found" if out_starts > -1 else "missing",
"found" if out_ends > -1 else "missing",
len(out),
out[:200] if out else "<empty>",
)
msg = f"{exe} returned invalid JSON (exit code {code}){f', stderr: {err!r}' if err else ''}"
failure = RuntimeError(msg)
failure.__cause__ = exc
return failure, None
return None, result
class LogCmd:
def __init__(self, cmd: list[str], env: Mapping[str, str] | None = None) -> None:
self.cmd = cmd
self.env = env
def __repr__(self) -> str:
cmd_repr = " ".join(quote(str(c)) for c in self.cmd)
if self.env is not None:
cmd_repr = f"{cmd_repr} env of {self.env!r}"
return cmd_repr
def clear(cache: PyInfoCache) -> None:
cache.py_info_clear()
_CACHE.clear()
__all__ = [
"LogCmd",
"clear",
"from_exe",
]