Skip to content

Commit 2babca4

Browse files
authored
Merge pull request #8 from BigDataIA-Spring2025-4/feature-snow_setup
Update 01 and 02 notebooks
2 parents a9ec988 + e88149e commit 2babca4

2 files changed

Lines changed: 55 additions & 23 deletions

File tree

notebooks/01_load_files/01_load_files.ipynb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"outputs": [],
3636
"source": [
3737
"import logging\n",
38-
"\n",
38+
"import datetime\n",
3939
"# Set up logging\n",
4040
"logger = logging.getLogger(\"fred_logger\")\n",
4141
"\n",
@@ -78,9 +78,9 @@
7878
"outputs": [],
7979
"source": [
8080
"session.use_schema(f\"{database_name}.{schema_name}\")\n",
81-
"\n",
82-
"load_csv_to_table(session, \"@INTEGRATIONS.S3_FRED_STAGE/FRED_10Y/DGS10.csv\", \"FRED_DGS10Y\")\n",
83-
"load_csv_to_table(session, \"@INTEGRATIONS.S3_FRED_STAGE/FRED_2Y/DGS2.csv\", \"FRED_DGS2Y\")\n",
81+
"t_date = datetime.datetime.today().strftime(\"%Y-%m-%d\")\n",
82+
"load_csv_to_table(session, f\"@INTEGRATIONS.S3_FRED_STAGE/{t_date}/DGS10_data.csv\", \"FRED_DGS10Y\")\n",
83+
"load_csv_to_table(session, f\"@INTEGRATIONS.S3_FRED_STAGE/{t_date}/DGS2_data.csv\", \"FRED_DGS2Y\")\n",
8484
"\n",
8585
"logger.info(\"01_load_files end\")"
8686
]

notebooks/02_raw_to_harmonized/02_raw_to_harmonized.ipynb

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,4 @@
11
{
2-
"metadata": {
3-
"kernelspec": {
4-
"display_name": "Streamlit Notebook",
5-
"name": "streamlit"
6-
},
7-
"lastEditStatus": {
8-
"notebookId": "gjmtbtbofcart6nlju5h",
9-
"authorId": "6690310009356",
10-
"authorName": "YOHANMARKOSE",
11-
"authorEmail": "markose.y@northeastern.edu",
12-
"sessionId": "d73d2d9f-2f54-4d4b-96d0-fe0c676a14f9",
13-
"lastEditTime": 1740634222219
14-
}
15-
},
16-
"nbformat_minor": 5,
17-
"nbformat": 4,
182
"cells": [
193
{
204
"cell_type": "code",
@@ -64,7 +48,39 @@
6448
"name": "cell3"
6549
},
6650
"outputs": [],
67-
"source": "import snowflake.snowpark.functions as F\n\nenv = schema_name[:3]\ndef create_fred_view(session):\n raw_fred_10Y = session.table(f\"{database_name}.{env}_RAW_FRED.FRED_DGS10Y\").select(\n F.to_date(F.col('\"observation_date\"')).alias(\"OBSERVATION_DATE\"),\n F.col('\"DGS10\"').cast(\"float\").alias(\"10Y_YIELD\")\n )\n \n raw_fred_2Y = session.table(f\"{database_name}.{env}_RAW_FRED.FRED_DGS2Y\").select(\n F.to_date(F.col('\"observation_date\"')).alias(\"OBSERVATION_DATE\"),\n F.col('\"DGS2\"').cast(\"float\").alias(\"2Y_YIELD\")\n )\n\n harmonized_data = raw_fred_10Y.join(raw_fred_2Y, raw_fred_10Y['OBSERVATION_DATE'] == raw_fred_2Y['OBSERVATION_DATE']).select(\n raw_fred_10Y[\"OBSERVATION_DATE\"].alias(\"OBSERVATION_DATE\"), # Keep only one OBSERVATION_DATE\n F.col(\"10Y_YIELD\"),\n F.col(\"2Y_YIELD\")\n )\n\n # harmonized_data = harmonized_data.filter(\n # (F.col(\"YIELD_SPREAD\").isNotNull()))\n \n session.use_schema(f\"{database_name}.{schema_name}\")\n harmonized_data.write.mode(\"overwrite\").save_as_table('FRED_FLATTENED')\n\ndef create_fred_view_stream(session):\n _ = session.sql('CREATE OR REPLACE STREAM FRED_STREAM \\\n ON VIEW FRED_FLATTENED \\\n SHOW_INITIAL_ROWS = TRUE').collect()\n \n"
51+
"source": [
52+
"import snowflake.snowpark.functions as F\n",
53+
"\n",
54+
"env = schema_name[:3]\n",
55+
"def create_fred_view(session):\n",
56+
" raw_fred_10Y = session.table(f\"{database_name}.{env}_RAW_FRED.FRED_DGS10Y\").select(\n",
57+
" F.to_date(F.col('\"observation_date\"')).alias(\"OBSERVATION_DATE\"),\n",
58+
" F.col('\"DGS10\"').cast(\"float\").alias(\"10Y_YIELD\")\n",
59+
" )\n",
60+
" \n",
61+
" raw_fred_2Y = session.table(f\"{database_name}.{env}_RAW_FRED.FRED_DGS2Y\").select(\n",
62+
" F.to_date(F.col('\"observation_date\"')).alias(\"OBSERVATION_DATE\"),\n",
63+
" F.col('\"DGS2\"').cast(\"float\").alias(\"2Y_YIELD\")\n",
64+
" )\n",
65+
"\n",
66+
" harmonized_data = raw_fred_10Y.join(raw_fred_2Y, raw_fred_10Y['OBSERVATION_DATE'] == raw_fred_2Y['OBSERVATION_DATE']).select(\n",
67+
" raw_fred_10Y[\"OBSERVATION_DATE\"].alias(\"OBSERVATION_DATE\"), # Keep only one OBSERVATION_DATE\n",
68+
" F.col(\"10Y_YIELD\"),\n",
69+
" F.col(\"2Y_YIELD\")\n",
70+
" )\n",
71+
"\n",
72+
" # harmonized_data = harmonized_data.filter(\n",
73+
" # (F.col(\"YIELD_SPREAD\").isNotNull()))\n",
74+
" \n",
75+
" session.use_schema(f\"{database_name}.{schema_name}\")\n",
76+
" harmonized_data.write.mode(\"overwrite\").save_as_table('FRED_FLATTENED')\n",
77+
"\n",
78+
"def create_fred_view_stream(session):\n",
79+
" _ = session.sql('CREATE OR REPLACE STREAM FRED_STREAM \\\n",
80+
" ON TABLE FRED_FLATTENED \\\n",
81+
" SHOW_INITIAL_ROWS = TRUE').collect()\n",
82+
" \n"
83+
]
6884
},
6985
{
7086
"cell_type": "code",
@@ -81,5 +97,21 @@
8197
"create_fred_view_stream(session)"
8298
]
8399
}
84-
]
85-
}
100+
],
101+
"metadata": {
102+
"kernelspec": {
103+
"display_name": "Streamlit Notebook",
104+
"name": "streamlit"
105+
},
106+
"lastEditStatus": {
107+
"authorEmail": "markose.y@northeastern.edu",
108+
"authorId": "6690310009356",
109+
"authorName": "YOHANMARKOSE",
110+
"lastEditTime": 1740634222219,
111+
"notebookId": "gjmtbtbofcart6nlju5h",
112+
"sessionId": "d73d2d9f-2f54-4d4b-96d0-fe0c676a14f9"
113+
}
114+
},
115+
"nbformat": 4,
116+
"nbformat_minor": 5
117+
}

0 commit comments

Comments
 (0)