3636try :
3737 import msstats as ms
3838except ImportError as ex :
39- print (f"Error: msstats.py not found in PYTHONPATH. Place msstats.py next to this script. { ex } " )
39+ print (
40+ f"Error: msstats.py not found in PYTHONPATH. Place msstats.py next to this script. { ex } "
41+ )
4042 sys .exit (1 )
4143
4244# ---------- Metric maps per product ----------
6264# Helper label candidates
6365REGION_LABELS = ("region" , "location" )
6466ZONE_LABELS = ("zone" ,)
65- NODETYPE_LABELS = ("node_type" , "cluster_node_type" , "tier" , "service_tier" , "instance_type" )
67+ NODETYPE_LABELS = (
68+ "node_type" ,
69+ "cluster_node_type" ,
70+ "tier" ,
71+ "service_tier" ,
72+ "instance_type" ,
73+ )
74+
6675
6776def _pick (labels : Dict [str , str ], keys ) -> Optional [str ]:
6877 for k in keys :
@@ -71,6 +80,7 @@ def _pick(labels: Dict[str, str], keys) -> Optional[str]:
7180 return v
7281 return None
7382
83+
7484def _time_interval (duration_sec : int ) -> monitoring_v3 .TimeInterval :
7585 now = time .time ()
7686 seconds = int (now )
@@ -82,6 +92,7 @@ def _time_interval(duration_sec: int) -> monitoring_v3.TimeInterval:
8292 }
8393 )
8494
95+
8596def _make_rate_aggregation (step_sec : int ) -> monitoring_v3 .Aggregation :
8697 return monitoring_v3 .Aggregation (
8798 {
@@ -91,9 +102,15 @@ def _make_rate_aggregation(step_sec: int) -> monitoring_v3.Aggregation:
91102 }
92103 )
93104
94- def _list_ts (client : monitoring_v3 .MetricServiceClient , project_name : str , metric_type : str ,
95- interval : monitoring_v3 .TimeInterval , view = monitoring_v3 .ListTimeSeriesRequest .TimeSeriesView .FULL ,
96- aggregation : Optional [monitoring_v3 .Aggregation ] = None ):
105+
106+ def _list_ts (
107+ client : monitoring_v3 .MetricServiceClient ,
108+ project_name : str ,
109+ metric_type : str ,
110+ interval : monitoring_v3 .TimeInterval ,
111+ view = monitoring_v3 .ListTimeSeriesRequest .TimeSeriesView .FULL ,
112+ aggregation : Optional [monitoring_v3 .Aggregation ] = None ,
113+ ):
97114 req = {
98115 "name" : project_name ,
99116 "filter" : f'metric.type = "{ metric_type } "' ,
@@ -104,7 +121,10 @@ def _list_ts(client: monitoring_v3.MetricServiceClient, project_name: str, metri
104121 req ["aggregation" ] = aggregation
105122 return list (client .list_time_series (request = req ))
106123
107- def _ensure_node_entry (table : Dict [str , Dict [str , Dict [str , Any ]]], inst_key : str , node_id : str ) -> Dict [str , Any ]:
124+
125+ def _ensure_node_entry (
126+ table : Dict [str , Dict [str , Dict [str , Any ]]], inst_key : str , node_id : str
127+ ) -> Dict [str , Any ]:
108128 if inst_key not in table :
109129 table [inst_key ] = {}
110130 if node_id not in table [inst_key ]:
@@ -116,13 +136,14 @@ def _ensure_node_entry(table: Dict[str, Dict[str, Dict[str, Any]]], inst_key: st
116136 "NodeType" : "" ,
117137 "Region" : "" ,
118138 "Zone" : "" ,
119- "Project ID" : "" , # filled later
120- "InstanceId" : "" , # full resource name if available
121- "InstanceType" : "" , # Redis | Valkey | Redis Cluster
122- "points" : {}, # timestamp -> {cmd: rate}
139+ "Project ID" : "" , # filled later
140+ "InstanceId" : "" , # full resource name if available
141+ "InstanceType" : "" , # Redis | Valkey | Redis Cluster
142+ "points" : {}, # timestamp -> {cmd: rate}
123143 }
124144 return table [inst_key ][node_id ]
125145
146+
126147def _accumulate_commands (results , table , product_name : str , project_id : str ):
127148 """
128149 Accumulate per-node command rates into table[instance][node]["points"][timestamp][cmd] = rate
@@ -132,21 +153,32 @@ def _accumulate_commands(results, table, product_name: str, project_id: str):
132153 mlabels = dict (ts .metric .labels )
133154
134155 # Identify instance/cluster id & node
135- inst_key = rlabels .get ("instance_id" ) or rlabels .get ("cluster_id" ) or rlabels .get ("resource_name" ) or "unknown"
156+ inst_key = (
157+ rlabels .get ("instance_id" )
158+ or rlabels .get ("cluster_id" )
159+ or rlabels .get ("resource_name" )
160+ or "unknown"
161+ )
136162 node_id = rlabels .get ("node_id" ) or rlabels .get ("shard_id" ) or "unknown"
137163 entry = _ensure_node_entry (table , inst_key , node_id )
138164
139165 # Fill common attributes
140166 entry ["Project ID" ] = project_id
141- entry ["InstanceId" ] = rlabels .get ("instance_id" ) or rlabels .get ("cluster_id" ) or ""
167+ entry ["InstanceId" ] = (
168+ rlabels .get ("instance_id" ) or rlabels .get ("cluster_id" ) or ""
169+ )
142170 entry ["Region" ] = _pick (rlabels , REGION_LABELS ) or entry ["Region" ]
143171 entry ["Zone" ] = _pick (rlabels , ZONE_LABELS ) or entry ["Zone" ]
144172 entry ["NodeType" ] = _pick (rlabels , NODETYPE_LABELS ) or entry ["NodeType" ]
145173
146174 # Node role if provided (e.g., 'primary'/'replica')
147175 role = mlabels .get ("role" ) or rlabels .get ("role" ) or ""
148176 if role :
149- entry ["NodeRole" ] = "Master" if role == "primary" else ("Replica" if role == "replica" else role )
177+ entry ["NodeRole" ] = (
178+ "Master"
179+ if role == "primary"
180+ else ("Replica" if role == "replica" else role )
181+ )
150182
151183 # Instance type label
152184 entry ["InstanceType" ] = product_name
@@ -178,6 +210,7 @@ def _accumulate_commands(results, table, product_name: str, project_id: str):
178210 pv = 0.0
179211 entry ["points" ][t ][cmd ] = pv
180212
213+
181214def _apply_processed_categories (table ):
182215 """
183216 For each node entry that has points, compute processed per-timestamp categories using
@@ -194,10 +227,16 @@ def _apply_processed_categories(table):
194227 if "points" in entry :
195228 del entry ["points" ]
196229
230+
197231def _attach_memory_usage (results , table , key_name = "BytesUsedForCache" ):
198232 for ts in results :
199233 rlabels = dict (ts .resource .labels )
200- inst_key = rlabels .get ("instance_id" ) or rlabels .get ("cluster_id" ) or rlabels .get ("resource_name" ) or "unknown"
234+ inst_key = (
235+ rlabels .get ("instance_id" )
236+ or rlabels .get ("cluster_id" )
237+ or rlabels .get ("resource_name" )
238+ or "unknown"
239+ )
201240 node_id = rlabels .get ("node_id" ) or rlabels .get ("shard_id" ) or "unknown"
202241 if inst_key not in table or node_id not in table [inst_key ]:
203242 _ensure_node_entry (table , inst_key , node_id )
@@ -217,12 +256,18 @@ def _attach_memory_usage(results, table, key_name="BytesUsedForCache"):
217256 prev = entry .get (key_name , 0 )
218257 entry [key_name ] = max (prev , maxv )
219258
259+
220260def _attach_capacity_scalar (results , table , key_name = "MaxMemory" ):
221261 """Attach a capacity scalar (e.g., memory size); applies to all nodes within the instance/cluster."""
222262 cap_by_inst = defaultdict (int )
223263 for ts in results :
224264 rlabels = dict (ts .resource .labels )
225- inst_key = rlabels .get ("instance_id" ) or rlabels .get ("cluster_id" ) or rlabels .get ("resource_name" ) or "unknown"
265+ inst_key = (
266+ rlabels .get ("instance_id" )
267+ or rlabels .get ("cluster_id" )
268+ or rlabels .get ("resource_name" )
269+ or "unknown"
270+ )
226271 v_max = 0
227272 for point in ts .points :
228273 try :
@@ -242,6 +287,7 @@ def _attach_capacity_scalar(results, table, key_name="MaxMemory"):
242287 for node_id in nodes :
243288 nodes [node_id ][key_name ] = cap_by_inst [inst_key ]
244289
290+
245291def _flatten_rows (table , project_id : str , instance_type : str ) -> List [Dict [str , Any ]]:
246292 rows = []
247293 for inst_key , nodes in table .items ():
@@ -254,8 +300,15 @@ def _flatten_rows(table, project_id: str, instance_type: str) -> List[Dict[str,
254300 rows .append (row )
255301 return rows
256302
257- def collect_for_product (client , project_id : str , duration : int , step : int ,
258- metric_map : Dict [str , str ], instance_type_label : str ) -> List [Dict [str , Any ]]:
303+
304+ def collect_for_product (
305+ client ,
306+ project_id : str ,
307+ duration : int ,
308+ step : int ,
309+ metric_map : Dict [str , str ],
310+ instance_type_label : str ,
311+ ) -> List [Dict [str , Any ]]:
259312 project_name = f"projects/{ project_id } "
260313 interval = _time_interval (duration )
261314 agg = _make_rate_aggregation (step )
@@ -264,17 +317,24 @@ def collect_for_product(client, project_id: str, duration: int, step: int,
264317
265318 # Commands (primary discovery)
266319 try :
267- cmd_results = _list_ts (client , project_name , metric_map ["commands" ], interval ,
268- view = monitoring_v3 .ListTimeSeriesRequest .TimeSeriesView .FULL ,
269- aggregation = agg )
320+ cmd_results = _list_ts (
321+ client ,
322+ project_name ,
323+ metric_map ["commands" ],
324+ interval ,
325+ view = monitoring_v3 .ListTimeSeriesRequest .TimeSeriesView .FULL ,
326+ aggregation = agg ,
327+ )
270328 except Exception :
271329 cmd_results = []
272330 _accumulate_commands (cmd_results , table , instance_type_label , project_id )
273331
274332 # If nothing found, discover via memory usage series (so we still emit rows)
275333 if not table :
276334 try :
277- mem_results = _list_ts (client , project_name , metric_map ["memory_usage" ], interval )
335+ mem_results = _list_ts (
336+ client , project_name , metric_map ["memory_usage" ], interval
337+ )
278338 except Exception :
279339 mem_results = []
280340 _attach_memory_usage (mem_results , table )
@@ -285,7 +345,9 @@ def collect_for_product(client, project_id: str, duration: int, step: int,
285345
286346 # Memory usage (BytesUsedForCache)
287347 try :
288- mem_results = _list_ts (client , project_name , metric_map ["memory_usage" ], interval )
348+ mem_results = _list_ts (
349+ client , project_name , metric_map ["memory_usage" ], interval
350+ )
289351 _attach_memory_usage (mem_results , table )
290352 except Exception :
291353 pass
@@ -303,13 +365,30 @@ def collect_for_product(client, project_id: str, duration: int, step: int,
303365 # Flatten to rows
304366 return _flatten_rows (table , project_id , instance_type_label )
305367
368+
306369def main ():
307- parser = argparse .ArgumentParser (description = "Export Memorystore metrics for Redis, Valkey and Redis Cluster to CSV (using only Cloud Monitoring)." )
370+ parser = argparse .ArgumentParser (
371+ description = "Export Memorystore metrics for Redis, Valkey and Redis Cluster to CSV (using only Cloud Monitoring)."
372+ )
308373 parser .add_argument ("--project" , required = True , help = "GCP Project ID" )
309- parser .add_argument ("--credentials" , required = True , help = "Path to service account JSON with monitoring.viewer" )
374+ parser .add_argument (
375+ "--credentials" ,
376+ required = True ,
377+ help = "Path to service account JSON with monitoring.viewer" ,
378+ )
310379 parser .add_argument ("--out" , required = True , help = "Output CSV file path" )
311- parser .add_argument ("--duration" , type = int , default = 604800 , help = "Lookback window in seconds (default 7 days)" )
312- parser .add_argument ("--step" , type = int , default = 60 , help = "Alignment step in seconds for rate metrics (default 60)" )
380+ parser .add_argument (
381+ "--duration" ,
382+ type = int ,
383+ default = 604800 ,
384+ help = "Lookback window in seconds (default 7 days)" ,
385+ )
386+ parser .add_argument (
387+ "--step" ,
388+ type = int ,
389+ default = 60 ,
390+ help = "Alignment step in seconds for rate metrics (default 60)" ,
391+ )
313392 args = parser .parse_args ()
314393
315394 # Auth
@@ -324,17 +403,28 @@ def main():
324403 (VALKEY_METRICS , "Valkey" ),
325404 (CLUSTER_METRICS , "Redis Cluster" ),
326405 ):
327- rows = collect_for_product (client , args .project , args .duration , args .step , metric_map , label )
406+ rows = collect_for_product (
407+ client , args .project , args .duration , args .step , metric_map , label
408+ )
328409 all_rows .extend (rows )
329410
330411 if not all_rows :
331412 print ("Warning: No metrics found; CSV will be created with no rows." )
332413
333414 # Build header: union of keys across rows, with useful columns first
334415 base_order = [
335- "Source" , "Project ID" , "InstanceType" ,
336- "ClusterId" , "InstanceId" , "NodeId" , "NodeRole" , "NodeType" , "Region" , "Zone" ,
337- "BytesUsedForCache" , "MaxMemory" ,
416+ "Source" ,
417+ "Project ID" ,
418+ "InstanceType" ,
419+ "ClusterId" ,
420+ "InstanceId" ,
421+ "NodeId" ,
422+ "NodeRole" ,
423+ "NodeType" ,
424+ "Region" ,
425+ "Zone" ,
426+ "BytesUsedForCache" ,
427+ "MaxMemory" ,
338428 ]
339429 category_keys = []
340430 for row in all_rows :
@@ -353,5 +443,6 @@ def main():
353443
354444 print (f"Wrote { len (all_rows )} rows to { args .out } " )
355445
446+
356447if __name__ == "__main__" :
357448 sys .exit (main ())
0 commit comments