Skip to content

Commit 8875907

Browse files
Merge pull request #23 from target/StringRegexCheck
String regex check
2 parents d52c2d1 + 71782e1 commit 8875907

5 files changed

Lines changed: 379 additions & 0 deletions

File tree

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,17 @@ At least one of `minLength` or `maxLength` must be specified. The data type of `
282282
| `maxLength` | Integer | Upper bound of the length of the string, inclusive.
283283
| `threshold` | String | See above description of threshold.
284284

285+
#### `stringRegexCheck`
286+
287+
Takes 2 to 3 parameters, described in the table below. If the `column` value does not match the pattern specified by the `regex`, the check will fail.
288+
A value for `regex` must be specified. The data type of `column` must be String.
289+
290+
| Arg | Type | Description |
291+
|-------------|--------|-------------------------------------------------------------------------|
292+
| `column` | String | Table column to be checked. The DataType of the column must be a String |
293+
| `regex` | String | POSIX regex. |
294+
| `threshold` | String | See above description of threshold. |
295+
285296
#### `rowCount`
286297

287298
The minimum number of rows a table must have to pass the validator.
@@ -372,6 +383,15 @@ tables:
372383
column: occupation
373384
minLength: 1
374385
maxLength: 5
386+
387+
# stringRegexCheck - checks if the string in the column matches the pattern specified by `regex`, counts number of rows in which there is a mismatch.
388+
- type: stringRegexCheck
389+
column: occupation
390+
regex: ^ENGINEER$ (matches the word ENGINEER)
391+
392+
- type: stringRegexCheck
393+
column: occupation
394+
regex: \w (matches any alphanumeric string)
375395
```
376396
377397
## Working with OOZIE Workflows

src/main/scala/com/target/data_validator/validator/JsonDecoders.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ object JsonDecoders extends LazyLogging {
1616
case "rangeCheck" => RangeCheck.fromJson(c)
1717
case "uniqueCheck" => UniqueCheck.fromJson(c)
1818
case "stringLengthCheck" => StringLengthCheck.fromJson(c)
19+
case "stringRegexCheck" => StringRegexCheck.fromJson(c)
1920
case x => logger.error(s"Unknown Check `$x` in config!")
2021
throw new RuntimeException(s"Unknown Check in config `$x`")
2122
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.target.data_validator.validator
2+
3+
import com.target.data_validator.{JsonEncoders, ValidatorError, VarSubstitution}
4+
import com.target.data_validator.JsonUtils.debugJson
5+
import com.target.data_validator.validator.ValidatorBase._
6+
import com.typesafe.scalalogging.LazyLogging
7+
import io.circe.{DecodingFailure, HCursor, Json}
8+
import io.circe.syntax._
9+
import org.apache.spark.sql.DataFrame
10+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
11+
import org.apache.spark.sql.catalyst.expressions._
12+
import org.apache.spark.sql.types.{StringType, StructType}
13+
14+
case class StringRegexCheck(
15+
column: String,
16+
regex: Option[Json],
17+
threshold: Option[String]
18+
) extends RowBased {
19+
20+
override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
21+
22+
val ret = StringRegexCheck(
23+
getVarSub(column, "column", dict),
24+
regex.map(getVarSubJson(_, "regex", dict)),
25+
threshold.map(getVarSub(_, "threshold", dict))
26+
)
27+
getEvents.foreach(ret.addEvent)
28+
ret
29+
}
30+
31+
override def colTest(schema: StructType, dict: VarSubstitution): Expression = {
32+
33+
val colExp = UnresolvedAttribute(column)
34+
35+
val regexExpression = regex.map { r => RLike(colExp, createLiteralOrUnresolvedAttribute(StringType, r)) }
36+
37+
val ret = regexExpression match {
38+
/*
39+
RLike returns false if the column value is null.
40+
To avoid counting null values as validation failures (like other validations),
41+
an explicit non null check on the column value is required.
42+
*/
43+
case Some(x) => And(Not(x), IsNotNull(colExp))
44+
case _ => throw new RuntimeException("Must define a regex.")
45+
}
46+
logger.debug(s"Expr: $ret")
47+
ret
48+
}
49+
50+
override def configCheck(df: DataFrame): Boolean = {
51+
52+
// Verify if regex is specified.
53+
val values = (regex::Nil).flatten
54+
if (values.isEmpty) {
55+
addEvent(ValidatorError("Must define a regex."))
56+
}
57+
58+
// Verify that the data type of the specified column is a String.
59+
val colType = findColumnInDataFrame(df, column)
60+
if (colType.isDefined) {
61+
val dataType = colType.get.dataType
62+
if (!(dataType.isInstanceOf[StringType])) {
63+
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
64+
}
65+
}
66+
67+
failed
68+
}
69+
70+
override def toJson: Json = {
71+
import JsonEncoders.eventEncoder
72+
val fields = Seq(
73+
("type", Json.fromString("stringRegexCheck")),
74+
("column", Json.fromString(column))
75+
) ++
76+
regex.map(r => ("regex", r)) ++
77+
Seq(
78+
("events", getEvents.asJson)
79+
)
80+
Json.obj(fields: _*)
81+
}
82+
}
83+
84+
object StringRegexCheck extends LazyLogging {
85+
def fromJson(c: HCursor): Either[DecodingFailure, ValidatorBase] = {
86+
val column = c.downField("column").as[String].right.get
87+
val regex = c.downField("regex").as[Json].right.toOption
88+
val threshold = c.downField("threshold").as[String].right.toOption
89+
90+
logger.debug(s"column: $column")
91+
logger.debug(s"regex: $regex type: ${regex.getClass.getCanonicalName}")
92+
logger.debug(s"threshold: $threshold type: ${threshold.getClass.getCanonicalName}")
93+
94+
c.focus.foreach {f => logger.info(s"StringRegexCheckJson: ${f.spaces2}")}
95+
scala.util.Right(StringRegexCheck(column, regex, threshold))
96+
}
97+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.target.data_validator.validator
2+
3+
import com.target.data_validator._
4+
import org.apache.spark.sql.types.StructType
5+
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
6+
import io.circe.Json
7+
8+
trait Mocker{
9+
10+
def mkDataFrame(spark: SparkSession, data: List[Row], schema: StructType): DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
11+
12+
def mkParams(params: List[Tuple2[String, Any]] = List.empty): VarSubstitution = {
13+
val dict = new VarSubstitution
14+
params.foreach { pair =>
15+
pair._2 match {
16+
case p: Json => dict.add(pair._1, pair._2.asInstanceOf[Json])
17+
case p: String => dict.addString(pair._1, pair._2.asInstanceOf[String])
18+
}
19+
}
20+
dict
21+
}
22+
}

0 commit comments

Comments
 (0)