-
Notifications
You must be signed in to change notification settings - Fork 742
Expand file tree
/
Copy path_utils.py
More file actions
109 lines (83 loc) · 3.37 KB
/
_utils.py
File metadata and controls
109 lines (83 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from __future__ import annotations
import asyncio
from contextlib import suppress
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from playwright.async_api import Page
from playwright.async_api import Request as PlaywrightRequest
_DEFAULT_BLOCK_REQUEST_URL_PATTERNS = [
'.css',
'.webp',
'.jpg',
'.jpeg',
'.png',
'.svg',
'.gif',
'.woff',
'.pdf',
'.zip',
]
async def infinite_scroll(page: Page) -> None:
"""Scroll to the bottom of a page, handling loading of additional items."""
scrolled_distance = 0
finished = False
match_count = 0
match_count_threshold = 4
old_request_count = 0
new_request_count = 0
def track_request(request: PlaywrightRequest) -> None:
if request.resource_type in ['xhr', 'fetch', 'websocket', 'other']:
nonlocal new_request_count
new_request_count += 1
page.on('request', track_request)
async def scroll() -> None:
body_scroll_height = await page.evaluate('() => document.body.scrollHeight')
delta = body_scroll_height or 10000
await page.mouse.wheel(delta_x=0, delta_y=delta)
nonlocal scrolled_distance
scrolled_distance += delta
async def check_finished() -> None:
nonlocal old_request_count, new_request_count, match_count, finished
while True:
if old_request_count == new_request_count:
match_count += 1
if match_count >= match_count_threshold:
finished = True
return
else:
match_count = 0
old_request_count = new_request_count
await asyncio.sleep(1)
check_task = asyncio.create_task(check_finished(), name='infinite_scroll_check_finished_task')
try:
while not finished:
await scroll()
await page.wait_for_timeout(250)
finally:
if not check_task.done():
check_task.cancel()
with suppress(asyncio.CancelledError):
await check_task
async def block_requests(
page: Page, url_patterns: list[str] | None = None, extra_url_patterns: list[str] | None = None
) -> None:
"""Blocks network requests matching specified URL patterns.
Args:
page: Playwright Page object to block requests on.
url_patterns: List of URL patterns to block. If None, uses default patterns.
extra_url_patterns: Additional URL patterns to append to the main patterns list.
"""
url_patterns = list(url_patterns or _DEFAULT_BLOCK_REQUEST_URL_PATTERNS)
url_patterns.extend(extra_url_patterns or [])
browser_type = page.context.browser.browser_type.name if page.context.browser else 'undefined'
if browser_type == 'chromium':
client = await page.context.new_cdp_session(page)
await client.send('Network.enable')
await client.send('Network.setBlockedURLs', {'urls': url_patterns})
else:
extensions = [pattern.strip('*.') for pattern in url_patterns if pattern.startswith(('*.', '.'))]
specific_files = [pattern for pattern in url_patterns if not pattern.startswith(('*.', '.'))]
if extensions:
await page.route(f'**/*.{{{",".join(extensions)}}}*', lambda route, _: route.abort())
if specific_files:
await page.route(f'**/{{{",".join(specific_files)}}}*', lambda route, _: route.abort())