|
2 | 2 | import importlib.util |
3 | 3 | import os |
4 | 4 | import sys |
| 5 | +import urllib.error |
5 | 6 | import urllib.parse |
6 | 7 | import urllib.request |
7 | 8 | from importlib import metadata |
@@ -34,21 +35,40 @@ def plugin(f, *args, **kwargs) -> None: # type: ignore[no-untyped-def] |
34 | 35 | plugins[f.__name__] = f |
35 | 36 |
|
36 | 37 |
|
37 | | -def _localize_path(path: Path) -> Path: |
| 38 | +def _localize_path(path: str | Path) -> Path: |
38 | 39 | """ |
39 | 40 | Support structures for load_plugin() |
40 | 41 | """ |
41 | | - url = urllib.parse.urlparse(str(path)) |
| 42 | + # Keep as string to preserve URL format (Path normalization breaks URLs) |
| 43 | + path_str = str(path) |
| 44 | + url = urllib.parse.urlparse(path_str) |
42 | 45 |
|
43 | 46 | if url.scheme and url.scheme in ('https', 'http'): |
44 | | - converted_path = Path(f'/tmp/{path.stem}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') |
| 47 | + # Extract filename from the URL path component |
| 48 | + # Use os.path.basename instead of path.stem to handle URLs correctly |
| 49 | + url_path = url.path |
| 50 | + filename = os.path.basename(url_path) if url_path else 'plugin' |
| 51 | + # Remove .py extension if present for the temporary filename format |
| 52 | + if filename.endswith('.py'): |
| 53 | + filename_base = filename.replace('.py', '') |
| 54 | + else: |
| 55 | + filename_base = filename |
| 56 | + |
| 57 | + converted_path = Path(f'/tmp/{filename_base}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') |
45 | 58 |
|
46 | 59 | with open(converted_path, 'w') as temp_file: |
47 | | - temp_file.write(urllib.request.urlopen(url.geturl()).read().decode('utf-8')) |
| 60 | + # Use the original path string directly instead of reconstructing from parsed URL |
| 61 | + # This avoids issues with url.geturl() producing malformed URLs |
| 62 | + try: |
| 63 | + response = urllib.request.urlopen(path_str) |
| 64 | + temp_file.write(response.read().decode('utf-8')) |
| 65 | + except urllib.error.URLError as e: |
| 66 | + error(f'Failed to download plugin from {path_str}: {e}') |
| 67 | + raise |
48 | 68 |
|
49 | 69 | return converted_path |
50 | 70 | else: |
51 | | - return path |
| 71 | + return Path(path) |
52 | 72 |
|
53 | 73 |
|
54 | 74 | def _import_via_path(path: Path, namespace: str | None = None) -> str: |
@@ -80,18 +100,20 @@ def _import_via_path(path: Path, namespace: str | None = None) -> str: |
80 | 100 | return namespace |
81 | 101 |
|
82 | 102 |
|
83 | | -def load_plugin(path: Path) -> None: |
| 103 | +def load_plugin(path: str | Path) -> None: |
84 | 104 | namespace: str | None = None |
85 | | - parsed_url = urllib.parse.urlparse(str(path)) |
| 105 | + # Keep URL as string to preserve scheme (avoid Path normalization) |
| 106 | + path_str = str(path) if isinstance(path, Path) else path |
| 107 | + parsed_url = urllib.parse.urlparse(path_str) |
86 | 108 | info(f'Loading plugin from url {parsed_url}') |
87 | 109 |
|
88 | 110 | # The Profile was not a direct match on a remote URL |
89 | 111 | if not parsed_url.scheme: |
90 | 112 | # Path was not found in any known examples, check if it's an absolute path |
91 | | - if os.path.isfile(path): |
92 | | - namespace = _import_via_path(path) |
| 113 | + if os.path.isfile(path_str): |
| 114 | + namespace = _import_via_path(Path(path_str)) |
93 | 115 | elif parsed_url.scheme in ('https', 'http'): |
94 | | - localized = _localize_path(path) |
| 116 | + localized = _localize_path(path_str) |
95 | 117 | namespace = _import_via_path(localized) |
96 | 118 |
|
97 | 119 | if namespace and namespace in sys.modules: |
|
0 commit comments