Skip to content

Commit dfe5bc6

Browse files
Indrayudd Roy ChowdhuryIndrayudd Roy Chowdhury
authored andcommitted
TutorTask541: Implementation and Saving of a different View for the Gridstatus Metadata in S3
1 parent 1c6e6a0 commit dfe5bc6

1 file changed

Lines changed: 172 additions & 0 deletions

File tree

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""
2+
Import as:
3+
4+
import causal_automl.postprocess_gridstatus_metadata as capogrme
5+
"""
6+
7+
import ast
8+
import io
9+
import logging
10+
import os
11+
import re
12+
from typing import Dict, Iterable, List
13+
14+
import helpers.hdbg as hdbg
15+
import helpers.henv as henv
16+
import helpers.hio as hio
17+
import helpers.hpandas as hpandas
18+
import helpers.hs3 as hs3
19+
import pandas as pd
20+
21+
# Configure logger.
22+
hdbg.init_logger(verbosity=logging.INFO)
23+
_LOG = logging.getLogger(__name__)
24+
25+
# Print system signature.
26+
_LOG.info("%s", henv.get_system_signature()[0])
27+
28+
29+
# #############################################################################
30+
# _GridstatusMetadataWriter
31+
# #############################################################################
32+
33+
34+
class _GridstatusMetadataWriter:
35+
"""
36+
Save Gridstatus metadata and upload to S3.
37+
"""
38+
39+
def __init__(self, bucket_path: str, aws_profile: str) -> None:
40+
"""
41+
Initialize the writer for saving metadata and facet values to S3.
42+
43+
:param bucket_path: base S3 path where files will be uploaded
44+
(e.g., "s3://bucket/dir/")
45+
:param aws_profile: AWS CLI profile name used for authentication
46+
"""
47+
self._bucket_path = bucket_path
48+
self._aws_profile = aws_profile
49+
50+
def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None:
51+
"""
52+
Save the data as a local CSV file and upload it to S3.
53+
54+
:param df: data to be saved to S3
55+
:param file_name: local file name for saving
56+
"""
57+
cache_dir = "tmp.download_metadata_cache/"
58+
local_file_path = os.path.join(cache_dir, file_name)
59+
hio.create_dir(os.path.dirname(local_file_path), incremental=True)
60+
# Save CSV locally.
61+
df.to_csv(local_file_path, index=False)
62+
_LOG.debug("Saved CSV locally to: %s", local_file_path)
63+
# Upload CSV to the specified S3 bucket.
64+
bucket_file_path = self._bucket_path + file_name
65+
hs3.copy_file_to_s3(local_file_path, bucket_file_path, self._aws_profile)
66+
_LOG.debug("Uploaded to S3: %s", bucket_file_path)
67+
68+
69+
def _load_data(file_path: str) -> pd.DataFrame:
70+
"""
71+
Load data from file path to a dataframe.
72+
73+
:param file_path: path of the data to load from
74+
:return: dataframe of the loaded data
75+
"""
76+
file = hs3.from_file(file_path, aws_profile="ck")
77+
df = pd.read_csv(io.StringIO(file))
78+
_LOG.info("shape: %s", df.shape)
79+
_LOG.info("columns: %s", df.columns)
80+
_LOG.info("df: \n %s", hpandas.df_to_str(df, log_level=logging.INFO))
81+
return df
82+
83+
84+
def _prettify(col: str) -> str:
85+
"""
86+
Convert snake_case to Title Case (“spinning_reserves” ⇒ “Spinning
87+
Reserves”).
88+
89+
:param col: column name to prettify
90+
:return: prettified column name
91+
"""
92+
tokens = re.sub(r"[_\s]+", " ", col).strip().split()
93+
return " ".join(t.capitalize() for t in tokens)
94+
95+
96+
def _build_series_row(
97+
base_row: pd.Series,
98+
col_name: str,
99+
dataset_id: str,
100+
dataset_name: str,
101+
) -> Dict[str, object]:
102+
"""
103+
Build new rows with the `id_series` and `num_series` columns.
104+
105+
:param base_row: original row
106+
:param col_name: column name to prettify
107+
"""
108+
nice_col_name = _prettify(col_name)
109+
# Start with the original row.
110+
new_row: Dict[str, object] = base_row.to_dict()
111+
# Add the two series identifiers.
112+
new_row["id_series"] = f"{dataset_id}.{col_name}"
113+
new_row["name_series"] = f"{dataset_name} / {nice_col_name}"
114+
return new_row
115+
116+
117+
def _explode_dataset_row(row: pd.Series) -> Iterable[Dict[str, object]]:
118+
"""
119+
Transform a single row into the row-per-series view.
120+
121+
:param row: row to transform
122+
:return: the exploded row
123+
"""
124+
dataset_id: str = row["id"]
125+
dataset_name: str = row["name"]
126+
# Ignore primary key columns.
127+
ignore_cols = set(ast.literal_eval(row["primary_key_columns"]))
128+
# Iterate through all columns and generate the row-per-series view.
129+
for col_meta in ast.literal_eval(row["all_columns"]):
130+
col_name: str = col_meta["name"]
131+
if col_meta.get("is_datetime") or col_name in ignore_cols:
132+
continue
133+
yield _build_series_row(row, col_name, dataset_id, dataset_name)
134+
135+
136+
def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame:
137+
"""
138+
Transform the whole dataset into the row-per-series view.
139+
140+
:param df: data to transform
141+
:return: transformed data
142+
"""
143+
exploded_rows: List[Dict[str, object]] = [
144+
row
145+
for _, dataset_row in df.iterrows()
146+
for row in _explode_dataset_row(dataset_row)
147+
]
148+
result = pd.DataFrame(exploded_rows)
149+
# Arrange according to desired ordering.
150+
leading = ["id_series", "name_series"]
151+
remaining = [c for c in result.columns if c not in leading]
152+
return result[leading + remaining]
153+
154+
155+
# Main flow.
156+
if __name__ == "__main__":
157+
# Configure S3.
158+
aws_profile = "ck"
159+
bucket_root = hs3.get_s3_bucket_path(aws_profile)
160+
bucket_path = "s3://causify-data-collaborators/causal_automl/metadata/"
161+
file_name = "gridstatus_metadata_original_v2.0.csv"
162+
writer = _GridstatusMetadataWriter(bucket_path, aws_profile)
163+
# Load data.
164+
v1_path = (
165+
"s3://causify-data-collaborators/causal_automl/metadata/"
166+
"gridstatus_metadata_original_v1.0.csv"
167+
)
168+
gs_meta = _load_data(v1_path)
169+
# Transform data to a row-per-series view.
170+
gs_meta_rps = create_series_metadata(gs_meta)
171+
# Save transformed dataset to S3.
172+
writer.write_df_to_s3(gs_meta_rps, file_name)

0 commit comments

Comments
 (0)