-
Notifications
You must be signed in to change notification settings - Fork 136
Expand file tree
/
Copy pathcache.py
More file actions
188 lines (139 loc) · 5.24 KB
/
cache.py
File metadata and controls
188 lines (139 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from __future__ import annotations
import os
import re
import tempfile
import types
from collections.abc import Iterator
from pathlib import Path
from typing import IO
class Cache:
instance: Cache
@classmethod
def get_working_instance_if(cls, condition: bool) -> Cache:
return cls.instance if condition else NullCache()
class BucketReader:
def read_parts(self, size: int) -> Iterator[bytes]:
raise NotImplementedError
def read_all(self) -> bytes:
raise NotImplementedError
def close(self) -> None:
raise NotImplementedError
def __enter__(self) -> Cache.BucketReader:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
self.close()
class BucketWriter:
def write(self, data: bytes) -> None:
raise NotImplementedError
def cancel(self) -> None:
raise NotImplementedError
def seal(self) -> None:
raise NotImplementedError
def __enter__(self) -> Cache.BucketWriter:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
if traceback is None:
self.seal()
else:
self.cancel()
class BucketRef:
def __init__(self, key: str) -> None:
self.key = key
def open_read(self) -> Cache.BucketReader | None:
raise NotImplementedError
def open_write(self) -> Cache.BucketWriter:
raise NotImplementedError
def get(self, key: str) -> BucketRef:
raise NotImplementedError
class NullCache(Cache):
class NullBucketWriter(Cache.BucketWriter):
def write(self, data: bytes) -> None:
pass
def cancel(self) -> None:
pass
def seal(self) -> None:
pass
class NullBucketRef(Cache.BucketRef):
def __init__(self, key: str) -> None:
super().__init__(key)
def open_read(self) -> Cache.BucketReader | None:
return None
def open_write(self) -> Cache.BucketWriter:
return NullCache.NullBucketWriter()
def get(self, key: str) -> Cache.BucketRef:
return NullCache.NullBucketRef(key)
class FilesystemBasedCache(Cache):
_SUBDIR = 'flatpak-node-generator'
_KEY_CHAR_ESCAPE_RE = re.compile(r'[^A-Za-z0-9._\-]')
def __init__(self, cache_root: Path | None = None) -> None:
self._cache_root = cache_root or self._default_cache_root()
@staticmethod
def _escape_key(key: str) -> str:
return FilesystemBasedCache._KEY_CHAR_ESCAPE_RE.sub(
lambda m: f'_{ord(m.group()):02X}', key
)
class FilesystemBucketReader(Cache.BucketReader):
def __init__(self, file: IO[bytes]) -> None:
self.file = file
def close(self) -> None:
self.file.close()
def read_parts(self, size: int) -> Iterator[bytes]:
while True:
data = self.file.read(size)
if not data:
break
yield data
def read_all(self) -> bytes:
return self.file.read()
class FilesystemBucketWriter(Cache.BucketWriter):
def __init__(self, file: IO[bytes], temp: Path, target: Path) -> None:
self.file = file
self.temp = temp
self.target = target
def write(self, data: bytes) -> None:
self.file.write(data)
def cancel(self) -> None:
self.file.close()
self.temp.unlink()
def seal(self) -> None:
self.file.close()
self.temp.rename(self.target)
class FilesystemBucketRef(Cache.BucketRef):
def __init__(self, key: str, cache_root: Path) -> None:
super().__init__(key)
self._cache_root = cache_root
self._cache_path = self._cache_root / FilesystemBasedCache._escape_key(key)
def open_read(self) -> Cache.BucketReader | None:
try:
fp = self._cache_path.open('rb')
except FileNotFoundError:
return None
else:
return FilesystemBasedCache.FilesystemBucketReader(fp)
def open_write(self) -> Cache.BucketWriter:
target = self._cache_path
if not target.parent.exists():
target.parent.mkdir(exist_ok=True, parents=True)
fd, temp = tempfile.mkstemp(dir=self._cache_root, prefix='__temp__')
return FilesystemBasedCache.FilesystemBucketWriter(
os.fdopen(fd, 'wb'), Path(temp), target
)
@classmethod
def _default_cache_root(cls) -> Path:
xdg_cache_home = os.environ.get(
'XDG_CACHE_HOME', os.path.expanduser('~/.cache')
)
return Path(xdg_cache_home) / cls._SUBDIR
def get(self, key: str) -> Cache.BucketRef:
return FilesystemBasedCache.FilesystemBucketRef(key, self._cache_root)
Cache.instance = NullCache()