Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/python/.idea
/.idea/
*.iml
#local spark context data from unit tests
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ evaluation specs and results
The summary report is meant to be just that, a summary of the failed rules. This will return only the records that
failed and only the rules that failed for that record; thus, if the `summaryReport.isEmpty` then all rules passed.


## Python Wraper
The Python Wrapper allows users to validate data quality of their PySpark DataFrames using Python.

They Python Wrapper can be found under the directory `/python`. A quickstart notebook is also located under `/python/examples`.

## Next Steps
Clearly, this is just a start. This is a small package and, as such, a GREAT place to start if you've never
contributed to a project before. Please feel free to fork the repo and/or submit PRs. We'd love to see what
Expand Down
47 changes: 47 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Python Connector for the DataFrame Rules Engine
The Python Connector allows users to validate data quality of their PySpark DataFrames using Python.

```python
validation_results = RuleSet(df)
.add(myRules)
.validate()
```

Currently, the Python Connector supports the following Rule types:
1. List of Values (Strings _only_)
2. Boolean Check
3. User-defined Functions (must evaluate to a Boolean)


### Boolean Check
Validate that an column expression evaluates to True.
```python
# Ensure that the temperature is a valid reading
valid_temp_rule = Rule("valid_temperature", F.col("temperature") > -100.0)
```

### List of Values (LOVs)
Validate that a Column only contains values present in a List of Strings.

```python
# Create a List of Strings (LOS)
building_sites = ["SiteA", "SiteB", "SiteC"]

# Build a Rule that validates that a column only contains values from LOS
building_name_rule = Rule("Building_LOV_Rule",
column=F.col("site_name"),
valid_strings=building_sites)
```

### User-Defined Functions (UDFs)
UDFs are great when you need to add custom business logic for validating dataset quality.
You can use User-defined Functions with the DataFrame Rules Engine that return a Boolean value.

```python
# Create a UDF to validate date entry
def valid_date_udf(ts_column):
return ts_column.isNotNull() & F.year(ts_column).isNotNull() & F.month(ts_column).isNotNull()

# Create a Rule that uses the UDF to validate data
valid_date_rule = Rule("valid_date_reading", valid_date_udf(F.col("reading_date")))
```
Comment thread
GeekSheikh marked this conversation as resolved.
Outdated
Binary file not shown.
Binary file added python/dist/dataframe_rules_engine-0.0.1.tar.gz
Comment thread
GeekSheikh marked this conversation as resolved.
Outdated
Binary file not shown.
60 changes: 60 additions & 0 deletions python/examples/01-Generate Mock Purchase Transactions.py
Comment thread
GeekSheikh marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Databricks notebook source
catalog_name = "REPLACE_ME"
schema_name = "REPLACE_ME"

# COMMAND ----------

import random
import datetime

def generate_sample_data():
"""Generates mock transaction data that randomly adds bad data"""

# randomly generate bad data
if bool(random.getrandbits(1)):
appl_id = None
acct_no = None
event_ts = None
cstone_last_updatetm = None
else:
appl_id = random.randint(1000000, 9999999)
acct_no = random.randint(1000000000000000, 9999999999999999)
event_ts = datetime.datetime.now()
cstone_last_updatetm = datetime.datetime.now()

# randomly generate an MCC description
categories = ["dining", "transportation", "merchendise", "hotels", "airfare", "grocery stores/supermarkets/bakeries"]
random_index = random.randint(0, len(categories)-1)
category = categories[random_index]

# randomly generate a transaction price
price = round(random.uniform(1.99, 9999.99), 2)

data = [
(appl_id, acct_no, event_ts, category, price, cstone_last_updatetm)
]
df = spark.createDataFrame(data,
"appl_id int, acct_no long, event_ts timestamp, category string, price float, cstone_last_updatetm timestamp")
return df

# COMMAND ----------

spark.sql(f"create schema if not exists {catalog_name}.{schema_name}")

# COMMAND ----------

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.purchase_transactions_bronze
(appl_id int, acct_no long, event_ts timestamp, category string, price float, cstone_last_updatetm timestamp)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# COMMAND ----------

df = generate_sample_data()
df.write.insertInto(f"{catalog_name}.{schema_name}.purchase_transactions_bronze")

# COMMAND ----------


152 changes: 152 additions & 0 deletions python/examples/02-Apply Purchase Transaction Rules.py
Comment thread
GeekSheikh marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Databricks notebook source
# MAGIC %run ./PythonWrapper

# COMMAND ----------

# MAGIC %md
# MAGIC # Ingest new Data

# COMMAND ----------

import datetime

starting_time = datetime.datetime.now() - datetime.timedelta(minutes=5)

catalog_name = "REPLACE_ME"
schema_name = "REPLACE_ME"

# COMMAND ----------

# Read table changes from 5 mins ago
df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", starting_time) \
.table(f"{catalog_name}.{schema_name}.purchase_transactions_bronze")
purchase_transactions_df = df.select("appl_id", "acct_no", "event_ts", "category", "price", "cstone_last_updatetm")\
.where("_change_type='insert'")
purchase_transactions_df.display()

# COMMAND ----------

# MAGIC %md
# MAGIC # Define Rules using Builder Pattern

# COMMAND ----------

# MAGIC %md
# MAGIC ## Sample Rules
# MAGIC
# MAGIC From a DQ rule point of view, we would be looking at following scenarios:
# MAGIC
# MAGIC - **event_ts**: Should have a timestamp for every day (timestamp format doesn’t matter)
# MAGIC - **cstone_last_updatetm**: Should have a timestamp for every day
# MAGIC - **acct_no**: No null values for this column
# MAGIC - **appl_id**: No null values for this column
# MAGIC - **Changes in string length** - for all columns
# MAGIC

# COMMAND ----------

import pyspark.sql.functions as F

# First, begin by defining your RuleSet by passing in your input DataFrame
myRuleSet = RuleSet(purchase_transactions_df)

# Rule 1 - define a Rule that validates that the `acct_no` is never null
acct_num_rule = Rule("valid_acct_no_rule", F.col("acct_no").isNotNull())
myRuleSet.add(acct_num_rule)

# Rule 2 - add a Rule that validates that the `appl_id` is never null
appl_id_rule = Rule("valid_appl_id", F.col("appl_id").isNotNull())
myRuleSet.add(appl_id_rule)

# COMMAND ----------

# Rules can even be used in conjunction with User-Defined Functions
def valid_timestamp(ts_column):
return ts_column.isNotNull() & F.year(ts_column).isNotNull() & F.month(ts_column).isNotNull()

# COMMAND ----------

# Rule 3 - enforce a valid `event_ts` timestamp
valid_event_ts_rule = Rule("valid_event_ts_rule", valid_timestamp(F.col("event_ts")))
myRuleSet.add(valid_event_ts_rule)

# Rule 4 - enforce a valid `cstone_last_updatetm` timestamp
valid_cstone_last_updatetm_rule = Rule("valid_cstone_last_updatetm_rule", valid_timestamp(F.col("cstone_last_updatetm")))
myRuleSet.add(valid_cstone_last_updatetm_rule)

# COMMAND ----------

# Rule 5 - validate string lengths
import pyspark.sql.functions as F
import datetime

starting_timestamp = datetime.datetime.now() - datetime.timedelta(minutes=5)
ending_timestamp = datetime.datetime.now() - datetime.timedelta(minutes=1)

# Read table changes from 5 mins ago
df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table(f"{catalog_name}.{schema_name}.purchase_transactions_bronze")
df_category = df.select("category").where("_change_type='insert'").agg(F.mean(F.length(F.col("category"))).alias("avg_category_len"))
avg_category_len = df_category.collect()[0]['avg_category_len']
print(avg_category_len)

# COMMAND ----------

def valid_category_len(category_column, avg_category_str_len):
return F.length(category_column) <= avg_category_str_len

# Rule 5 - validate `category` string lengths
valid_str_length_rule = Rule("valid_category_str_length_rule", valid_category_len(F.col("category"), avg_category_len))
myRuleSet.add(valid_str_length_rule)

# COMMAND ----------

# MAGIC %md
# MAGIC # Validate Rows

# COMMAND ----------

from pyspark.sql import DataFrame

# Finally, add the Rule to the RuleSet and validate!
summaryReport = myRuleSet.get_summary_report()
completeReport = myRuleSet.get_complete_report()

# Display the summary validation report
display(summaryReport)

# COMMAND ----------

# Display the complete validation report
display(completeReport)

# COMMAND ----------

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.purchase_transactions_validated
(appl_id int, acct_no long, event_ts timestamp, category string, price float, cstone_last_updatetm timestamp, failed_rules array<string>)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# COMMAND ----------

import pyspark.sql.functions as F
import pyspark.sql.types as T

if summaryReport.count() > 0:
summaryReport.write.insertInto(f"{catalog_name}.{schema_name}.purchase_transactions_validated")
else:
string_array_type = T.ArrayType(T.StringType())
purchase_transactions_df \
.withColumn("failed_rules", F.array(F.array().cast(string_array_type))) \
.write.insertInto(f"{catalog_name}.{schema_name}.purchase_transactions_validated")

# COMMAND ----------


Loading