You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"source": "# 01 Load Excel Files\n\n* Author: Jeremiah Hansen\n* Last Updated: 2/12/2026\n\nThis notebook will load data into the `LOCATION` and `ORDER_DETAIL` tables from Excel files.\n\nThis currently does not use Snowpark File Access as it doesn't yet work in Notebooks. So for now we copy the file locally first.",
19
+
"execution_count": null
20
+
},
21
+
{
22
+
"id": "6300f36b-5d89-4f7e-95ec-7ba84941d501",
23
+
"cell_type": "code",
24
+
"metadata": {
25
+
"language": "python",
26
+
"name": "py_initialize",
27
+
"title": "py_initialize"
28
+
},
29
+
"source": "# Import python packages\nimport sys\nimport logging\n\n# Set up the logger\nlogger_name = 'demo_logger'\nlogger = logging.getLogger(logger_name)\nlogger.setLevel(logging.INFO)\n\n# Set default values for debugging\nnotebook_name = '01_load_excel_files.ipynb'\ndatabase_name = 'DEMO_DB'\nschema_name = 'DEV_SCHEMA'\nrole_name = 'DEMO_ROLE'\n\n# Override values with passed notebook arguments\nif sys.argv[0].endswith('.ipynb'):\n import argparse\n parser = argparse.ArgumentParser()\n parser.add_argument('--database-name', type=str)\n parser.add_argument('--schema-name', type=str)\n parser.add_argument('--role-name', type=str)\n args, args_unknown = parser.parse_known_args()\n\n notebook_name = parser.prog # same as argv[0]\n database_name = args.database_name or database_name\n schema_name = args.schema_name or schema_name\n role_name = args.role_name or role_name\n\n# Get a Snowpark session\nfrom snowflake.snowpark.context import get_active_session\nsession = get_active_session()\n\n# Set the default database and schema for the following cells\nsession.use_schema(f\"{database_name}.{schema_name}\")\n\n# Set the role\n# Needed when running EXECUTE NOTEBOOK PROJECT directly (since it ignores session context and uses the user's default role)\nsession.use_role(role_name)\n\n# Get details about the current state\ncurrent_state_df = session.sql(f\"\"\"\n SELECT OBJECT_CONSTRUCT(\n 'current_user', CURRENT_USER(),\n 'current_role', CURRENT_ROLE(),\n 'current_secondary_roles', PARSE_JSON(CURRENT_SECONDARY_ROLES()),\n 'current_database', CURRENT_DATABASE(),\n 'current_schema', CURRENT_SCHEMA()\n )::STRING AS session_context;\n \"\"\").collect()\n\nlogger.info(f\"Begin executing notebook {notebook_name}\", extra = {'logger_name': logger_name})\nlogger.info(f\"Using parameters database: {database_name}, schema: {schema_name}, role: {role_name}\", extra = {'logger_name': logger_name})\nlogger.info(f\"Using session context {current_state_df[0]['SESSION_CONTEXT']}\", extra = {'logger_name': logger_name})",
30
+
"outputs": [],
31
+
"execution_count": null
32
+
},
33
+
{
34
+
"id": "c698b8ce-63c6-46ea-93f3-5cb40b13b46f",
35
+
"cell_type": "code",
36
+
"metadata": {
37
+
"language": "python",
38
+
"name": "py_pip_install",
39
+
"title": "py_pip_install"
40
+
},
41
+
"source": "!pip install openpyxl",
42
+
"outputs": [],
43
+
"execution_count": null
44
+
},
45
+
{
46
+
"id": "387772d4-3652-403f-bf19-f47747e1224f",
47
+
"cell_type": "code",
48
+
"metadata": {
49
+
"resultVariableName": "dataframe_1",
50
+
"language": "sql",
51
+
"name": "sql_get_spreadsheets",
52
+
"title": "sql_get_spreadsheets"
53
+
},
54
+
"source": "%%sql -r dataframe_1\n-- Temporary solution to load in the metadata, this should be replaced with a directy query to a directory table (or a metadata table)\nSELECT '@INTEGRATIONS.FROSTBYTE_RAW_STAGE/intro/order_detail.xlsx' AS STAGE_FILE_PATH, 'order_detail' AS WORKSHEET_NAME, 'ORDER_DETAIL' AS TARGET_TABLE\nUNION\nSELECT '@INTEGRATIONS.FROSTBYTE_RAW_STAGE/intro/location.xlsx', 'location', 'LOCATION';",
55
+
"outputs": [],
56
+
"execution_count": null
57
+
},
58
+
{
59
+
"id": "5f4fc654-1a1f-4aae-a20d-466ecd83f022",
60
+
"cell_type": "markdown",
61
+
"metadata": {
62
+
"collapsed": false,
63
+
"codeCollapsed": true
64
+
},
65
+
"source": "## Create a function to load Excel worksheet to table\n\nCreate a reusable function to load an Excel worksheet to a table in Snowflake.\n\nNote: Until we can use scoped URLs in Notebooks, via the `BUILD_SCOPED_FILE_URL()` function, we need to temporarily copy the file to a temp stage and then process from there."
66
+
},
67
+
{
68
+
"id": "85f77673-fca8-46aa-83bd-cb0dfa8d80ba",
69
+
"cell_type": "code",
70
+
"metadata": {
71
+
"language": "python",
72
+
"name": "py_load_excel_function",
73
+
"title": "py_load_excel_function"
74
+
},
75
+
"source": "from snowflake.snowpark.files import SnowflakeFile\nfrom openpyxl import load_workbook\nimport pandas as pd\n\n# 1. Create a temp internal stage (once at the start)\nsession.sql(\"CREATE TEMP STAGE IF NOT EXISTS temp_excel_stage\").collect()\n\ndef load_excel_worksheet_to_table(session, external_path, worksheet_name, target_table):\n \"\"\"Load an Excel worksheet by copying to internal stage first\"\"\"\n \n # Extract filename from path\n filename = external_path.split('/')[-1]\n \n # 2. Copy file from external to internal stage\n session.sql(f\"\"\"\n COPY FILES INTO @temp_excel_stage\n FROM {external_path}\n \"\"\").collect()\n \n # 3. Now SnowflakeFile.open() works on internal stage\n with SnowflakeFile.open(f'@temp_excel_stage/{filename}', 'rb') as f:\n workbook = load_workbook(f)\n sheet = workbook[worksheet_name]\n \n # Convert to DataFrame\n data = sheet.values\n columns = next(data)\n df = pd.DataFrame(data, columns=columns)\n \n # Write to Snowflake table\n snowpark_df = session.create_dataframe(df)\n snowpark_df.write.mode(\"overwrite\").save_as_table(target_table)\n \n logger.info(f\"Loaded {len(df)} rows from '{worksheet_name}' to {target_table}\", extra = {'logger_name': logger_name})",
76
+
"outputs": [],
77
+
"execution_count": null
78
+
},
79
+
{
80
+
"id": "3fbc225c-332c-42e6-a5aa-e812da54e1dc",
81
+
"cell_type": "markdown",
82
+
"metadata": {
83
+
"collapsed": false,
84
+
"codeCollapsed": true
85
+
},
86
+
"source": "## Process all Excel worksheets\n\nLoop through each Excel worksheet to process and call our `load_excel_worksheet_to_table_local()` function."
87
+
},
88
+
{
89
+
"id": "4d5b0921-3b3a-45cb-a709-476ffd962ea6",
90
+
"cell_type": "code",
91
+
"metadata": {
92
+
"language": "python",
93
+
"name": "py_process_spreadsheets",
94
+
"title": "py_process_spreadsheets"
95
+
},
96
+
"source": "# Process each file from the sql_get_spreadsheets cell above\nfiles_to_load = dataframe_1\nfor index, excel_file in files_to_load.iterrows():\n print(f\"Processing Excel file {excel_file['STAGE_FILE_PATH']}\")\n load_excel_worksheet_to_table(session, excel_file['STAGE_FILE_PATH'], excel_file['WORKSHEET_NAME'], excel_file['TARGET_TABLE'])\n\nlogger.info(f\"Finish executing notebook {notebook_name}\", extra = {'logger_name': logger_name})",
97
+
"outputs": [],
98
+
"execution_count": null
99
+
},
100
+
{
101
+
"id": "42ddde31-f39d-442a-b788-b161c820ecdd",
102
+
"cell_type": "markdown",
103
+
"metadata": {
104
+
"collapsed": false,
105
+
"codeCollapsed": true
106
+
},
107
+
"source": "### Debugging"
108
+
},
109
+
{
110
+
"id": "620b67d4-cde5-4599-9cf0-353023847a51",
111
+
"cell_type": "code",
112
+
"metadata": {
113
+
"resultVariableName": "dataframe_2",
114
+
"language": "sql",
115
+
"name": "sql_debugging",
116
+
"title": "sql_debugging"
117
+
},
118
+
"source": "%%sql -r dataframe_2\n--DESCRIBE TABLE LOCATION;\n--SELECT * FROM LOCATION;\nSHOW TABLES;",
"source": "# 02 Load Daily City Metrics\n\n* Author: Jeremiah Hansen\n* Last Updated: 2/12/2026\n\nThis notebook will load data into the `DAILY_CITY_METRICS` table with support for incremental processing.",
19
+
"execution_count": null
20
+
},
21
+
{
22
+
"id": "61d0aa49-693f-4bc2-803e-9e351b3211c4",
23
+
"cell_type": "code",
24
+
"metadata": {
25
+
"language": "python",
26
+
"name": "py_initialize",
27
+
"title": "py_initialize"
28
+
},
29
+
"source": "# Import python packages\nimport sys\nimport logging\n\n# Set up the logger\nlogger_name = 'demo_logger'\nlogger = logging.getLogger(logger_name)\nlogger.setLevel(logging.INFO)\n\n# Set default values for debugging\nnotebook_name = '02_load_daily_city_metrics.ipynb'\ndatabase_name = 'DEMO_DB'\nschema_name = 'DEV_SCHEMA'\nrole_name = 'DEMO_ROLE'\n\n# Override values with passed notebook arguments\nif sys.argv[0].endswith('.ipynb'):\n import argparse\n parser = argparse.ArgumentParser()\n parser.add_argument('--database-name', type=str)\n parser.add_argument('--schema-name', type=str)\n parser.add_argument('--role-name', type=str)\n args, args_unknown = parser.parse_known_args()\n\n notebook_name = parser.prog # same as argv[0]\n database_name = args.database_name or database_name\n schema_name = args.schema_name or schema_name\n role_name = args.role_name or role_name\n\n# Get a Snowpark session\nfrom snowflake.snowpark.context import get_active_session\nsession = get_active_session()\n\n# Set the default database and schema for the following cells\nsession.use_schema(f\"{database_name}.{schema_name}\")\n\n# Set the role\n# Needed when running EXECUTE NOTEBOOK PROJECT directly (since it ignores session context and uses the user's default role)\nsession.use_role(role_name)\n\n# Get details about the current state\ncurrent_state_df = session.sql(f\"\"\"\n SELECT OBJECT_CONSTRUCT(\n 'current_user', CURRENT_USER(),\n 'current_role', CURRENT_ROLE(),\n 'current_secondary_roles', PARSE_JSON(CURRENT_SECONDARY_ROLES()),\n 'current_database', CURRENT_DATABASE(),\n 'current_schema', CURRENT_SCHEMA()\n )::STRING AS session_context;\n \"\"\").collect()\n\nlogger.info(f\"Begin executing notebook {notebook_name}\", extra = {'logger_name': logger_name})\nlogger.info(f\"Using parameters database: {database_name}, schema: {schema_name}, role: {role_name}\", extra = {'logger_name': logger_name})\nlogger.info(f\"Using session context {current_state_df[0]['SESSION_CONTEXT']}\", extra = {'logger_name': logger_name})",
30
+
"outputs": [],
31
+
"execution_count": null
32
+
},
33
+
{
34
+
"id": "2f8be802-753e-4bcc-b80d-b7ed17c06b7d",
35
+
"cell_type": "markdown",
36
+
"metadata": {
37
+
"collapsed": false,
38
+
"codeCollapsed": true
39
+
},
40
+
"source": "## Create a function to check if a table exists\n\nThis function uses the [Snowflake Python Management API](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview)."
41
+
},
42
+
{
43
+
"id": "8de74798-23fc-4de4-ab36-b158ce37faf5",
44
+
"cell_type": "code",
45
+
"metadata": {
46
+
"language": "python",
47
+
"name": "py_table_exists",
48
+
"title": "py_table_exists"
49
+
},
50
+
"source": "def table_exists(session, database_name='', schema_name='', table_name=''):\n from snowflake.core import Root\n\n root = Root(session)\n tables = root.databases[database_name].schemas[schema_name].tables.iter(like=table_name)\n for table_obj in tables:\n if table_obj.name == table_name:\n return True\n\n return False\n\n# Not used, SQL alternative to Python version above\ndef table_exists2(session, database_name='', schema_name='', table_name=''):\n exists = session.sql(\"SELECT EXISTS (SELECT * FROM {}.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS\".format(database_name, schema_name, table_name)).collect()[0]['TABLE_EXISTS']\n return exists",
51
+
"outputs": [],
52
+
"execution_count": null
53
+
},
54
+
{
55
+
"id": "6e76c9a3-7c9b-4645-bcbb-d05509d4a80b",
56
+
"cell_type": "markdown",
57
+
"metadata": {
58
+
"collapsed": false,
59
+
"codeCollapsed": true
60
+
},
61
+
"source": "## Pipeline to update daily_city_metrics"
62
+
},
63
+
{
64
+
"id": "6c608229-284c-4a84-9627-b807e9ea8295",
65
+
"cell_type": "code",
66
+
"metadata": {
67
+
"language": "python",
68
+
"name": "py_process_dcm",
69
+
"title": "py_process_dcm"
70
+
},
71
+
"source": "import snowflake.snowpark.functions as F\n\ntable_name = \"DAILY_CITY_METRICS\"\n\n# Define the tables\norder_detail = session.table(\"ORDER_DETAIL\")\nhistory_day = session.table(\"FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY\")\nlocation = session.table(\"LOCATION\")\n\n# Join the tables\norder_detail = order_detail.join(location, order_detail['LOCATION_ID'] == location['LOCATION_ID'])\norder_detail = order_detail.join(history_day, (F.builtin(\"DATE\")(order_detail['ORDER_TS']) == history_day['DATE_VALID_STD']) & (location['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (location['CITY'] == history_day['CITY_NAME']))\n\n# Aggregate the data\nfinal_agg = order_detail.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('ISO_COUNTRY_CODE')) \\\n .agg( \\\n F.sum('PRICE').alias('DAILY_SALES_SUM'), \\\n F.avg('AVG_TEMPERATURE_AIR_2M_F').alias(\"AVG_TEMPERATURE_F\"), \\\n F.avg(\"TOT_PRECIPITATION_IN\").alias(\"AVG_PRECIPITATION_IN\"), \\\n ) \\\n .select(F.col(\"DATE_VALID_STD\").alias(\"DATE\"), F.col(\"CITY_NAME\"), F.col(\"ISO_COUNTRY_CODE\").alias(\"COUNTRY_DESC\"), \\\n F.builtin(\"ZEROIFNULL\")(F.col(\"DAILY_SALES_SUM\")).alias(\"DAILY_SALES\"), \\\n F.round(F.col(\"AVG_TEMPERATURE_F\"), 2).alias(\"AVG_TEMPERATURE_FAHRENHEIT\"), \\\n F.round(F.col(\"AVG_PRECIPITATION_IN\"), 2).alias(\"AVG_PRECIPITATION_INCHES\"), \\\n )\n\n# If the table doesn't exist then create it\nif not table_exists(session, database_name=database_name, schema_name=schema_name, table_name=table_name):\n final_agg.write.mode(\"overwrite\").save_as_table(table_name)\n\n logger.info(f\"Successfully created {table_name}\", extra = {'logger_name': logger_name})\n# Otherwise update it\nelse:\n cols_to_update = {c: final_agg[c] for c in final_agg.schema.names}\n\n dcm = session.table(table_name)\n dcm.merge(final_agg, (dcm['DATE'] == final_agg['DATE']) & (dcm['CITY_NAME'] == final_agg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \\\n [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])\n\n logger.info(f\"Successfully updated {table_name}\", extra = {'logger_name': logger_name})\n\nlogger.info(f\"Finish executing notebook {notebook_name}\", extra = {'logger_name': logger_name})",
72
+
"outputs": [],
73
+
"execution_count": null
74
+
},
75
+
{
76
+
"id": "602e3d31-d4de-4a98-b02b-ba53b864f15d",
77
+
"cell_type": "markdown",
78
+
"metadata": {
79
+
"collapsed": false,
80
+
"codeCollapsed": true
81
+
},
82
+
"source": "## Debugging"
83
+
},
84
+
{
85
+
"id": "1fa54381-ad5c-44be-bf63-f05e3e9ac64d",
86
+
"cell_type": "code",
87
+
"metadata": {
88
+
"resultVariableName": "dataframe_1",
89
+
"language": "sql",
90
+
"name": "sql_debugging",
91
+
"title": "sql_debugging"
92
+
},
93
+
"source": "%%sql -r dataframe_1\n--SELECT * FROM DAILY_CITY_METRICS LIMIT 10;",
0 commit comments