1+ """
2+ Rule: azure.compute.snapshot.old
3+
4+ Intent:
5+ Detect Azure managed snapshots that are old enough to be cleanup review
6+ candidates. Age alone does not prove a snapshot is unused, orphaned, or
7+ safe to delete. This is a conservative review-candidate rule only.
8+
9+ Exclusions:
10+ - id absent or empty
11+ - outside optional region filter (exact lowercase match)
12+ - provisioning_state != "Succeeded"
13+ - timeCreated absent or unparsable
14+ - completionPercent present and < 100
15+ - age_days < 30 (review_age_days)
16+
17+ Detection:
18+ - provisioning_state == "Succeeded"
19+ - timeCreated parseable and age_days >= 30
20+
21+ Confidence model (spec 8):
22+ LOW — 30 <= age_days < max_age_days
23+ MEDIUM — age_days >= max_age_days
24+ HIGH is never used; age alone cannot establish HIGH confidence
25+
26+ Cost model (spec 10):
27+ estimated_monthly_cost_usd = None (always)
28+ Azure bills snapshots on used size, not diskSizeGB — no per-snapshot
29+ cost estimate is possible without that data.
30+
31+ APIs:
32+ - Microsoft.Compute/snapshots/read (snapshots.list)
33+ """
34+
135from datetime import datetime , timezone
236from typing import List , Optional
337
842from cleancloud .core .finding import Finding
943from cleancloud .core .risk import RiskLevel
1044
11- MIN_AGE_DAYS_MEDIUM = 30
12- MIN_AGE_DAYS_HIGH = 90
45+ _RULE_ID = "azure.compute.snapshot.old"
46+ _RESOURCE_TYPE = "azure.compute.snapshot"
1347
48+ # Minimum age for a snapshot to become a review candidate (spec: review_age_days = 30).
49+ _REVIEW_AGE_DAYS = 30
1450
15- def _age_in_days (created_at : datetime ) -> int :
16- now = datetime .now (timezone .utc )
17- return (now - created_at ).days
51+
52+ def _norm_location (s : str ) -> str :
53+ """Lowercase only — exact lowercase match per spec section 4."""
54+ return s .lower () if s else ""
55+
56+
57+ def _parse_time_created (snapshot ) -> Optional [datetime ]:
58+ """
59+ Return a UTC-aware datetime for the snapshot creation timestamp, or None.
60+
61+ Accepts datetime objects (aware or naive) and ISO-format strings.
62+ Naive datetimes are treated as UTC. Unparseable values return None.
63+ """
64+ tc = getattr (snapshot , "time_created" , None )
65+ if tc is None :
66+ return None
67+ if isinstance (tc , datetime ):
68+ return tc if tc .tzinfo is not None else tc .replace (tzinfo = timezone .utc )
69+ if isinstance (tc , str ):
70+ try :
71+ dt = datetime .fromisoformat (tc .replace ("Z" , "+00:00" ))
72+ return dt if dt .tzinfo is not None else dt .replace (tzinfo = timezone .utc )
73+ except (ValueError , AttributeError ):
74+ return None
75+ return None
1876
1977
2078def find_old_snapshots (
@@ -23,79 +81,133 @@ def find_old_snapshots(
2381 credential ,
2482 region_filter : str = None ,
2583 client : Optional [ComputeManagementClient ] = None ,
26- max_age_days : int = MIN_AGE_DAYS_HIGH ,
84+ max_age_days : int = 90 ,
2785) -> List [Finding ]:
2886 """
29- Find old Azure managed snapshots that may be orphaned .
87+ Find Azure managed snapshots that are review candidates based on age .
3088
31- Conservative rule (review-only):
32- - Snapshot age checked
33- - Other usage/ownership not inferred
89+ Does not infer unused, orphaned, or safe-to-delete from age alone.
90+ Confidence is LOW for [review_age_days, max_age_days) and MEDIUM for
91+ >= max_age_days. estimated_monthly_cost_usd is always None.
3492
3593 IAM permissions:
3694 - Microsoft.Compute/snapshots/read
3795 """
38-
3996 findings : List [Finding ] = []
4097
4198 compute_client = client or ComputeManagementClient (
4299 credential = credential ,
43100 subscription_id = subscription_id ,
44101 )
45102
103+ now = datetime .now (timezone .utc )
104+
46105 for snapshot in compute_client .snapshots .list ():
47- if region_filter and (snapshot .location or "" ).lower () != region_filter .lower ():
106+ # spec 6A: id must be present and non-empty
107+ snap_id = getattr (snapshot , "id" , None )
108+ if not snap_id :
109+ continue
110+
111+ # resource_name is required (spec 12.3); skip malformed records without a name
112+ snap_name = getattr (snapshot , "name" , None )
113+ if not snap_name :
114+ continue
115+
116+ # spec 6A: region filter — exact lowercase match
117+ location = _norm_location (getattr (snapshot , "location" , "" ) or "" )
118+ if region_filter and location != _norm_location (region_filter ):
48119 continue
49120
50- if not snapshot .time_created :
121+ # spec 6A: provisioning_state must be exactly "Succeeded"
122+ if getattr (snapshot , "provisioning_state" , None ) != "Succeeded" :
51123 continue
52124
53- age_days = _age_in_days (snapshot .time_created )
125+ # spec 6A: timeCreated must be parseable
126+ time_created = _parse_time_created (snapshot )
127+ if time_created is None :
128+ continue
129+
130+ # spec 6A: completionPercent — skip if present and < 100.
131+ # A non-numeric value is treated as malformed and causes a conservative skip.
132+ completion_percent = getattr (snapshot , "completion_percent" , None )
133+ if completion_percent is not None :
134+ try :
135+ if completion_percent < 100 :
136+ continue
137+ except TypeError :
138+ continue # non-numeric completionPercent → skip conservatively
139+
140+ # spec 4: compute age in whole UTC days; skip if below review threshold
141+ age_days = (now - time_created ).days
142+ if age_days < _REVIEW_AGE_DAYS :
143+ continue
54144
55- if age_days >= max_age_days :
56- confidence_value = ConfidenceLevel .HIGH
57- elif age_days >= MIN_AGE_DAYS_MEDIUM :
58- confidence_value = ConfidenceLevel .MEDIUM
59- else :
60- continue # too new, ignore
145+ # spec 8: confidence — LOW for lower band, MEDIUM for higher, never HIGH
146+ confidence = ConfidenceLevel .MEDIUM if age_days >= max_age_days else ConfidenceLevel .LOW
147+
148+ # spec 12.2: required signals
149+ signals_used = [
150+ f"Snapshot age is { age_days } days" ,
151+ "Snapshot provisioning state is Succeeded" ,
152+ ]
153+ if completion_percent is not None :
154+ # completionPercent was present and used as a best-effort gate
155+ signals_used .append (f"Snapshot completionPercent is { completion_percent } " )
61156
62157 evidence = Evidence (
63- signals_used = [ f"Snapshot age is { age_days } days" ] ,
158+ signals_used = signals_used ,
64159 signals_not_checked = [
65- "Disk usage by applications " ,
66- "IaC-managed ownership" ,
67- "Disaster recovery or backup intent" ,
68- "Future planned usage " ,
160+ "Business or application restore intent " ,
161+ "Azure Backup or external backup ownership" ,
162+ "Disaster recovery retention intent" ,
163+ "Whether deleting the snapshot reduces billed used size " ,
69164 ],
70- time_window = f" { MIN_AGE_DAYS_MEDIUM } - { max_age_days } days" ,
165+ time_window = None ,
71166 )
72167
73- # ~$0.05/GB-month for managed snapshots
74- snap_size = snapshot .disk_size_gb or 0
75- cost_usd = round (snap_size * 0.05 , 2 ) if snap_size > 0 else None
168+ # spec 12.3: source_resource_id from creation_data if present
169+ creation_data = getattr (snapshot , "creation_data" , None )
170+ source_resource_id = (
171+ getattr (creation_data , "source_resource_id" , None )
172+ if creation_data is not None
173+ else None
174+ )
76175
77176 findings .append (
78177 Finding (
79178 provider = "azure" ,
80- rule_id = "azure.compute.snapshot.old" ,
81- resource_type = "azure.compute.snapshot" ,
82- resource_id = snapshot .id ,
83- region = snapshot .location ,
84- estimated_monthly_cost_usd = cost_usd ,
85- title = "Old Azure managed snapshot" ,
86- summary = f"Snapshot has existed for { age_days } days" ,
87- reason = "Snapshot age exceeds configured threshold" ,
179+ rule_id = _RULE_ID ,
180+ resource_type = _RESOURCE_TYPE ,
181+ resource_id = snap_id ,
182+ region = location ,
183+ estimated_monthly_cost_usd = None , # spec 10: always None
184+ title = f"Old managed snapshot ({ age_days } days)" ,
185+ summary = (
186+ f"Snapshot '{ snap_name } ' has existed for { age_days } days "
187+ f"and is a cleanup review candidate"
188+ ),
189+ reason = (
190+ f"Snapshot age is { age_days } days, which meets the "
191+ f"{ _REVIEW_AGE_DAYS } -day review threshold"
192+ ),
88193 risk = RiskLevel .LOW ,
89- confidence = confidence_value ,
90- detected_at = datetime . now ( timezone . utc ) ,
194+ confidence = confidence ,
195+ detected_at = now ,
91196 evidence = evidence ,
92197 details = {
93- "resource_name" : snapshot . name ,
198+ "resource_name" : snap_name ,
94199 "subscription_id" : subscription_id ,
95200 "age_days" : age_days ,
96- "disk_size_gb" : snapshot .disk_size_gb ,
97- "sku" : snapshot .sku .name if snapshot .sku else None ,
98- "tags" : snapshot .tags ,
201+ "time_created" : time_created .isoformat (),
202+ "disk_size_gb" : getattr (snapshot , "disk_size_gb" , None ),
203+ "sku" : (
204+ getattr (snapshot .sku , "name" , None )
205+ if getattr (snapshot , "sku" , None ) is not None
206+ else None
207+ ),
208+ "incremental" : getattr (snapshot , "incremental" , None ),
209+ "source_resource_id" : source_resource_id ,
210+ "tags" : getattr (snapshot , "tags" , None ) or {},
99211 },
100212 )
101213 )
0 commit comments