-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathgit_query.py
More file actions
187 lines (163 loc) · 7.76 KB
/
git_query.py
File metadata and controls
187 lines (163 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Git repository version query tools."""
import datetime
import logging
import pathlib
import typing as t
import git
from .version import Version
_LOG = logging.getLogger(__name__)
def preprocess_git_version_tag(tag: str):
"""Remove a prefix from a version tag."""
if tag.startswith('ver'):
return tag[3:]
if tag.startswith('v'):
return tag[1:]
if tag and tag[0] in ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9'):
return tag
raise ValueError(f'given tag "{tag}" does not appear to be a version tag')
def _git_version_tags(repo: git.Repo) -> t.Mapping[git.Tag, Version]:
versions = {}
for tag in repo.tags:
try:
tag_str = preprocess_git_version_tag(str(tag))
except ValueError:
_LOG.debug('%s: ignoring non-version tag %s', repo, tag)
continue
try:
versions[tag] = Version.from_str(tag_str)
except ValueError:
# except packaging.version.InvalidVersion:
_LOG.warning('%s: failed to convert %s to version', repo, tag_str)
continue
return versions
def _latest_git_version_tag_on_branches(
repo: git.Repo, assume_if_none: bool, commit: git.objects.Commit, commit_distance: int,
skip_commits: t.Set[git.objects.Commit]) -> t.Union[int, t.Tuple[
t.Optional[git.objects.Commit], t.Optional[git.TagReference], t.Optional[Version],
int]]:
_LOG.log(logging.NOTSET, 'entering %i branches...', len(commit.parents))
results: t.List[t.Tuple[
t.Optional[git.objects.Commit], t.Optional[git.TagReference], Version, int]] = []
main_commit_distance = None
for parent in commit.parents:
try:
result = _latest_git_version_tag(
repo, assume_if_none, parent, commit_distance, skip_commits)
except ValueError:
continue
if main_commit_distance is None:
main_commit_distance = result[3]
if result[2] is not None:
results.append(result) # type: ignore
if not results:
if main_commit_distance is None:
raise ValueError(f'reached max commit distance {MAX_COMMIT_DISTANCE}'
f' with no version tags in repo {repo}')
return main_commit_distance
final_result = sorted(results, key=lambda _: _[2])[-1]
_LOG.log(logging.NOTSET, 'result from %i branches is %s and %s',
len(commit.parents), *final_result[1:3])
return final_result
MAX_COMMIT_DISTANCE = 999
def _latest_git_version_tag_new(
repo: git.Repo, assume_if_none: bool = False, base_commit: git.Commit = None,
commit_distance: int = 0, skip_commits: t.Set[git.Commit] = None) -> t.Tuple[
git.Commit, t.Optional[git.TagReference], Version, int]:
version_tags = _git_version_tags(repo)
version_tag_commits = set()
divergence_points = []
if skip_commits is None:
skip_commits = set()
while True:
commit = None
for commit in repo.iter_commits(rev=base_commit):
if commit in skip_commits:
return None, None, None, -1
_LOG.log(logging.NOTSET, 'iterating over commit %s', commit)
current_version_tags = {tag: version for tag, version in version_tags.items()
if tag.commit == commit}
commit_distance += 1
skip_commits.add(commit)
if len(commit.parents) > 1:
divergence_points.append(commit)
break
base_commit = divergence_points.pop()
return commit, tag, version, commit_distance
def _latest_git_version_tag(
repo: git.Repo, assume_if_none: bool = False,
base_commit: t.Optional[git.objects.Commit] = None, commit_distance: int = 0,
skip_commits: t.Optional[t.Set[git.objects.Commit]] = None) -> t.Tuple[
t.Optional[git.objects.Commit], t.Optional[git.TagReference], t.Optional[Version], int]:
"""Return (commit, tag at that commit if any, latest version, distance from the version)."""
version_tags = _git_version_tags(repo)
version_tag_commits: t.Dict[git.objects.Commit, set] = {}
for tag, version in version_tags.items():
_commit = tag.commit
if _commit not in version_tag_commits:
version_tag_commits[_commit] = set()
version_tag_commits[_commit].add(tag)
current_version_tags = {}
commit = None
if skip_commits is None:
skip_commits = set()
for commit in repo.iter_commits(rev=base_commit):
if commit in skip_commits:
return None, None, None, -1
_LOG.log(logging.NOTSET, 'iterating over commit %s', commit)
skip_commits.add(commit)
if commit in version_tag_commits:
current_tags = version_tag_commits[commit]
current_version_tags = {tag: version for tag, version in version_tags.items()
if tag in current_tags}
_LOG.log(logging.NOTSET, 'found version data %s', current_version_tags)
break
if commit_distance >= MAX_COMMIT_DISTANCE:
raise ValueError(f'reached max commit distance {MAX_COMMIT_DISTANCE}'
f' with no version tags in repo {repo}')
commit_distance += 1
if len(commit.parents) <= 1:
continue
result = _latest_git_version_tag_on_branches(
repo, assume_if_none, commit, commit_distance, skip_commits)
if not isinstance(result, tuple):
commit_distance = result # main_commit_distance
break
return result
if not current_version_tags:
if assume_if_none:
return commit, None, Version.from_str('0.1.0.dev0'), commit_distance
raise ValueError(f'the given repo {repo} has no version tags')
tag, version = sorted(current_version_tags.items(), key=lambda _: _[1])[-1]
_LOG.log(logging.NOTSET, 'result is %s and %s', tag, version)
return commit, tag, version, commit_distance
_latest_git_version_tag = _latest_git_version_tag_new
def _upcoming_git_version_tag(repo: git.Repo, ignore_untracked_files: bool = True) -> t.Tuple[
t.Optional[git.objects.Commit], t.Optional[git.TagReference], t.Optional[Version], int,
bool]:
commit, tag, version, commit_distance = _latest_git_version_tag(repo, True)
is_repo_dirty = repo.is_dirty(untracked_files=not ignore_untracked_files)
return commit, tag, version, commit_distance, is_repo_dirty
def query_git_repo(repo_path: pathlib.Path, search_parent_directories: bool = True) -> Version:
"""Determine version from tags of a git repository."""
_LOG.debug('looking for git repository in "%s"', repo_path)
repo = git.Repo(str(repo_path), search_parent_directories=search_parent_directories)
_LOG.debug('found git repository in "%s"', repo.working_dir)
version = _latest_git_version_tag(repo)[2]
assert isinstance(version, Version), version
return version
def predict_git_repo(repo_path: pathlib.Path, search_parent_directories: bool = True) -> Version:
"""Predict version from tags, commit history and index status of git repository."""
repo = git.Repo(str(repo_path), search_parent_directories=search_parent_directories)
version, commit_distance, is_repo_dirty = _upcoming_git_version_tag(repo)[2:]
assert isinstance(version, Version), version
if commit_distance > 0:
version.devel_increment(commit_distance)
version.local = (f'git{repo.head.commit.hexsha[:8]}',)
if is_repo_dirty:
dt_ = f'dirty{datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d%H%M%S")}'
if version.has_local:
assert version.local is not None # mypy needs this
version.local = (*version.local, '.', dt_)
else:
version.local = (dt_,)
return version