1+ #!/usr/bin/env python
12"""
3+ Convert the dataset-per-row metadata of the Gridstatus metadata into a series-
4+ per-row schema and upload the result back into the same S3 bucket.
5+
6+ > python causal_automl/postprocess_gridstatus_metadata.py \
7+ --aws_profile ck \
8+ --bucket_path s3://causify-data-collaborators/causal_automl/metadata/ \
9+ --input_version v1.0 \
10+ --output_version v2.0
11+
212Import as:
313
414import causal_automl.postprocess_gridstatus_metadata as capogrme
515"""
616
17+ import argparse
718import ast
819import io
920import logging
1021import os
1122import re
12- from typing import Dict , Iterable , List
23+ from typing import Any , Dict , List
1324
1425import helpers .hdbg as hdbg
15- import helpers .henv as henv
1626import helpers .hio as hio
17- import helpers .hpandas as hpandas
1827import helpers .hs3 as hs3
1928import pandas as pd
2029
2130# Configure logger.
2231hdbg .init_logger (verbosity = logging .INFO )
2332_LOG = logging .getLogger (__name__ )
2433
25- # Print system signature.
26- _LOG .info ("%s" , henv .get_system_signature ()[0 ])
27-
2834
2935# #############################################################################
3036# _GridstatusMetadataWriter
@@ -36,16 +42,23 @@ class _GridstatusMetadataWriter:
3642 Save Gridstatus metadata and upload to S3.
3743 """
3844
39- def __init__ (self , bucket_path : str , aws_profile : str ) -> None :
45+ def __init__ (
46+ self ,
47+ bucket_path : str ,
48+ aws_profile : str ,
49+ cache_dir : str = "tmp.download_metadata_cache/" ,
50+ ) -> None :
4051 """
41- Initialize the writer for saving metadata and facet values to S3.
52+ Initialize the writer for saving postprocessed metadata to S3.
4253
4354 :param bucket_path: base S3 path where files will be uploaded
4455 (e.g., "s3://bucket/dir/")
4556 :param aws_profile: AWS CLI profile name used for authentication
57+ :param cache_dir: cache directory path
4658 """
4759 self ._bucket_path = bucket_path
4860 self ._aws_profile = aws_profile
61+ self .cache_dir = cache_dir
4962
5063 def write_df_to_s3 (self , df : pd .DataFrame , file_name : str ) -> None :
5164 """
@@ -54,8 +67,7 @@ def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None:
5467 :param df: data to be saved to S3
5568 :param file_name: local file name for saving
5669 """
57- cache_dir = "tmp.download_metadata_cache/"
58- local_file_path = os .path .join (cache_dir , file_name )
70+ local_file_path = os .path .join (self .cache_dir , file_name )
5971 hio .create_dir (os .path .dirname (local_file_path ), incremental = True )
6072 # Save CSV locally.
6173 df .to_csv (local_file_path , index = False )
@@ -66,18 +78,17 @@ def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None:
6678 _LOG .debug ("Uploaded to S3: %s" , bucket_file_path )
6779
6880
69- def _load_data (file_path : str ) -> pd .DataFrame :
81+ def _load_data (file_path : str , aws_profile : str ) -> pd .DataFrame :
7082 """
71- Load data from file path to a dataframe.
83+ Load data from S3 path to a dataframe.
7284
73- :param file_path: path of the data to load from
74- :return: dataframe of the loaded data
85+ :param file_path: S3 path of the data to load from
86+ :param aws_profile: aws profile that accesses S3 bucket
87+ :return: the queried metadata
7588 """
76- file = hs3 .from_file (file_path , aws_profile = "ck" )
89+ file = hs3 .from_file (file_path , aws_profile = aws_profile )
7790 df = pd .read_csv (io .StringIO (file ))
78- _LOG .info ("shape: %s" , df .shape )
79- _LOG .info ("columns: %s" , df .columns )
80- _LOG .info ("df: \n %s" , hpandas .df_to_str (df , log_level = logging .INFO ))
91+ _LOG .info ("Data Successfully Downloaded." )
8192 return df
8293
8394
@@ -91,34 +102,60 @@ def _prettify(col: str) -> str:
91102 :return: prettified column name
92103 """
93104 tokens = re .sub (r"[_\s]+" , " " , col ).strip ().split ()
94- return " " .join (t .capitalize () for t in tokens )
105+ prettified = " " .join (t .capitalize () for t in tokens )
106+ return prettified
95107
96108
97109def _build_series_row (
98110 base_row : pd .Series ,
99111 col_name : str ,
100112 dataset_id : str ,
101113 dataset_name : str ,
102- ) -> Dict [str , object ]:
114+ ) -> Dict [str , Any ]:
103115 """
104- Build new rows with the `id_series` and `num_series ` columns.
116+ Build new rows with the `id_series` and `name_series ` columns.
105117
106118 :param base_row: original row
107119 :param col_name: column name to prettify
120+ :param dataset_id: id of the data series
121+ :param dataset_name: name of the collection of series
122+ :return: modified row
108123 """
109- nice_col_name = _prettify (col_name )
110124 # Start with the original row.
111125 new_row : Dict [str , object ] = base_row .to_dict ()
112126 # Add the two series identifiers.
113127 new_row ["id_series" ] = f"{ dataset_id } .{ col_name } "
114- new_row ["name_series" ] = f"{ dataset_name } / { nice_col_name } "
128+ new_row ["name_series" ] = f"{ dataset_name } / { _prettify ( col_name ) } "
115129 return new_row
116130
117131
118- def _explode_dataset_row (row : pd .Series ) -> Iterable [Dict [str , object ]]:
132+ def _explode_dataset_row (row : pd .Series ) -> List [Dict [str , Any ]]:
119133 """
120134 Transform a single row into the row-per-series view.
121135
136+ E.g.,
137+ Input row:
138+ id name ....
139+ caiso_as_prices CAISO AS Prices ....
140+
141+ Output row:
142+ id name ....
143+ caiso_as_prices CAISO AS Prices ....
144+ caiso_as_prices CAISO AS Prices ....
145+ caiso_as_prices CAISO AS Prices ....
146+ caiso_as_prices CAISO AS Prices ....
147+ caiso_as_prices CAISO AS Prices ....
148+ caiso_as_prices CAISO AS Prices ....
149+ /
150+ id_series name_series
151+ caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves
152+ caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down
153+ caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down
154+ caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up
155+ caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up
156+ caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves
157+
158+
122159 :param row: row to transform
123160 :return: the exploded row
124161 """
@@ -127,47 +164,98 @@ def _explode_dataset_row(row: pd.Series) -> Iterable[Dict[str, object]]:
127164 # Ignore primary key columns.
128165 ignore_cols = set (ast .literal_eval (row ["primary_key_columns" ]))
129166 # Iterate through all columns and generate the row-per-series view.
167+ exploded : List [Dict [str , Any ]] = []
130168 for col_meta in ast .literal_eval (row ["all_columns" ]):
131169 col_name : str = col_meta ["name" ]
132170 if col_meta .get ("is_datetime" ) or col_name in ignore_cols :
133171 continue
134- yield _build_series_row (row , col_name , dataset_id , dataset_name )
172+ exploded .append (
173+ _build_series_row (row , col_name , dataset_id , dataset_name )
174+ )
175+ return exploded
135176
136177
137178def create_series_metadata (df : pd .DataFrame ) -> pd .DataFrame :
138179 """
139180 Transform the whole dataset into the row-per-series view.
140181
182+ E.g.,
183+ Input dataset:
184+ id name ....
185+ caiso_as_prices CAISO AS Prices ....
186+ ...
187+
188+ Output dataset:
189+ id_series name_series
190+ caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves
191+ caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down
192+ caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down
193+ caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up
194+ caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up
195+ caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves
196+ ...
197+ /
198+ id name ....
199+ caiso_as_prices CAISO AS Prices ....
200+ caiso_as_prices CAISO AS Prices ....
201+ caiso_as_prices CAISO AS Prices ....
202+ caiso_as_prices CAISO AS Prices ....
203+ caiso_as_prices CAISO AS Prices ....
204+ caiso_as_prices CAISO AS Prices ....
205+ ...
206+
141207 :param df: data to transform
142208 :return: transformed data
143209 """
144- exploded_rows : List [Dict [str , object ]] = [
145- row
146- for _ , dataset_row in df .iterrows ()
147- for row in _explode_dataset_row (dataset_row )
148- ]
210+ exploded_rows : List [Dict [str , Any ]] = []
211+ for _ , dataset_row in df .iterrows ():
212+ exploded_rows .extend (_explode_dataset_row (dataset_row ))
149213 result = pd .DataFrame (exploded_rows )
150214 # Arrange according to desired ordering.
151215 leading = ["id_series" , "name_series" ]
152216 remaining = [c for c in result .columns if c not in leading ]
153- return result [leading + remaining ]
217+ transformed_df = result [leading + remaining ]
218+ return transformed_df
154219
155220
156- # Main flow.
157- if __name__ == "__main__" :
158- # Configure S3.
159- aws_profile = "ck"
160- bucket_root = hs3 .get_s3_bucket_path (aws_profile )
161- bucket_path = "s3://causify-data-collaborators/causal_automl/metadata/"
162- file_name = "gridstatus_metadata_original_v2.0.csv"
163- writer = _GridstatusMetadataWriter (bucket_path , aws_profile )
221+ def _parse () -> argparse .Namespace :
222+ parser = argparse .ArgumentParser (
223+ description = __doc__ ,
224+ formatter_class = argparse .ArgumentDefaultsHelpFormatter ,
225+ )
226+ parser .add_argument (
227+ "--aws_profile" , default = "ck" , help = "AWS CLI profile for authentication"
228+ )
229+ parser .add_argument (
230+ "--bucket_path" ,
231+ default = "s3://causify-data-collaborators/causal_automl/metadata/" ,
232+ help = "Destination S3 directory (trailing slash optional)" ,
233+ )
234+ parser .add_argument (
235+ "--input_version" ,
236+ default = "v1.0" ,
237+ help = "Version of the source metadata file" ,
238+ )
239+ parser .add_argument (
240+ "--output_version" , default = "v2.0" , help = "Version tag for the result file"
241+ )
242+ return parser .parse_args ()
243+
244+
245+ def _main (args : argparse .Namespace ) -> None :
164246 # Load data.
165- v1_path = (
166- "s3://causify-data-collaborators/causal_automl/metadata/ "
167- "gridstatus_metadata_original_v1.0 .csv"
247+ src_file = (
248+ f" { args . bucket_path . rstrip ( '/' ) } /gridstatus_metadata_original_ "
249+ f" { args . input_version } .csv"
168250 )
169- gs_meta = _load_data (v1_path )
251+ gs_meta = _load_data (src_file , args . aws_profile )
170252 # Transform data to a row-per-series view.
171253 gs_meta_rps = create_series_metadata (gs_meta )
172254 # Save transformed dataset to S3.
173- writer .write_df_to_s3 (gs_meta_rps , file_name )
255+ writer = _GridstatusMetadataWriter (args .bucket_path , args .aws_profile )
256+ dst_file = f"gridstatus_metadata_original_{ args .output_version } .csv"
257+ writer .write_df_to_s3 (gs_meta_rps , dst_file )
258+
259+
260+ if __name__ == "__main__" :
261+ _main (_parse ())
0 commit comments