-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathgitlab_api.py
More file actions
195 lines (161 loc) · 7.67 KB
/
gitlab_api.py
File metadata and controls
195 lines (161 loc) · 7.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import re
import argparse
import tempfile
import tarfile
import zipfile
import gitlab
class Gitlab(object):
JOB_NAME_PATTERN = re.compile(r"(\w+)(\s+(\d+)/(\d+))?")
def __init__(self, project_id=None):
config_data_from_env = os.getenv("PYTHON_GITLAB_CONFIG")
if config_data_from_env:
# prefer to load config from env variable
with tempfile.NamedTemporaryFile("w", delete=False) as temp_file:
temp_file.write(config_data_from_env)
config_files = [temp_file.name]
else:
# otherwise try to use config file at local filesystem
config_files = None
self.gitlab_inst = gitlab.Gitlab.from_config(config_files=config_files)
self.gitlab_inst.auth()
if project_id:
self.project = self.gitlab_inst.projects.get(project_id)
else:
self.project = None
def get_project_id(self, name, namespace=None):
"""
search project ID by name
:param name: project name
:param namespace: namespace to match when we have multiple project with same name
:return: project ID
"""
projects = self.gitlab_inst.projects.list(search=name)
for project in projects:
if namespace is None:
if len(projects) == 1:
project_id = project.id
break
if project.namespace["path"] == namespace:
project_id = project.id
break
else:
raise ValueError("Can't find project")
return project_id
def download_artifacts(self, job_id, destination):
"""
download full job artifacts and extract to destination.
:param job_id: Gitlab CI job ID
:param destination: extract artifacts to path.
"""
job = self.project.jobs.get(job_id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
job.artifacts(streamed=True, action=temp_file.write)
with zipfile.ZipFile(temp_file.name, "r") as archive_file:
archive_file.extractall(destination)
def download_artifact(self, job_id, artifact_path, destination=None):
"""
download specific path of job artifacts and extract to destination.
:param job_id: Gitlab CI job ID
:param artifact_path: list of path in artifacts (relative path to artifact root path)
:param destination: destination of artifact. Do not save to file if destination is None
:return: A list of artifact file raw data.
"""
job = self.project.jobs.get(job_id)
raw_data_list = []
for a_path in artifact_path:
try:
data = job.artifact(a_path)
except gitlab.GitlabGetError as e:
print("Failed to download '{}' form job {}".format(a_path, job_id))
raise e
raw_data_list.append(data)
if destination:
file_path = os.path.join(destination, a_path)
try:
os.makedirs(os.path.dirname(file_path))
except OSError:
# already exists
pass
with open(file_path, "wb") as f:
f.write(data)
return raw_data_list
def find_job_id(self, job_name, pipeline_id=None, job_status="success"):
"""
Get Job ID from job name of specific pipeline
:param job_name: job name
:param pipeline_id: If None, will get pipeline id from CI pre-defined variable.
:param job_status: status of job. One pipeline could have multiple jobs with same name after retry.
job_status is used to filter these jobs.
:return: a list of job IDs (parallel job will generate multiple jobs)
"""
job_id_list = []
if pipeline_id is None:
pipeline_id = os.getenv("CI_PIPELINE_ID")
pipeline = self.project.pipelines.get(pipeline_id)
jobs = pipeline.jobs.list(all=True)
for job in jobs:
match = self.JOB_NAME_PATTERN.match(job.name)
if match:
if match.group(1) == job_name and job.status == job_status:
job_id_list.append({"id": job.id, "parallel_num": match.group(3)})
return job_id_list
def download_archive(self, ref, destination, project_id=None):
"""
Download archive of certain commit of a repository and extract to destination path
:param ref: commit or branch name
:param destination: destination path of extracted archive file
:param project_id: download project of current instance if project_id is None
:return: root path name of archive file
"""
if project_id is None:
project = self.project
else:
project = self.gitlab_inst.projects.get(project_id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
try:
project.repository_archive(sha=ref, streamed=True, action=temp_file.write)
except gitlab.GitlabGetError as e:
print("Failed to archive from project {}".format(project_id))
raise e
print("archive size: {:.03f}MB".format(float(os.path.getsize(temp_file.name)) / (1024 * 1024)))
with tarfile.open(temp_file.name, "r") as archive_file:
root_name = archive_file.getnames()[0]
def is_within_directory(directory, target):
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix([abs_directory, abs_target])
return prefix == abs_directory
def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise Exception("Attempted Path Traversal in Tar File")
tar.extractall(path, members, numeric_owner=numeric_owner)
safe_extract(archive_file, destination)
return os.path.join(os.path.realpath(destination), root_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("action")
parser.add_argument("project_id", type=int)
parser.add_argument("--pipeline_id", "-i", type=int, default=None)
parser.add_argument("--ref", "-r", default="master")
parser.add_argument("--job_id", "-j", type=int, default=None)
parser.add_argument("--job_name", "-n", default=None)
parser.add_argument("--project_name", "-m", default=None)
parser.add_argument("--destination", "-d", default=None)
parser.add_argument("--artifact_path", "-a", nargs="*", default=None)
args = parser.parse_args()
gitlab_inst = Gitlab(args.project_id)
if args.action == "download_artifacts":
gitlab_inst.download_artifacts(args.job_id, args.destination)
if args.action == "download_artifact":
gitlab_inst.download_artifact(args.job_id, args.artifact_path, args.destination)
elif args.action == "find_job_id":
job_ids = gitlab_inst.find_job_id(args.job_name, args.pipeline_id)
print(";".join([",".join([str(j["id"]), j["parallel_num"]]) for j in job_ids]))
elif args.action == "download_archive":
gitlab_inst.download_archive(args.ref, args.destination)
elif args.action == "get_project_id":
ret = gitlab_inst.get_project_id(args.project_name)
print("project id: {}".format(ret))