|
| 1 | +""" |
| 2 | +Import as: |
| 3 | +
|
| 4 | +import causal_automl.download_fred_data as cadofrda |
| 5 | +""" |
| 6 | + |
| 7 | +import logging as log |
| 8 | +import os |
| 9 | +import time |
| 10 | +from typing import Optional |
| 11 | + |
| 12 | +import fredapi |
| 13 | +import helpers.hdbg as hdbg |
| 14 | +import pandas as pd |
| 15 | +import ratelimit |
| 16 | + |
| 17 | +_LOG = log.getLogger(__name__) |
| 18 | + |
| 19 | + |
| 20 | +# ############################################################################# |
| 21 | +# FredDataDownloader |
| 22 | +# ############################################################################# |
| 23 | + |
| 24 | + |
| 25 | +class FredDataDownloader: |
| 26 | + """ |
| 27 | + Download historical data from FRED. |
| 28 | + """ |
| 29 | + |
| 30 | + def __init__(self, api_key: Optional[str] = None) -> None: |
| 31 | + """ |
| 32 | + Initialize the FRED data downloader with the API key. |
| 33 | +
|
| 34 | + If no FRED API key is passed as a parameter, it is read from the |
| 35 | + environment variable. |
| 36 | +
|
| 37 | + :param api_key: FRED API key |
| 38 | + """ |
| 39 | + key = api_key or os.getenv("FRED_API_KEY") |
| 40 | + if not key: |
| 41 | + raise ValueError("FRED API key is required") |
| 42 | + self._client = fredapi.Fred(api_key=key) |
| 43 | + |
| 44 | + @ratelimit.sleep_and_retry |
| 45 | + @ratelimit.limits(calls=60, period=60) |
| 46 | + def download_series( |
| 47 | + self, |
| 48 | + id_: str, |
| 49 | + start_timestamp: Optional[pd.Timestamp] = None, |
| 50 | + end_timestamp: Optional[pd.Timestamp] = None, |
| 51 | + frequency: Optional[str] = None, |
| 52 | + ) -> Optional[pd.DataFrame]: |
| 53 | + """ |
| 54 | + Download historical series data. |
| 55 | +
|
| 56 | + When no start and end timestamps are passed, the entire time series is downloaded. |
| 57 | + If no frequency is passed, the highest available frequency is downloaded. |
| 58 | +
|
| 59 | + Example of a returned series: |
| 60 | +
|
| 61 | + ``` |
| 62 | + GDP |
| 63 | + 2019-10-01 21933.217 |
| 64 | + 2020-01-01 21727.657 |
| 65 | + 2020-04-01 19935.444 |
| 66 | + ``` |
| 67 | +
|
| 68 | + :param id_: FRED series identifier (e.g., "GDP") |
| 69 | + :param start_timestamp: first observation date |
| 70 | + :param end_timestamp: last observation date |
| 71 | + :param frequency: series data frequency |
| 72 | + - "q": quarter |
| 73 | + - "sa": semi-annual |
| 74 | + - "a": annual |
| 75 | + :return: relevant FRED series data |
| 76 | + """ |
| 77 | + # Validate the passed frequency value. |
| 78 | + valid_freqs = ["q", "sa", "a"] |
| 79 | + if frequency is not None: |
| 80 | + hdbg.dassert_in( |
| 81 | + frequency, |
| 82 | + valid_freqs, |
| 83 | + "Invalid frequency '%s'.", |
| 84 | + frequency, |
| 85 | + ) |
| 86 | + # Set args. |
| 87 | + loading_kwargs = {} |
| 88 | + if start_timestamp is not None: |
| 89 | + loading_kwargs["observation_start"] = start_timestamp |
| 90 | + if end_timestamp is not None: |
| 91 | + loading_kwargs["observation_end"] = end_timestamp |
| 92 | + if frequency is not None: |
| 93 | + loading_kwargs["frequency"] = frequency |
| 94 | + attempt = 1 |
| 95 | + max_attempts = 4 |
| 96 | + err_msgs = {} |
| 97 | + # Start attempts. |
| 98 | + while attempt <= max_attempts: |
| 99 | + try: |
| 100 | + # Download the data for the series. |
| 101 | + series = self._client.get_series( |
| 102 | + id_, |
| 103 | + **loading_kwargs, |
| 104 | + ) |
| 105 | + except Exception as err: |
| 106 | + if "Internal Server Error" in str(err): |
| 107 | + _LOG.error("Attempt %s: %s Retrying...", attempt, err) |
| 108 | + # Wait before retrying. |
| 109 | + time.sleep(10) |
| 110 | + elif "Too Many Requests" in str(err): |
| 111 | + # Retry after exponential backoff. |
| 112 | + backoff = 4**attempt |
| 113 | + _LOG.error( |
| 114 | + "Attempt %d: %s Retrying after %ds... ", |
| 115 | + attempt, |
| 116 | + err, |
| 117 | + backoff, |
| 118 | + ) |
| 119 | + time.sleep(backoff) |
| 120 | + continue |
| 121 | + else: |
| 122 | + raise |
| 123 | + err_msgs[f"Attempt {attempt}"] = str(err) |
| 124 | + attempt += 1 |
| 125 | + continue |
| 126 | + # Package the output. |
| 127 | + df = series.to_frame(name=id_) |
| 128 | + _LOG.info( |
| 129 | + "Downloaded series %s with %d records", |
| 130 | + id_, |
| 131 | + len(df), |
| 132 | + ) |
| 133 | + return df |
| 134 | + raise RuntimeError( |
| 135 | + f"Failed to fetch after {max_attempts} attempts. Errors per run: {err_msgs}" |
| 136 | + ) |
0 commit comments