-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopenstax-scrape.py
More file actions
328 lines (261 loc) · 13.2 KB
/
openstax-scrape.py
File metadata and controls
328 lines (261 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import asyncio
import os
import re
from pathlib import Path
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeout
import requests
class OpenStaxPlaywrightDownloader:
def __init__(self, base_url="https://openstax.org"):
self.base_url = base_url
self.download_folder = "OpenStax_Books"
self.subjects = {
"Business": "business",
"College Success": "college-success",
"Computer Science": "computer-science",
"Humanities": "humanities",
"Math": "math",
"Nursing": "nursing",
"Science": "science",
"Social Sciences": "social-sciences"
}
def create_folder(self, folder_name):
"""Create folder if it doesn't exist"""
Path(folder_name).mkdir(parents=True, exist_ok=True)
def sanitize_filename(self, filename):
"""Remove invalid characters from filename"""
filename = re.sub(r'[<>:"/\\|?*]', '', filename)
filename = filename.strip()
if len(filename) > 200:
filename = filename[:200]
return filename
async def wait_for_page_load(self, page):
"""Wait for the SPA to fully load"""
try:
await page.wait_for_selector('#app > *', timeout=10000)
await page.wait_for_load_state('networkidle', timeout=10000)
await page.wait_for_timeout(2000)
except PlaywrightTimeout:
pass
def download_pdf_with_requests(self, pdf_url, filepath):
"""Download PDF using requests library (more reliable)"""
try:
print(f" → Downloading from: {pdf_url[:80]}...")
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
response = requests.get(pdf_url, headers=headers, stream=True, timeout=120)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
with open(filepath, 'wb') as f:
if total_size == 0:
f.write(response.content)
else:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
progress = (downloaded / total_size) * 100
mb_downloaded = downloaded / 1024 / 1024
mb_total = total_size / 1024 / 1024
print(f" Progress: {progress:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)", end='\r')
print(f"\n ✓ Downloaded successfully ({mb_total:.2f} MB)")
return True
except Exception as e:
print(f"\n ✗ Download error: {str(e)}")
if os.path.exists(filepath):
os.remove(filepath)
return False
async def extract_pdf_from_page_content(self, page):
"""Extract PDF URL from page source"""
try:
content = await page.content()
# Look for PDF URLs - prioritize actual book PDFs over guides
pdf_patterns = [
r'https://assets\.openstax\.org/oscms-prodcms/media/documents/[^"\']*?(?<!Instructor)[^"\']*?(?<!Student)[^"\']*?(?<!Getting)[^"\']*?(?<!Guide)\.pdf[^"\']*',
r'https://assets\.openstax\.org/[^"\']+\.pdf[^"\']*',
r'https://[^"\']*openstax[^"\']*\.pdf[^"\']*',
]
found_pdfs = []
for pattern in pdf_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
for match in matches:
# Filter out instructor/student guides
if not any(x in match.lower() for x in ['instructor', 'student', 'getting', 'guide', 'started']):
found_pdfs.append(match)
# If we found PDFs, return the first one (likely the main book)
if found_pdfs:
pdf_url = found_pdfs[0]
print(f" → Found PDF: {pdf_url[:80]}...")
return pdf_url
# Fallback: return any PDF if no book PDF found
all_pdfs = re.findall(r'https://[^"\']+\.pdf[^"\']*', content)
if all_pdfs:
for pdf in all_pdfs:
if 'openstax' in pdf.lower():
print(f" → Found PDF (fallback): {pdf[:80]}...")
return pdf
return None
except Exception as e:
print(f" ✗ Error extracting PDF: {str(e)}")
return None
async def download_from_book_detail_page(self, page, book_url, subject_folder, idx, total):
"""Navigate to book detail page and download PDF"""
try:
print(f"\n [{idx}/{total}] Accessing: {book_url}")
# Navigate to book detail page
await page.goto(book_url, wait_until='domcontentloaded', timeout=30000)
await self.wait_for_page_load(page)
# Get book title
title_selectors = ['h1', 'h2', '[data-html="book-title"]']
book_title = None
for selector in title_selectors:
try:
title_elem = await page.query_selector(selector)
if title_elem:
book_title = await title_elem.inner_text()
book_title = book_title.strip()
if len(book_title) > 5 and book_title != "Book details":
break
except:
continue
if not book_title or book_title == "Book details":
# Extract from URL
book_title = book_url.split('/')[-1].split('?')[0].replace('-', ' ').title()
print(f" Title: {book_title}")
# Check if already downloaded
filename = f"{self.sanitize_filename(book_title)}.pdf"
filepath = os.path.join(subject_folder, filename)
if os.path.exists(filepath):
file_size = os.path.getsize(filepath) / 1024 / 1024
print(f" ⊘ Already exists ({file_size:.2f} MB)")
return None
# Extract PDF URL from page
pdf_url = await self.extract_pdf_from_page_content(page)
if not pdf_url:
print(f" ✗ Could not find PDF download link")
return False
# Clean up URL
pdf_url = pdf_url.split('"')[0].split("'")[0]
# Download using requests (more reliable than Playwright for direct file downloads)
return self.download_pdf_with_requests(pdf_url, filepath)
except Exception as e:
print(f" ✗ Error: {str(e)}")
return False
async def process_subject(self, page, subject_name, subject_slug):
"""Process all books in a subject"""
print(f"\n Navigating to: {self.base_url}/subjects/{subject_slug}")
try:
await page.goto(f"{self.base_url}/subjects/{subject_slug}", wait_until='domcontentloaded')
await self.wait_for_page_load(page)
# Find all links to book detail pages
print(f" → Looking for book detail page links...")
book_links = await page.query_selector_all('a[href*="/details/books/"]')
# Get unique URLs (filter out instructor/student resource links)
book_urls = set()
for link in book_links:
href = await link.get_attribute('href')
if href:
# Skip resource links
if '?' in href and any(x in href.lower() for x in ['instructor', 'student', 'resource']):
continue
# Get base URL without query parameters
base_href = href.split('?')[0]
full_url = base_href if base_href.startswith('http') else f"{self.base_url}{base_href}"
book_urls.add(full_url)
book_urls = list(book_urls)
print(f" Found {len(book_urls)} unique books")
if not book_urls:
return 0, 0, 0
# Create subject folder
subject_folder = os.path.join(self.download_folder, self.sanitize_filename(subject_name))
self.create_folder(subject_folder)
# Process each book
downloaded = 0
failed = 0
skipped = 0
for idx, book_url in enumerate(book_urls, 1):
result = await self.download_from_book_detail_page(page, book_url, subject_folder, idx, len(book_urls))
if result is True:
downloaded += 1
elif result is False:
failed += 1
else:
skipped += 1
await page.wait_for_timeout(2000)
return downloaded, failed, skipped
except Exception as e:
print(f" ✗ Error processing subject: {str(e)}")
return 0, 0, 0
async def download_all(self):
"""Main function to download all PDFs"""
print(f"\n{'='*70}")
print("OpenStax PDF Downloader - Headless Mode")
print(f"{'='*70}\n")
self.create_folder(self.download_folder)
total_downloaded = 0
total_failed = 0
total_skipped = 0
async with async_playwright() as p:
# Launch browser in HEADLESS mode (no window will appear)
browser = await p.chromium.launch(
headless=True, # ← This ensures no browser window pops up
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-setuid-sandbox'
]
)
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
)
page = await context.new_page()
# Process each subject
for idx, (subject_name, subject_slug) in enumerate(self.subjects.items(), 1):
print(f"\n{'='*70}")
print(f"[{idx}/{len(self.subjects)}] SUBJECT: {subject_name}")
print(f"{'='*70}")
try:
downloaded, failed, skipped = await self.process_subject(page, subject_name, subject_slug)
total_downloaded += downloaded
total_failed += failed
total_skipped += skipped
print(f"\n{'─'*70}")
print(f"✓ Completed: {subject_name}")
print(f" Downloaded: {downloaded} | Failed: {failed} | Skipped: {skipped}")
print(f"{'─'*70}")
if idx < len(self.subjects):
print(f"\n⏳ Waiting 3 seconds before next subject...")
await page.wait_for_timeout(3000)
except Exception as e:
print(f"\n✗ Error processing subject {subject_name}: {str(e)}")
await browser.close()
print(f"\n{'='*70}")
print("📊 FINAL SUMMARY")
print(f"{'='*70}")
print(f"✓ Successfully downloaded: {total_downloaded}")
print(f"✗ Failed downloads: {total_failed}")
print(f"⊘ Skipped (already exist): {total_skipped}")
print(f"📁 Files saved to: {os.path.abspath(self.download_folder)}")
print(f"{'='*70}\n")
# Create download log
log_file = os.path.join(self.download_folder, "download_log.txt")
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"OpenStax Download Log\n")
f.write(f"{'='*50}\n")
f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total Downloaded: {total_downloaded}\n")
f.write(f"Total Failed: {total_failed}\n")
f.write(f"Total Skipped: {total_skipped}\n")
f.write(f"{'='*50}\n")
async def main():
print("Starting OpenStax PDF Downloader (Headless Mode)...")
print("Running in background - no browser windows will appear.\n")
downloader = OpenStaxPlaywrightDownloader()
await downloader.download_all()
print("\n✅ Download process complete!")
if __name__ == "__main__":
import time
asyncio.run(main())