Skip to content

Commit 750a4fe

Browse files
committed
feat: New cleanDirectory utility function
1 parent 907dae5 commit 750a4fe

8 files changed

Lines changed: 350 additions & 92 deletions

File tree

src/DIRAC/Core/Utilities/File.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
import errno
1414
import stat
1515
import tempfile
16+
import fnmatch
17+
import time
18+
from collections.abc import Callable
1619
from contextlib import contextmanager
20+
from pathlib import Path
1721

1822
# Translation table of a given unit to Bytes
1923
# I know, it should be kB...
@@ -256,6 +260,77 @@ def convertSizeUnits(size, srcUnit, dstUnit):
256260
return -sys.maxsize
257261

258262

263+
def cleanDirectory(
264+
workDir: str | os.PathLike[str],
265+
maxSecs: int | float | None = None,
266+
filePatterns: list[str] = [],
267+
maxDepth: int = 0,
268+
callbackFn: Callable[[Path], bool] | None = None,
269+
delEmptyDirs: bool = False,
270+
) -> list[str]:
271+
"""Recursively clean files older than a threshold.
272+
273+
Walks ``workDir`` bottom-up and deletes (or invokes ``callbackFn`` on)
274+
regular files that are older than ``maxSecs`` seconds which match the ``filePatterns``
275+
glob. Empty directories can also be removed with ``delEmptyDirs``=``True``.
276+
277+
:param workDir: directory to scan
278+
:param maxSecs: age threshold in seconds (files older are deleted); pass
279+
``None`` to skip the age filter entirely
280+
:param filePatterns: list of globs, only files matching this will be considered
281+
:param maxDepth: maximum directory depth to process (0 = unlimited, 1 = root only)
282+
:param callbackFn: If ``None`` files will be unlinked, otherwise this function
283+
will be called for matchin files instead. Function should take a
284+
single``Path`` object argument. Returning True indicates the
285+
file was processed without error. Returning False will add the
286+
path onto the list of failed files.
287+
:param delEmptyDirs: if ``True``, delete directories that are empty after
288+
file cleanup. All directories are considered, the filePatterns
289+
glob is not used to filter these.
290+
:returns: list of file (and dir) paths that could not be deleted (empty on success)
291+
"""
292+
errFiles = []
293+
timeThresh = time.time() - maxSecs if maxSecs is not None else None
294+
rootPath = Path(workDir)
295+
if not rootPath.exists() or not rootPath.is_dir():
296+
return errFiles
297+
298+
for curRoot, dirs, files in os.walk(workDir, topdown=False):
299+
curPath = Path(curRoot)
300+
depth = len(curPath.relative_to(rootPath).parts)
301+
302+
# Only process files if we're within maxDepth
303+
if not maxDepth or depth < maxDepth:
304+
for fileName in files:
305+
if any(fnmatch.fnmatch(fileName, p) for p in filePatterns):
306+
filePath = curPath / fileName
307+
try:
308+
if not filePath.is_file() or filePath.is_symlink():
309+
continue # Not a regular file
310+
if timeThresh is not None and filePath.stat().st_mtime >= timeThresh:
311+
continue # file is not old enough
312+
if callbackFn:
313+
if not callbackFn(filePath):
314+
errFiles.append(str(filePath))
315+
continue
316+
else:
317+
filePath.unlink()
318+
continue
319+
except OSError:
320+
errFiles.append(str(filePath))
321+
continue
322+
323+
# Now files are processed, see if dir is empty if we're deleting empty dirs
324+
if delEmptyDirs and curPath != rootPath:
325+
try:
326+
if not any(curPath.iterdir()):
327+
curPath.rmdir()
328+
except OSError:
329+
errFiles.append(str(curPath))
330+
331+
return errFiles
332+
333+
259334
@contextmanager
260335
def secureOpenForWrite(filename=None, *, text=True):
261336
"""Securely open a file for writing.

src/DIRAC/Core/Utilities/test/Test_File.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,22 @@
77

88
# imports
99
import os
10+
import time
1011
from os.path import abspath
1112
import re
1213
import sys
14+
from pathlib import Path
1315

1416
from hypothesis import given
1517
from hypothesis.strategies import floats
1618

1719
from pytest import mark
20+
from unittest.mock import MagicMock
1821

1922
# sut
2023
from DIRAC.Core.Utilities.File import (
2124
checkGuid,
25+
cleanDirectory,
2226
makeGuid,
2327
getSize,
2428
getMD5ForFiles,
@@ -134,3 +138,185 @@ def test_convert_loop(nb, srcUnit, dstUnit):
134138
# We exclude the infinity case
135139
if converted != float("Inf"):
136140
assert converted == nb
141+
142+
143+
def _set_old_mtime(path: str, age_seconds: int = 3600) -> None:
144+
"""Set a file's mtime to `age_seconds` ago."""
145+
old_time = time.time() - age_seconds
146+
os.utime(path, (old_time, old_time))
147+
148+
149+
def _build_tree(tmp_path, files, subdirs=None):
150+
"""Create a file tree and return a mapping of name -> pathlib.Path."""
151+
mapping = {}
152+
if subdirs:
153+
for sd in subdirs:
154+
(tmp_path / sd).mkdir(parents=True, exist_ok=True)
155+
for name, content in files.items():
156+
mapping[name] = tmp_path / name
157+
mapping[name].write_text(content)
158+
return mapping
159+
160+
161+
def test_clean_directory_basic(tmp_path):
162+
"""Check old file removed, newer file kept and old-non-matching kept."""
163+
files = _build_tree(
164+
tmp_path,
165+
{"DIRAC_old_job": "abc", "DIRAC_new_job": "def", "other.log": "ghi"},
166+
)
167+
_set_old_mtime(str(files["DIRAC_old_job"]))
168+
_set_old_mtime(str(files["other.log"]))
169+
170+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"])
171+
assert err == []
172+
assert not files["DIRAC_old_job"].exists()
173+
assert files["DIRAC_new_job"].exists()
174+
assert files["other.log"].exists()
175+
176+
177+
def test_clean_directory_empty_dir(tmp_path):
178+
assert cleanDirectory(str(tmp_path)) == []
179+
180+
181+
def test_clean_directory_maxDepth_restricts_recursion(tmp_path):
182+
files = _build_tree(
183+
tmp_path,
184+
{"root": "a", "DIRAC_root_job": "b"},
185+
subdirs=["subdir"],
186+
)
187+
_set_old_mtime(str(files["root"]))
188+
_set_old_mtime(str(files["DIRAC_root_job"]))
189+
deep = tmp_path / "subdir" / "DIRAC_deep"
190+
deep.write_text("c")
191+
_set_old_mtime(str(deep))
192+
193+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"], maxDepth=1)
194+
assert err == []
195+
assert not files["DIRAC_root_job"].exists()
196+
assert (tmp_path / "subdir" / "DIRAC_deep").exists()
197+
198+
199+
def test_clean_directory_recursive_without_maxDepth(tmp_path):
200+
nested = tmp_path / "a" / "b"
201+
nested.mkdir(parents=True)
202+
deep = nested / "DIRAC_nested.log"
203+
deep.write_text("data")
204+
_set_old_mtime(str(deep))
205+
206+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.log"])
207+
assert err == []
208+
assert not deep.exists()
209+
210+
211+
def test_clean_directory_symlinks_not_deleted(tmp_path):
212+
target = tmp_path / "real"
213+
target.write_text("data")
214+
symlink = tmp_path / "DIRAC_link"
215+
symlink.symlink_to(target)
216+
_set_old_mtime(str(symlink))
217+
218+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"])
219+
assert err == []
220+
assert symlink.exists()
221+
assert target.exists()
222+
223+
224+
def test_clean_directory_multiple_patterns(tmp_path):
225+
files = _build_tree(
226+
tmp_path,
227+
{"a.out": "x", "b.err": "y", "c.txt": "z"},
228+
)
229+
for f in files.values():
230+
_set_old_mtime(str(f))
231+
232+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.out", "*.err"])
233+
assert err == []
234+
assert not files["a.out"].exists()
235+
assert not files["b.err"].exists()
236+
assert files["c.txt"].exists()
237+
238+
239+
def test_clean_directory_returns_errors_on_unable_delete(mocker, tmp_path):
240+
"""When unlink raises OSError, errors are collected."""
241+
old_file = tmp_path / "DIRAC_blocked"
242+
old_file.write_text("data")
243+
_set_old_mtime(str(old_file))
244+
245+
mocker.patch("pathlib.Path.unlink", side_effect=PermissionError("Denied"))
246+
247+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"])
248+
assert len(err) == 1
249+
assert "DIRAC_blocked" in err[0]
250+
251+
252+
def test_clean_directory_empty_dirs_removed(tmp_path):
253+
"""With delEmptyDirs=True, empty directories are removed after file cleanup."""
254+
subdir = tmp_path / "subdir" / "nested"
255+
subdir.mkdir(parents=True)
256+
log_file = subdir / "old.log"
257+
log_file.write_text("data")
258+
_set_old_mtime(str(log_file))
259+
260+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.log"], delEmptyDirs=True)
261+
assert err == []
262+
assert not subdir.exists()
263+
264+
265+
def test_clean_directory_empty_dirs_kept_by_default(tmp_path):
266+
"""Without delEmptyDirs, empty directories remain after file cleanup."""
267+
subdir = tmp_path / "subdir"
268+
subdir.mkdir(parents=True)
269+
log_file = subdir / "old.log"
270+
log_file.write_text("data")
271+
_set_old_mtime(str(log_file))
272+
273+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.log"])
274+
assert err == []
275+
assert subdir.exists()
276+
277+
278+
def test_clean_directory_callback_fn(mocker, tmp_path):
279+
"""When callbackFn is provided, it replaces unlink."""
280+
callback = mocker.Mock(side_effect=lambda p: (p.unlink(), True)[1])
281+
282+
old_file = tmp_path / "DIRAC_test"
283+
old_file.write_text("data")
284+
_set_old_mtime(str(old_file))
285+
286+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"], callbackFn=callback)
287+
assert err == []
288+
assert not old_file.exists()
289+
callback.assert_called_once()
290+
291+
292+
def test_clean_directory_callback_false_recorded_as_error(tmp_path):
293+
"""Callback returning False records the file as an error; file is left untouched."""
294+
295+
def skip_all(_path):
296+
return False
297+
298+
old_file = tmp_path / "DIRAC_test"
299+
old_file.write_text("data")
300+
_set_old_mtime(str(old_file))
301+
302+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"], callbackFn=skip_all)
303+
assert len(err) == 1
304+
assert "DIRAC_test" in err[0]
305+
assert old_file.exists()
306+
307+
308+
def test_clean_directory_callback_oserror_recorded(mocker, tmp_path):
309+
"""Callback raising OSError is treated as a deletion error."""
310+
mocker.patch("pathlib.Path.unlink", side_effect=PermissionError("Denied"))
311+
312+
old_file = tmp_path / "DIRAC_test"
313+
old_file.write_text("data")
314+
_set_old_mtime(str(old_file))
315+
316+
err = cleanDirectory(
317+
str(tmp_path),
318+
maxSecs=60,
319+
filePatterns=["DIRAC_*"],
320+
callbackFn=lambda p: (p.unlink(), True)[1],
321+
)
322+
assert len(err) == 1

src/DIRAC/FrameworkSystem/private/SecurityFileLog.py

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import os
2-
import re
32
import time
43
import gzip
54
import queue
65
import shutil
76
import threading
87
from DIRAC import gLogger, S_OK, S_ERROR
98
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
10-
from DIRAC.Core.Utilities.File import mkDir
9+
from DIRAC.Core.Utilities.File import cleanDirectory, mkDir
1110

1211

1312
class SecurityFileLog(threading.Thread):
@@ -52,24 +51,27 @@ def run(self):
5251
fd.close()
5352

5453
def __launchCleaningOldLogFiles(self):
55-
nowEpoch = time.time()
56-
self.__walkOldLogs(self.__basePath, nowEpoch, re.compile(r"^\d*\.security\.log\.csv$"), 86400, self.__zipOldLog)
57-
self.__walkOldLogs(
58-
self.__basePath,
59-
nowEpoch,
60-
re.compile(r"^\d*\.security\.log\.csv\.gz$"),
61-
self.__secsToLog,
62-
self.__unlinkOldLog,
54+
self._cleanupLogs(self.__basePath, 86400, self.__zipOldLog, "*.security.log.csv")
55+
self._cleanupLogs(self.__basePath, self.__secsToLog, self.__unlinkOldLog, "*.security.log.csv.gz")
56+
57+
def _cleanupLogs(self, basePath, maxSecs, functor, pattern):
58+
"""Clean old logs matching a pattern, optionally zipping first."""
59+
60+
errFiles = cleanDirectory(
61+
basePath, maxSecs=maxSecs, filePatterns=[pattern], maxDepth=0, callbackFn=functor, delEmptyDirs=True
6362
)
63+
if errFiles:
64+
for fp in errFiles:
65+
gLogger.error("Failed to clean security log", fp)
6466

6567
def __unlinkOldLog(self, filePath):
6668
try:
6769
gLogger.info(f"Unlinking file {filePath}")
6870
os.unlink(filePath)
6971
except Exception as e:
7072
gLogger.error("Can't unlink old log file", f"{filePath}: {str(e)}")
71-
return 1
72-
return 0
73+
return False
74+
return True
7375

7476
def __zipOldLog(self, filePath):
7577
try:
@@ -79,30 +81,8 @@ def __zipOldLog(self, filePath):
7981
shutil.copyfileobj(f_in, f_out)
8082
except Exception:
8183
gLogger.exception("Can't compress old log file", filePath)
82-
return 1
83-
return self.__unlinkOldLog(filePath) + 1
84-
85-
def __walkOldLogs(self, path, nowEpoch, reLog, executionInSecs, functor):
86-
initialEntries = os.listdir(path)
87-
numEntries = 0
88-
for entry in initialEntries:
89-
entryPath = os.path.join(path, entry)
90-
if os.path.isdir(entryPath):
91-
numEntries += 1
92-
numEntriesSubDir = self.__walkOldLogs(entryPath, nowEpoch, reLog, executionInSecs, functor)
93-
if numEntriesSubDir == 0:
94-
gLogger.info(f"Removing dir {entryPath}")
95-
try:
96-
os.rmdir(entryPath)
97-
numEntries -= 1
98-
except Exception as e:
99-
gLogger.error("Can't delete directory", f"{entryPath}: {str(e)}")
100-
elif os.path.isfile(entryPath):
101-
numEntries += 1
102-
if reLog.match(entry):
103-
if nowEpoch - os.stat(entryPath)[8] > executionInSecs:
104-
numEntries += functor(entryPath) - 1
105-
return numEntries
84+
return False
85+
return self.__unlinkOldLog(filePath)
10686

10787
def logAction(self, msg):
10888
if len(msg) != len(self.__requiredFields):

0 commit comments

Comments
 (0)