-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path__main__.py
More file actions
193 lines (155 loc) · 6.39 KB
/
__main__.py
File metadata and controls
193 lines (155 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import asyncio
import sys
import typing
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from os import cpu_count
from pathlib import Path
from binaryornot.check import is_binary
from plumbum import cli
from .core import CombinedReplacerFactory, ReplaceContext
from .core.InBufferReplacer import InBufferReplacer
from .core.InFileReplacer import InFileReplacer
from .replacers.HEReplacer import HEReplacer
from .replacers.HSTSPreloadReplacer import HSTSPreloadReplacer
class OurInBufferReplacer(InBufferReplacer):
__slots__ = ()
FACS = CombinedReplacerFactory(
{
"preloads": HSTSPreloadReplacer,
"heRulesets": HEReplacer,
}
)
def __init__(self, preloads=None, heRulesets=None):
super().__init__(preloads=preloads, heRulesets=heRulesets)
class OurInFileReplacer(InFileReplacer):
def __init__(self, preloads=None, heRulesets=None):
super().__init__(OurInBufferReplacer(preloads=preloads, heRulesets=heRulesets))
class CLI(cli.Application):
"""HTTPSEverywhere-like URI rewriter"""
class FileClassifier:
__slots__ = ("noSkipDot", "noSkipBinary")
def __init__(self, noSkipDot: bool, noSkipBinary: bool):
self.noSkipDot = noSkipDot
self.noSkipBinary = noSkipBinary
def __call__(self, p: Path) -> str:
for pa in p.parts:
if not self.noSkipDot and pa[0] == ".":
return "dotfile"
if not p.is_dir():
if p.is_file():
if self.noSkipBinary or not is_binary(p):
return ""
else:
return "binary"
else:
return "not regular file"
class FilesEnumerator:
__slots__ = ("classifier", "disallowedReportingCallback")
def __init__(self, classifier, disallowedReportingCallback):
self.classifier = classifier
self.disallowedReportingCallback = disallowedReportingCallback
def __call__(self, fileOrDir: Path):
reasonOfDisallowal = self.classifier(fileOrDir)
if not reasonOfDisallowal:
if fileOrDir.is_dir():
for f in fileOrDir.iterdir():
yield from self(f)
else:
yield fileOrDir
else:
self.disallowedReportingCallback(fileOrDir, reasonOfDisallowal)
@CLI.subcommand("bulk")
class FileRewriteCLI(cli.Application):
"""Rewrites URIs in files. Use - to consume list of files from stdin. Don't use `find`, it is a piece of shit which is impossible to configure to skip .git dirs."""
__slots__ = ("_repl",)
@property
def repl(self) -> InFileReplacer:
if self._repl is None:
self._repl = OurInFileReplacer()
print(
len(self._repl.inBufferReplacer.singleURIReplacer.children[0].preloads),
"HSTS preloads",
)
print(len(self._repl.inBufferReplacer.singleURIReplacer.children[1].rulesets), "HE rules")
return self._repl
def processEachFileName(self, ctx: ReplaceContext, l: str) -> Path:
l = l.strip()
if l:
l = l.decode("utf-8")
p = Path(l).resolve().absolute()
self.processEachFilePath(ctx, p)
def processEachFilePath(self, ctx: ReplaceContext, p: Path) -> None:
for pp in self.fe(p):
if self.trace:
print("Processing", pp, file=sys.stderr)
self.repl(ctx, pp)
if self.trace:
print("Processed", pp, file=sys.stderr)
@asyncio.coroutine
def asyncMainPathsFromStdIn(self):
conc = []
asyncStdin = asyncio.StreamReader(loop=self.loop)
yield from self.loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(asyncStdin, loop=self.loop), sys.stdin
)
with ThreadPoolExecutor(max_workers=cpu_count()) as pool:
while not asyncStdin.at_eof():
l = yield from asyncStdin.readline()
yield from self.loop.run_in_executor(pool, partial(self.processEachFileName, l))
@asyncio.coroutine
def asyncMainPathsFromCLI(self, filesOrDirs: typing.Iterable[typing.Union[Path, str]]):
try:
from tqdm import tqdm
except ImportError:
def tqdm(x):
return x
ctx = ReplaceContext(None)
replaceInEachFileWithContext = partial(self.repl, ctx)
with tqdm(filesOrDirs) as pb:
for fileOrDir in pb:
fileOrDir = Path(fileOrDir).resolve().absolute()
files = tuple(self.fe(fileOrDir))
if files:
with ThreadPoolExecutor(max_workers=cpu_count()) as pool:
for f in files:
if self.trace:
print("Processing", f, file=pb)
yield from self.loop.run_in_executor(pool, partial(replaceInEachFileWithContext, f))
if self.trace:
print("Processed", f, file=pb)
noSkipBinary = cli.Flag(
["--no-skip-binary", "-n"],
help="Don't skip binary files. Allows usage without `binaryornot`",
default=False,
)
noSkipDot = cli.Flag(
["--no-skip-dotfiles", "-d"],
help="Don't skip files and dirs which name stem begins from dot.",
default=False,
)
trace = cli.Flag(
["--trace", "-t"],
help="Print info about processing of regular files",
default=False,
)
noReportSkipped = cli.Flag(
["--no-report-skipped", "-s"],
help="Don't report about skipped files",
default=False,
)
def disallowedReportingCallback(self, fileOrDir: Path, reasonOfDisallowal: str) -> None:
if not self.noReportSkipped:
print("Skipping ", fileOrDir, ":", reasonOfDisallowal)
def main(self, *filesOrDirs):
self._repl = None # type: OurInFileReplacer
self.loop = asyncio.get_event_loop()
self.fc = FileClassifier(self.noSkipDot, self.noSkipBinary)
self.fe = FilesEnumerator(self.fc, self.disallowedReportingCallback)
if len(filesOrDirs) == 1 and filesOrDirs[0] == "0":
t = self.loop.create_task(self.asyncMainPathsFromStdIn())
else:
t = self.loop.create_task(self.asyncMainPathsFromCLI(filesOrDirs))
self.loop.run_until_complete(t)
if __name__ == "__main__":
CLI.run()