-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathsubproject.py
More file actions
477 lines (387 loc) · 16.8 KB
/
subproject.py
File metadata and controls
477 lines (387 loc) · 16.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
"""SubProject."""
import os
import pathlib
from abc import ABC, abstractmethod
from collections.abc import Callable, Sequence
from dfetch.log import get_logger
from dfetch.manifest.project import ProjectEntry
from dfetch.manifest.version import Version
from dfetch.project.abstract_check_reporter import AbstractCheckReporter
from dfetch.project.metadata import Dependency, Metadata
from dfetch.util.util import hash_directory, safe_rm
from dfetch.util.versions import latest_tag_from_list
from dfetch.vcs.patch import Patch
logger = get_logger(__name__)
class SubProject(ABC): # pylint: disable=too-many-public-methods
"""Abstract SubProject object.
This object represents one Project entry in the Manifest.
It can be updated.
"""
NAME = ""
def __init__(self, project: ProjectEntry) -> None:
"""Create the subproject."""
self.__project = project
self.__metadata = Metadata.from_project_entry(self.__project)
self._show_animations = not self._running_in_ci()
@staticmethod
def _running_in_ci() -> bool:
"""Are we running in CI."""
ci_env_var = os.getenv("CI", "")
return bool(ci_env_var) and ci_env_var[0].lower() in ("t", "1", "y")
def check_wanted_with_local(self) -> tuple[Version | None, Version | None]:
"""Given the project entry in the manifest, get the relevant version from disk.
Returns:
Tuple[Optional[Version], Optional[Version]]: Wanted, Have
"""
on_disk = self.on_disk_version()
if not on_disk:
return (self.wanted_version, None)
if self.wanted_version.tag:
return (Version(tag=self.wanted_version.tag), Version(tag=on_disk.tag))
wanted_branch, on_disk_branch = "", ""
if not (self.wanted_version.revision and self.revision_is_enough()):
wanted_branch = self.wanted_version.branch or self.get_default_branch()
on_disk_branch = on_disk.branch
wanted_revision = (
self.wanted_version.revision
or self._latest_revision_on_branch(wanted_branch)
)
return (
Version(
revision=wanted_revision,
branch=wanted_branch,
),
Version(revision=on_disk.revision, branch=on_disk_branch),
)
def update_is_required(self, force: bool = False) -> Version | None:
"""Check if this project should be upgraded.
Args:
force (bool, optional): Ignore if versions match.
Defaults to False.
"""
wanted, current = self.check_wanted_with_local()
if not force and wanted == current:
self._log_project(f"up-to-date ({current})")
return None
logger.debug(f"{self.__project.name} Current ({current}), Available ({wanted})")
return wanted
def update(
self,
force: bool = False,
ignored_files_callback: Callable[[], Sequence[str]] | None = None,
patch_count: int = -1,
) -> None:
"""Update this subproject if required.
Args:
force (bool, optional): Ignore if version is ok or any local changes were done.
Defaults to False.
ignored_files_callback (Callable, optional): Called to obtain the set of files
to ignore. Invoked twice: once before clearing the destination (to detect
pre-existing local changes) and once after extraction (to compute the stored
hash). Calling it at both points ensures the stored hash and the check-time
hash use the same skiplist, preventing false "local changes" reports.
patch_count (int, optional): Number of patches to apply (-1 means all).
"""
to_fetch = self.update_is_required(force)
if not to_fetch:
return
pre_fetch_ignored = (
list(ignored_files_callback()) if ignored_files_callback else []
)
if not force and self._are_there_local_changes(pre_fetch_ignored):
self._log_project(
"skipped - local changes after last update (use --force to overwrite)"
)
return
if os.path.exists(self.local_path):
logger.debug(f"Clearing destination {self.local_path}")
safe_rm(self.local_path)
with logger.status(
self.__project.name,
f"Fetching {to_fetch}",
enabled=self._show_animations,
):
actually_fetched, dependency = self._fetch_impl(to_fetch)
self._log_project(f"Fetched {actually_fetched}")
applied_patches = self._apply_patches(patch_count)
post_fetch_ignored = (
list(ignored_files_callback()) if ignored_files_callback else []
)
self.__metadata.fetched(
actually_fetched,
hash_=hash_directory(
self.local_path,
skiplist=[self.__metadata.FILENAME] + post_fetch_ignored,
),
patch_=applied_patches,
dependencies=list(dependency),
)
logger.debug(f"Writing repo metadata to: {self.__metadata.path}")
self.__metadata.dump()
def _apply_patches(self, count: int = -1) -> list[str]:
"""Apply the patches."""
cwd = pathlib.Path(".").resolve()
applied_patches = []
count = len(self.__project.patch) if count == -1 else count
for patch in self.__project.patch[:count]:
patch_path = (cwd / patch).resolve()
try:
relative_patch_path = patch_path.relative_to(cwd)
except ValueError:
self._log_project(f'Skipping patch "{patch}" which is outside {cwd}.')
continue
if not patch_path.exists():
self._log_project(f"Skipping non-existent patch {patch}")
continue
normalized_patch_path = str(relative_patch_path.as_posix())
self._log_project(f'Applying patch "{normalized_patch_path}"')
result = Patch.from_file(normalized_patch_path).apply(root=self.local_path)
if result.encoding_warning:
self._log_project(
f'After retrying found that patch-file "{normalized_patch_path}" '
"is not UTF-8 encoded, consider saving it with UTF-8 encoding."
)
applied_patches.append(normalized_patch_path)
return applied_patches
def _report_unavailable_version(
self, reporters: Sequence[AbstractCheckReporter]
) -> None:
for reporter in reporters:
reporter.unavailable_project_version(self.__project, self.wanted_version)
def _report_unfetched_project(
self, reporters: Sequence[AbstractCheckReporter], latest_version: Version
) -> None:
for reporter in reporters:
reporter.unfetched_project(
self.__project, self.wanted_version, latest_version
)
def _report_local_changes(self, reporters: Sequence[AbstractCheckReporter]) -> None:
for reporter in reporters:
reporter.local_changes(self.__project)
def check_for_update(
self, reporters: Sequence[AbstractCheckReporter], files_to_ignore: Sequence[str]
) -> None:
"""Check if there is an update available."""
on_disk_version = self.on_disk_version()
with logger.status(
self.__project.name, "Checking", enabled=self._show_animations
):
latest_version = self._check_for_newer_version()
if not latest_version:
self._report_unavailable_version(reporters)
return
if not on_disk_version:
self._report_unfetched_project(reporters, latest_version)
return
if self._are_there_local_changes(files_to_ignore):
self._report_local_changes(reporters)
self._check_latest_with_on_disk_version(
latest_version, on_disk_version, reporters
)
def _versions_match(
self, latest_version: Version, on_disk_version: Version
) -> bool:
"""Return True when latest and on-disk versions are considered equal."""
return (latest_version == on_disk_version) or (
self.revision_is_enough()
and bool(latest_version.revision)
and latest_version.revision == on_disk_version.revision
)
def _select_check_action(
self, latest_version: Version, on_disk_version: Version
) -> Callable[[AbstractCheckReporter], None]:
"""Return the single reporter callback that matches the version comparison."""
if self._versions_match(latest_version, on_disk_version):
return lambda r: r.up_to_date_project(self.__project, latest_version)
if on_disk_version == self.wanted_version:
return lambda r: r.pinned_but_out_of_date_project(
self.__project, self.wanted_version, latest_version
)
return lambda r: r.out_of_date_project(
self.__project, self.wanted_version, on_disk_version, latest_version
)
def _check_latest_with_on_disk_version(
self,
latest_version: Version,
on_disk_version: Version,
reporters: Sequence[AbstractCheckReporter],
) -> None:
report = self._select_check_action(latest_version, on_disk_version)
for reporter in reporters:
report(reporter)
def _log_project(self, msg: str) -> None:
logger.print_info_line(self.__project.name, msg)
@staticmethod
def _log_tool(name: str, msg: str) -> None:
logger.print_report_line(name, msg.strip())
@property
def name(self) -> str:
"""Get the name of this project."""
return self.__project.name
@property
def local_path(self) -> str:
"""Get the local destination of this project."""
return self.__project.destination
@property
def wanted_version(self) -> Version:
"""Get the wanted version of this subproject."""
return self.__metadata.version
@property
def metadata_path(self) -> str:
"""Get the path of the metadata."""
return self.__metadata.path
@property
def remote(self) -> str:
"""Get the remote URL of this subproject."""
return self.__metadata.remote_url
@property
def source(self) -> str:
"""Get the source folder of this subproject."""
return self.__project.source
@property
def ignore(self) -> Sequence[str]:
"""Get the files/folders to ignore of this subproject."""
return self.__project.ignore
@property
def patch(self) -> Sequence[str]:
"""Get the patches of this project."""
return self.__project.patch
@abstractmethod
def check(self) -> bool:
"""Check if it can handle the type."""
@staticmethod
@abstractmethod
def revision_is_enough() -> bool:
"""See if this VCS can uniquely distinguish branch with revision only."""
@abstractmethod
def _latest_revision_on_branch(self, branch: str) -> str:
"""Get the latest revision on a branch."""
@abstractmethod
def _does_revision_exist(self, revision: str) -> bool:
"""Check if the given revision exists."""
@abstractmethod
def _list_of_tags(self) -> list[str]:
"""Get list of all available tags."""
@staticmethod
@abstractmethod
def list_tool_info() -> None:
"""Print out version information."""
def on_disk_version(self) -> Version | None:
"""Get the version of the project on disk.
Returns:
Version: Could be None of no on disk version
"""
if not os.path.exists(self.__metadata.path):
return None
try:
return Metadata.from_file(self.__metadata.path).version
except TypeError:
logger.print_warning_line(
self.__project.name,
f"{pathlib.Path(self.__metadata.path).relative_to(os.getcwd()).as_posix()}"
" is an invalid metadata file, not checking on disk version!",
)
return None
def _on_disk_hash(self) -> str | None:
"""Get the hash of the project on disk.
Returns:
Str: Could be None if no on disk version
"""
if not os.path.exists(self.__metadata.path):
return None
try:
return Metadata.from_file(self.__metadata.path).hash
except TypeError:
logger.print_warning_line(
self.__project.name,
f"{pathlib.Path(self.__metadata.path).relative_to(os.getcwd()).as_posix()}"
" is an invalid metadata file, not checking local hash!",
)
return None
def _revision_only_mode(self) -> bool:
"""Return True when the wanted version should be resolved by revision alone."""
return (
not self.wanted_version.branch
and bool(self.wanted_version.revision)
and self.revision_is_enough()
)
def _check_for_newer_version(self) -> Version | None:
"""Check if a newer version is available on the given branch.
In case wanted_version does not exist (anymore) on the remote return None.
"""
if self.wanted_version.tag:
available_tags = self._list_of_tags()
if self.wanted_version.tag not in available_tags:
return None
return Version(
tag=latest_tag_from_list(self.wanted_version.tag, available_tags)
)
if self.wanted_version.branch == " ":
branch = ""
else:
branch = self.wanted_version.branch or self.get_default_branch()
if self._revision_only_mode():
return (
Version(revision=self.wanted_version.revision)
if self._does_revision_exist(self.wanted_version.revision)
else None
)
revision = self._latest_revision_on_branch(branch)
return Version(revision=revision, branch=branch) if revision else None
def _are_there_local_changes(self, files_to_ignore: Sequence[str]) -> bool:
"""Check if there are local changes.
Returns:
Bool: True if there are local changes, false if no were detected or no hash was found.
"""
logger.debug(f"Checking if there were local changes in {self.local_path}")
on_disk_hash = self._on_disk_hash()
return bool(on_disk_hash) and on_disk_hash != hash_directory(
self.local_path,
skiplist=[self.__metadata.FILENAME] + list(files_to_ignore),
)
@abstractmethod
def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]:
"""Fetch the given version of the subproject, should be implemented by the child class."""
@abstractmethod
def get_default_branch(self) -> str:
"""Get the default branch of this repository."""
def list_of_branches(self) -> list[str]:
"""Get list of all available branches. Override in VCS-specific subclasses."""
return []
def list_of_tags(self) -> list[str]:
"""Get list of all available tags (public wrapper around ``_list_of_tags``)."""
return self._list_of_tags()
def freeze_project(self, project: ProjectEntry) -> str | None:
"""Freeze *project* to its current on-disk version.
Subclasses may override this to apply VCS-specific freeze logic (e.g.
:class:`~dfetch.project.archivesubproject.ArchiveSubProject` stores
the hash under ``integrity.hash`` rather than ``revision:``).
Returns:
The version string that was written to *project* when a change was
made, or *None* if the entry was already pinned to the on-disk
version or no on-disk version could be determined.
Raises:
RuntimeError: When VCS-specific freeze logic fails (e.g. archive
download error). Callers should catch and report these.
"""
on_disk_version = self.on_disk_version()
if on_disk_version and self._is_already_pinned(project, on_disk_version):
return None
if on_disk_version:
project.version = on_disk_version
return (
on_disk_version.revision or on_disk_version.tag or str(on_disk_version)
)
return None
def _is_already_pinned(
self, project: ProjectEntry, on_disk_version: Version
) -> bool:
"""Return True if *project* is already pinned to *on_disk_version*."""
if project.version.tag:
return project.version.tag == on_disk_version.tag
if not project.version.revision or not on_disk_version.revision:
return False
return (
project.version.revision == on_disk_version.revision
and self.revision_is_enough()
)