Skip to content

Commit 2695dfa

Browse files
indrayuddIndrayudd Roy Chowdhurysonniki
authored
TutorTask527_Create_a_downloader_for_GridStatus_data (#532)
Co-authored-by: Indrayudd Roy Chowdhury <indro@Indrayudds-MacBook-Air.local> Co-authored-by: Sonya Nikiforova <son.nik@mail.ru>
1 parent 79a545b commit 2695dfa

2 files changed

Lines changed: 195 additions & 10 deletions

File tree

causal_automl/download_fred_data.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,17 @@ class FredDataDownloader:
2727
Download historical data from FRED.
2828
"""
2929

30-
def __init__(self, api_key: Optional[str] = None) -> None:
30+
def __init__(self) -> None:
3131
"""
3232
Initialize the FRED data downloader with the API key.
33-
34-
If no FRED API key is passed as a parameter, it is read from the
35-
environment variable.
36-
37-
:param api_key: FRED API key
3833
"""
39-
key = api_key or os.getenv("FRED_API_KEY")
40-
if not key:
41-
raise ValueError("FRED API key is required")
42-
self._client = fredapi.Fred(api_key=key)
34+
hdbg.dassert_in(
35+
"FRED_API_KEY",
36+
os.environ,
37+
msg="FRED_API_KEY is not found in environment variables",
38+
)
39+
api_key = os.getenv("FRED_API_KEY")
40+
self._client = fredapi.Fred(api_key=api_key)
4341

4442
@ratelimit.sleep_and_retry
4543
@ratelimit.limits(calls=60, period=60)
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
"""
2+
Import as:
3+
4+
import causal_automl.download_gridstatus_data as cadogrda
5+
"""
6+
7+
import logging
8+
import os
9+
import time
10+
from typing import Dict, Optional, Union
11+
12+
import gridstatusio
13+
import helpers.hdbg as hdbg
14+
import pandas as pd
15+
import ratelimit
16+
17+
_LOG = logging.getLogger(__name__)
18+
19+
20+
# #############################################################################
21+
# GridstatusDataDownloader
22+
# #############################################################################
23+
24+
25+
class GridstatusDataDownloader:
26+
"""
27+
Download historical data from GridStatus.io.
28+
"""
29+
30+
def __init__(self) -> None:
31+
"""
32+
Initialize the GridStatus data downloader with the API key.
33+
"""
34+
hdbg.dassert_in(
35+
"GRIDSTATUS_API_KEY",
36+
os.environ,
37+
msg="GRIDSTATUS_API_KEY is not found in environment variables",
38+
)
39+
api_key = os.getenv("GRIDSTATUS_API_KEY")
40+
self._client = gridstatusio.GridStatusClient(api_key=api_key)
41+
42+
@ratelimit.sleep_and_retry
43+
@ratelimit.limits(calls=60, period=60)
44+
def download_series(
45+
self,
46+
id_: str,
47+
start_timestamp: Optional[Union[str, pd.Timestamp]] = None,
48+
end_timestamp: Optional[Union[str, pd.Timestamp]] = None,
49+
) -> Optional[pd.DataFrame]:
50+
"""
51+
Download historical series data.
52+
53+
When no start and end timestamps are passed, the entire time series is downloaded.
54+
55+
Example of a returned series:
56+
57+
```
58+
interval_start_utc interval_end_utc region market
59+
2010-01-01 08:00:00+00:00 2010-01-01 09:00:00+00:00 AS_CAISO DAM
60+
2010-01-01 08:00:00+00:00 2010-01-01 09:00:00+00:00 AS_CAISO_EXP DAM
61+
/
62+
non_spinning_reserves
63+
0.0
64+
0.5
65+
```
66+
67+
:param id_: Gridstatus series identifier (e.g., "caiso_as_prices.spinning_reserves")
68+
:param start_timestamp: first observation timestamp
69+
(e.g., "2010-01-01 08:00:00+00:00" or pd.Timestamp("2023-04-01 01:00:00"))
70+
:param end_timestamp: last observation timestamp
71+
:return: relevant Gridstatus series data
72+
"""
73+
# Build request parameters.
74+
id_dataset, name_series = id_.split(".", 1)
75+
request_kwargs: Dict[str, str] = {}
76+
if start_timestamp is not None:
77+
request_kwargs["start"] = start_timestamp
78+
if end_timestamp is not None:
79+
request_kwargs["end"] = end_timestamp
80+
# Start attempts.
81+
attempt = 1
82+
max_attempts = 4
83+
err_msgs: Dict[str, str] = {}
84+
while attempt <= max_attempts:
85+
try:
86+
# Download the data for the dataset.
87+
df = self._client.get_dataset(
88+
dataset=id_dataset,
89+
columns=[name_series],
90+
**request_kwargs,
91+
)
92+
except Exception as err:
93+
msg = str(err)
94+
if msg.startswith("Error 5"):
95+
_LOG.error("Attempt %d: %s Retrying...", attempt, msg)
96+
# Wait before retrying.
97+
time.sleep(10)
98+
else:
99+
raise
100+
err_msgs[f"Attempt {attempt}"] = msg
101+
attempt += 1
102+
continue
103+
# Log success and return.
104+
_LOG.info(
105+
"Downloaded series %s with %d records",
106+
id_,
107+
len(df),
108+
)
109+
return df
110+
raise RuntimeError(
111+
f"Failed to fetch after {max_attempts} attempts. Errors per run: {err_msgs}"
112+
)
113+
114+
def filter_series(
115+
self,
116+
df: pd.DataFrame,
117+
id_: str,
118+
filters: Dict[str, str],
119+
) -> pd.DataFrame:
120+
"""
121+
Filter out a single time series from a Gridstatus dataset.
122+
123+
- Apply single filters across columns (e.g., `region`, `market`)
124+
- Drop NaN values
125+
- Set the end timestamp as index
126+
127+
E.g.,
128+
129+
Input series (caiso_as_prices.non_spinning_reserves):
130+
```
131+
interval_start_utc interval_end_utc region market
132+
2022-01-01 08:00:00+00:00 2022-01-01 09:00:00+00:00 AS_CAISO DAM
133+
2022-01-01 08:00:00+00:00 2022-01-01 09:00:00+00:00 AS_CAISO_EXP DAM
134+
2022-01-01 08:00:00+00:00 2022-01-01 09:00:00+00:00 AS_NP26 DAM
135+
2022-01-01 08:00:00+00:00 2022-01-01 09:00:00+00:00 AS_NP26_EXP DAM
136+
2022-01-01 08:00:00+00:00 2022-01-01 09:00:00+00:00 AS_SP26 DAM
137+
... ... ... ...
138+
/
139+
non_spinning_reserves
140+
0.00
141+
0.15
142+
0.00
143+
0.00
144+
0.00
145+
...
146+
```
147+
Output series (with filters - {"region": "AS_CAISO_EXP", "market": "DAM"})):
148+
```
149+
non_spinning_reserves
150+
interval_end_utc
151+
2022-01-01 09:00:00+00:00 0.15
152+
2022-01-01 10:00:00+00:00 0.15
153+
2022-01-01 11:00:00+00:00 0.15
154+
2022-01-01 12:00:00+00:00 0.15
155+
2022-01-01 13:00:00+00:00 0.15
156+
... ...
157+
```
158+
159+
:param df: data series to filter
160+
:param id_: series identifier (e.g., "caiso_as_prices.spinning_reserves")
161+
:param filters: filters to apply on the dataset
162+
(e.g., {"region": "AS_CAISO_EXP", "market": "DAM"})
163+
:return: filtered series
164+
"""
165+
# Filter data.
166+
filtered_data = df.copy()
167+
for k, v in filters.items():
168+
hdbg.dassert_in(
169+
k,
170+
filtered_data.columns,
171+
"%s not found in columns: %s",
172+
k,
173+
list(filtered_data.columns),
174+
)
175+
filtered_data = filtered_data[filtered_data[k] == v]
176+
if filtered_data.empty:
177+
_LOG.warning("No data remaining after applying filters")
178+
# Find the series name.
179+
name_series = id_.split(".", 1)[1]
180+
# Drop missing value rows.
181+
filtered_data = filtered_data.dropna(subset=[name_series])
182+
if filtered_data.empty:
183+
_LOG.warning("No data remaining after dropping NaN values")
184+
filtered_data = filtered_data[["interval_end_utc", name_series]]
185+
filtered_data = filtered_data.set_index("interval_end_utc")
186+
filtered_data = filtered_data.sort_index()
187+
return filtered_data

0 commit comments

Comments
 (0)