-
Notifications
You must be signed in to change notification settings - Fork 188
Expand file tree
/
Copy pathFile.py
More file actions
executable file
·357 lines (297 loc) · 11.5 KB
/
Copy pathFile.py
File metadata and controls
executable file
·357 lines (297 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""Collection of DIRAC useful file related modules.
.. warning::
By default on Error they return None.
"""
import os
import hashlib
import random
import glob
import sys
import re
import errno
import stat
import tempfile
import fnmatch
import time
from collections.abc import Callable
from contextlib import contextmanager
from pathlib import Path
# Translation table of a given unit to Bytes
# I know, it should be kB...
SIZE_UNIT_CONVERSION = {
"B": 1,
"KB": 1024,
"MB": 1024 * 1024,
"GB": 1024 * 1024 * 1024,
"TB": 1024 * 1024 * 1024 * 1024,
"PB": 1024 * 1024 * 1024 * 1024 * 1024,
}
def mkDir(path, mode=None):
"""Emulate 'mkdir -p path' (if path exists already, don't raise an exception)
:param str path: directory hierarchy to create
:param int mode: Use this mode as the mode for new directories, use python default if None.
"""
try:
if os.path.isdir(path):
return
if mode is None:
os.makedirs(path)
else:
os.makedirs(path, mode)
except OSError as osError:
if osError.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def mkLink(src, dst):
"""Protected creation of symbolic link"""
try:
os.symlink(src, dst)
except OSError as osError:
if osError.errno == errno.EEXIST and os.path.islink(dst) and os.path.realpath(dst) == src:
pass
else:
raise
def makeGuid(fileName=None):
"""Utility to create GUID. If a filename is provided the
GUID will correspond to its content's hexadecimal md5 checksum.
Otherwise a random seed is used to create the GUID.
The format is capitalized 8-4-4-4-12.
.. warning::
Could return None in case of OSError or IOError.
:param string fileName: name of file
"""
myMd5 = hashlib.md5(usedforsecurity=False)
if fileName:
try:
with open(fileName, "rb") as fd:
data = fd.read(10 * 1024 * 1024)
myMd5.update(data)
except Exception:
return None
else:
myMd5.update(str(random.getrandbits(128)).encode()) # nosec B311
md5HexString = myMd5.hexdigest().upper()
return generateGuid(md5HexString, "MD5")
def generateGuid(checksum, checksumtype):
"""Generate a GUID based on the file checksum"""
if checksum:
if checksumtype == "MD5":
checksumString = checksum
elif checksumtype == "Adler32":
checksumString = str(checksum).zfill(32)
else:
checksumString = ""
if checksumString:
guid = "{}-{}-{}-{}-{}".format(
checksumString[0:8],
checksumString[8:12],
checksumString[12:16],
checksumString[16:20],
checksumString[20:32],
)
guid = guid.upper()
return guid
# Failed to use the check sum, generate a new guid
myMd5 = hashlib.md5(usedforsecurity=False)
myMd5.update(str(random.getrandbits(128)).encode()) # nosec B311
md5HexString = myMd5.hexdigest()
guid = "{}-{}-{}-{}-{}".format(
md5HexString[0:8],
md5HexString[8:12],
md5HexString[12:16],
md5HexString[16:20],
md5HexString[20:32],
)
guid = guid.upper()
return guid
def checkGuid(guid):
"""Checks whether a supplied GUID is of the correct format.
The guid is a string of 36 characters [0-9A-F] long split into 5 parts of length 8-4-4-4-12.
.. warning::
As we are using GUID produced by various services and some of them could not follow
convention, this function is passing by a guid which can be made of lower case chars or even just
have 5 parts of proper length with whatever chars.
:param string guid: string to be checked
:return: True (False) if supplied string is (not) a valid GUID.
"""
reGUID = re.compile("^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$")
if reGUID.match(guid.upper()):
return True
else:
guid = [len(x) for x in guid.split("-")]
if guid == [8, 4, 4, 4, 12]:
return True
return False
def getSize(fileName: os.PathLike) -> int:
"""Get size of a file.
:param string fileName: name of file to be checked
The os module claims only OSError can be thrown,
but just for curiosity it's catching all possible exceptions.
.. warning::
On any exception it returns -1.
"""
try:
return os.stat(fileName)[6]
except OSError:
return -1
def getGlobbedTotalSize(files):
"""Get total size of a list of files or a single file.
Globs the parameter to allow regular expressions.
:params list files: list or tuple of strings of files
"""
totalSize = 0
if isinstance(files, (list, tuple)):
for entry in files:
size = getGlobbedTotalSize(entry)
if size == -1:
size = 0
totalSize += size
else:
for path in glob.glob(files):
if os.path.isdir(path) and not os.path.islink(path):
for content in os.listdir(path):
totalSize += getGlobbedTotalSize(os.path.join(path, content))
if os.path.isfile(path):
size = getSize(path)
if size == -1:
size = 0
totalSize += size
return totalSize
def getGlobbedFiles(files):
"""Get list of files or a single file.
Globs the parameter to allow regular expressions.
:params list files: list or tuple of strings of files
"""
globbedFiles = []
if isinstance(files, (list, tuple)):
for entry in files:
globbedFiles += getGlobbedFiles(entry)
else:
for path in glob.glob(files):
if os.path.isdir(path) and not os.path.islink(path):
for content in os.listdir(path):
globbedFiles += getGlobbedFiles(os.path.join(path, content))
if os.path.isfile(path):
globbedFiles.append(path)
return globbedFiles
def getMD5ForFiles(fileList):
"""Calculate md5 for the content of all the files.
:param fileList: list of paths
:type fileList: python:list
"""
fileList.sort()
hashMD5 = hashlib.md5(usedforsecurity=False)
for filePath in fileList:
if os.path.isdir(filePath):
continue
with open(filePath, "rb") as fd:
buf = fd.read(4096)
while buf:
hashMD5.update(buf)
buf = fd.read(4096)
return hashMD5.hexdigest()
def convertSizeUnits(size, srcUnit, dstUnit):
"""Converts a number from a given source unit to a destination unit.
Example:
In [1]: convertSizeUnits(1024, 'B', 'kB')
Out[1]: 1
In [2]: convertSizeUnits(1024, 'MB', 'kB')
Out[2]: 1048576
:param size: number to convert
:param srcUnit: unit of the number. Any of ( 'B', 'kB', 'MB', 'GB', 'TB', 'PB')
:param dstUnit: unit expected for the return. Any of ( 'B', 'kB', 'MB', 'GB', 'TB', 'PB')
:returns: the size number converted in the dstUnit. In case of problem -sys.maxint is returned (negative)
"""
srcUnit = srcUnit.upper()
dstUnit = dstUnit.upper()
try:
convertedValue = float(size) * SIZE_UNIT_CONVERSION[srcUnit] / SIZE_UNIT_CONVERSION[dstUnit]
return convertedValue
# TypeError, ValueError: size is not a number
# KeyError: srcUnit or dstUnit are not in the conversion list
except (TypeError, ValueError, KeyError):
return -sys.maxsize
def cleanDirectory(
workDir: str | os.PathLike[str],
maxSecs: int | float | None = None,
filePatterns: list[str] = [],
maxDepth: int = 0,
callbackFn: Callable[[Path], bool] | None = None,
delEmptyDirs: bool = False,
) -> list[str]:
"""Recursively clean files older than a threshold.
Walks ``workDir`` bottom-up and deletes (or invokes ``callbackFn`` on)
regular files that are older than ``maxSecs`` seconds which match the ``filePatterns``
glob. Empty directories can also be removed with ``delEmptyDirs``=``True``.
:param workDir: directory to scan
:param maxSecs: age threshold in seconds (files older are deleted); pass
``None`` to skip the age filter entirely
:param filePatterns: list of globs, only files matching this will be considered
:param maxDepth: maximum directory depth to process (0 = unlimited, 1 = root only)
:param callbackFn: If ``None`` files will be unlinked, otherwise this function
will be called for matchin files instead. Function should take a
single``Path`` object argument. Returning True indicates the
file was processed without error. Returning False will add the
path onto the list of failed files.
:param delEmptyDirs: if ``True``, delete directories that are empty after
file cleanup. All directories are considered, the filePatterns
glob is not used to filter these.
:returns: list of file (and dir) paths that could not be deleted (empty on success)
"""
errFiles = []
timeThresh = time.time() - maxSecs if maxSecs is not None else None
rootPath = Path(workDir)
if not rootPath.exists() or not rootPath.is_dir():
return errFiles
for curRoot, dirs, files in os.walk(workDir, topdown=False):
curPath = Path(curRoot)
depth = len(curPath.relative_to(rootPath).parts)
# Only process files if we're within maxDepth
if not maxDepth or depth < maxDepth:
for fileName in files:
if any(fnmatch.fnmatch(fileName, p) for p in filePatterns):
filePath = curPath / fileName
try:
if not filePath.is_file() or filePath.is_symlink():
continue # Not a regular file
if timeThresh is not None and filePath.stat().st_mtime >= timeThresh:
continue # file is not old enough
if callbackFn:
if not callbackFn(filePath):
errFiles.append(str(filePath))
continue
else:
filePath.unlink()
continue
except OSError:
errFiles.append(str(filePath))
continue
# Now files are processed, see if dir is empty if we're deleting empty dirs
if delEmptyDirs and curPath != rootPath:
try:
if not any(curPath.iterdir()):
curPath.rmdir()
except OSError:
errFiles.append(str(curPath))
return errFiles
@contextmanager
def secureOpenForWrite(filename=None, *, text=True):
"""Securely open a file for writing.
If filename is not provided, a file is created in tempfile.gettempdir().
The file always created with mode 600.
:param string filename: name of file to be opened
"""
if filename:
fd = os.open(
path=filename,
flags=os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
mode=stat.S_IRUSR | stat.S_IWUSR,
)
else:
fd, filename = tempfile.mkstemp(text=text)
with open(fd, "w" if text else "wb", encoding="utf-8" if text else None) as fd:
yield fd, filename
if __name__ == "__main__":
for p in sys.argv[1:]:
print(f"{p} : {getGlobbedTotalSize(p)} bytes")