Skip to content

Commit adf911f

Browse files
committed
0.1 initial commit
1 parent d7f87e6 commit adf911f

9 files changed

Lines changed: 547 additions & 0 deletions

File tree

.gitignore

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/.idea/
2+
*.iml
3+
#local spark context data from unit tests
4+
spark-warehouse/
5+
6+
#Build dirctory for maven/sbt
7+
target/
8+
project/project/
9+
project/target/
10+
*.DS_Store
11+
/target/
12+
/project/

build.sbt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
name := "Rules_Engine"
2+
3+
organization := "com.databricks"
4+
5+
version := "0.1"
6+
7+
scalaVersion := "2.11.12"
8+
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
9+
10+
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
11+
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.0"
12+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
13+
14+
lazy val commonSettings = Seq(
15+
version := "0.1",
16+
organization := "com.databricks",
17+
scalaVersion := "2.11.12"
18+
)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.databricks.labs.validation
2+
3+
import com.databricks.labs.validation.utils.{SparkSessionWrapper, MinMaxFunc}
4+
import com.databricks.labs.validation.utils.Structures._
5+
import org.apache.spark.sql.{Column, functions}
6+
import org.apache.spark.sql.functions.col
7+
8+
object Example extends App with SparkSessionWrapper {
9+
import spark.implicits._
10+
11+
/**
12+
* Validation example
13+
* Passing pre-built array of rules into a RuleSet and validating a non-grouped dataframe
14+
*/
15+
16+
/**
17+
* Example of a proper UDF to simplify rules logic. Simplification UDFs should take in zero or many
18+
* columns and return one column
19+
* @param retailPrice column 1
20+
* @param scanPrince column 2
21+
* @return result column of applied logic
22+
*/
23+
def getDiscountPercentage(retailPrice: Column, scanPrince: Column): Column = {
24+
((retailPrice - scanPrince) / retailPrice).alias("total_discount_percent")
25+
}
26+
27+
// Example of creating array of custom rules
28+
val specializedRules = Array(
29+
Rule("Reasonable_sku_counts", col("sku"), functions.count, "sku_count", Bounds(lower = 20.0, upper = 200.0)),
30+
Rule("Max_allowed_discount", getDiscountPercentage(col("retail_price"), col("scan_price")),
31+
functions.max, "total_discount", Bounds(upper = 90.0))
32+
)
33+
34+
// It's common to generate many mins and max boundaries. These can be generated easily
35+
// The generator function can easily be extended or overridden to satisfy more complex requirements
36+
val minMaxPriceDefs = Array(
37+
MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 350.0)),
38+
MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 350.0)),
39+
MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 350.0))
40+
41+
)
42+
val minMaxPriceRules = RuleSet.generateMinMaxRules(minMaxPriceDefs: _*)
43+
44+
val x = Array(("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 350.0)))
45+
46+
// Generate a single stand alone rule to demonstrate builder pattern below
47+
val standAloneRule = Rule("stddev_retail_price", col("retail_price"), functions.stddev, "stddev_price", Bounds(upper = 13.0))
48+
49+
//TODO - validate datetime
50+
// TODO - validate distinct keys
51+
// Test, example data frame
52+
val df = sc.parallelize(Seq(
53+
(1001, 123456, 9.32, 8.99, 4.23),
54+
(1001, 123256, 19.99, 16.49, 12.99),
55+
(1001, 123456, 0.99, 0.99, 0.10),
56+
(1001, 123456, 0.98, 0.90, 0.10), // non_distinct sku
57+
(1002, 122987, 9.99, 9.49, 6.49),
58+
(1002, 173544, 1.29, 0.99, 1.23),
59+
(1002, 168212, 3.29, 1.99, 1.23),
60+
(1002, 365423, 1.29, 0.99, 1.23),
61+
(1002, 3897615, 14.99, 129.99, 1.23),
62+
(1003, 163212, 3.29, 1.99, 1.23)
63+
)).toDF("region", "store_id", "sku", "retail_price", "scan_price", "cost")
64+
65+
// Doing the validation
66+
// The validate method will return the rules report dataframe which breaks down which rules passed and which
67+
// rules failed and how/why. The second return value returns a boolean to determine whether or not all tests passed
68+
val (rulesReport, passed) = RuleSet(df)
69+
.add(specializedRules)
70+
.add(standAloneRule)
71+
.add(minMaxPriceRules)
72+
.validate
73+
74+
rulesReport.show(false)
75+
76+
77+
78+
79+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package com.databricks.labs.validation
2+
3+
import com.databricks.labs.validation.utils.MinMaxFunc
4+
import com.databricks.labs.validation.utils.Structures.{Bounds, ValidNumerics, ValidStrings}
5+
import org.apache.spark.sql.Column
6+
7+
class Rule {
8+
9+
private var _ruleName: String = _
10+
private var _column: Column = _
11+
private var _aggFunc: Column => Column = _
12+
private var _alias: String = _
13+
private var _boundaries: Bounds = _
14+
private var _validNumerics: ValidNumerics = _
15+
private var _validStrings: ValidStrings = _
16+
private var _dateTimeLogic: Column = _
17+
private var _ruleType: String = _
18+
private var _by: Seq[Column] = _
19+
20+
private def setRuleName(value: String): this.type = {
21+
_ruleName = value
22+
this
23+
}
24+
private def setColumn(value: Column): this.type = {
25+
_column = value
26+
this
27+
}
28+
private def setAggFunc(value: Column => Column): this.type = {
29+
_aggFunc = value
30+
this
31+
}
32+
private def setAlias(value: String): this.type = {
33+
_alias = value
34+
this
35+
}
36+
private def setBoundaries(value: Bounds): this.type = {
37+
_boundaries = value
38+
this
39+
}
40+
private def setByCols(value: Seq[Column]): this.type = {
41+
_by = value
42+
this
43+
}
44+
private def setValidNumerics(value: ValidNumerics): this.type = {
45+
_validNumerics = value
46+
this
47+
}
48+
private def setValidStrings(value: ValidStrings): this.type = {
49+
_validStrings = value
50+
this
51+
}
52+
private def setDateTimeLogic(value: Column): this.type = {
53+
_dateTimeLogic = value
54+
this
55+
}
56+
private def setRuleType(value: String): this.type = {
57+
_ruleType = value
58+
this
59+
}
60+
61+
def ruleName: String = _ruleName
62+
def inputColumn: Column = _column
63+
def aggFunc: Column => Column = _aggFunc //TODO - add .toString
64+
def alias: String = _alias
65+
def boundaries: Bounds = _boundaries
66+
def groupByColumns: Seq[Column] = _by //TODO - add .toString
67+
def validNumerics: ValidNumerics = _validNumerics
68+
def validStrings: ValidStrings = _validStrings
69+
def dateTimeLogic: Column = _dateTimeLogic
70+
def ruleType: String = _ruleType
71+
72+
}
73+
74+
object Rule {
75+
76+
def apply(
77+
ruleName: String,
78+
column: Column,
79+
aggFunc: Column => Column,
80+
alias: String,
81+
boundaries: Bounds,
82+
by: Column*
83+
): Rule = {
84+
85+
new Rule()
86+
.setRuleName(ruleName)
87+
.setColumn(column)
88+
.setAggFunc(aggFunc)
89+
.setAlias(alias)
90+
.setBoundaries(boundaries)
91+
.setRuleType("bounds")
92+
.setByCols(by)
93+
}
94+
95+
def apply(
96+
ruleName: String,
97+
column: Column,
98+
aggFunc: Column => Column, // TODO -- Handle aggs
99+
alias: String,
100+
validNumerics: ValidNumerics,
101+
by: Column*
102+
): Rule = {
103+
104+
new Rule()
105+
.setRuleName(ruleName)
106+
.setColumn(column)
107+
.setAggFunc(aggFunc)
108+
.setAlias(alias)
109+
.setValidNumerics(validNumerics)
110+
.setRuleType("validNumerics")
111+
.setByCols(by)
112+
}
113+
114+
def apply(
115+
ruleName: String,
116+
column: Column,
117+
aggFunc: Column => Column, // TODO - Handle Aggs
118+
alias: String,
119+
validStrings: ValidStrings,
120+
by: Column*
121+
): Rule = {
122+
123+
new Rule()
124+
.setRuleName(ruleName)
125+
.setColumn(column)
126+
.setAggFunc(aggFunc)
127+
.setAlias(alias)
128+
.setValidStrings(validStrings)
129+
.setRuleType("validStrings")
130+
.setByCols(by)
131+
}
132+
133+
def apply(
134+
ruleName: String,
135+
column: Column,
136+
aggFunc: Column => Column, // TODO - handle aggs
137+
alias: String,
138+
dateTimeLogic: Column,
139+
by: Column*
140+
): Rule = {
141+
142+
new Rule()
143+
.setRuleName(ruleName)
144+
.setColumn(column)
145+
.setAggFunc(aggFunc)
146+
.setAlias(alias)
147+
.setDateTimeLogic(dateTimeLogic)
148+
.setRuleType("dateTime")
149+
.setByCols(by)
150+
}
151+
152+
}

0 commit comments

Comments
 (0)