|
44 | 44 | is_target_closed_error, |
45 | 45 | rewrite_error, |
46 | 46 | ) |
47 | | -from playwright._impl._glob import glob_to_regex |
| 47 | +from playwright._impl._glob import glob_to_regex_pattern |
48 | 48 | from playwright._impl._greenlets import RouteGreenlet |
49 | 49 | from playwright._impl._str_utils import escape_regex_flags |
50 | 50 |
|
@@ -144,31 +144,103 @@ class FrameNavigatedEvent(TypedDict): |
144 | 144 |
|
145 | 145 |
|
146 | 146 | def url_matches( |
147 | | - base_url: Optional[str], url_string: str, match: Optional[URLMatch] |
| 147 | + base_url: Optional[str], |
| 148 | + url_string: str, |
| 149 | + match: Optional[URLMatch], |
| 150 | + websocket_url: bool = None, |
148 | 151 | ) -> bool: |
149 | 152 | if not match: |
150 | 153 | return True |
151 | | - if isinstance(match, str) and match[0] != "*": |
152 | | - # Allow http(s) baseURL to match ws(s) urls. |
153 | | - if ( |
154 | | - base_url |
155 | | - and re.match(r"^https?://", base_url) |
156 | | - and re.match(r"^wss?://", url_string) |
157 | | - ): |
158 | | - base_url = re.sub(r"^http", "ws", base_url) |
159 | | - if base_url: |
160 | | - match = urljoin(base_url, match) |
161 | | - parsed = urlparse(match) |
162 | | - if parsed.path == "": |
163 | | - parsed = parsed._replace(path="/") |
164 | | - match = parsed.geturl() |
165 | 154 | if isinstance(match, str): |
166 | | - match = glob_to_regex(match) |
| 155 | + match = re.compile( |
| 156 | + resolve_glob_to_regex_pattern(base_url, match, websocket_url) |
| 157 | + ) |
167 | 158 | if isinstance(match, Pattern): |
168 | 159 | return bool(match.search(url_string)) |
169 | 160 | return match(url_string) |
170 | 161 |
|
171 | 162 |
|
| 163 | +def resolve_glob_to_regex_pattern( |
| 164 | + base_url: Optional[str], glob: str, websocket_url: bool = None |
| 165 | +) -> str: |
| 166 | + if websocket_url: |
| 167 | + base_url = to_websocket_base_url(base_url) |
| 168 | + glob = resolve_glob_base(base_url, glob) |
| 169 | + return glob_to_regex_pattern(glob) |
| 170 | + |
| 171 | + |
| 172 | +def to_websocket_base_url(base_url: Optional[str]) -> Optional[str]: |
| 173 | + if base_url is not None and re.match(r"^https?://", base_url): |
| 174 | + base_url = re.sub(r"^http", "ws", base_url) |
| 175 | + return base_url |
| 176 | + |
| 177 | + |
| 178 | +def resolve_glob_base(base_url: Optional[str], match: str) -> str: |
| 179 | + if match[0] == "*": |
| 180 | + return match |
| 181 | + |
| 182 | + token_map: Dict[str, str] = {} |
| 183 | + |
| 184 | + def map_token(original: str, replacement: str) -> str: |
| 185 | + if len(original) == 0: |
| 186 | + return "" |
| 187 | + token_map[replacement] = original |
| 188 | + return replacement |
| 189 | + |
| 190 | + # Escaped `\\?` behaves the same as `?` in our glob patterns. |
| 191 | + match = match.replace(r"\\?", "?") |
| 192 | + # Glob symbols may be escaped in the URL and some of them such as ? affect resolution, |
| 193 | + # so we replace them with safe components first. |
| 194 | + processed_parts = [] |
| 195 | + for index, token in enumerate(match.split("/")): |
| 196 | + if token in (".", "..", ""): |
| 197 | + processed_parts.append(token) |
| 198 | + continue |
| 199 | + # Handle special case of http*://, note that the new schema has to be |
| 200 | + # a web schema so that slashes are properly inserted after domain. |
| 201 | + if index == 0 and token.endswith(":"): |
| 202 | + # Using a simple replacement for the scheme part |
| 203 | + processed_parts.append(map_token(token, "http:")) |
| 204 | + continue |
| 205 | + question_index = token.find("?") |
| 206 | + if question_index == -1: |
| 207 | + processed_parts.append(map_token(token, f"$_{index}_$")) |
| 208 | + else: |
| 209 | + new_prefix = map_token(token[:question_index], f"$_{index}_$") |
| 210 | + new_suffix = map_token(token[question_index:], f"?$_{index}_$") |
| 211 | + processed_parts.append(new_prefix + new_suffix) |
| 212 | + |
| 213 | + relative_path = "/".join(processed_parts) |
| 214 | + resolved_url = urljoin(base_url if base_url is not None else "", relative_path) |
| 215 | + |
| 216 | + for replacement, original in token_map.items(): |
| 217 | + resolved_url = resolved_url.replace(replacement, original, 1) |
| 218 | + |
| 219 | + return ensure_trailing_slash(resolved_url) |
| 220 | + |
| 221 | + |
| 222 | +# In Node.js, new URL('http://localhost') returns 'http://localhost/'. |
| 223 | +# To ensure the same url matching behavior, do the same. |
| 224 | +def ensure_trailing_slash(url: str) -> str: |
| 225 | + split = url.split("://", maxsplit=1) |
| 226 | + if len(split) == 2: |
| 227 | + # URL parser doesn't like strange/unknown schemes, so we replace it for parsing, then put it back |
| 228 | + parsable_url = "http://" + split[1] |
| 229 | + else: |
| 230 | + # Given current rules, this should never happen _and_ still be a valid matcher. We require the protocol to be part of the match, |
| 231 | + # so either the user is using a glob that starts with "*" (and none of this code is running), or the user actually has `something://` in `match` |
| 232 | + parsable_url = url |
| 233 | + parsed = urlparse(parsable_url, allow_fragments=True) |
| 234 | + if len(split) == 2: |
| 235 | + # Replace the scheme that we removed earlier |
| 236 | + parsed = parsed._replace(scheme=split[0]) |
| 237 | + if parsed.path == "": |
| 238 | + parsed = parsed._replace(path="/") |
| 239 | + url = parsed.geturl() |
| 240 | + |
| 241 | + return url |
| 242 | + |
| 243 | + |
172 | 244 | class HarLookupResult(TypedDict, total=False): |
173 | 245 | action: Literal["error", "redirect", "fulfill", "noentry"] |
174 | 246 | message: Optional[str] |
|
0 commit comments