-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathgithub_pr_notifier.py
More file actions
552 lines (446 loc) · 22.8 KB
/
github_pr_notifier.py
File metadata and controls
552 lines (446 loc) · 22.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
from typing import Any, Dict, List, Optional
import logging
from socket_basics.core.notification.base import BaseNotifier
from socket_basics.core.config import get_github_token, get_github_repository, get_github_pr_number
logger = logging.getLogger(__name__)
# GitHub API comment character limit
GITHUB_COMMENT_MAX_LENGTH = 65536
class GithubPRNotifier(BaseNotifier):
"""GitHub PR notifier: posts security findings as PR comments.
Simplified version that works with pre-formatted content from connectors.
"""
name = "github_pr"
def __init__(self, params: Dict[str, Any] | None = None):
super().__init__(params or {})
# GitHub token from params, env variable, or app config
self.token = (
self.config.get('token') or
get_github_token()
)
self.api_base = "https://api.github.com"
# Get repository from GitHub environment
self.repository = (
self.config.get('repository') or
get_github_repository()
)
def notify(self, facts: Dict[str, Any]) -> None:
notifications = facts.get('notifications', []) or []
if not isinstance(notifications, list):
logger.error('GithubPRNotifier: only supports new format - list of dicts with title/content')
return
if not notifications:
logger.info('GithubPRNotifier: no notifications present; skipping')
return
# Get full scan URL if available and store it for use in truncation
self.full_scan_url = facts.get('full_scan_html_url')
# Validate format
valid_notifications = []
for item in notifications:
if isinstance(item, dict) and 'title' in item and 'content' in item:
# Full scan URL is now handled in the formatter itself
valid_notifications.append({'title': item['title'], 'content': item['content']})
else:
logger.warning('GithubPRNotifier: skipping invalid notification item: %s', type(item))
if not valid_notifications:
return
# Get PR number for current branch
pr_number = self._get_pr_number()
if not pr_number:
logger.warning('GithubPRNotifier: unable to determine PR number for current branch')
return
# Get existing comments to check for sections to update
existing_comments = self._get_pr_comments(pr_number)
# Group notifications by comment (find existing sections)
comment_updates = {}
new_sections = []
for notification in valid_notifications:
content = notification['content']
section_match = self._extract_section_markers(content)
if section_match:
section_type = section_match['type']
section_content = section_match['content']
# Find existing comment with this section
existing_comment = self._find_comment_with_section(existing_comments, section_type)
if existing_comment:
# Update existing comment
comment_id = existing_comment['id']
if comment_id not in comment_updates:
comment_updates[comment_id] = existing_comment['body']
comment_updates[comment_id] = self._update_section_in_comment(
comment_updates[comment_id], section_type, content
)
else:
# New section to add
new_sections.append(content)
else:
# No section markers, treat as new section
new_sections.append(content)
# Update existing comments with new section content
for comment_id, updated_body in comment_updates.items():
# Detect whether content actually changed before making the API call
original_body = next(
(c.get('body', '') for c in existing_comments if c.get('id') == comment_id),
'',
)
if original_body == updated_body:
logger.info(
'GithubPRNotifier: comment %s content unchanged; skipping update',
comment_id,
)
continue
success = self._update_comment(pr_number, comment_id, updated_body)
if success:
logger.info('GithubPRNotifier: updated existing comment %s', comment_id)
else:
logger.error('GithubPRNotifier: failed to update comment %s', comment_id)
# Create separate comments for each new section
# Each scanner should get its own comment to avoid merging issues
for section_content in new_sections:
success = self._post_comment(pr_number, section_content)
if success:
logger.info('GithubPRNotifier: posted individual comment for section')
else:
logger.error('GithubPRNotifier: failed to post individual comment')
# Add labels to PR if enabled
if self.config.get('pr_labels_enabled', True) and pr_number:
labels = self._determine_pr_labels(valid_notifications)
if labels:
self._add_pr_labels(pr_number, labels)
def _send_pr_comment(self, facts: Dict[str, Any], title: str, content: str) -> None:
"""Send a single PR comment with title and content."""
if not self.token:
logger.warning('GithubPRNotifier: no GitHub token available')
return
# Get repository and branch info from config (discovered by main logic)
owner_repo = self.repository
branch = self.config.get('branch')
if not self.repository or not branch:
logger.warning('GithubPRNotifier: repository (%s) or branch (%s) not available in config',
self.repository, branch)
return
# Find PR number
pr_number = self._get_pr_number()
if not pr_number:
logger.info('GithubPRNotifier: no PR found for branch %s in %s', branch, self.repository)
return
# Create comment body with pre-formatted content
uid = f"socket-security:{self.repository}:{branch}:{title.lower().replace(' ', '-')}"
marker = f"<!-- {uid} -->"
comment_body = f"{marker}\n\n### {title}\n\n{content}\n\n---\n*Generated by Socket Security*"
# Post the comment
success = self._post_comment(pr_number, comment_body)
if success:
logger.info('GithubPRNotifier: posted comment for "%s"', title)
else:
logger.error('GithubPRNotifier: failed to post comment for "%s"', title)
def _get_pr_number(self) -> Optional[int]:
"""Get PR number from environment or API."""
# Try environment variables first
pr_env = get_github_pr_number()
if pr_env and pr_env.isdigit():
logger.info(f"GithubPRNotifier: Using PR number from environment: {pr_env}")
return int(pr_env)
logger.debug(f"GithubPRNotifier: No PR number in environment (GITHUB_PR_NUMBER: {pr_env or 'not set'})")
# Try to find via API
pr_number = self._find_pr_for_branch()
if pr_number:
logger.info(f"GithubPRNotifier: Found PR number via API: {pr_number}")
else:
logger.debug("GithubPRNotifier: Could not find PR number via API")
return pr_number
def _find_pr_for_branch(self) -> Optional[int]:
"""Find PR number for the given branch using API."""
owner_repo = self.repository
branch = self.config.get('branch')
logger.debug(f"GithubPRNotifier: Searching for PR - repository: {owner_repo}, branch: {branch}")
if not self.repository or not branch:
logger.debug(f"GithubPRNotifier: Missing required info - repository: {bool(self.repository)}, branch: {bool(branch)}")
return None
try:
import requests
headers = {
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
}
url = f"{self.api_base}/repos/{self.repository}/pulls"
params = {'head': f"{self.repository.split('/')[0]}:{branch}", 'state': 'open'}
logger.debug(f"GithubPRNotifier: API request to {url} with params: {params}")
resp = requests.get(url, headers=headers, params=params, timeout=10)
if resp.status_code == 200:
prs = resp.json()
if prs:
logger.debug(f"GithubPRNotifier: Found {len(prs)} open PR(s) for branch {branch}")
return prs[0]['number']
else:
logger.debug(f"GithubPRNotifier: No open PRs found for branch {branch}")
else:
logger.warning(f"GithubPRNotifier: API request failed with status {resp.status_code}")
except Exception as e:
logger.debug('GithubPRNotifier: failed to find PR for branch %s: %s', branch, e)
return None
def _get_pr_comments(self, pr_number: int) -> List[Dict[str, Any]]:
"""Get all comments for a PR."""
owner_repo = self.repository
if not self.repository:
return []
try:
import requests
headers = {
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
}
url = f"{self.api_base}/repos/{self.repository}/issues/{pr_number}/comments"
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code == 200:
return resp.json()
else:
logger.warning('GithubPRNotifier: failed to get comments: %s', resp.status_code)
return []
except Exception as e:
logger.error('GithubPRNotifier: exception getting comments: %s', e)
return []
def _extract_section_markers(self, content: str) -> Optional[Dict[str, str]]:
"""Extract section type and content from HTML comment markers."""
import re
# Look for <!-- TYPE start --> ... <!-- TYPE end -->
pattern = r'<!-- ([a-zA-Z0-9\-_]+) start -->(.*?)<!-- \1 end -->'
match = re.search(pattern, content, re.DOTALL)
if match:
section_type = match.group(1)
section_content = content # Keep full content with markers
return {'type': section_type, 'content': section_content}
return None
def _find_comment_with_section(self, comments: List[Dict[str, Any]], section_type: str) -> Optional[Dict[str, Any]]:
"""Find an existing comment that contains the given section type."""
import re
pattern = f'<!-- {re.escape(section_type)} start -->'
for comment in comments:
if re.search(pattern, comment.get('body', '')):
return comment
return None
def _update_section_in_comment(self, comment_body: str, section_type: str, new_section_content: str) -> str:
"""Update a specific section within a comment body."""
import re
# Pattern to match the existing section
pattern = f'<!-- {re.escape(section_type)} start -->.*?<!-- {re.escape(section_type)} end -->'
# Replace the existing section with new content
# Use a lambda to avoid regex replacement string interpretation issues
updated_body = re.sub(pattern, lambda m: new_section_content, comment_body, flags=re.DOTALL)
return updated_body
def _truncate_comment_if_needed(self, comment_body: str, full_scan_url: Optional[str] = None) -> str:
"""Truncate comment if it exceeds GitHub's character limit.
Args:
comment_body: The comment body to check
full_scan_url: Optional URL to the full scan results
Returns:
Potentially truncated comment body with a link to full results
"""
if len(comment_body) <= GITHUB_COMMENT_MAX_LENGTH:
return comment_body
# Calculate space needed for truncation message
truncation_msg = "\n\n---\n\n⚠️ **Results truncated due to size limits.**"
if full_scan_url:
truncation_msg += f"\n\n🔗 [View complete scan results in Socket Report]({full_scan_url})"
else:
truncation_msg += "\n\nThe complete results exceed GitHub's comment size limit."
# Reserve space for the truncation message
max_content_length = GITHUB_COMMENT_MAX_LENGTH - len(truncation_msg) - 100 # Extra buffer
# Truncate at a reasonable boundary (try to break at newline)
truncated = comment_body[:max_content_length]
# Try to find the last complete line or section
last_newline = truncated.rfind('\n')
if last_newline > max_content_length * 0.8: # If we find a newline in the last 20%
truncated = truncated[:last_newline]
logger.warning(
f'GithubPRNotifier: comment truncated from {len(comment_body)} to {len(truncated)} characters'
)
return truncated + truncation_msg
def _update_comment(self, pr_number: int, comment_id: int, comment_body: str) -> bool:
"""Update an existing comment."""
owner_repo = self.repository
if not self.repository:
return False
# Truncate if needed
full_scan_url = getattr(self, 'full_scan_url', None)
comment_body = self._truncate_comment_if_needed(comment_body, full_scan_url)
try:
import requests
headers = {
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
}
url = f"{self.api_base}/repos/{self.repository}/issues/comments/{comment_id}"
payload = {'body': comment_body}
resp = requests.patch(url, headers=headers, json=payload, timeout=10)
if resp.status_code == 200:
logger.debug('GithubPRNotifier: comment updated successfully')
return True
else:
logger.warning('GithubPRNotifier: API error updating comment %s: %s', resp.status_code, resp.text[:200])
return False
except Exception as e:
logger.error('GithubPRNotifier: exception updating comment: %s', e)
return False
def _post_comment(self, pr_number: int, comment_body: str) -> bool:
"""Post a comment to the PR."""
if not self.repository:
logger.warning('GithubPRNotifier: no repository configured')
return False
# Truncate if needed
full_scan_url = getattr(self, 'full_scan_url', None)
comment_body = self._truncate_comment_if_needed(comment_body, full_scan_url)
try:
import requests
headers = {
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
}
url = f"{self.api_base}/repos/{self.repository}/issues/{pr_number}/comments"
payload = {'body': comment_body}
resp = requests.post(url, headers=headers, json=payload, timeout=10)
if resp.status_code == 201:
logger.debug('GithubPRNotifier: comment posted successfully')
return True
else:
logger.warning('GithubPRNotifier: API error %s: %s', resp.status_code, resp.text[:200])
return False
except Exception as e:
logger.error('GithubPRNotifier: exception posting comment: %s', e)
return False
def _ensure_label_exists_with_color(self, label_name: str, color: str, description: str = '') -> bool:
"""Ensure a label exists in the repository with the specified color.
If the label doesn't exist, it will be created with the given color.
If it already exists, we leave it alone (don't update existing labels).
Args:
label_name: Name of the label
color: Hex color code (without #), e.g., 'D73A4A'
description: Optional description for the label
Returns:
True if label exists/was created, False otherwise
"""
if not self.repository:
return False
try:
import requests
headers = {
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
}
# Check if label exists
check_url = f"{self.api_base}/repos/{self.repository}/labels/{label_name}"
resp = requests.get(check_url, headers=headers, timeout=10)
if resp.status_code == 200:
# Label already exists, don't modify it
logger.debug('GithubPRNotifier: label "%s" already exists', label_name)
return True
elif resp.status_code == 404:
# Label doesn't exist, create it
create_url = f"{self.api_base}/repos/{self.repository}/labels"
payload = {
'name': label_name,
'color': color,
'description': description
}
create_resp = requests.post(create_url, headers=headers, json=payload, timeout=10)
if create_resp.status_code == 201:
logger.info('GithubPRNotifier: created label "%s" with color #%s', label_name, color)
return True
else:
logger.warning('GithubPRNotifier: failed to create label "%s": %s',
label_name, create_resp.status_code)
return False
else:
logger.warning('GithubPRNotifier: unexpected response checking label: %s', resp.status_code)
return False
except Exception as e:
logger.debug('GithubPRNotifier: exception ensuring label exists: %s', e)
return False
def _add_pr_labels(self, pr_number: int, labels: List[str]) -> bool:
"""Add labels to a PR, ensuring they exist with appropriate colors.
Args:
pr_number: PR number
labels: List of label names to add
Returns:
True if successful, False otherwise
"""
if not self.repository or not labels:
return False
# Color mapping for severity labels (matching emoji colors)
label_colors = {
'security: critical': ('D73A4A', 'Critical security vulnerabilities'),
'security: high': ('D93F0B', 'High severity security issues'),
'security: medium': ('FBCA04', 'Medium severity security issues'),
'security: low': ('E4E4E4', 'Low severity security issues'),
}
# Ensure labels exist with correct colors
for label in labels:
# Get color and description if this is a known severity label
color_info = label_colors.get(label)
if color_info:
color, description = color_info
self._ensure_label_exists_with_color(label, color, description)
# For custom label names, use a default color
elif ':' in label:
# Try to infer severity from label name
label_lower = label.lower()
if 'critical' in label_lower:
self._ensure_label_exists_with_color(label, 'D73A4A', 'Critical security vulnerabilities')
elif 'high' in label_lower:
self._ensure_label_exists_with_color(label, 'D93F0B', 'High severity security issues')
elif 'medium' in label_lower:
self._ensure_label_exists_with_color(label, 'FBCA04', 'Medium severity security issues')
elif 'low' in label_lower:
self._ensure_label_exists_with_color(label, 'E4E4E4', 'Low severity security issues')
try:
import requests
headers = {
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
}
url = f"{self.api_base}/repos/{self.repository}/issues/{pr_number}/labels"
payload = {'labels': labels}
resp = requests.post(url, headers=headers, json=payload, timeout=10)
if resp.status_code == 200:
logger.info('GithubPRNotifier: added labels to PR %s: %s', pr_number, ', '.join(labels))
return True
else:
logger.warning('GithubPRNotifier: failed to add labels: %s', resp.status_code)
return False
except Exception as e:
logger.error('GithubPRNotifier: exception adding labels: %s', e)
return False
def _determine_pr_labels(self, notifications: List[Dict[str, Any]]) -> List[str]:
"""Determine which labels to add based on notifications.
Args:
notifications: List of notification dictionaries
Returns:
List of label names to add
"""
severities_found = set()
# Scan notifications for severity indicators
for notif in notifications:
content = notif.get('content', '')
# Look for severity indicators in content
# Pattern: "Critical: X" where X > 0
import re
critical_match = re.search(r'Critical:\s*(\d+)', content)
high_match = re.search(r'High:\s*(\d+)', content)
medium_match = re.search(r'Medium:\s*(\d+)', content)
if critical_match and int(critical_match.group(1)) > 0:
severities_found.add('critical')
if high_match and int(high_match.group(1)) > 0:
severities_found.add('high')
if medium_match and int(medium_match.group(1)) > 0:
severities_found.add('medium')
# Map severities to label names (using configurable labels)
labels = []
if 'critical' in severities_found:
label_name = self.config.get('pr_label_critical', 'security: critical')
labels.append(label_name)
elif 'high' in severities_found:
label_name = self.config.get('pr_label_high', 'security: high')
labels.append(label_name)
elif 'medium' in severities_found:
label_name = self.config.get('pr_label_medium', 'security: medium')
labels.append(label_name)
return labels