@@ -26,15 +26,21 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
2626 - interval_between_versions: Period in minutes between the versions we will keep. (default: 90)
2727 - period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false)
2828 - delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule.
29+ Useful to keep the second and second to last instead of first and last.
30+ - mw_deletion_delay: delete moving windows data entirely after this number of minutes. If not present or negative, don't delete.
31+ As an extra safety, and because it is designed for Moving Windows, we only delete if the object has `mw` in the path.
2932
3033 It is implemented like this :
3134 Map of buckets: run[+pass+period] -> list of versions
3235 Go through all objects: Add the object to the corresponding key (run[+pass+period])
3336 Sort the versions in the bucket
3437 Remove the empty run from the map (we ignore objects without a run)
3538 Go through the map: for each run (resp. run+pass+period)
36-
3739 Get SOR (validity of first object)
40+
41+ if SOR < now - mw_deletion_delay
42+ delete the data for this run
43+
3844 if SOR < now - delay
3945 if delete_first_last
4046 Get flag cleaner_2nd from first object (if there)
@@ -75,6 +81,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
7581 logger .debug (f"migrate_to_EOS : { migrate_to_EOS } " )
7682 delete_first_last = (extra_params .get ("delete_first_last" , False ) is True )
7783 logger .debug (f"delete_first_last : { delete_first_last } " )
84+ mw_deletion_delay = int (extra_params .get ("mw_deletion_delay" , - 1 ))
85+ logger .debug (f"mw_deletion_delay : { mw_deletion_delay } " )
7886
7987 # Find all the runs and group the versions (by run or by a combination of multiple attributes)
8088 policies_utils .group_versions (ccdb , object_path , period_pass , versions_buckets_dict )
@@ -95,9 +103,19 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
95103 if policies_utils .in_grace_period (first_object , delay ):
96104 logger .debug (f" in grace period, skip this bucket" )
97105 preservation_list .extend (run_versions )
98- elif not (from_timestamp < first_object .createdAt < to_timestamp ): # in the allowed period
106+ elif not (from_timestamp < first_object .createdAt < to_timestamp ): # not in the allowed period
99107 logger .debug (f" not in the allowed period, skip this bucket" )
100108 preservation_list .extend (run_versions )
109+ elif first_object .createdAtDt < datetime .now () - timedelta (minutes = mw_deletion_delay ):
110+ logger .debug (f" after mw_deletion_delay period, delete this bucket" )
111+ for v in run_versions :
112+ if "/mw/" in v .path : # this is because we really don't want to take the risk of batch deleting non moving windows
113+ logger .debug (f" deleting { v } " )
114+ deletion_list .append (v )
115+ ccdb .deleteVersion (v )
116+ else :
117+ logger .debug (f" deletion is aborted as path does not contain `mw` ({ v } )" )
118+ preservation_list .append (v )
101119 else :
102120 logger .debug (f" not in the grace period" )
103121
0 commit comments