-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy pathcheck_links.py
More file actions
230 lines (187 loc) · 7.31 KB
/
check_links.py
File metadata and controls
230 lines (187 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FlyPython Link Checker Tool
Periodically checks the validity of all external links in README files
"""
import re
import json
import os
from pathlib import Path
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class LinkChecker:
def __init__(self, timeout: int = 10, max_workers: int = 10):
self.session = self._create_session()
self.timeout = timeout
self.max_workers = max_workers
self.results = {
'working': [],
'broken': [],
'redirect': [],
'timeout': [],
'unknown': []
}
self.processed_urls = set()
def _create_session(self) -> requests.Session:
"""Create a requests session with retry strategy and headers"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=2,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)
# Set user agent
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
return session
def extract_links_from_file(self, filename: str) -> List[Dict]:
"""Extract all external links from a markdown file"""
filepath = Path(filename)
if not filepath.exists():
print(f"File not found: {filename}")
return []
try:
content = filepath.read_text(encoding='utf-8')
except Exception as e:
print(f"Failed to read {filename}: {e}")
return []
links = []
# Extract markdown links [text](url)
markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content)
for text, url in markdown_links:
if url.startswith('http'):
links.append({
'text': text,
'url': url,
'file': str(filepath),
'type': 'markdown'
})
# Extract plain URLs
plain_urls = re.findall(r'https?://[^\s\])\}]+', content)
seen = {link['url'] for link in links}
for url in plain_urls:
if url not in seen:
links.append({
'text': url,
'url': url,
'file': str(filepath),
'type': 'plain'
})
seen.add(url)
return links
def check_link(self, link: Dict) -> Dict:
"""Check the status of a single link"""
url = link['url']
if url in self.processed_urls:
return link
self.processed_urls.add(url)
try:
# Try HEAD request first (faster)
response = self.session.head(url, timeout=self.timeout, allow_redirects=True)
return self._process_response(link, response)
except requests.exceptions.Timeout:
link['status'] = 'timeout'
link['error'] = 'Request timeout'
self.results['timeout'].append(link)
return link
except requests.exceptions.RequestException as e:
# Fall back to GET request for servers that don't support HEAD
try:
response = self.session.get(url, timeout=self.timeout)
return self._process_response(link, response)
except requests.exceptions.RequestException:
link['status'] = 'unknown'
link['error'] = str(e)
self.results['unknown'].append(link)
return link
def _process_response(self, link: Dict, response: requests.Response) -> Dict:
"""Process HTTP response and categorize link"""
status_code = response.status_code
if status_code == 200:
link['status'] = 'working'
self.results['working'].append(link)
elif 300 <= status_code < 400:
link['status'] = 'redirect'
link['final_url'] = response.url
self.results['redirect'].append(link)
else:
link['status'] = 'broken'
self.results['broken'].append(link)
link['status_code'] = status_code
return link
def check_all_links(self, links: List[Dict]) -> None:
"""Concurrently check all links"""
print(f"Checking {len(links)} links...\n")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.check_link, link): link for link in links}
for i, future in enumerate(as_completed(futures), 1):
link = futures[future]
try:
result = future.result()
status = result.get('status', 'unknown').upper()
print(f"[{i}/{len(links)}] {status}: {result['url']}")
except Exception as e:
print(f"Error checking {link['url']}: {e}")
def generate_report(self, output_dir: str = 'reports') -> None:
"""Generate and save detailed report"""
total = sum(len(links) for links in self.results.values())
report = f"""
{'='*60}
Link Check Report
{'='*60}
Total Links: {total}
✓ Working: {len(self.results['working'])}
→ Redirects: {len(self.results['redirect'])}
✗ Broken: {len(self.results['broken'])}
⏱ Timeouts: {len(self.results['timeout'])}
? Unknown: {len(self.results['unknown'])}
{'='*60}
"""
print(report)
# Save detailed results
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
results_file = output_path / 'link_check_results.json'
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
print(f"Detailed results saved to: {results_file}")
def deduplicate_links(self, links: List[Dict]) -> List[Dict]:
"""Remove duplicate links by URL"""
seen = set()
unique = []
for link in links:
if link['url'] not in seen:
unique.append(link)
seen.add(link['url'])
return unique
def main():
files = ['../README.md', '../README_cn.md']
# Extract links
checker = LinkChecker(timeout=10, max_workers=10)
all_links = []
for filename in files:
print(f"Extracting links from {filename}...")
links = checker.extract_links_from_file(filename)
if links:
all_links.extend(links)
print(f"Found {len(links)} links\n")
if not all_links:
print("No links found!")
return
# Deduplicate and check
unique_links = checker.deduplicate_links(all_links)
print(f"Checking {len(unique_links)} unique links\n")
checker.check_all_links(unique_links)
checker.generate_report()
if __name__ == '__main__':
main()