11#!/usr/bin/env python
22"""
3- Convert the dataset-per-row metadata of the Gridstatus metadata into a series-
3+ Convert the dataset-per-row schema of the Gridstatus metadata into a series-
44per-row schema and upload the result back into the same S3 bucket.
55
6- > python causal_automl/postprocess_gridstatus_metadata.py \
6+ > causal_automl/postprocess_gridstatus_metadata.py \
77 --aws_profile ck \
88 --bucket_path s3://causify-data-collaborators/causal_automl/metadata/ \
99 --input_version v1.0 \
2828import pandas as pd
2929
3030# Configure logger.
31- hdbg .init_logger (verbosity = logging .INFO )
3231_LOG = logging .getLogger (__name__ )
3332
3433
@@ -46,6 +45,7 @@ def __init__(
4645 self ,
4746 bucket_path : str ,
4847 aws_profile : str ,
48+ * ,
4949 cache_dir : str = "tmp.download_metadata_cache/" ,
5050 ) -> None :
5151 """
@@ -84,11 +84,11 @@ def _load_data(file_path: str, aws_profile: str) -> pd.DataFrame:
8484
8585 :param file_path: S3 path of the data to load from
8686 :param aws_profile: aws profile that accesses S3 bucket
87- :return: the queried metadata
87+ :return: the loaded data
8888 """
8989 file = hs3 .from_file (file_path , aws_profile = aws_profile )
9090 df = pd .read_csv (io .StringIO (file ))
91- _LOG .info ("Data Successfully Downloaded." )
91+ _LOG .info ("Data Successfully Downloaded from %s." , file_path )
9292 return df
9393
9494
@@ -116,63 +116,89 @@ def _build_series_row(
116116 Build new rows with the `id_series` and `name_series` columns.
117117
118118 :param base_row: original row
119- :param col_name: column name to prettify
120- :param dataset_id: id of the data series
119+ :param col_name: name of the column representing the series
120+ :param dataset_id: id of the collection of series
121121 :param dataset_name: name of the collection of series
122- :return: modified row
122+ :return: modified row with the new columns added
123123 """
124124 # Start with the original row.
125- new_row : Dict [str , object ] = base_row .to_dict ()
125+ new_row : Dict [str , Any ] = base_row .to_dict ()
126126 # Add the two series identifiers.
127127 new_row ["id_series" ] = f"{ dataset_id } .{ col_name } "
128128 new_row ["name_series" ] = f"{ dataset_name } / { _prettify (col_name )} "
129129 return new_row
130130
131131
132- def _explode_dataset_row (row : pd .Series ) -> List [Dict [str , Any ]]:
132+ def _expand_dataset_row (row : pd .Series ) -> List [Dict [str , Any ]]:
133133 """
134- Transform a single row into the row-per-series view.
134+ Expand a row representing a collection into multiple representing each
135+ series.
135136
136137 E.g.,
137138 Input row:
139+ ```
138140 id name ....
139141 caiso_as_prices CAISO AS Prices ....
140-
141- Output row:
142+ /
143+ all_columns
144+ [{'name': 'interval_start_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True},\
145+ {'name': 'interval_end_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True}, \
146+ {'name': 'region', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
147+ {'name': 'market', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
148+ {'name': 'non_spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
149+ 'is_datetime': False}, \
150+ {'name': 'regulation_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
151+ 'is_datetime': False}, \
152+ {'name': 'regulation_mileage_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
153+ 'is_datetime': False}, \
154+ {'name': 'regulation_mileage_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
155+ 'is_datetime': False}, \
156+ {'name': 'regulation_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
157+ 'is_datetime': False}, \
158+ {'name': 'spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
159+ 'is_datetime': False}]
160+ ```
161+ Output rows:
162+ ```
142163 id name ....
143164 caiso_as_prices CAISO AS Prices ....
144165 caiso_as_prices CAISO AS Prices ....
145166 caiso_as_prices CAISO AS Prices ....
146167 caiso_as_prices CAISO AS Prices ....
147168 caiso_as_prices CAISO AS Prices ....
148169 caiso_as_prices CAISO AS Prices ....
170+ caiso_as_prices CAISO AS Prices ....
171+ caiso_as_prices CAISO AS Prices ....
172+ caiso_as_prices CAISO AS Prices ....
173+ caiso_as_prices CAISO AS Prices ....
149174 /
150175 id_series name_series
176+ caiso_as_prices.interval_start_utc CAISO AS Prices / Interval Start Utc
177+ caiso_as_prices.interval_end_utc CAISO AS Prices / Interval End Utc
178+ caiso_as_prices.region CAISO AS Prices / Region
179+ caiso_as_prices.market CAISO AS Prices / Market
151180 caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves
152181 caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down
153182 caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down
154183 caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up
155184 caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up
156185 caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves
186+ ```
157187
158188
159189 :param row: row to transform
160- :return: the exploded row
190+ :return: the collection of expanded rows
161191 """
162192 dataset_id : str = row ["id" ]
163193 dataset_name : str = row ["name" ]
164- # Ignore primary key columns.
165- ignore_cols = set (ast .literal_eval (row ["primary_key_columns" ]))
166194 # Iterate through all columns and generate the row-per-series view.
167- exploded : List [Dict [str , Any ]] = []
195+ expanded : List [Dict [str , Any ]] = []
168196 for col_meta in ast .literal_eval (row ["all_columns" ]):
169197 col_name : str = col_meta ["name" ]
170- if col_meta .get ("is_datetime" ) or col_name in ignore_cols :
171- continue
172- exploded .append (
198+ expanded .append (
173199 _build_series_row (row , col_name , dataset_id , dataset_name )
174200 )
175- return exploded
201+ return expanded
176202
177203
178204def create_series_metadata (df : pd .DataFrame ) -> pd .DataFrame :
@@ -181,12 +207,37 @@ def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame:
181207
182208 E.g.,
183209 Input dataset:
210+ ```
184211 id name ....
185212 caiso_as_prices CAISO AS Prices ....
186213 ...
187-
214+ /
215+ all_columns
216+ [{'name': 'interval_start_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True},\
217+ {'name': 'interval_end_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True}, \
218+ {'name': 'region', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
219+ {'name': 'market', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
220+ {'name': 'non_spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
221+ 'is_datetime': False}, \
222+ {'name': 'regulation_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
223+ 'is_datetime': False}, \
224+ {'name': 'regulation_mileage_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
225+ 'is_datetime': False}, \
226+ {'name': 'regulation_mileage_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
227+ 'is_datetime': False}, \
228+ {'name': 'regulation_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
229+ 'is_datetime': False}, \
230+ {'name': 'spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
231+ 'is_datetime': False}]
232+ ...
233+ ```
188234 Output dataset:
235+ ```
189236 id_series name_series
237+ caiso_as_prices.interval_start_utc CAISO AS Prices / Interval Start Utc
238+ caiso_as_prices.interval_end_utc CAISO AS Prices / Interval End Utc
239+ caiso_as_prices.region CAISO AS Prices / Region
240+ caiso_as_prices.market CAISO AS Prices / Market
190241 caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves
191242 caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down
192243 caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down
@@ -202,16 +253,21 @@ def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame:
202253 caiso_as_prices CAISO AS Prices ....
203254 caiso_as_prices CAISO AS Prices ....
204255 caiso_as_prices CAISO AS Prices ....
256+ caiso_as_prices CAISO AS Prices ....
257+ caiso_as_prices CAISO AS Prices ....
258+ caiso_as_prices CAISO AS Prices ....
259+ caiso_as_prices CAISO AS Prices ....
205260 ...
261+ ```
206262
207263 :param df: data to transform
208264 :return: transformed data
209265 """
210- exploded_rows : List [Dict [str , Any ]] = []
266+ expanded_rows : List [Dict [str , Any ]] = []
211267 for _ , dataset_row in df .iterrows ():
212- exploded_rows .extend (_explode_dataset_row (dataset_row ))
213- result = pd .DataFrame (exploded_rows )
214- # Arrange according to desired ordering .
268+ expanded_rows .extend (_expand_dataset_row (dataset_row ))
269+ result = pd .DataFrame (expanded_rows )
270+ # Move the series-defining columns to the beginning .
215271 leading = ["id_series" , "name_series" ]
216272 remaining = [c for c in result .columns if c not in leading ]
217273 transformed_df = result [leading + remaining ]
@@ -233,16 +289,19 @@ def _parse() -> argparse.Namespace:
233289 )
234290 parser .add_argument (
235291 "--input_version" ,
236- default = "v1.0" ,
237292 help = "Version of the source metadata file" ,
238293 )
239294 parser .add_argument (
240- "--output_version" , default = "v2.0" , help = "Version tag for the result file"
295+ "--output_version" , help = "Version tag for the result file"
296+ )
297+ parser .add_argument (
298+ "--log_level" , type = int , default = logging .INFO , help = "Logging level"
241299 )
242300 return parser .parse_args ()
243301
244302
245303def _main (args : argparse .Namespace ) -> None :
304+ hdbg .init_logger (verbosity = args .log_level , use_exec_path = True )
246305 # Load data.
247306 src_file = (
248307 f"{ args .bucket_path .rstrip ('/' )} /gridstatus_metadata_original_"
0 commit comments