-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathgitlab_fetcher.py
More file actions
250 lines (212 loc) · 9.08 KB
/
gitlab_fetcher.py
File metadata and controls
250 lines (212 loc) · 9.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import os
import re
import shutil
import subprocess
import time
import requests
from retrying import retry
from config.config import *
from utils.logger import log
from utils.tools import run_command
class GitlabMergeRequestFetcher:
def __init__(self, project_id, merge_request_iid):
self.project_id = project_id
self.iid = merge_request_iid
self._changes_cache = None
self._file_content_cache = {}
self._info_cache = None
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def get_changes(self, force=False):
"""
Get the changes of the merge request
:return: changes
"""
if self._changes_cache and not force:
return self._changes_cache
# URL for the GitLab API endpoint
url = f"{GITLAB_SERVER_URL}/api/v4/projects/{self.project_id}/merge_requests/{self.iid}/changes"
# Headers for the request
headers = {
"PRIVATE-TOKEN": GITLAB_PRIVATE_TOKEN
}
# Make the GET request
response = requests.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
self._changes_cache = response.json()["changes"]
return response.json()["changes"]
else:
return None
@retry(stop_max_attempt_number=3, wait_fixed=2000)
# 获取文件内容
def get_file_content(self, file_path, branch_name='main', force=False):
"""
Get the content of the file
:param file_path: The path of the file
:return: The content of the file
"""
# 对file_path中的'/'转换为'%2F'
file_path = file_path.replace('/', '%2F')
if file_path in self._file_content_cache and not force:
return self._file_content_cache[file_path]
# URL for the GitLab API endpoint
url = f"{GITLAB_SERVER_URL}/api/v4/projects/{self.project_id}/repository/files/{file_path}/raw?ref={branch_name}"
# Headers for the request
headers = {
"PRIVATE-TOKEN": GITLAB_PRIVATE_TOKEN
}
# Make the GET request
response = requests.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
self._file_content_cache[file_path] = response.text
return response.text
else:
return None
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def get_info(self, force=False):
"""
Get the merge request information
:return: Merge request information
"""
if self._info_cache and not force:
return self._info_cache
# URL for the GitLab API endpoint
url = f"{GITLAB_SERVER_URL}/api/v4/projects/{self.project_id}/merge_requests/{self.iid}"
# Headers for the request
headers = {
"PRIVATE-TOKEN": GITLAB_PRIVATE_TOKEN
}
# Make the GET request
response = requests.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
self._info_cache = response.json()
return response.json()
else:
return None
# gitlab仓库clone和管理
class GitlabRepoManager:
def __init__(self, project_id, branch_name = ""):
self.project_id = project_id
self.timestamp = int(time.time() * 1000)
self.repo_path = f"./repo/{self.project_id}_{self.timestamp}"
self.has_cloned = False
def get_info(self):
"""
Fetches project information from GitLab.
Sends an HTTP GET request using the instance's project_id to retrieve project details.
Returns a dictionary of project information on success, or logs an error and returns None
if the request fails.
"""
# URL for the GitLab API endpoint
url = f"{GITLAB_SERVER_URL}/api/v4/projects/{self.project_id}"
# Headers for the request
headers = {
"PRIVATE-TOKEN": GITLAB_PRIVATE_TOKEN
}
# Make the GET request
response = requests.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
return response.json()
else:
log.error(f"获取项目信息失败: {response.status_code} {response.text}")
return None
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def shallow_clone(self, branch_name = "main"):
"""
Perform a shallow clone of the repository
param branch_name: The name of the branch to clone
"""
# If the target directory exists, remove it
self.delete_repo()
# Build the authenticated URL
authenticated_url = self._build_authenticated_url(self.get_info()["http_url_to_repo"])
# Build the Git command
command = ["git", "clone", authenticated_url, "--depth", "1"]
if branch_name:
command.extend(["--branch", branch_name])
command.extend([self.repo_path + "/" + str(branch_name)])
else:
command.extend([self.repo_path + "/default"])
# command 添加clone到的位置:
if run_command(command) != 0:
log.error("Failed to clone the repository")
self.has_cloned = True
# 切换分支
def checkout_branch(self, branch_name, force=False):
# Build the Git command
if not self.has_cloned:
self.shallow_clone(branch_name)
else:
# 检查是否已经在目标分支上
if not force and os.path.exists(self.repo_path + "/" + str(branch_name) + "/.git"):
return
else:
self.shallow_clone(branch_name)
# 删除库
def delete_repo(self):
if os.path.exists(self.repo_path):
shutil.rmtree(self.repo_path)
# 查找相关文件列表
def find_files_by_keyword(self, keyword, branch_name="main"):
matching_files = []
regex = re.compile(keyword)
self.checkout_branch(branch_name)
for root, _, files in os.walk(self.repo_path + "/" + str(branch_name)):
for file in files:
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if regex.search(content):
matching_files.append(file_path)
except (UnicodeDecodeError, FileNotFoundError, PermissionError):
# 跳过无法读取的文件
continue
return matching_files
# 构建带有身份验证信息的 URL
def _build_authenticated_url(self, repo_url):
# 如果 URL 使用 https
"""
Build an authenticated GitLab repository URL using a private token.
This method embeds OAuth2 credentials into the provided repository URL by
injecting the GitLab private token. If the URL begins with "http://" or "https://",
the method constructs a new URL with the token included. Otherwise, it raises a
ValueError for unsupported URL schemes.
Args:
repo_url: The original repository URL (must begin with "http://" or "https://").
Returns:
An authenticated URL string with embedded OAuth2 credentials.
Raises:
ValueError: If the URL does not start with a supported scheme.
"""
token = GITLAB_PRIVATE_TOKEN
if repo_url.startswith("https://"):
return f"https://oauth2:{token}@{repo_url[8:]}"
# 如果 URL 使用 http
elif repo_url.startswith("http://"):
return f"http://oauth2:{token}@{repo_url[7:]}"
else:
raise ValueError("Unsupported URL scheme")
def is_merge_request_opened(gitlab_payload) -> bool:
"""
Determines if a merge request is open.
Checks whether the provided GitLab payload indicates an open merge request by verifying that its
"object_attributes" have either a state of "opened" with a merge status of "preparing" or a state
of "merged" with a merge status of "can_be_merged". If any error occurs during the evaluation, the
error is logged and False is returned.
Args:
gitlab_payload: Dictionary containing merge request attributes, including an "object_attributes"
key with state and merge_status information.
Returns:
bool: True if the merge request meets one of the open criteria; otherwise, False.
"""
try:
gitlab_merge_request_old = gitlab_payload.get("object_attributes").get("state") == "opened" and gitlab_payload.get("object_attributes").get("merge_status") == "preparing"
gitlab_merge_request_new = gitlab_payload.get("object_attributes").get("state") == "merged" and gitlab_payload.get("object_attributes").get("merge_status") == "can_be_merged"
return gitlab_merge_request_old or gitlab_merge_request_new
except Exception as e:
log.error(f"判断是否是merge request打开事件失败: {e}")
return False