Skip to content

Commit 7689a91

Browse files
authored
Repo cleaner improvements (#1128)
* make sure we don't fail silently if we cannot load a module * Add a parameter for production to say whether to delete the objects without runs or not * fix the logger in the rules * logger in ccdb is set externally * Doc for accessing qcdb * fix * leftover
1 parent c1c7ad5 commit 7689a91

11 files changed

Lines changed: 147 additions & 91 deletions

File tree

Framework/script/RepoCleaner/Ccdb.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import requests
88

99

10+
logger = logging # default logger
11+
12+
1013
class ObjectVersion:
1114
'''
1215
A version of an object in the CCDB.
@@ -50,7 +53,7 @@ class Ccdb:
5053
counter_preserved: int = 0
5154

5255
def __init__(self, url):
53-
logging.info(f"Instantiate CCDB at {url}")
56+
logger.info(f"Instantiate CCDB at {url}")
5457
self.url = url
5558

5659
def getObjectsList(self, added_since: int = 0) -> List[str]:
@@ -60,16 +63,16 @@ def getObjectsList(self, added_since: int = 0) -> List[str]:
6063
:param added_since: if specified, only return objects added since this timestamp in epoch milliseconds.
6164
:return A list of strings, each containing a path to an object in the CCDB.
6265
'''
63-
logging.debug(f"added_since : {added_since}")
66+
logger.debug(f"added_since : {added_since}")
6467
url_for_all_obj = self.url + '/latest/.*'
65-
logging.debug(f"Ccdb::getObjectsList -> {url_for_all_obj}")
68+
logger.debug(f"Ccdb::getObjectsList -> {url_for_all_obj}")
6669
headers = {'Accept':'application/json', 'If-Not-Before':str(added_since)}
6770
r = requests.get(url_for_all_obj, headers=headers)
6871
r.raise_for_status()
6972
try:
7073
json = r.json()
7174
except JSONDecodeError as err:
72-
logging.error(f"JSON decode error: {err}")
75+
logger.error(f"JSON decode error: {err}")
7376
raise
7477
paths = []
7578
for item in json['objects']:
@@ -83,7 +86,7 @@ def getVersionsList(self, object_path: str) -> List[ObjectVersion]:
8386
:return A list of ObjectVersion.
8487
'''
8588
url_browse_all_versions = self.url + '/browse/' + object_path
86-
logging.debug(f"Ccdb::getVersionsList -> {url_browse_all_versions}")
89+
logger.debug(f"Ccdb::getVersionsList -> {url_browse_all_versions}")
8790
headers = {'Accept':'application/json', 'Connection': 'close'}
8891
r = requests.get(url_browse_all_versions, headers=headers)
8992
r.raise_for_status()
@@ -106,7 +109,7 @@ def deleteVersion(self, version: ObjectVersion):
106109
:param version: The version of the object to delete, as an instance of ObjectVersion.
107110
'''
108111
url_delete = self.url + '/' + version.path + '/' + str(version.validFrom) + '/' + version.uuid
109-
logging.debug(f"Delete version at url {url_delete}")
112+
logger.debug(f"Delete version at url {url_delete}")
110113
headers = {'Connection': 'close'}
111114
try:
112115
r = requests.delete(url_delete, headers=headers)
@@ -126,9 +129,9 @@ def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int,
126129
:param metadata: Add or modify metadata
127130
'''
128131
full_path = self.url + '/' + version.path + '/' + str(valid_from) + '/' + str(valid_to) + '/' + str(version.uuid) + '?'
129-
logging.debug(f"Update end limit validity of {version.path} ({version.uuid}) from {version.validTo} to {valid_to}")
132+
logger.debug(f"Update end limit validity of {version.path} ({version.uuid}) from {version.validTo} to {valid_to}")
130133
if metadata is not None:
131-
logging.debug(f"{metadata}")
134+
logger.debug(f"{metadata}")
132135
for key in metadata:
133136
full_path += key + "=" + metadata[key] + "&"
134137
try:
@@ -140,7 +143,7 @@ def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int,
140143
print(e)
141144
sys.exit(1) # really ?
142145

143-
def putVersion(self, version: ObjectVersion, data):
146+
def putVersion(self, version: ObjectVersion, data):
144147
'''
145148
:param version: An ObjectVersion that describes the data to be uploaded.
146149
:param data: the actual data to send. E.g.:{'somekey': 'somevalue'}
@@ -150,17 +153,17 @@ def putVersion(self, version: ObjectVersion, data):
150153
if version.metadata is not None:
151154
for key in version.metadata:
152155
full_path += key + "=" + version.metadata[key] + "/"
153-
logging.debug(f"fullpath: {full_path}")
156+
logger.debug(f"fullpath: {full_path}")
154157
headers = {'Connection': 'close'}
155158
r = requests.post(full_path, files=data, headers=headers)
156159
if r.ok:
157-
logging.debug(f"Version pushed to {version.path}")
160+
logger.debug(f"Version pushed to {version.path}")
158161
else:
159-
logging.error(f"Could not post a new version of {version.path}: {r.text}")
162+
logger.error(f"Could not post a new version of {version.path}: {r.text}")
160163

161164
def main():
162-
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
163-
logging.getLogger().setLevel(int(10))
165+
logger.basicConfig(level=logger.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
166+
logger.getLogger().setLevel(int(10))
164167

165168
ccdb = Ccdb('http://ccdb-test.cern.ch:8080')
166169

Framework/script/RepoCleaner/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,9 @@ Most of the classes and Rules have a main to help test them. To run do e.g. `pyt
4242

4343
## Installation
4444
CMake will install the python scripts in bin and the config file in etc.
45+
46+
## Example
47+
48+
```
49+
PYTHONPATH=./rules:$PYTHONPATH ./o2-qc-repo-cleaner --dry-run --config config-test.yaml --dry-run --only-path qc/DAQ --log-level 10
50+
```

Framework/script/RepoCleaner/config-test.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ Rules:
55
- object_path: QcTask/example
66
delay: 120
77
policy: 1_per_hour
8-
8+
- object_path: qc/TST/MO/repo/test/.*
9+
delay: 60
10+
policy: production
11+
delete_when_no_run: True
12+
913
Ccdb:
1014
Url: http://ccdb-test.cern.ch:8080

Framework/script/RepoCleaner/o2-qc-repo-cleaner

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
#
1818
# Usage
1919
# # run with debug logs and don't actually touch the database
20-
# ./repoCleaner --dry-run --log-level 10
20+
# PYTHONPATH=./rules:$PYTHONPATH./o2-qc-repo-cleaner --dry-run --log-level 10
2121

2222
import argparse
2323
import logging
2424
import requests
2525
import re
2626
import sys
27+
import importlib
2728
from typing import List
2829
import tempfile
2930
import socket
@@ -289,17 +290,25 @@ def read_config(args):
289290
return ccdb_url, rules
290291

291292

292-
def process_object(object_path, rules, ccdb):
293+
def process_object(object_path, rules, ccdb, args):
293294
logger = create_parallel_logger()
295+
logger.setLevel(int(args.log_level))
294296
logger.info(f"Processing {object_path}")
295297

296298
# Take the first matching rule, if any
297299
rule = findMatchingRule(rules, object_path)
300+
298301
if rule is None:
302+
logger.info(f" no matching rule")
299303
return
300304

301305
# Apply rule on object (find the plug-in script and apply)
302-
module = __import__(rule.policy)
306+
try:
307+
module = importlib.import_module(rule.policy)
308+
module.logger = logger
309+
except ModuleNotFoundError as err:
310+
logger.error(f"could not load module {rule.policy}")
311+
return
303312
stats = module.process(ccdb, object_path, int(rule.delay), # rule.migration == "True",
304313
rule.extra_params)
305314
logger.info(f"{rule.policy} applied on {object_path}: {stats}")
@@ -309,17 +318,17 @@ def run(args, ccdb_url, rules):
309318

310319
# Get list of objects from CCDB
311320
ccdb = Ccdb(ccdb_url)
321+
ccdb.logger = logging.getLogger
312322
paths = ccdb.getObjectsList(getTimestampLastExecution())
313323
if args.only_path != '':
314324
paths = [item for item in paths if item is not None and item.startswith(args.only_path)]
315325
logging.debug(paths)
316-
logging.debug(len(paths))
317326
# For each object call the first matching rule
318327
logging.info("Loop through the objects and apply first matching rule.")
319328

320329
logging.info(f"workers: {args.workers}")
321330
pool = mp.Pool(int(args.workers))
322-
[pool.apply_async(process_object, args=(object_path, rules, ccdb)) for object_path in paths]
331+
[pool.apply_async(process_object, args=(object_path, rules, ccdb, args)) for object_path in paths]
323332
pool.close()
324333
pool.join()
325334

Framework/script/RepoCleaner/rules/1_per_hour.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
from datetime import datetime
22
from datetime import timedelta
3-
import logging
3+
import logger
44
from typing import Dict
55

66
from Ccdb import Ccdb, ObjectVersion
77

88

9+
logger = logging # default logger
10+
11+
912
def process(ccdb: Ccdb, object_path: str, delay: int, #migration: bool,
1013
extra_params: Dict[str, str]):
1114
'''
@@ -23,7 +26,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, #migration: bool,
2326
:return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved.
2427
'''
2528

26-
logging.debug(f"Plugin 1_per_hour processing {object_path}")
29+
logger.debug(f"Plugin 1_per_hour processing {object_path}")
2730

2831
versions = ccdb.getVersionsList(object_path)
2932

@@ -41,22 +44,22 @@ def process(ccdb: Ccdb, object_path: str, delay: int, #migration: bool,
4144
else:
4245
if v.validFromAsDt < datetime.now() - timedelta(minutes=delay):
4346
deletion_list.append(v)
44-
logging.debug(f"not in the grace period, we delete {v}")
47+
logger.debug(f"not in the grace period, we delete {v}")
4548
ccdb.deleteVersion(v)
4649
else:
4750
preservation_list.append(v)
4851

49-
logging.debug("deleted : ")
52+
logger.debug("deleted : ")
5053
for v in deletion_list:
51-
logging.debug(f" {v}")
54+
logger.debug(f" {v}")
5255

53-
logging.debug("preserved : ")
56+
logger.debug("preserved : ")
5457
for v in preservation_list:
55-
logging.debug(f" {v}")
58+
logger.debug(f" {v}")
5659

57-
logging.debug("updated : ")
60+
logger.debug("updated : ")
5861
for v in update_list:
59-
logging.debug(f" {v}")
62+
logger.debug(f" {v}")
6063

6164
return {"deleted" : len(deletion_list), "preserved": len(preservation_list), "updated" : len(update_list)}
6265

Framework/script/RepoCleaner/rules/1_per_run.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,14 @@
66
import dryable
77
from typing import Dict
88

9+
10+
logger = logging # default logger
11+
12+
913
def in_grace_period(version: ObjectVersion, delay: int):
1014
return not (version.validFromAsDt < datetime.now() - timedelta(minutes=delay))
1115

16+
1217
def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, str]):
1318
'''
1419
Process this deletion rule on the object. We use the CCDB passed by argument.
@@ -29,41 +34,41 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
2934
:return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved.
3035
'''
3136

32-
logging.debug(f"Plugin 1_per_run processing {object_path}")
37+
logger.debug(f"Plugin 1_per_run processing {object_path}")
3338

3439
preservation_list: List[ObjectVersion] = []
3540
deletion_list: List[ObjectVersion] = []
3641
update_list: List[ObjectVersion] = []
3742
runs_dict: DefaultDict[str, List[ObjectVersion]] = defaultdict(list)
3843

3944
delete_when_no_run = (extra_params.get("delete_when_no_run", False) == True)
40-
logging.debug(f"delete_when_no_run : {delete_when_no_run}")
45+
logger.debug(f"delete_when_no_run : {delete_when_no_run}")
4146

4247
# Find all the runs and group the versions
4348
versions = ccdb.getVersionsList(object_path)
4449
for v in versions:
45-
logging.debug(f"Processing {v}")
50+
logger.debug(f"Processing {v}")
4651
if "Run" in v.metadata:
4752
runs_dict[v.metadata['Run']].append(v)
4853
elif "RunNumber" in v.metadata:
4954
runs_dict[v.metadata['RunNumber']].append(v)
5055
else:
5156
runs_dict[-1].append(v) # the ones with no run specified
5257

53-
logging.debug(f"Number of runs : {len(runs_dict)}")
54-
logging.debug(f"Number of versions without runs : {len(runs_dict[-1])}")
58+
logger.debug(f"Number of runs : {len(runs_dict)}")
59+
logger.debug(f"Number of versions without runs : {len(runs_dict[-1])}")
5560

5661
# if we should not touch the files with no runs, let's just remove them from the map
5762
if not delete_when_no_run:
5863
del runs_dict[-1]
5964

6065
# Dispatch the versions to deletion and preservation lists
6166
for r, run_versions in runs_dict.items():
62-
# logging.debug(f"- run {r}")
67+
# logger.debug(f"- run {r}")
6368

6469
freshest: ObjectVersion = None
6570
for v in run_versions:
66-
# logging.debug(f" - version {v}")
71+
# logger.debug(f" - version {v}")
6772
if freshest is None or freshest.validFromAsDt < v.validFromAsDt:
6873
if freshest is not None:
6974
if in_grace_period(freshest, delay):
@@ -82,23 +87,23 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
8287
for d in deletion_list:
8388
ccdb.deleteVersion(d)
8489

85-
logging.debug(f"deleted ({len(deletion_list)}) : ")
90+
logger.debug(f"deleted ({len(deletion_list)}) : ")
8691
for v in deletion_list:
87-
logging.debug(f" {v}")
92+
logger.debug(f" {v}")
8893

89-
logging.debug(f"preserved ({len(preservation_list)}) : ")
94+
logger.debug(f"preserved ({len(preservation_list)}) : ")
9095
for v in preservation_list:
91-
logging.debug(f" {v}")
96+
logger.debug(f" {v}")
9297

93-
logging.debug(f"updated ({len(update_list)}) : ")
98+
logger.debug(f"updated ({len(update_list)}) : ")
9499
for v in update_list:
95-
logging.debug(f" {v}")
100+
logger.debug(f" {v}")
96101

97102
return {"deleted" : len(deletion_list), "preserved": len(preservation_list), "updated" : len(update_list)}
98103

99104

100105
def main():
101-
logging.getLogger().setLevel(int(10))
106+
logger.getLogger().setLevel(int(10))
102107
dryable.set( True )
103108
ccdb = Ccdb('http://ccdb-test.cern.ch:8080')
104109
process(ccdb, "qc/testRunCleanup", 0)

Framework/script/RepoCleaner/rules/last_only.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
from Ccdb import Ccdb, ObjectVersion
77

88

9+
logger = logging # default logger
10+
11+
912
def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, str]):
1013
'''
1114
Process this deletion rule on the object. We use the CCDB passed by argument.
@@ -19,7 +22,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
1922
:return a dictionary with the number of deleted, preserved and updated versions. Total = deleted+preserved.
2023
'''
2124

22-
logging.debug(f"Plugin last_only processing {object_path}")
25+
logger.debug(f"Plugin last_only processing {object_path}")
2326

2427
versions = ccdb.getVersionsList(object_path)
2528

@@ -30,7 +33,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
3033
for v in versions:
3134
if earliest == None or v.validFromAsDt > earliest.validFromAsDt:
3235
earliest = v
33-
logging.debug(f"earliest : {earliest}")
36+
logger.debug(f"earliest : {earliest}")
3437

3538
# delete the non-earliest if we are not in the grace period
3639
for v in versions:
@@ -44,13 +47,13 @@ def process(ccdb: Ccdb, object_path: str, delay: int, extra_params: Dict[str, st
4447
else:
4548
preservation_list.append(v)
4649

47-
logging.debug("deleted : ")
50+
logger.debug("deleted : ")
4851
for v in deletion_list:
49-
logging.debug(f" {v}")
52+
logger.debug(f" {v}")
5053

51-
logging.debug("preserved : ")
54+
logger.debug("preserved : ")
5255
for v in preservation_list:
53-
logging.debug(f" {v}")
56+
logger.debug(f" {v}")
5457

5558
return {"deleted" : len(deletion_list), "preserved": len(preservation_list)}
5659

0 commit comments

Comments
 (0)