Skip to content

Commit f446df7

Browse files
committed
feat: New cleanDirectory utility function
1 parent 91f5d3e commit f446df7

7 files changed

Lines changed: 338 additions & 91 deletions

File tree

src/DIRAC/Core/Utilities/File.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import errno
1414
import stat
1515
import tempfile
16+
import fnmatch
17+
import time
1618
from contextlib import contextmanager
19+
from pathlib import Path
1720

1821
# Translation table of a given unit to Bytes
1922
# I know, it should be kB...
@@ -256,6 +259,68 @@ def convertSizeUnits(size, srcUnit, dstUnit):
256259
return -sys.maxsize
257260

258261

262+
def cleanDirectory(workDir, maxSecs=None, filePatterns=[], maxDepth=0, callbackFn=None, delEmptyDirs=False):
263+
"""Recursively clean files older than a threshold.
264+
265+
Walks ``workDir`` bottom-up and deletes (or invokes ``callbackFn`` on)
266+
regular files that are older than ``maxSecs`` seconds which match the ``filePatterns``
267+
glob. Empty directories can also be removed with ``delEmptyDirs``=``True``.
268+
269+
:param workDir: directory to scan
270+
:param maxSecs: age threshold in seconds (files older are deleted); pass
271+
``None`` to skip the age filter entirely
272+
:param filePatterns: list of globs, only files matching this will be considered
273+
:param maxDepth: maximum directory depth to process (0 = unlimited, 1 = root only)
274+
:param callbackFn: If ``None`` files will be unlinked, otherwise this function
275+
will be called for matchin files instead. Function should take a
276+
single``Path`` object argument. Returning True indicates the
277+
file was processed without error. Returning False will add the
278+
path onto the list of failed files.
279+
:param delEmptyDirs: if ``True``, delete directories that are empty after
280+
file cleanup. All directories are considered, the filePatterns
281+
glob is not used to filter these.
282+
:returns: list of file (and dir) paths that could not be deleted (empty on success)
283+
"""
284+
errFiles = []
285+
timeThresh = time.time() - maxSecs if maxSecs is not None else None
286+
rootPath = Path(workDir)
287+
288+
for curRoot, dirs, files in os.walk(workDir, topdown=False):
289+
curPath = Path(curRoot)
290+
depth = len(curPath.relative_to(rootPath).parts)
291+
292+
# Only process files if we're within maxDepth
293+
if not maxDepth or depth < maxDepth:
294+
for fileName in files:
295+
if any(fnmatch.fnmatch(fileName, p) for p in filePatterns):
296+
filePath = curPath / fileName
297+
try:
298+
if not filePath.is_file() or filePath.is_symlink():
299+
continue # Not a regular file
300+
if timeThresh is not None and filePath.stat().st_mtime >= timeThresh:
301+
continue # file is not old enough
302+
if callbackFn:
303+
if not callbackFn(filePath):
304+
errFiles.append(str(filePath))
305+
continue
306+
else:
307+
filePath.unlink()
308+
continue
309+
except OSError:
310+
errFiles.append(str(filePath))
311+
continue
312+
313+
# Now files are processed, see if dir is empty if we're deleting empty dirs
314+
if delEmptyDirs and curPath != rootPath:
315+
try:
316+
if not any(curPath.iterdir()):
317+
curPath.rmdir()
318+
except OSError:
319+
errFiles.append(str(curPath))
320+
321+
return errFiles
322+
323+
259324
@contextmanager
260325
def secureOpenForWrite(filename=None, *, text=True):
261326
"""Securely open a file for writing.

src/DIRAC/Core/Utilities/test/Test_File.py

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,22 @@
77

88
# imports
99
import os
10+
import time
1011
from os.path import abspath
1112
import re
1213
import sys
14+
from pathlib import Path
1315

1416
from hypothesis import given
1517
from hypothesis.strategies import floats
1618

1719
from pytest import mark
20+
from unittest.mock import MagicMock
1821

1922
# sut
2023
from DIRAC.Core.Utilities.File import (
2124
checkGuid,
25+
cleanDirectory,
2226
makeGuid,
2327
getSize,
2428
getMD5ForFiles,
@@ -134,3 +138,182 @@ def test_convert_loop(nb, srcUnit, dstUnit):
134138
# We exclude the infinity case
135139
if converted != float("Inf"):
136140
assert converted == nb
141+
142+
143+
def _set_old_mtime(path: str, age_seconds: int = 3600) -> None:
144+
"""Set a file's mtime to `age_seconds` ago."""
145+
old_time = time.time() - age_seconds
146+
os.utime(path, (old_time, old_time))
147+
148+
149+
def _build_tree(tmp_path, files, subdirs=None):
150+
"""Create a file tree and return a mapping of name -> pathlib.Path."""
151+
mapping = {}
152+
if subdirs:
153+
for sd in subdirs:
154+
(tmp_path / sd).mkdir(parents=True, exist_ok=True)
155+
for name, content in files.items():
156+
mapping[name] = tmp_path / name
157+
mapping[name].write_text(content)
158+
return mapping
159+
160+
161+
def test_clean_directory_basic(tmp_path):
162+
"""Check old file removed, newer file kept and old-non-matching kept."""
163+
files = _build_tree(
164+
tmp_path,
165+
{"DIRAC_old_job": "abc", "DIRAC_new_job": "def", "other.log": "ghi"},
166+
)
167+
_set_old_mtime(str(files["DIRAC_old_job"]))
168+
_set_old_mtime(str(files["other.log"]))
169+
170+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"])
171+
assert err == []
172+
assert not files["DIRAC_old_job"].exists()
173+
assert files["DIRAC_new_job"].exists()
174+
assert files["other.log"].exists()
175+
176+
177+
def test_clean_directory_empty_dir(tmp_path):
178+
assert cleanDirectory(str(tmp_path)) == []
179+
180+
181+
def test_clean_directory_maxDepth_restricts_recursion(tmp_path):
182+
files = _build_tree(
183+
tmp_path,
184+
{"root": "a", "DIRAC_root_job": "b"},
185+
subdirs=["subdir"],
186+
)
187+
_set_old_mtime(str(files["root"]))
188+
_set_old_mtime(str(files["DIRAC_root_job"]))
189+
deep = tmp_path / "subdir" / "DIRAC_deep"
190+
deep.write_text("c")
191+
_set_old_mtime(str(deep))
192+
193+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"], maxDepth=1)
194+
assert err == []
195+
assert not files["DIRAC_root_job"].exists()
196+
assert (tmp_path / "subdir" / "DIRAC_deep").exists()
197+
198+
199+
def test_clean_directory_recursive_without_maxDepth(tmp_path):
200+
nested = tmp_path / "a" / "b"
201+
nested.mkdir(parents=True)
202+
deep = nested / "DIRAC_nested.log"
203+
deep.write_text("data")
204+
_set_old_mtime(str(deep))
205+
206+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.log"])
207+
assert err == []
208+
assert not deep.exists()
209+
210+
211+
def test_clean_directory_symlinks_not_deleted(tmp_path):
212+
target = tmp_path / "real"
213+
target.write_text("data")
214+
symlink = tmp_path / "DIRAC_link"
215+
symlink.symlink_to(target)
216+
_set_old_mtime(str(symlink))
217+
218+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"])
219+
assert err == []
220+
assert symlink.exists()
221+
assert target.exists()
222+
223+
224+
def test_clean_directory_multiple_patterns(tmp_path):
225+
files = _build_tree(
226+
tmp_path,
227+
{"a.out": "x", "b.err": "y", "c.txt": "z"},
228+
)
229+
for f in files.values():
230+
_set_old_mtime(str(f))
231+
232+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.out", "*.err"])
233+
assert err == []
234+
assert not files["a.out"].exists()
235+
assert not files["b.err"].exists()
236+
assert files["c.txt"].exists()
237+
238+
239+
def test_clean_directory_returns_errors_on_unable_delete(mocker, tmp_path):
240+
"""When unlink raises OSError, errors are collected."""
241+
old_file = tmp_path / "DIRAC_blocked"
242+
old_file.write_text("data")
243+
_set_old_mtime(str(old_file))
244+
245+
mocker.patch("pathlib.Path.unlink", side_effect=PermissionError("Denied"))
246+
247+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"])
248+
assert len(err) == 1
249+
assert "DIRAC_blocked" in err[0]
250+
251+
252+
def test_clean_directory_empty_dirs_removed(tmp_path):
253+
"""With delEmptyDirs=True, empty directories are removed after file cleanup."""
254+
subdir = tmp_path / "subdir" / "nested"
255+
subdir.mkdir(parents=True)
256+
log_file = subdir / "old.log"
257+
log_file.write_text("data")
258+
_set_old_mtime(str(log_file))
259+
260+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.log"], delEmptyDirs=True)
261+
assert err == []
262+
assert not subdir.exists()
263+
264+
265+
def test_clean_directory_empty_dirs_kept_by_default(tmp_path):
266+
"""Without delEmptyDirs, empty directories remain after file cleanup."""
267+
subdir = tmp_path / "subdir"
268+
subdir.mkdir(parents=True)
269+
log_file = subdir / "old.log"
270+
log_file.write_text("data")
271+
_set_old_mtime(str(log_file))
272+
273+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["*.log"])
274+
assert err == []
275+
assert subdir.exists()
276+
277+
278+
def test_clean_directory_callback_fn(mocker, tmp_path):
279+
"""When callbackFn is provided, it replaces unlink."""
280+
callback = mocker.Mock(side_effect=lambda p: (p.unlink(), True)[1])
281+
282+
old_file = tmp_path / "DIRAC_test"
283+
old_file.write_text("data")
284+
_set_old_mtime(str(old_file))
285+
286+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"], callbackFn=callback)
287+
assert err == []
288+
assert not old_file.exists()
289+
callback.assert_called_once()
290+
291+
292+
def test_clean_directory_callback_false_recorded_as_error(tmp_path):
293+
"""Callback returning False records the file as an error; file is left untouched."""
294+
def skip_all(_path):
295+
return False
296+
297+
old_file = tmp_path / "DIRAC_test"
298+
old_file.write_text("data")
299+
_set_old_mtime(str(old_file))
300+
301+
err = cleanDirectory(str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"], callbackFn=skip_all)
302+
assert len(err) == 1
303+
assert "DIRAC_test" in err[0]
304+
assert old_file.exists()
305+
306+
307+
def test_clean_directory_callback_oserror_recorded(mocker, tmp_path):
308+
"""Callback raising OSError is treated as a deletion error."""
309+
mocker.patch("pathlib.Path.unlink", side_effect=PermissionError("Denied"))
310+
311+
old_file = tmp_path / "DIRAC_test"
312+
old_file.write_text("data")
313+
_set_old_mtime(str(old_file))
314+
315+
err = cleanDirectory(
316+
str(tmp_path), maxSecs=60, filePatterns=["DIRAC_*"],
317+
callbackFn=lambda p: (p.unlink(), True)[1],
318+
)
319+
assert len(err) == 1

src/DIRAC/FrameworkSystem/private/SecurityFileLog.py

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import os
2-
import re
32
import time
43
import gzip
54
import queue
65
import shutil
76
import threading
87
from DIRAC import gLogger, S_OK, S_ERROR
98
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
10-
from DIRAC.Core.Utilities.File import mkDir
9+
from DIRAC.Core.Utilities.File import cleanDirectory, mkDir
1110

1211

1312
class SecurityFileLog(threading.Thread):
@@ -52,24 +51,31 @@ def run(self):
5251
fd.close()
5352

5453
def __launchCleaningOldLogFiles(self):
55-
nowEpoch = time.time()
56-
self.__walkOldLogs(self.__basePath, nowEpoch, re.compile(r"^\d*\.security\.log\.csv$"), 86400, self.__zipOldLog)
57-
self.__walkOldLogs(
58-
self.__basePath,
59-
nowEpoch,
60-
re.compile(r"^\d*\.security\.log\.csv\.gz$"),
61-
self.__secsToLog,
62-
self.__unlinkOldLog,
54+
self._cleanupLogs(
55+
self.__basePath, 86400, self.__zipOldLog, "*.security.log.csv"
6356
)
57+
self._cleanupLogs(
58+
self.__basePath, self.__secsToLog, self.__unlinkOldLog, "*.security.log.csv.gz"
59+
)
60+
61+
def _cleanupLogs(self, basePath, maxSecs, functor, pattern):
62+
"""Clean old logs matching a pattern, optionally zipping first."""
63+
64+
errFiles = cleanDirectory(
65+
basePath, maxSecs=maxSecs, filePatterns=[pattern], maxDepth=0, callbackFn=functor, delEmptyDirs=True
66+
)
67+
if errFiles:
68+
for fp in errFiles:
69+
gLogger.error("Failed to clean security log", fp)
6470

6571
def __unlinkOldLog(self, filePath):
6672
try:
6773
gLogger.info(f"Unlinking file {filePath}")
6874
os.unlink(filePath)
6975
except Exception as e:
7076
gLogger.error("Can't unlink old log file", f"{filePath}: {str(e)}")
71-
return 1
72-
return 0
77+
return False
78+
return True
7379

7480
def __zipOldLog(self, filePath):
7581
try:
@@ -79,30 +85,8 @@ def __zipOldLog(self, filePath):
7985
shutil.copyfileobj(f_in, f_out)
8086
except Exception:
8187
gLogger.exception("Can't compress old log file", filePath)
82-
return 1
83-
return self.__unlinkOldLog(filePath) + 1
84-
85-
def __walkOldLogs(self, path, nowEpoch, reLog, executionInSecs, functor):
86-
initialEntries = os.listdir(path)
87-
numEntries = 0
88-
for entry in initialEntries:
89-
entryPath = os.path.join(path, entry)
90-
if os.path.isdir(entryPath):
91-
numEntries += 1
92-
numEntriesSubDir = self.__walkOldLogs(entryPath, nowEpoch, reLog, executionInSecs, functor)
93-
if numEntriesSubDir == 0:
94-
gLogger.info(f"Removing dir {entryPath}")
95-
try:
96-
os.rmdir(entryPath)
97-
numEntries -= 1
98-
except Exception as e:
99-
gLogger.error("Can't delete directory", f"{entryPath}: {str(e)}")
100-
elif os.path.isfile(entryPath):
101-
numEntries += 1
102-
if reLog.match(entry):
103-
if nowEpoch - os.stat(entryPath)[8] > executionInSecs:
104-
numEntries += functor(entryPath) - 1
105-
return numEntries
88+
return False
89+
return self.__unlinkOldLog(filePath)
10690

10791
def logAction(self, msg):
10892
if len(msg) != len(self.__requiredFields):

0 commit comments

Comments
 (0)