@@ -19,13 +19,21 @@ def add_analysis(self, analysis: StorageAnalysis) -> None:
1919 """Add analysis for capacity planning"""
2020 self .predictor .add_analysis (analysis )
2121
22- def calculate_capacity_utilization (self , analysis : StorageAnalysis , total_capacity : int ) -> Dict [str , Any ]:
22+ def calculate_capacity_utilization (
23+ self , analysis : StorageAnalysis , total_capacity : int
24+ ) -> Dict [str , Any ]:
2325 """Calculate current capacity utilization"""
2426 current_size = analysis .get ("totalSize" , 0 )
25- utilization_percent = (current_size / total_capacity * 100 ) if total_capacity > 0 else 0
26-
27- status = "critical" if utilization_percent > 90 else "warning" if utilization_percent > 75 else "healthy"
28-
27+ utilization_percent = (
28+ (current_size / total_capacity * 100 ) if total_capacity > 0 else 0
29+ )
30+
31+ status = (
32+ "critical"
33+ if utilization_percent > 90
34+ else "warning" if utilization_percent > 75 else "healthy"
35+ )
36+
2937 return {
3038 "currentSize" : current_size ,
3139 "totalCapacity" : total_capacity ,
@@ -35,7 +43,9 @@ def calculate_capacity_utilization(self, analysis: StorageAnalysis, total_capaci
3543 "timestamp" : datetime .now ().isoformat (),
3644 }
3745
38- def estimate_time_to_capacity (self , total_capacity : int , threshold_percent : float = 90.0 ) -> Dict [str , Any ]:
46+ def estimate_time_to_capacity (
47+ self , total_capacity : int , threshold_percent : float = 90.0
48+ ) -> Dict [str , Any ]:
3949 """Estimate time until capacity threshold is reached"""
4050 if len (self .predictor .history ) < 2 :
4151 return {
@@ -44,35 +54,37 @@ def estimate_time_to_capacity(self, total_capacity: int, threshold_percent: floa
4454 "confidence" : "low" ,
4555 "message" : "Insufficient historical data" ,
4656 }
47-
57+
4858 current_size = self .predictor .history [- 1 ]["totalSize" ]
4959 threshold_size = total_capacity * (threshold_percent / 100 )
50-
60+
5161 if current_size >= threshold_size :
5262 return {
5363 "estimatedDays" : 0 ,
5464 "estimatedDate" : datetime .now ().isoformat (),
5565 "status" : "threshold_reached" ,
5666 "message" : f"Already at { threshold_percent } % capacity" ,
5767 }
58-
68+
5969 # Predict growth
6070 prediction = self .predictor .predict_growth (days = 365 )
6171 growth_rate = prediction .get ("growthRate" , 0 ) / 100
62-
72+
6373 if growth_rate <= 0 :
6474 return {
6575 "estimatedDays" : None ,
6676 "estimatedDate" : None ,
6777 "status" : "no_growth" ,
6878 "message" : "No growth detected, capacity threshold not expected" ,
6979 }
70-
80+
7181 # Calculate days to threshold
7282 remaining_space = threshold_size - current_size
7383 daily_growth = current_size * growth_rate
74- estimated_days = int (remaining_space / daily_growth ) if daily_growth > 0 else None
75-
84+ estimated_days = (
85+ int (remaining_space / daily_growth ) if daily_growth > 0 else None
86+ )
87+
7688 if estimated_days :
7789 estimated_date = datetime .now () + timedelta (days = estimated_days )
7890 return {
@@ -91,62 +103,74 @@ def estimate_time_to_capacity(self, total_capacity: int, threshold_percent: floa
91103 "status" : "unable_to_calculate" ,
92104 }
93105
94- def get_capacity_recommendations (self , analysis : StorageAnalysis , total_capacity : int ) -> List [Dict [str , Any ]]:
106+ def get_capacity_recommendations (
107+ self , analysis : StorageAnalysis , total_capacity : int
108+ ) -> List [Dict [str , Any ]]:
95109 """Get capacity planning recommendations"""
96110 recommendations = []
97111 utilization = self .calculate_capacity_utilization (analysis , total_capacity )
98-
112+
99113 if utilization ["utilizationPercent" ] > 90 :
100- recommendations .append ({
101- "type" : "urgent" ,
102- "severity" : "critical" ,
103- "message" : "Database is at critical capacity (>90%)" ,
104- "recommendation" : "Immediate action required: Consider archiving old data or increasing capacity" ,
105- })
114+ recommendations .append (
115+ {
116+ "type" : "urgent" ,
117+ "severity" : "critical" ,
118+ "message" : "Database is at critical capacity (>90%)" ,
119+ "recommendation" : "Immediate action required: Consider archiving old data or increasing capacity" ,
120+ }
121+ )
106122 elif utilization ["utilizationPercent" ] > 75 :
107- recommendations .append ({
108- "type" : "warning" ,
109- "severity" : "high" ,
110- "message" : "Database is approaching capacity (>75%)" ,
111- "recommendation" : "Plan for capacity increase or data archiving in the near future" ,
112- })
113-
123+ recommendations .append (
124+ {
125+ "type" : "warning" ,
126+ "severity" : "high" ,
127+ "message" : "Database is approaching capacity (>75%)" ,
128+ "recommendation" : "Plan for capacity increase or data archiving in the near future" ,
129+ }
130+ )
131+
114132 # Check growth trends
115133 trends = self .predictor .get_growth_trends ()
116134 if trends .get ("trend" ) == "rapid_growth" :
117- recommendations .append ({
118- "type" : "growth_alert" ,
119- "severity" : "medium" ,
120- "message" : "Rapid growth detected" ,
121- "recommendation" : "Monitor growth closely and plan capacity expansion" ,
122- })
123-
135+ recommendations .append (
136+ {
137+ "type" : "growth_alert" ,
138+ "severity" : "medium" ,
139+ "message" : "Rapid growth detected" ,
140+ "recommendation" : "Monitor growth closely and plan capacity expansion" ,
141+ }
142+ )
143+
124144 # Time to capacity estimate
125145 time_estimate = self .estimate_time_to_capacity (total_capacity )
126146 if time_estimate .get ("estimatedDays" ) and time_estimate ["estimatedDays" ] < 90 :
127- recommendations .append ({
128- "type" : "capacity_forecast" ,
129- "severity" : "high" ,
130- "message" : f"Capacity threshold may be reached in { time_estimate ['estimatedDays' ]} days" ,
131- "recommendation" : "Plan capacity increase or data management strategy" ,
132- })
133-
147+ recommendations .append (
148+ {
149+ "type" : "capacity_forecast" ,
150+ "severity" : "high" ,
151+ "message" : f"Capacity threshold may be reached in { time_estimate ['estimatedDays' ]} days" ,
152+ "recommendation" : "Plan capacity increase or data management strategy" ,
153+ }
154+ )
155+
134156 return recommendations
135157
136- def generate_capacity_report (self , analysis : StorageAnalysis , total_capacity : int ) -> Dict [str , Any ]:
158+ def generate_capacity_report (
159+ self , analysis : StorageAnalysis , total_capacity : int
160+ ) -> Dict [str , Any ]:
137161 """Generate comprehensive capacity planning report"""
138162 utilization = self .calculate_capacity_utilization (analysis , total_capacity )
139163 trends = self .predictor .get_growth_trends ()
140164 time_estimate = self .estimate_time_to_capacity (total_capacity )
141165 recommendations = self .get_capacity_recommendations (analysis , total_capacity )
142-
166+
143167 # Predictions for different timeframes
144168 predictions = {
145169 "30days" : self .predictor .predict_growth (30 ),
146170 "90days" : self .predictor .predict_growth (90 ),
147171 "365days" : self .predictor .predict_growth (365 ),
148172 }
149-
173+
150174 return {
151175 "currentState" : utilization ,
152176 "growthTrends" : trends ,
@@ -155,4 +179,3 @@ def generate_capacity_report(self, analysis: StorageAnalysis, total_capacity: in
155179 "recommendations" : recommendations ,
156180 "reportDate" : datetime .now ().isoformat (),
157181 }
158-
0 commit comments