-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Expand file tree
/
Copy path04_create_pos_view.py
More file actions
122 lines (110 loc) · 6.5 KB
/
Copy path04_create_pos_view.py
File metadata and controls
122 lines (110 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#------------------------------------------------------------------------------
# Hands-On Lab: Data Engineering with Snowpark
# Script: 04_create_order_view.py
# Author: Jeremiah Hansen, Caleb Baechtold
# Last Updated: 1/9/2023
#------------------------------------------------------------------------------
# SNOWFLAKE ADVANTAGE: Snowpark DataFrame API
# SNOWFLAKE ADVANTAGE: Streams for incremental processing (CDC)
# SNOWFLAKE ADVANTAGE: Streams on views
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
from dotenv import load_dotenv
import os
def create_pos_view(session):
session.use_schema('HARMONIZED')
order_detail = session.table("RAW_POS.ORDER_DETAIL").select(F.col("ORDER_DETAIL_ID"), \
F.col("LINE_NUMBER"), \
F.col("MENU_ITEM_ID"), \
F.col("QUANTITY"), \
F.col("UNIT_PRICE"), \
F.col("PRICE"), \
F.col("ORDER_ID"))
order_header = session.table("RAW_POS.ORDER_HEADER").select(F.col("ORDER_ID"), \
F.col("TRUCK_ID"), \
F.col("ORDER_TS"), \
F.to_date(F.col("ORDER_TS")).alias("ORDER_TS_DATE"), \
F.col("ORDER_AMOUNT"), \
F.col("ORDER_TAX_AMOUNT"), \
F.col("ORDER_DISCOUNT_AMOUNT"), \
F.col("LOCATION_ID"), \
F.col("ORDER_TOTAL"))
truck = session.table("RAW_POS.TRUCK").select(F.col("TRUCK_ID"), \
F.col("PRIMARY_CITY"), \
F.col("REGION"), \
F.col("COUNTRY"), \
F.col("FRANCHISE_FLAG"), \
F.col("FRANCHISE_ID"))
menu = session.table("RAW_POS.MENU").select(F.col("MENU_ITEM_ID"), \
F.col("TRUCK_BRAND_NAME"), \
F.col("MENU_TYPE"), \
F.col("MENU_ITEM_NAME"))
franchise = session.table("RAW_POS.FRANCHISE").select(F.col("FRANCHISE_ID"), \
F.col("FIRST_NAME").alias("FRANCHISEE_FIRST_NAME"), \
F.col("LAST_NAME").alias("FRANCHISEE_LAST_NAME"))
location = session.table("RAW_POS.LOCATION").select(F.col("LOCATION_ID"))
'''
We can do this one of two ways: either select before the join so it is more explicit, or just join on the full tables.
The end result is the same, it's mostly a readibility question.
'''
# order_detail = session.table("RAW_POS.ORDER_DETAIL")
# order_header = session.table("RAW_POS.ORDER_HEADER")
# truck = session.table("RAW_POS.TRUCK")
# menu = session.table("RAW_POS.MENU")
# franchise = session.table("RAW_POS.FRANCHISE")
# location = session.table("RAW_POS.LOCATION")
t_with_f = truck.join(franchise, truck['FRANCHISE_ID'] == franchise['FRANCHISE_ID'], rsuffix='_f')
oh_w_t_and_l = order_header.join(t_with_f, order_header['TRUCK_ID'] == t_with_f['TRUCK_ID'], rsuffix='_t') \
.join(location, order_header['LOCATION_ID'] == location['LOCATION_ID'], rsuffix='_l')
final_df = order_detail.join(oh_w_t_and_l, order_detail['ORDER_ID'] == oh_w_t_and_l['ORDER_ID'], rsuffix='_oh') \
.join(menu, order_detail['MENU_ITEM_ID'] == menu['MENU_ITEM_ID'], rsuffix='_m')
final_df = final_df.select(F.col("ORDER_ID"), \
F.col("TRUCK_ID"), \
F.col("ORDER_TS"), \
F.col('ORDER_TS_DATE'), \
F.col("ORDER_DETAIL_ID"), \
F.col("LINE_NUMBER"), \
F.col("TRUCK_BRAND_NAME"), \
F.col("MENU_TYPE"), \
F.col("PRIMARY_CITY"), \
F.col("REGION"), \
F.col("COUNTRY"), \
F.col("FRANCHISE_FLAG"), \
F.col("FRANCHISE_ID"), \
F.col("FRANCHISEE_FIRST_NAME"), \
F.col("FRANCHISEE_LAST_NAME"), \
F.col("LOCATION_ID"), \
F.col("MENU_ITEM_ID"), \
F.col("MENU_ITEM_NAME"), \
F.col("QUANTITY"), \
F.col("UNIT_PRICE"), \
F.col("PRICE"), \
F.col("ORDER_AMOUNT"), \
F.col("ORDER_TAX_AMOUNT"), \
F.col("ORDER_DISCOUNT_AMOUNT"), \
F.col("ORDER_TOTAL"))
final_df.create_or_replace_view('POS_FLATTENED_V')
def create_pos_view_stream(session):
session.use_schema('HARMONIZED')
_ = session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
ON VIEW POS_FLATTENED_V \
SHOW_INITIAL_ROWS = TRUE').collect()
def test_pos_view(session):
session.use_schema('HARMONIZED')
tv = session.table('POS_FLATTENED_V')
tv.limit(5).show()
# For local debugging
if __name__ == "__main__":
connection_parameters = {
"account": os.getenv("SNOWFLAKE_ACCOUNT"),
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"role": os.getenv("SNOWFLAKE_ROLE"),
"warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
"database": os.getenv("SNOWFLAKE_DATABASE"),
}
with Session.builder.configs(connection_parameters).create() as session:
create_pos_view(session)
create_pos_view_stream(session)
# test_pos_view(session)