forked from micropython/micropython-lib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
187 lines (154 loc) · 5.47 KB
/
__init__.py
File metadata and controls
187 lines (154 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# MicroPython package installer
# MIT license; Copyright (c) 2022 Jim Mussared
from micropython import const
import requests
import sys
_PACKAGE_INDEX = const("https://micropython.org/pi/v2")
_CHUNK_SIZE = 128
allowed_mip_url_prefixes = ("http://", "https://", "github:", "gitlab:")
# This implements os.makedirs(os.dirname(path))
def _ensure_path_exists(path):
import os
split = path.split("/")
# Handle paths starting with "/".
if not split[0]:
split.pop(0)
split[0] = "/" + split[0]
prefix = ""
for i in range(len(split) - 1):
prefix += split[i]
try:
os.stat(prefix)
except:
os.mkdir(prefix)
prefix += "/"
# Copy from src (stream) to dest (function-taking-bytes)
def _chunk(src, dest):
buf = memoryview(bytearray(_CHUNK_SIZE))
while True:
n = src.readinto(buf)
if n == 0:
break
dest(buf if n == _CHUNK_SIZE else buf[:n])
# Check if the specified path exists and matches the hash.
def _check_exists(path, short_hash):
import os
try:
import binascii
import hashlib
with open(path, "rb") as f:
hs256 = hashlib.sha256()
_chunk(f, hs256.update)
existing_hash = str(binascii.hexlify(hs256.digest())[: len(short_hash)], "utf-8")
return existing_hash == short_hash
except:
return False
def _rewrite_url(url, branch=None):
if not branch:
branch = "HEAD"
if url.startswith("github:"):
url = url[7:].split("/")
url = (
"https://raw.githubusercontent.com/"
+ url[0]
+ "/"
+ url[1]
+ "/"
+ branch
+ "/"
+ "/".join(url[2:])
)
elif url.startswith("gitlab:"):
url = url[7:].split("/")
url = (
"https://gitlab.com/"
+ url[0]
+ "/"
+ url[1]
+ "/-/raw/"
+ branch
+ "/"
+ "/".join(url[2:])
)
return url
def _download_file(url, dest):
response = requests.get(url)
try:
if response.status_code != 200:
print("Error", response.status_code, "requesting", url)
return False
print("Copying:", dest)
_ensure_path_exists(dest)
with open(dest, "wb") as f:
_chunk(response.raw, f.write)
return True
finally:
response.close()
def _install_json(package_json_url, index, target, version, mpy):
response = requests.get(_rewrite_url(package_json_url, version))
try:
if response.status_code != 200:
print("Package not found:", package_json_url)
return False
package_json = response.json()
finally:
response.close()
for target_path, short_hash in package_json.get("hashes", ()):
fs_target_path = target + "/" + target_path
if _check_exists(fs_target_path, short_hash):
print("Exists:", fs_target_path)
else:
file_url = "{}/file/{}/{}".format(index, short_hash[:2], short_hash)
if not _download_file(file_url, fs_target_path):
print("File not found: {} {}".format(target_path, short_hash))
return False
base_url = package_json_url.rpartition("/")[0]
for target_path, url in package_json.get("urls", ()):
fs_target_path = target + "/" + target_path
is_full_url = any(url.startswith(p) for p in allowed_mip_url_prefixes)
if base_url and not is_full_url:
url = f"{base_url}/{url}" # Relative URLs
if not _download_file(_rewrite_url(url, version), fs_target_path):
print("File not found: {} {}".format(target_path, url))
return False
for dep, dep_version in package_json.get("deps", ()):
if not _install_package(dep, index, target, dep_version, mpy):
return False
return True
def _install_package(package, index, target, version, mpy):
if any(package.startswith(p) for p in allowed_mip_url_prefixes):
if package.endswith(".py") or package.endswith(".mpy"):
print("Downloading {} to {}".format(package, target))
return _download_file(
_rewrite_url(package, version), target + "/" + package.rsplit("/")[-1]
)
else:
if not package.endswith(".json"):
if not package.endswith("/"):
package += "/"
package += "package.json"
print("Installing {} to {}".format(package, target))
else:
if not version:
version = "latest"
print("Installing {} ({}) from {} to {}".format(package, version, index, target))
mpy_version = (
sys.implementation._mpy & 0xFF if mpy and hasattr(sys.implementation, "_mpy") else "py"
)
package = "{}/package/{}/{}/{}.json".format(index, mpy_version, package, version)
return _install_json(package, index, target, version, mpy)
def install(package, index=None, target=None, version=None, mpy=True):
if not target:
for p in sys.path:
if p.endswith("/lib"):
target = p
break
else:
print("Unable to find lib dir in sys.path")
return
if not index:
index = _PACKAGE_INDEX
if _install_package(package, index.rstrip("/"), target, version, mpy):
print("Done")
else:
print("Package may be partially installed")