|
1 | 1 | from pathlib import Path |
2 | | -from typing import Iterator, List, Optional, Type |
| 2 | +from typing import Any, Dict, Iterator, List, Optional, Tuple, Type |
3 | 3 |
|
4 | 4 | import os |
5 | 5 | import re |
| 6 | +import shlex |
6 | 7 | import types |
7 | 8 | import urllib.parse |
8 | 9 |
|
@@ -37,88 +38,73 @@ def is_git_version(version: str) -> bool: |
37 | 38 | return len([p for p in url.path.split('/') if p]) == 2 |
38 | 39 | return False |
39 | 40 |
|
| 41 | + def parse_lockfile(self, lockfile: Path) -> Dict[str, Any]: |
| 42 | + def _iter_lines() -> Iterator[Tuple[int, str]]: |
| 43 | + indent = ' ' |
| 44 | + for line in lockfile.open(): |
| 45 | + level = 0 |
| 46 | + while line.startswith(indent): |
| 47 | + level += 1 |
| 48 | + line = line[len(indent) :] |
| 49 | + yield level, line.strip() |
| 50 | + |
| 51 | + root_entry: Dict[str, Any] = {} |
| 52 | + parent_entries = [root_entry] |
| 53 | + |
| 54 | + for level, line in _iter_lines(): |
| 55 | + if line.startswith('#') or not line: |
| 56 | + continue |
| 57 | + assert level <= len(parent_entries) - 1 |
| 58 | + parent_entries = parent_entries[: level + 1] |
| 59 | + if line.endswith(':'): |
| 60 | + key = line[:-1] |
| 61 | + child_entry = parent_entries[-1][key] = {} |
| 62 | + parent_entries.append(child_entry) |
| 63 | + else: |
| 64 | + # NOTE shlex.split is handy, but slow; |
| 65 | + # to speed up parsing we can use something less robust, e.g. |
| 66 | + # _key, _value = line.split(' ', 1) |
| 67 | + # parent_entries[-1][self.unquote(_key)] = self.unquote(_value) |
| 68 | + key, value = shlex.split(line) |
| 69 | + parent_entries[-1][key] = value |
| 70 | + |
| 71 | + return root_entry |
| 72 | + |
40 | 73 | def unquote(self, string: str) -> str: |
41 | 74 | if string.startswith('"'): |
42 | 75 | assert string.endswith('"') |
43 | 76 | return string[1:-1] |
44 | 77 | else: |
45 | 78 | return string |
46 | 79 |
|
47 | | - def parse_package_section(self, lockfile: Path, section: List[str]) -> Package: |
48 | | - assert section |
49 | | - name_line = section[0] |
50 | | - assert name_line.endswith(':'), name_line |
51 | | - name_line = name_line[:-1] |
| 80 | + def process_package( |
| 81 | + self, lockfile: Path, name_line: str, entry: Dict[str, Any] |
| 82 | + ) -> Package: |
| 83 | + assert name_line and entry |
52 | 84 |
|
53 | 85 | name = self.unquote(name_line.split(',', 1)[0]) |
54 | 86 | name, version_constraint = name.rsplit('@', 1) |
55 | 87 |
|
56 | | - version: Optional[str] = None |
57 | | - resolved: Optional[str] = None |
58 | | - integrity: Optional[Integrity] = None |
59 | | - |
60 | | - section_indent = 0 |
61 | | - |
62 | | - line = None |
63 | | - for line in section[1:]: |
64 | | - indent = 0 |
65 | | - while line[indent].isspace(): |
66 | | - indent += 1 |
67 | | - |
68 | | - assert indent, line |
69 | | - if not section_indent: |
70 | | - section_indent = indent |
71 | | - elif indent > section_indent: |
72 | | - # Inside some nested section. |
73 | | - continue |
74 | | - |
75 | | - line = line.strip() |
76 | | - |
77 | | - if line.startswith('"'): |
78 | | - # XXX: assuming no spaces in the quoted region! |
79 | | - key, value = line.split(' ', 1) |
80 | | - line = f'{self.unquote(key)} {value}' |
81 | | - |
82 | | - if line.startswith('version'): |
83 | | - version = self.unquote(line.split(' ', 1)[1]) |
84 | | - elif line.startswith('resolved'): |
85 | | - resolved = self.unquote(line.split(' ', 1)[1]) |
86 | | - elif line.startswith('integrity'): |
87 | | - _, values_str = line.split(' ', 1) |
88 | | - values = self.unquote(values_str).split(' ') |
89 | | - integrity = Integrity.parse(values[0]) |
90 | | - |
91 | | - assert version, section |
92 | | - |
93 | 88 | source: PackageSource |
94 | 89 | if self._LOCAL_PKG_RE.match(version_constraint): |
95 | 90 | source = LocalSource(path=self._LOCAL_PKG_RE.sub('', version_constraint)) |
96 | 91 | else: |
97 | | - assert resolved, section |
98 | | - if self.is_git_version(resolved): |
99 | | - source = self.parse_git_source(version=resolved) |
| 92 | + if self.is_git_version(entry['resolved']): |
| 93 | + source = self.parse_git_source(version=entry['resolved']) |
100 | 94 | else: |
101 | | - source = ResolvedSource(resolved=resolved, integrity=integrity) |
| 95 | + if 'integrity' in entry: |
| 96 | + integrity = Integrity.parse(entry['integrity']) |
| 97 | + else: |
| 98 | + integrity = None |
| 99 | + source = ResolvedSource(resolved=entry['resolved'], integrity=integrity) |
102 | 100 |
|
103 | | - return Package(name=name, version=version, source=source, lockfile=lockfile) |
| 101 | + return Package( |
| 102 | + name=name, version=entry['version'], source=source, lockfile=lockfile |
| 103 | + ) |
104 | 104 |
|
105 | 105 | def process_lockfile(self, lockfile: Path) -> Iterator[Package]: |
106 | | - section: List[str] = [] |
107 | | - |
108 | | - with open(lockfile) as fp: |
109 | | - for line in map(str.rstrip, fp): |
110 | | - if not line.strip() or line.strip().startswith('#'): |
111 | | - continue |
112 | | - |
113 | | - if not line[0].isspace(): |
114 | | - if section: |
115 | | - yield self.parse_package_section(lockfile, section) |
116 | | - section = [] |
117 | | - |
118 | | - section.append(line) |
119 | | - |
120 | | - if section: |
121 | | - yield self.parse_package_section(lockfile, section) |
| 106 | + for name_line, package in self.parse_lockfile(lockfile).items(): |
| 107 | + yield self.process_package(lockfile, name_line, package) |
122 | 108 |
|
123 | 109 |
|
124 | 110 | class YarnRCFileProvider(RCFileProvider): |
|
0 commit comments