Skip to content

Commit 85f30bc

Browse files
authored
[QC-996] policy multiple_per_run can delete first and last (#1921)
* [QC-996] policy multiple_per_run can delete first and last * fix and preserve all versions if there are less than 4 * use a dedicated method to update the metadata. At the moment te QCCDB requires to specify the start of validity behind the scene.
1 parent 2f86036 commit 85f30bc

5 files changed

Lines changed: 271 additions & 4 deletions

File tree

Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,24 @@ def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int,
200200
except requests.exceptions.RequestException as e:
201201
logging.error(f"Exception in updateValidity: {traceback.format_exc()}")
202202

203+
@dryable.Dryable()
204+
def updateMetadata(self, version: ObjectVersion, metadata):
205+
logger.debug(f"update metadata : {metadata}")
206+
full_path = self.url + '/' + version.path + '/' + str(version.validFrom) + '/' + str(version.uuid) + '?'
207+
if metadata is not None:
208+
for key in metadata:
209+
full_path += key + "=" + metadata[key] + "&"
210+
if self.set_adjustable_eov:
211+
logger.debug(f"As the parameter force is set, we add metadata adjustableEOV")
212+
full_path += "adjustableEOV=1&"
213+
try:
214+
headers = {'Connection': 'close'}
215+
r = requests.put(full_path, headers=headers)
216+
r.raise_for_status()
217+
except requests.exceptions.RequestException as e:
218+
logging.error(f"Exception in updateMetadata: {traceback.format_exc()}")
203219

220+
@dryable.Dryable()
204221
def putVersion(self, version: ObjectVersion, data):
205222
'''
206223
:param version: An ObjectVersion that describes the data to be uploaded.

Framework/script/RepoCleaner/qcrepocleaner/policies_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def get_run(v: ObjectVersion) -> str:
2424
def group_versions(ccdb, object_path, period_pass, versions_buckets_dict: DefaultDict[str, List[ObjectVersion]]):
2525
# Find all the runs and group the versions (by run or by a combination of multiple attributes)
2626
versions = ccdb.getVersionsList(object_path)
27+
logger.debug(f"group_versions: found {len(versions)} versions")
2728
for v in versions:
2829
logger.debug(f"Assigning {v} to a bucket")
2930
run = get_run(v)

Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
logger = logging # default logger
1313

14+
1415
def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_timestamp: int,
1516
extra_params: Dict[str, str]):
1617
'''
@@ -24,12 +25,22 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
2425
- migrate_to_EOS: Migrate the object to EOS. (default: false)
2526
- interval_between_versions: Period in minutes between the versions we will keep. (default: 90)
2627
- period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false)
28+
- delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule.
2729
2830
It is implemented like this :
2931
Map of buckets: run[+pass+period] -> list of versions
3032
Go through all objects: Add the object to the corresponding key (run[+pass+period])
33+
Sort the versions in the bucket
3134
Remove the empty run from the map (we ignore objects without a run)
3235
Go through the map: for each run (resp. run+pass+period)
36+
37+
if delete_first_last
38+
Get flag cleaner_2nd from first object (if there)
39+
if cleaner_2nd
40+
continue # we do not want to reprocess the same run twice
41+
flag second with `cleaner_2nd`
42+
delete first and last versions in the bucket
43+
3344
Get SOR (validity of first object)
3445
if SOR < now - delay
3546
do
@@ -62,6 +73,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
6273
logger.debug(f"interval_between_versions : {interval_between_versions}")
6374
migrate_to_EOS = (extra_params.get("migrate_to_EOS", False) is True)
6475
logger.debug(f"migrate_to_EOS : {migrate_to_EOS}")
76+
delete_first_last = (extra_params.get("delete_first_last", False) is True)
77+
logger.debug(f"delete_first_last : {delete_first_last}")
6578

6679
# Find all the runs and group the versions (by run or by a combination of multiple attributes)
6780
policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict)
@@ -85,7 +98,28 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
8598
logger.debug(f" not in the allowed period, skip this bucket")
8699
preservation_list.extend(run_versions)
87100
else:
88-
logger.debug(f" not in the grace period")
101+
logger.debug(f" not in the grace period")
102+
103+
if delete_first_last:
104+
logger.debug(f" delete_first_last is set")
105+
run_versions.sort(key=lambda x: x.createdAt)
106+
# Get flag cleaner_2nd from first object (if there)
107+
cleaner_2nd = "cleaner_2nd" in run_versions[0].metadata
108+
if cleaner_2nd or len(run_versions) < 4:
109+
logger.debug(f" first version has flag cleaner_2nd or there are less than 4 version, "
110+
f"we continue to next bucket")
111+
preservation_list.extend(run_versions)
112+
continue
113+
# flag second with `cleaner_2nd`
114+
ccdb.updateMetadata(run_versions[1], {'cleaner_2nd': 'true'})
115+
# delete first and last versions in the bucket
116+
logger.debug(f" delete the first and last versions")
117+
deletion_list.append(run_versions[-1])
118+
ccdb.deleteVersion(run_versions[-1])
119+
del run_versions[-1]
120+
deletion_list.append(run_versions[0])
121+
ccdb.deleteVersion(run_versions[0])
122+
del run_versions[0]
89123

90124
last_preserved: ObjectVersion = None
91125
for v in run_versions:
@@ -98,7 +132,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
98132
logger.debug(f" --> preserve")
99133
last_preserved = v
100134
if migrate_to_EOS:
101-
ccdb.updateValidity(v, v.validFrom, v.validTo, metadata_for_preservation)
135+
ccdb.updateMetadata(v, metadata_for_preservation)
102136
preservation_list.append(last_preserved)
103137
else: # in between period --> delete
104138
logger.debug(f" --> delete")

Framework/script/RepoCleaner/tests/test_MultiplePerRun.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class TestProduction(unittest.TestCase):
2121
one_minute = 60000
2222

2323
def setUp(self):
24-
self.ccdb = Ccdb('http://ccdb-test.cern.ch:8080')
24+
self.ccdb = Ccdb('http://137.138.47.222:8080')
2525
self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False}
2626
self.path = "qc/TST/MO/repo/test"
2727

@@ -77,7 +77,8 @@ def test_5_runs(self):
7777

7878
# Prepare data
7979
test_path = self.path + "/test_5_runs"
80-
self.prepare_data(test_path, [3, 3, 3, 3, 3], [60, 120, 190, 240, 24*60], 123)
80+
self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60],
81+
[60, 120, 190, 240, 24*60], 123)
8182

8283
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
8384
to_timestamp=self.in_ten_years, extra_params=self.extra)
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import logging
2+
import time
3+
import unittest
4+
from datetime import timedelta, date, datetime
5+
from typing import List
6+
7+
from qcrepocleaner.Ccdb import Ccdb, ObjectVersion
8+
from qcrepocleaner.rules import multiple_per_run
9+
10+
11+
class TestProduction(unittest.TestCase):
12+
"""
13+
This test pushes data to the CCDB and then run the Rule Production and then check.
14+
It does it for several use cases.
15+
One should truncate /qc/TST/MO/repo/test before running it.
16+
"""
17+
18+
thirty_minutes = 1800000
19+
one_hour = 3600000
20+
in_ten_years = 1975323342000
21+
one_minute = 60000
22+
23+
def setUp(self):
24+
self.ccdb = Ccdb('http://137.138.47.222:8080')
25+
self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False, "delete_first_last": True}
26+
self.path = "qc/TST/MO/repo/test"
27+
28+
def test_1_finished_run(self):
29+
"""
30+
1 run of 2.5 hours finished 22 hours ago.
31+
Expected output: SOR, EOR, 1 in the middle
32+
"""
33+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
34+
datefmt='%d-%b-%y %H:%M:%S')
35+
logging.getLogger().setLevel(int(10))
36+
37+
# Prepare data
38+
test_path = self.path + "/test_1_finished_run"
39+
self.prepare_data(test_path, [150], [22*60], 123)
40+
objectsBefore = self.ccdb.getVersionsList(test_path)
41+
42+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
43+
to_timestamp=self.in_ten_years, extra_params=self.extra)
44+
objectsAfter = self.ccdb.getVersionsList(test_path)
45+
46+
self.assertEqual(stats["deleted"], 147)
47+
self.assertEqual(stats["preserved"], 3)
48+
self.assertEqual(stats["updated"], 0)
49+
50+
self.assertEqual(objectsAfter[0].validFrom, objectsBefore[1].validFrom)
51+
self.assertEqual(objectsAfter[2].validFrom, objectsBefore[-2].validFrom)
52+
53+
def test_2_runs(self):
54+
"""
55+
2 runs of 2.5 hours, separated by 3 hours, second finished 20h ago.
56+
Expected output: SOR, EOR, 1 in the middle for the first one, all for the second
57+
"""
58+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
59+
datefmt='%d-%b-%y %H:%M:%S')
60+
logging.getLogger().setLevel(int(10))
61+
62+
# Prepare data
63+
test_path = self.path + "/test_2_runs"
64+
self.prepare_data(test_path, [150, 150], [3*60, 20*60], 123)
65+
66+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
67+
to_timestamp=self.in_ten_years, extra_params=self.extra)
68+
69+
self.assertEqual(stats["deleted"], 147)
70+
self.assertEqual(stats["preserved"], 3+150)
71+
self.assertEqual(stats["updated"], 0)
72+
73+
def test_5_runs(self):
74+
"""
75+
1 hour Run - 1h - 2 hours Run - 2h - 3h10 run - 3h10 - 4 hours run - 4 hours - 5 hours run - 5 h
76+
All more than 24 hours
77+
Expected output: 2 + 3 + 4 + 4 + 5
78+
"""
79+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
80+
datefmt='%d-%b-%y %H:%M:%S')
81+
logging.getLogger().setLevel(int(10))
82+
83+
# Prepare data
84+
test_path = self.path + "/test_5_runs"
85+
self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60],
86+
[60, 120, 190, 240, 24*60], 123)
87+
88+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
89+
to_timestamp=self.in_ten_years, extra_params=self.extra)
90+
self.assertEqual(stats["deleted"], 60+120+190+240+300-18)
91+
self.assertEqual(stats["preserved"], 18)
92+
self.assertEqual(stats["updated"], 0)
93+
94+
# and now re-run it to make sure we preserve the state
95+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
96+
to_timestamp=self.in_ten_years, extra_params=self.extra)
97+
98+
self.assertEqual(stats["deleted"], 0)
99+
self.assertEqual(stats["preserved"], 18)
100+
self.assertEqual(stats["updated"], 0)
101+
102+
def test_run_one_object(self):
103+
"""
104+
A run with a single object
105+
Expected output: keep the object
106+
"""
107+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
108+
datefmt='%d-%b-%y %H:%M:%S')
109+
logging.getLogger().setLevel(int(10))
110+
111+
# Prepare data
112+
test_path = self.path + "/test_run_one_object"
113+
self.prepare_data(test_path, [1], [25*60], 123)
114+
115+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
116+
to_timestamp=self.in_ten_years, extra_params=self.extra)
117+
118+
self.assertEqual(stats["deleted"], 0)
119+
self.assertEqual(stats["preserved"], 1)
120+
self.assertEqual(stats["updated"], 0)
121+
122+
def test_run_two_object(self):
123+
"""
124+
A run with 2 objects
125+
Expected output: keep the 2 objects
126+
"""
127+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
128+
datefmt='%d-%b-%y %H:%M:%S')
129+
logging.getLogger().setLevel(int(10))
130+
131+
# Prepare data
132+
test_path = self.path + "/test_run_two_object"
133+
self.prepare_data(test_path, [2], [25*60], 123)
134+
135+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
136+
to_timestamp=self.in_ten_years, extra_params=self.extra)
137+
138+
self.assertEqual(stats["deleted"], 0)
139+
self.assertEqual(stats["preserved"], 2)
140+
self.assertEqual(stats["updated"], 0)
141+
142+
def test_3_runs_with_period(self):
143+
"""
144+
3 runs more than 24h in the past but only the middle one starts in the period that is allowed.
145+
Expected output: second run is trimmed, not the other
146+
"""
147+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
148+
datefmt='%d-%b-%y %H:%M:%S')
149+
logging.getLogger().setLevel(int(10))
150+
151+
# Prepare data
152+
test_path = self.path + "/test_3_runs_with_period"
153+
self.prepare_data(test_path, [30,30, 30], [120,120,25*60], 123)
154+
155+
current_timestamp = int(time.time() * 1000)
156+
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=current_timestamp-29*60*60*1000,
157+
to_timestamp=current_timestamp-26*60*60*1000, extra_params=self.extra)
158+
159+
self.assertEqual(stats["deleted"], 28)
160+
self.assertEqual(stats["preserved"], 90-28)
161+
self.assertEqual(stats["updated"], 0)
162+
163+
def test_asdf(self):
164+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
165+
datefmt='%d-%b-%y %H:%M:%S')
166+
logging.getLogger().setLevel(int(10))
167+
test_path = self.path + "/asdf"
168+
self.prepare_data(test_path, [70, 70, 70], [6*60, 6*60, 25*60], 55555)
169+
170+
def prepare_data(self, path, run_durations: List[int], time_till_next_run: List[int], first_run_number: int):
171+
"""
172+
Prepare a data set populated with a number of runs.
173+
run_durations contains the duration of each of these runs in minutes
174+
time_till_next_run is the time between two runs in minutes.
175+
The first element of time_till_next_run is used to separate the first two runs.
176+
Both lists must have the same number of elements.
177+
"""
178+
179+
if len(run_durations) != len(time_till_next_run):
180+
logging.error(f"run_durations and time_till_next_run must have the same length")
181+
exit(1)
182+
183+
total_duration = 0
184+
for a, b in zip(run_durations, time_till_next_run):
185+
total_duration += a + b
186+
logging.info(f"Total duration : {total_duration}")
187+
188+
current_timestamp = int(time.time() * 1000)
189+
cursor = current_timestamp - total_duration * 60 * 1000
190+
first_ts = cursor
191+
data = {'part': 'part'}
192+
run = first_run_number
193+
194+
for run_duration, time_till_next in zip(run_durations, time_till_next_run):
195+
metadata = {'RunNumber': str(run)}
196+
logging.debug(f"cursor: {cursor}")
197+
logging.debug(f"time_till_next: {time_till_next}")
198+
199+
for i in range(run_duration):
200+
to_ts = cursor + 24 * 60 * 60 * 1000 # a day
201+
metadata2 = {**metadata, 'Created': str(cursor)}
202+
version_info = ObjectVersion(path=path, validFrom=cursor, validTo=to_ts, metadata=metadata2,
203+
createdAt=cursor)
204+
self.ccdb.putVersion(version=version_info, data=data)
205+
cursor += 1 * 60 * 1000
206+
207+
run += 1
208+
cursor += time_till_next * 60 * 1000
209+
210+
return first_ts
211+
212+
213+
if __name__ == '__main__':
214+
unittest.main()

0 commit comments

Comments
 (0)