Skip to content

Commit 3861e32

Browse files
indrayuddIndrayudd Roy Chowdhury
andauthored
TutorTask541_Save_GridStatus_metadata_with_series_as_minimal_unit (#545)
Co-authored-by: Indrayudd Roy Chowdhury <indro@Indrayudds-MacBook-Air.local>
1 parent 4c7d960 commit 3861e32

1 file changed

Lines changed: 305 additions & 0 deletions

File tree

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
#!/usr/bin/env python
2+
"""
3+
Convert the dataset-per-row schema of the Gridstatus metadata into a series-
4+
per-row schema and upload the result back into the same S3 bucket.
5+
6+
> causal_automl/postprocess_gridstatus_metadata.py \
7+
--aws_profile ck \
8+
--bucket_path s3://causify-data-collaborators/causal_automl/metadata/ \
9+
--input_version v1.0 \
10+
--output_version v2.0
11+
12+
Import as:
13+
14+
import causal_automl.postprocess_gridstatus_metadata as capogrme
15+
"""
16+
17+
import argparse
18+
import ast
19+
import io
20+
import logging
21+
import os
22+
import re
23+
from typing import Any, Dict, List
24+
25+
import helpers.hdbg as hdbg
26+
import helpers.hio as hio
27+
import helpers.hs3 as hs3
28+
import pandas as pd
29+
30+
# Configure logger.
31+
_LOG = logging.getLogger(__name__)
32+
33+
34+
# #############################################################################
35+
# _GridstatusMetadataWriter
36+
# #############################################################################
37+
38+
39+
class _GridstatusMetadataWriter:
40+
"""
41+
Save Gridstatus metadata and upload to S3.
42+
"""
43+
44+
def __init__(
45+
self,
46+
bucket_path: str,
47+
aws_profile: str,
48+
*,
49+
cache_dir: str = "tmp.download_metadata_cache/",
50+
) -> None:
51+
"""
52+
Initialize the writer for saving postprocessed metadata to S3.
53+
54+
:param bucket_path: base S3 path where files will be uploaded
55+
(e.g., "s3://bucket/dir/")
56+
:param aws_profile: AWS CLI profile name used for authentication
57+
:param cache_dir: cache directory path
58+
"""
59+
self._bucket_path = bucket_path
60+
self._aws_profile = aws_profile
61+
self.cache_dir = cache_dir
62+
63+
def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None:
64+
"""
65+
Save the data as a local CSV file and upload it to S3.
66+
67+
:param df: data to be saved to S3
68+
:param file_name: local file name for saving
69+
"""
70+
local_file_path = os.path.join(self.cache_dir, file_name)
71+
hio.create_dir(os.path.dirname(local_file_path), incremental=True)
72+
# Save CSV locally.
73+
df.to_csv(local_file_path, index=False)
74+
_LOG.debug("Saved CSV locally to: %s", local_file_path)
75+
# Upload CSV to the specified S3 bucket.
76+
bucket_file_path = self._bucket_path + file_name
77+
hs3.copy_file_to_s3(local_file_path, bucket_file_path, self._aws_profile)
78+
_LOG.debug("Uploaded to S3: %s", bucket_file_path)
79+
80+
81+
def _load_data(file_path: str, aws_profile: str) -> pd.DataFrame:
82+
"""
83+
Load data from S3 path to a dataframe.
84+
85+
:param file_path: S3 path of the data to load from
86+
:param aws_profile: aws profile that accesses S3 bucket
87+
:return: the loaded data
88+
"""
89+
file = hs3.from_file(file_path, aws_profile=aws_profile)
90+
df = pd.read_csv(io.StringIO(file))
91+
_LOG.info("Data successfully loaded from %s.", file_path)
92+
return df
93+
94+
95+
def _prettify(col: str) -> str:
96+
"""
97+
Convert snake_case to Title Case.
98+
99+
E.g., “spinning_reserves” to “Spinning Reserves”
100+
101+
:param col: column name to prettify
102+
:return: prettified column name
103+
"""
104+
tokens = re.sub(r"[_\s]+", " ", col).strip().split()
105+
prettified = " ".join(t.capitalize() for t in tokens)
106+
return prettified
107+
108+
109+
def _build_series_row(
110+
base_row: pd.Series,
111+
col_name: str,
112+
dataset_id: str,
113+
dataset_name: str,
114+
) -> Dict[str, Any]:
115+
"""
116+
Build new rows with the `id_series` and `name_series` columns.
117+
118+
:param base_row: original row
119+
:param col_name: name of the column representing the series
120+
:param dataset_id: id of the collection of series
121+
:param dataset_name: name of the collection of series
122+
:return: modified row with the new columns added
123+
"""
124+
# Start with the original row.
125+
new_row: Dict[str, Any] = base_row.to_dict()
126+
# Add the two series identifiers.
127+
new_row["id_series"] = f"{dataset_id}.{col_name}"
128+
new_row["name_series"] = f"{dataset_name} / {_prettify(col_name)}"
129+
return new_row
130+
131+
132+
def _expand_dataset_row(row: pd.Series) -> List[Dict[str, Any]]:
133+
"""
134+
Expand a row with the dataset info into rows for its series.
135+
136+
E.g.,
137+
Input row:
138+
```
139+
id name ....
140+
caiso_as_prices CAISO AS Prices ....
141+
/
142+
all_columns
143+
[{'name': 'interval_start_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True},\
144+
{'name': 'interval_end_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True}, \
145+
{'name': 'region', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
146+
{'name': 'market', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
147+
{'name': 'non_spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
148+
'is_datetime': False}, \
149+
{'name': 'regulation_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
150+
'is_datetime': False}, \
151+
{'name': 'regulation_mileage_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
152+
'is_datetime': False}, \
153+
{'name': 'regulation_mileage_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
154+
'is_datetime': False}, \
155+
{'name': 'regulation_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
156+
'is_datetime': False}, \
157+
{'name': 'spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
158+
'is_datetime': False}]
159+
```
160+
Output rows:
161+
```
162+
id name ....
163+
caiso_as_prices CAISO AS Prices ....
164+
caiso_as_prices CAISO AS Prices ....
165+
caiso_as_prices CAISO AS Prices ....
166+
caiso_as_prices CAISO AS Prices ....
167+
caiso_as_prices CAISO AS Prices ....
168+
caiso_as_prices CAISO AS Prices ....
169+
/
170+
id_series name_series
171+
caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves
172+
caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down
173+
caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down
174+
caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up
175+
caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up
176+
caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves
177+
```
178+
179+
:param row: row to transform
180+
:return: the collection of expanded rows
181+
"""
182+
dataset_id: str = row["id"]
183+
dataset_name: str = row["name"]
184+
# Iterate through all columns and generate the row-per-series view.
185+
expanded: List[Dict[str, Any]] = []
186+
for col_meta in ast.literal_eval(row["all_columns"]):
187+
col_name: str = col_meta["name"]
188+
# Expand only with columns that contain numeric time series.
189+
if not col_meta.get("is_numeric"):
190+
continue
191+
expanded.append(
192+
_build_series_row(row, col_name, dataset_id, dataset_name)
193+
)
194+
return expanded
195+
196+
197+
def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame:
198+
"""
199+
Transform the whole dataset into the row-per-series view.
200+
201+
E.g.,
202+
Input dataset:
203+
```
204+
id name ....
205+
caiso_as_prices CAISO AS Prices ....
206+
...
207+
/
208+
all_columns
209+
[{'name': 'interval_start_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True},\
210+
{'name': 'interval_end_utc', 'type': 'TIMESTAMP', 'is_numeric': False, 'is_datetime': True}, \
211+
{'name': 'region', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
212+
{'name': 'market', 'type': 'VARCHAR', 'is_numeric': False, 'is_datetime': False}, \
213+
{'name': 'non_spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
214+
'is_datetime': False}, \
215+
{'name': 'regulation_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
216+
'is_datetime': False}, \
217+
{'name': 'regulation_mileage_down', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
218+
'is_datetime': False}, \
219+
{'name': 'regulation_mileage_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
220+
'is_datetime': False}, \
221+
{'name': 'regulation_up', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
222+
'is_datetime': False}, \
223+
{'name': 'spinning_reserves', 'type': 'DOUBLE PRECISION', 'is_numeric': True, \
224+
'is_datetime': False}]
225+
...
226+
```
227+
Output dataset:
228+
```
229+
id_series name_series
230+
caiso_as_prices.non_spinning_reserves CAISO AS Prices / Non Spinning Reserves
231+
caiso_as_prices.regulation_down CAISO AS Prices / Regulation Down
232+
caiso_as_prices.regulation_mileage_down CAISO AS Prices / Regulation Mileage Down
233+
caiso_as_prices.regulation_mileage_up CAISO AS Prices / Regulation Mileage Up
234+
caiso_as_prices.regulation_up CAISO AS Prices / Regulation Up
235+
caiso_as_prices.spinning_reserves CAISO AS Prices / Spinning Reserves
236+
...
237+
/
238+
id name ....
239+
caiso_as_prices CAISO AS Prices ....
240+
caiso_as_prices CAISO AS Prices ....
241+
caiso_as_prices CAISO AS Prices ....
242+
caiso_as_prices CAISO AS Prices ....
243+
caiso_as_prices CAISO AS Prices ....
244+
caiso_as_prices CAISO AS Prices ....
245+
...
246+
```
247+
248+
:param df: data to transform
249+
:return: transformed data
250+
"""
251+
expanded_rows: List[Dict[str, Any]] = []
252+
for _, dataset_row in df.iterrows():
253+
expanded_rows.extend(_expand_dataset_row(dataset_row))
254+
result = pd.DataFrame(expanded_rows)
255+
# Move the series-defining columns to the beginning.
256+
leading = ["id_series", "name_series"]
257+
remaining = [c for c in result.columns if c not in leading]
258+
transformed_df = result[leading + remaining]
259+
return transformed_df
260+
261+
262+
def _parse() -> argparse.Namespace:
263+
parser = argparse.ArgumentParser(
264+
description=__doc__,
265+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
266+
)
267+
parser.add_argument(
268+
"--aws_profile", default="ck", help="AWS CLI profile for authentication"
269+
)
270+
parser.add_argument(
271+
"--bucket_path",
272+
default="s3://causify-data-collaborators/causal_automl/metadata/",
273+
help="Destination S3 directory (trailing slash optional)",
274+
)
275+
parser.add_argument(
276+
"--input_version",
277+
help="Version of the source metadata file",
278+
)
279+
parser.add_argument(
280+
"--output_version", help="Version tag for the result file"
281+
)
282+
parser.add_argument(
283+
"--log_level", type=int, default=logging.INFO, help="Logging level"
284+
)
285+
return parser.parse_args()
286+
287+
288+
def _main(args: argparse.Namespace) -> None:
289+
hdbg.init_logger(verbosity=args.log_level, use_exec_path=True)
290+
# Load data.
291+
src_file = (
292+
f"{args.bucket_path.rstrip('/')}/gridstatus_metadata_original_"
293+
f"{args.input_version}.csv"
294+
)
295+
gs_meta = _load_data(src_file, args.aws_profile)
296+
# Transform data to a row-per-series view.
297+
gs_meta_rps = create_series_metadata(gs_meta)
298+
# Save transformed dataset to S3.
299+
writer = _GridstatusMetadataWriter(args.bucket_path, args.aws_profile)
300+
dst_file = f"gridstatus_metadata_original_{args.output_version}.csv"
301+
writer.write_df_to_s3(gs_meta_rps, dst_file)
302+
303+
304+
if __name__ == "__main__":
305+
_main(_parse())

0 commit comments

Comments
 (0)