Skip to content

Commit 30d8c14

Browse files
authored
Merge pull request #11 from BigDataIA-Spring2025-4/feature-snowflakedags
Snowflake start here notebook addition with - snowflake setup requirements and dag creation snowflake objects
2 parents 3aad0dc + 275c498 commit 30d8c14

2 files changed

Lines changed: 146 additions & 33 deletions

File tree

FRED_0_START.ipynb

Lines changed: 140 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,28 @@
1818
"metadata": {
1919
"collapsed": false,
2020
"language": "sql",
21-
"name": "cell5"
21+
"name": "github_secrets"
2222
},
2323
"outputs": [],
2424
"source": [
2525
"SET MY_USER = CURRENT_USER();\n",
2626
"\n",
2727
"-- Check on this \n",
28-
"SET GITHUB_SECRET_USERNAME = '';\n",
29-
"SET GITHUB_SECRET_PASSWORD = '';\n",
30-
"SET GITHUB_URL_PREFIX = 'https://github.com/BigDataIA-Spring2025-4';\n",
31-
"SET GITHUB_REPO_ORIGIN = 'https://github.com/BigDataIA-Spring2025-4/DAMG7245_Assignment03_Part02.git';"
28+
"SET GITHUB_SECRET_USERNAME = '##############';\n",
29+
"SET GITHUB_SECRET_PASSWORD = '#####################';\n",
30+
"SET GITHUB_URL_PREFIX = 'https://github.com/#####################';\n",
31+
"SET GITHUB_REPO_ORIGIN = 'https://github.com/##############################';"
3232
]
3333
},
3434
{
3535
"cell_type": "code",
3636
"execution_count": null,
3737
"id": "152b48af-537c-402e-b087-29d333bb3f48",
3838
"metadata": {
39+
"codeCollapsed": false,
3940
"collapsed": false,
4041
"language": "sql",
41-
"name": "cell6"
42+
"name": "role_creation"
4243
},
4344
"outputs": [],
4445
"source": [
@@ -83,8 +84,6 @@
8384
"-- ----------------------------------------------------------------------------\n",
8485
"-- Create the database level objects\n",
8586
"-- ----------------------------------------------------------------------------\n",
86-
"\n",
87-
"\n",
8887
"-- Schemas\n",
8988
"CREATE OR REPLACE SCHEMA INTEGRATIONS;;\n",
9089
"CREATE OR REPLACE SCHEMA DEV_RAW_FRED;\n",
@@ -97,9 +96,9 @@
9796
"USE SCHEMA INTEGRATIONS;\n",
9897
"\n",
9998
"CREATE OR REPLACE STAGE S3_FRED_STAGE\n",
100-
" URL = 's3://snowpipeline120/'\n",
101-
" CREDENTIALS = (AWS_KEY_ID = '' \n",
102-
" AWS_SECRET_KEY = '');\n",
99+
" URL = 's3://#############/'\n",
100+
" CREDENTIALS = (AWS_KEY_ID = '#########################' \n",
101+
" AWS_SECRET_KEY = '#########################');\n",
103102
"\n",
104103
"\n",
105104
"-- Secrets (schema level)\n",
@@ -173,60 +172,169 @@
173172
},
174173
"outputs": [],
175174
"source": [
176-
"EXECUTE NOTEBOOK FRED_DB.DEV_ANALYTICS.\"DEV_03_analytics_table_processing\"()"
175+
"EXECUTE NOTEBOOK FRED_DB.PROD_ANALYTICS.\"PROD_03_analytics_table_processing\"()"
176+
]
177+
},
178+
{
179+
"cell_type": "markdown",
180+
"id": "e0ca6d33-b94a-48ca-abbb-c89369300de8",
181+
"metadata": {
182+
"collapsed": false,
183+
"name": "cell17"
184+
},
185+
"source": [
186+
"# DAG Creation Script"
177187
]
178188
},
179189
{
180190
"cell_type": "code",
181191
"execution_count": null,
182-
"id": "e1b75120-92d4-4b16-9c70-edab9a798eaf",
192+
"id": "dfd28d67-fa7c-4334-b302-310754bcc9d7",
183193
"metadata": {
194+
"collapsed": false,
184195
"language": "python",
185-
"name": "cell2"
196+
"name": "cell15"
186197
},
187198
"outputs": [],
188199
"source": [
189-
"# To call the sproc\n",
190-
"# from snowflake.snowpark.context import get_active_session\n",
191-
"# session = get_active_session()\n",
200+
"# Import necessary packages\n",
201+
"from snowflake.core import Root\n",
202+
"from snowflake.snowpark import Session\n",
203+
"from snowflake.snowpark.context import get_active_session\n",
204+
"from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask\n",
205+
"from datetime import timedelta"
206+
]
207+
},
208+
{
209+
"cell_type": "code",
210+
"execution_count": null,
211+
"id": "280b5c2b-d34b-4de8-91e6-072025e77060",
212+
"metadata": {
213+
"codeCollapsed": false,
214+
"collapsed": false,
215+
"language": "python",
216+
"name": "cell14"
217+
},
218+
"outputs": [],
219+
"source": [
220+
"database_name=\"FRED_DB\"\n",
221+
"env=\"PROD\"\n",
192222
"\n",
193-
"# session.use_schema(\"FRED_DB.DEV_ANALYTICS\")\n",
194-
"# # env = schema_name[:3]\n",
195-
"# session.sql(f\"CALL merge_fred_updates_sp('FRED_DB', 'DEV_ANALYTICS', 'DEV')\").collect()"
223+
"session = get_active_session()\n",
224+
"session.use_role(\"FRED_ROLE\")\n",
225+
"session.use_warehouse(\"FRED_WH\")\n",
226+
"\n",
227+
"database_name = \"FRED_DB\"\n",
228+
"schema1 = f\"{env}_RAW_FRED\"\n",
229+
"schema2 = f\"{env}_HARMONIZED\"\n",
230+
"schema3 = f\"{env}_ANALYTICS\"\n",
231+
"schema_name= \"INTEGRATIONS\""
196232
]
197233
},
198234
{
199235
"cell_type": "code",
200236
"execution_count": null,
201-
"id": "3faa73a1-e3c2-4300-8321-a3b143de1966",
237+
"id": "350978d9-4705-46a7-b637-cbcc43e75abc",
202238
"metadata": {
239+
"codeCollapsed": false,
203240
"language": "python",
204-
"name": "cell10"
241+
"name": "cell5"
205242
},
206243
"outputs": [],
207244
"source": [
208-
"# session.sql(f\"CALL create_analytical_tables_sp('DEV_ANALYTICS', 'FRED_10Y_2Y')\").collect()"
245+
"## Task 3: Merge FRED updates\n",
246+
"sql_query = f\"\"\"\n",
247+
"CREATE OR REPLACE TASK FRED_DB.INTEGRATIONS.SPOC_TASK_MERGE_FRED_UPDATES\n",
248+
"WAREHOUSE = FRED_WH\n",
249+
"WHEN SYSTEM$STREAM_HAS_DATA('FRED_DB.{env}_HARMONIZED.FRED_STREAM')\n",
250+
"AS\n",
251+
"BEGIN\n",
252+
" CALL FRED_DB.{env}_ANALYTICS.merge_fred_updates_sp('FRED_DB', '{env}_ANALYTICS', '{env}');\n",
253+
" CALL FRED_DB.{env}_ANALYTICS.create_analytical_tables_sp('{env}_ANALYTICS', 'FRED_10Y_2Y');\n",
254+
"END;\n",
255+
"\"\"\"\n",
256+
"session.sql(sql_query)"
209257
]
210258
},
211259
{
212260
"cell_type": "code",
213261
"execution_count": null,
214-
"id": "395bc402-be4a-4211-8e8b-c537212d72f6",
262+
"id": "b7cff76e-bdbe-4c6e-a714-24cb3a78b9c0",
215263
"metadata": {
216-
"language": "sql",
217-
"name": "cell11"
264+
"codeCollapsed": false,
265+
"collapsed": false,
266+
"language": "python",
267+
"name": "cell16"
218268
},
219269
"outputs": [],
220270
"source": [
221-
"-- select * from FRED_10Y_2Y where OBSERVATION_DATE > '2025-02-24';"
271+
"session.use_schema(f\"{database_name}.{schema_name}\")\n",
272+
"\n",
273+
"# Create a Root object and obtain the schema\n",
274+
"api_root = Root(session)\n",
275+
"schema = api_root.databases[database_name].schemas[schema_name]\n",
276+
"\n",
277+
"dag_op = DAGOperation(schema)\n",
278+
"\n",
279+
"warehouse_name = \"FRED_WH\"\n",
280+
"dag_name = f\"{env}_FRED_ANALYTICS_DAG\"\n",
281+
"\n",
282+
"with DAG(dag_name, schedule=timedelta(days=1), warehouse=warehouse_name) as dag:\n",
283+
" dag_task1 = DAGTask(\n",
284+
" \"AWS_S3_DATA_STAGING\", \n",
285+
" definition=f'EXECUTE NOTEBOOK \"{database_name}\".\"{schema1}\".\"{env}_01_load_files\"()', \n",
286+
" warehouse=warehouse_name\n",
287+
" )\n",
288+
"\n",
289+
" dag_task2 = DAGTask(\n",
290+
" \"DATA_TRANSFORMATION_TO_HARMONIZED\", \n",
291+
" definition=f'EXECUTE NOTEBOOK \"{database_name}\".\"{schema2}\".\"{env}_02_raw_to_harmonized\"()', \n",
292+
" warehouse=warehouse_name\n",
293+
" )\n",
294+
" \n",
295+
" dag_task3 = DAGTask(\n",
296+
" \"SPOC_TASK_MERGE_FRED_UPDATES\",\n",
297+
" definition=f'EXECUTE TASK \"{database_name}\".\"{schema_name}\".\"SPOC_TASK_MERGE_FRED_UPDATES\";', \n",
298+
" warehouse=warehouse_name\n",
299+
" )\n",
300+
"\n",
301+
" # Define task dependencies\n",
302+
" dag_task1 >> dag_task2 >> dag_task3\n",
303+
"\n",
304+
"# Deploy the DAG\n",
305+
"dag_op.deploy(dag, mode=\"orreplace\")\n"
222306
]
223307
},
224308
{
225309
"cell_type": "code",
226310
"execution_count": null,
227-
"id": "5a73c7d9-6eaf-47b0-8c0b-e88bbce1d3ab",
311+
"id": "fe0c0c3e-d11f-45c6-a241-d83eb1bcd1ec",
312+
"metadata": {
313+
"language": "sql",
314+
"name": "cell12"
315+
},
316+
"outputs": [],
317+
"source": [
318+
"-- SELECT * FROM FRED_DB.DEV_ANALYTICS.FRED_10Y_2Y ORDER BY observation_date DESC LIMIT 10;\n",
319+
"SELECT * FROM FRED_DB.DEV_ANALYTICS.FRED_COMBINED_DAILY ORDER BY obs_date DESC LIMIT 10;"
320+
]
321+
},
322+
{
323+
"cell_type": "markdown",
324+
"id": "40bdc08b-01f2-4811-8ecc-7fdd1cb4e345",
228325
"metadata": {
229326
"collapsed": false,
327+
"name": "cell2"
328+
},
329+
"source": [
330+
"## Teardown Scripts "
331+
]
332+
},
333+
{
334+
"cell_type": "code",
335+
"execution_count": null,
336+
"id": "5e0ea6fe-e895-4e15-a30e-e17890e12e60",
337+
"metadata": {
230338
"language": "sql",
231339
"name": "cell1"
232340
},
@@ -236,8 +344,7 @@
236344
"-- DROP ROLE FRED_ROLE;\n",
237345
"-- DROP API INTEGRATION FRED_GITHUB_API_INTEGRATION;\n",
238346
"-- DROP DATABASE FRED_DB;\n",
239-
"-- DROP WAREHOUSE FRED_WH;\n",
240-
"\n"
347+
"-- DROP WAREHOUSE FRED_WH;"
241348
]
242349
}
243350
],
@@ -250,9 +357,9 @@
250357
"authorEmail": "markose.y@northeastern.edu",
251358
"authorId": "6690310009356",
252359
"authorName": "YOHANMARKOSE",
253-
"lastEditTime": 1740688224407,
254-
"notebookId": "ym2jm7mboxidom324g4k",
255-
"sessionId": "ae5ae0eb-d657-4df5-a1e6-c07ee55fdbb2"
360+
"lastEditTime": 1740719590239,
361+
"notebookId": "ctd7cq24mw6njaq6xyy5",
362+
"sessionId": "2199f6f4-5da7-4e81-96ad-1752081b52d2"
256363
}
257364
},
258365
"nbformat": 4,
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
# External Access Integrations
3+
4+
Within Snowflake user can create an external access integration for access to external network locations from a UDF or procedure handler. And by default, Snowflake does not enable external access for trial accounts.
5+
6+
https://docs.snowflake.com/en/developer-guide/external-network-access/external-network-access-limitations

0 commit comments

Comments
 (0)