-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpr_conflict_detector.py
More file actions
293 lines (251 loc) · 10.8 KB
/
pr_conflict_detector.py
File metadata and controls
293 lines (251 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
A GitHub Action that detects potential merge conflicts between open pull requests
by analyzing overlapping file and line changes across repositories.
"""
import auth
import deduplication
import env
from conflict_detector import detect_conflicts
from issue_writer import create_or_update_issue
from json_writer import write_to_json
from markdown_writer import write_to_markdown
from pr_comment import post_pr_comments
from pr_data import fetch_all_pr_data
from slack_notify import send_slack_notification
def main():
"""Run the PR conflict detector."""
# 1. Get environment variables
env_vars = env.get_env_vars()
# 2. Authenticate to GitHub
github_connection = auth.auth_to_github(
env_vars.token,
env_vars.gh_app_id,
env_vars.gh_app_installation_id,
env_vars.gh_app_private_key_bytes,
env_vars.ghe,
env_vars.gh_app_enterprise_only,
)
# 2b. Resolve FILTER_TEAMS into usernames and merge with FILTER_AUTHORS
combined_filter_authors = set(env_vars.filter_authors)
if env_vars.filter_teams:
print("\nResolving FILTER_TEAMS...")
for team_ref in env_vars.filter_teams:
org, team_slug = team_ref.split("/", 1)
members = auth.get_team_members(github_connection, org, team_slug)
combined_filter_authors.update(members)
if env_vars.filter_authors:
print(
f"Combined {len(env_vars.filter_authors)} FILTER_AUTHORS + "
f"team members = {len(combined_filter_authors)} unique author(s)"
)
else:
print(
f"Resolved {len(combined_filter_authors)} unique author(s) from teams"
)
if not combined_filter_authors and not env_vars.filter_authors:
print(
" ⚠️ No valid teams resolved and no FILTER_AUTHORS set"
" — no author filtering will be applied"
)
# 2c. Remove excluded authors
# Derive from the resolved set, not env var presence. When teams resolve
# to zero members and no FILTER_AUTHORS are set, this stays False so we
# fall back to scanning all PRs (matching the warning above).
filtering_requested = bool(combined_filter_authors)
if env_vars.exclude_authors:
if not env_vars.filter_authors and not env_vars.filter_teams:
print(
" ⚠️ EXCLUDE_AUTHORS has no effect without"
" FILTER_AUTHORS or FILTER_TEAMS"
)
else:
excluded = set(env_vars.exclude_authors)
before_count = len(combined_filter_authors)
combined_filter_authors -= excluded
removed = before_count - len(combined_filter_authors)
if removed:
print(
f"Excluded {removed} author(s) via EXCLUDE_AUTHORS "
f"({len(combined_filter_authors)} remaining)"
)
else:
print("EXCLUDE_AUTHORS set but no matching authors found to exclude")
# 3. Get repositories to scan
repos = get_repos_iterator(github_connection, env_vars)
# 4. For each repo, fetch PRs and detect conflicts
all_conflicts = {} # {repo_full_name: list[ConflictResult]}
for repo in repos:
# Skip exempt repos
if (
repo.full_name in env_vars.exempt_repos
or repo.name in env_vars.exempt_repos
):
print(f"Skipping exempt repo: {repo.full_name}")
continue
if repo.archived:
print(f"Skipping archived repo: {repo.full_name}")
continue
print(f"\nScanning {repo.full_name}...")
# Fetch all open PR data
owner, repo_name = repo.full_name.split("/")
prs = fetch_all_pr_data(
repo,
env_vars.include_drafts,
github_connection,
owner,
repo_name,
filter_authors=combined_filter_authors if filtering_requested else None,
)
# Filter exempt PRs
if env_vars.exempt_prs:
prs = [pr for pr in prs if pr.number not in env_vars.exempt_prs]
if len(prs) < 2:
print(f" {len(prs)} open PR(s) - need at least 2 to detect conflicts")
continue
print(f" Found {len(prs)} open PRs, analyzing for conflicts...")
# Detect conflicts
conflicts = detect_conflicts(
prs,
verify=env_vars.verify_conflicts,
github_connection=github_connection,
owner=owner,
repo_name=repo_name,
)
# Filter out conflicts where both PRs have the same author
if conflicts:
original_count = len(conflicts)
conflicts = [c for c in conflicts if c.pr_a.author != c.pr_b.author]
filtered_count = original_count - len(conflicts)
if filtered_count > 0:
print(
f" Filtered {filtered_count} same-author conflict(s) "
f"({len(conflicts)} remaining)"
)
if conflicts:
all_conflicts[repo.full_name] = conflicts
print(f" ⚠️ Found {len(conflicts)} potential conflict(s)")
else:
print(" ✅ No conflicts detected")
# 5. Apply deduplication
print(f"\n{'='*50}")
total = sum(len(c) for c in all_conflicts.values())
print(f"Total: {total} potential conflict(s) across {len(all_conflicts)} repo(s)")
# Load and prune state
state = deduplication.load_state()
state = deduplication.prune_expired_conflicts(state)
# Compare current vs historical
dedup_result = deduplication.compare_conflicts(all_conflicts, state)
print(
f"Deduplication: {len(dedup_result.new_conflicts)} new, "
f"{len(dedup_result.changed_conflicts)} changed, "
f"{len(dedup_result.unchanged_conflicts)} unchanged, "
f"{len(dedup_result.resolved_fingerprints)} resolved"
)
# Update state with current conflicts (skip saving in dry run)
updated_state = deduplication.update_state_with_current(all_conflicts, state)
if not env_vars.dry_run:
deduplication.save_state(updated_state)
else:
print("DRY RUN: Skipping state file save")
# Detect state rebuild: if state was empty and has no last_run marker,
# this is either a first run or a cache eviction. Suppress notifications
# to avoid a blast of alerts for all existing conflicts.
state_was_empty = len(state.get("conflicts", [])) == 0
has_run_before = "last_run" in state
suppress_notifications = state_was_empty and not has_run_before
if suppress_notifications:
print(
"No prior state found — rebuilding state, "
"skipping notifications this run"
)
# Conflicts to notify about (new + changed)
notify_conflicts: dict[str, list] = {}
if not suppress_notifications:
for conflict in dedup_result.new_conflicts + dedup_result.changed_conflicts:
# Find which repo this conflict belongs to
for repo_name, conflicts in all_conflicts.items():
if conflict in conflicts:
if repo_name not in notify_conflicts:
notify_conflicts[repo_name] = []
notify_conflicts[repo_name].append(conflict)
break
# 6. Generate outputs
# Write markdown report (always generated, full results)
write_to_markdown(
all_conflicts,
output_file=env_vars.output_file,
report_title=env_vars.report_title,
enable_step_summary=env_vars.enable_github_actions_step_summary,
)
# Write JSON report
json_output = env_vars.output_file.replace(".md", ".json")
write_to_json(all_conflicts, output_file=json_output)
# Create/update issues in repos (all conflicts, not just new)
if not env_vars.enable_report_issues:
print("Report issue creation disabled (ENABLE_REPORT_ISSUES=false)")
elif not env_vars.dry_run:
for repo_full_name, conflicts in all_conflicts.items():
owner, rname = repo_full_name.split("/")
repo_obj = github_connection.repository(owner, rname)
issue_url = create_or_update_issue(
repo_obj, conflicts, env_vars.report_title, env_vars.dry_run
)
if issue_url:
print(f" Created/updated issue: {issue_url}")
else:
print("DRY RUN: Skipping issue creation")
# Send Slack notification (only for new + changed conflicts)
if notify_conflicts:
print(
f"Sending Slack notifications for {sum(len(c) for c in notify_conflicts.values())} conflict(s)"
)
send_slack_notification(
env_vars.slack_webhook_url,
notify_conflicts,
channel=env_vars.slack_channel,
dry_run=env_vars.dry_run,
)
else:
print("No new or changed conflicts — skipping Slack notifications")
# Post PR comments (all active conflicts + resolved info)
# Pass all_conflicts so every PR's comment shows its full conflict picture,
# plus resolved entries and new-conflict badges for UX.
has_comment_changes = bool(notify_conflicts) or bool(
dedup_result.resolved_fingerprints
)
if env_vars.enable_pr_comments and has_comment_changes:
new_conflict_keys: set[tuple[int, int]] = set()
for conflict in dedup_result.new_conflicts:
new_conflict_keys.add((conflict.pr_a.number, conflict.pr_b.number))
active_count = sum(len(c) for c in all_conflicts.values())
resolved_count = len(updated_state.get("resolved_conflicts", []))
print(
f"Posting PR comments ({active_count} active conflict(s), "
f"{resolved_count} resolved)"
)
post_pr_comments(
all_conflicts,
github_connection,
new_conflict_keys=new_conflict_keys,
resolved_entries=updated_state.get("resolved_conflicts", []),
dry_run=env_vars.dry_run,
)
elif env_vars.enable_pr_comments:
print("No new, changed, or resolved conflicts — skipping PR comments")
def get_repos_iterator(github_connection, env_vars):
"""Get an iterator of repositories to scan.
Args:
github_connection: Authenticated github3 connection.
env_vars: Environment variables dataclass.
Returns:
Iterator of github3 repository objects.
"""
if env_vars.organization and not env_vars.repository_list:
return github_connection.organization(env_vars.organization).repositories()
repos = []
for repo_full_name in env_vars.repository_list:
owner, repo_name = repo_full_name.split("/")
repos.append(github_connection.repository(owner, repo_name))
return repos
if __name__ == "__main__": # pragma: no cover
main()