-
Notifications
You must be signed in to change notification settings - Fork 112
TutorTask541_Save_GridStatus_metadata_with_series_as_minimal_unit #545
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
sonniki
merged 6 commits into
master
from
TutorTask541_Save_GridStatus_metadata_with_series_as_minimal_unit
Jun 3, 2025
+305
−0
Merged
Changes from 2 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
dfe5bc6
TutorTask541: Implementation and Saving of a different View for the G…
f7a20ba
TutorTask541: Docstring Improvements
52a73f6
TutorTask541: Reviewer Changes
indrayudd 4ddabd3
Merge branch 'master' into TutorTask541_Save_GridStatus_metadata_with…
indrayudd c9e21ec
TutorTask541: Reviewer Changes
indrayudd d978290
TutorTask541: Reviewer changes
indrayudd File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| """ | ||
| Import as: | ||
|
|
||
| import causal_automl.postprocess_gridstatus_metadata as capogrme | ||
| """ | ||
|
|
||
| import ast | ||
| import io | ||
| import logging | ||
| import os | ||
| import re | ||
| from typing import Dict, Iterable, List | ||
|
|
||
| import helpers.hdbg as hdbg | ||
| import helpers.henv as henv | ||
| import helpers.hio as hio | ||
| import helpers.hpandas as hpandas | ||
| import helpers.hs3 as hs3 | ||
| import pandas as pd | ||
|
|
||
| # Configure logger. | ||
| hdbg.init_logger(verbosity=logging.INFO) | ||
| _LOG = logging.getLogger(__name__) | ||
|
|
||
| # Print system signature. | ||
| _LOG.info("%s", henv.get_system_signature()[0]) | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| # ############################################################################# | ||
| # _GridstatusMetadataWriter | ||
| # ############################################################################# | ||
|
|
||
|
|
||
| class _GridstatusMetadataWriter: | ||
| """ | ||
| Save Gridstatus metadata and upload to S3. | ||
| """ | ||
|
|
||
| def __init__(self, bucket_path: str, aws_profile: str) -> None: | ||
| """ | ||
| Initialize the writer for saving metadata and facet values to S3. | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
|
|
||
| :param bucket_path: base S3 path where files will be uploaded | ||
| (e.g., "s3://bucket/dir/") | ||
| :param aws_profile: AWS CLI profile name used for authentication | ||
| """ | ||
| self._bucket_path = bucket_path | ||
| self._aws_profile = aws_profile | ||
|
|
||
| def write_df_to_s3(self, df: pd.DataFrame, file_name: str) -> None: | ||
| """ | ||
| Save the data as a local CSV file and upload it to S3. | ||
|
|
||
| :param df: data to be saved to S3 | ||
| :param file_name: local file name for saving | ||
| """ | ||
| cache_dir = "tmp.download_metadata_cache/" | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| local_file_path = os.path.join(cache_dir, file_name) | ||
| hio.create_dir(os.path.dirname(local_file_path), incremental=True) | ||
| # Save CSV locally. | ||
| df.to_csv(local_file_path, index=False) | ||
| _LOG.debug("Saved CSV locally to: %s", local_file_path) | ||
| # Upload CSV to the specified S3 bucket. | ||
| bucket_file_path = self._bucket_path + file_name | ||
| hs3.copy_file_to_s3(local_file_path, bucket_file_path, self._aws_profile) | ||
| _LOG.debug("Uploaded to S3: %s", bucket_file_path) | ||
|
|
||
|
|
||
| def _load_data(file_path: str) -> pd.DataFrame: | ||
| """ | ||
| Load data from file path to a dataframe. | ||
|
|
||
| :param file_path: path of the data to load from | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| :return: dataframe of the loaded data | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| """ | ||
| file = hs3.from_file(file_path, aws_profile="ck") | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| df = pd.read_csv(io.StringIO(file)) | ||
| _LOG.info("shape: %s", df.shape) | ||
| _LOG.info("columns: %s", df.columns) | ||
| _LOG.info("df: \n %s", hpandas.df_to_str(df, log_level=logging.INFO)) | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| return df | ||
|
|
||
|
|
||
| def _prettify(col: str) -> str: | ||
| """ | ||
| Convert snake_case to Title Case. | ||
|
|
||
| E.g., “spinning_reserves” to “Spinning Reserves” | ||
|
|
||
| :param col: column name to prettify | ||
| :return: prettified column name | ||
| """ | ||
| tokens = re.sub(r"[_\s]+", " ", col).strip().split() | ||
| return " ".join(t.capitalize() for t in tokens) | ||
|
|
||
|
|
||
| def _build_series_row( | ||
| base_row: pd.Series, | ||
| col_name: str, | ||
| dataset_id: str, | ||
| dataset_name: str, | ||
| ) -> Dict[str, object]: | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| """ | ||
| Build new rows with the `id_series` and `num_series` columns. | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
|
|
||
| :param base_row: original row | ||
| :param col_name: column name to prettify | ||
|
indrayudd marked this conversation as resolved.
Outdated
indrayudd marked this conversation as resolved.
Outdated
|
||
| """ | ||
| nice_col_name = _prettify(col_name) | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| # Start with the original row. | ||
| new_row: Dict[str, object] = base_row.to_dict() | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| # Add the two series identifiers. | ||
| new_row["id_series"] = f"{dataset_id}.{col_name}" | ||
| new_row["name_series"] = f"{dataset_name} / {nice_col_name}" | ||
| return new_row | ||
|
|
||
|
|
||
| def _explode_dataset_row(row: pd.Series) -> Iterable[Dict[str, object]]: | ||
| """ | ||
| Transform a single row into the row-per-series view. | ||
|
indrayudd marked this conversation as resolved.
Outdated
indrayudd marked this conversation as resolved.
Outdated
|
||
|
|
||
| :param row: row to transform | ||
|
indrayudd marked this conversation as resolved.
|
||
| :return: the exploded row | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| """ | ||
| dataset_id: str = row["id"] | ||
| dataset_name: str = row["name"] | ||
| # Ignore primary key columns. | ||
| ignore_cols = set(ast.literal_eval(row["primary_key_columns"])) | ||
| # Iterate through all columns and generate the row-per-series view. | ||
| for col_meta in ast.literal_eval(row["all_columns"]): | ||
| col_name: str = col_meta["name"] | ||
| if col_meta.get("is_datetime") or col_name in ignore_cols: | ||
| continue | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| yield _build_series_row(row, col_name, dataset_id, dataset_name) | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| def create_series_metadata(df: pd.DataFrame) -> pd.DataFrame: | ||
| """ | ||
| Transform the whole dataset into the row-per-series view. | ||
|
|
||
| :param df: data to transform | ||
| :return: transformed data | ||
|
indrayudd marked this conversation as resolved.
|
||
| """ | ||
| exploded_rows: List[Dict[str, object]] = [ | ||
| row | ||
| for _, dataset_row in df.iterrows() | ||
| for row in _explode_dataset_row(dataset_row) | ||
| ] | ||
| result = pd.DataFrame(exploded_rows) | ||
| # Arrange according to desired ordering. | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| leading = ["id_series", "name_series"] | ||
| remaining = [c for c in result.columns if c not in leading] | ||
| return result[leading + remaining] | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| # Main flow. | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| if __name__ == "__main__": | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| # Configure S3. | ||
| aws_profile = "ck" | ||
| bucket_root = hs3.get_s3_bucket_path(aws_profile) | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| bucket_path = "s3://causify-data-collaborators/causal_automl/metadata/" | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| file_name = "gridstatus_metadata_original_v2.0.csv" | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| writer = _GridstatusMetadataWriter(bucket_path, aws_profile) | ||
| # Load data. | ||
|
indrayudd marked this conversation as resolved.
|
||
| v1_path = ( | ||
| "s3://causify-data-collaborators/causal_automl/metadata/" | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| "gridstatus_metadata_original_v1.0.csv" | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
| ) | ||
| gs_meta = _load_data(v1_path) | ||
| # Transform data to a row-per-series view. | ||
| gs_meta_rps = create_series_metadata(gs_meta) | ||
| # Save transformed dataset to S3. | ||
| writer.write_df_to_s3(gs_meta_rps, file_name) | ||
|
indrayudd marked this conversation as resolved.
Outdated
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.