-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path3_gold_processing.py
More file actions
60 lines (41 loc) · 1.34 KB
/
3_gold_processing.py
File metadata and controls
60 lines (41 loc) · 1.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC # Gold Processing
# COMMAND ----------
catalog = 'fmcg'
silver_schema = 'silver'
data_source = 'customers'
df_silver = spark.sql(f"select * from {catalog}.{silver_schema}.{data_source};")
# take required columns only
df_gold = df_silver.select("customer_id", "customer_name", "city", "customer", "market", "platform","channel")
# COMMAND ----------
df_gold.show(truncate=False)
# COMMAND ----------
gold_schema = "gold"
df_gold.write \
.mode("overwrite") \
.format("delta") \
.option("delta.enableChangeDataFeed", "true") \
.saveAsTable(f"{catalog}.{gold_schema}.sb_dim_{data_source}")
# COMMAND ----------
# merge child table according to parent table
from delta.tables import DeltaTable
from pyspark.sql import functions as F
delta_table = DeltaTable.forName(spark, "fmcg.gold.dim_customers")
df_child_customers = spark.table("fmcg.gold.sb_dim_customers").select(
F.col("customer_id").alias("customer_code"),
"customer",
"market",
"platform",
"channel"
)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Upsert Operation
# COMMAND ----------
delta_table.alias("target").merge(
source=df_child_customers.alias("source"),
condition="target.customer_code = source.customer_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# COMMAND ----------