You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Adds a new dataset-level check is_aggr_not_anomalous to check_funcs.py,
sitting alongside the other is_aggr_* primitives. The check fires when
the current bucket's aggregate deviates by more than N standard deviations
from the rolling mean of the preceding lookback_num_intervals buckets.
Key design points:
- Params: aggr_type, sigma, lookback_num_intervals, warmup_num_intervals,
time_interval (minute/hour/day/week/month), group_by, row_filter, aggr_params
- Warmup guard: passes silently when fewer than warmup_num_intervals buckets exist
- Constant-series guard: passes when stddev_pop == 0
- Missing-current guard: passes when the most-recent bucket is absent
- Grouped checks fire per-group with independent rolling bands
- row_filter uses F.when() pattern consistent with _is_aggr_compare
- 2 auxiliary columns (condition + pre-built message string) inside apply()
- Unit tests: registration, parameter validation (10 cases)
- Integration tests: spike detection, warmup, constant series, group_by,
row_filter, hour truncation, YAML round-trip, count(*), sum+group_by,
warmup==lookback boundary
Copy file name to clipboardExpand all lines: docs/dqx/docs/reference/quality_checks.mdx
+41Lines changed: 41 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1808,6 +1808,7 @@ You can also define your own custom dataset-level checks (see [Creating custom c
1808
1808
| `is_aggr_not_less_than` | Checks whether the aggregated values over group of rows or all rows are not less than the provided limit. | `column`: column to check (can be a string column name or a column expression), optional for 'count' aggregation; `limit`: limit as number, column name or sql expression (string literals must be single quoted, e.g. 'string_value'); `aggr_type`: aggregation function (default: "count"), supports 20 curated functions (count, sum, avg, stddev, percentile, etc.) plus any Databricks built-in aggregate; `group_by`: (optional) list of columns or column expressions to group the rows for aggregation (no grouping by default); `aggr_params`: (optional) dict of parameters for aggregates requiring them |
1809
1809
| `is_aggr_equal` | Checks whether the aggregated values over group of rows or all rows are equal to the provided limit. | `column`: column to check (can be a string column name or a column expression), optional for 'count' aggregation; `limit`: limit as number, column name or sql expression (string literals must be single quoted, e.g. 'string_value'); `aggr_type`: aggregation function (default: "count"), supports 20 curated functions (count, sum, avg, stddev, percentile, etc.) plus any Databricks built-in aggregate; `group_by`: (optional) list of columns or column expressions to group the rows for aggregation (no grouping by default); `aggr_params`: (optional) dict of parameters for aggregates requiring them; `abs_tolerance`: (optional) absolute tolerance for equality comparison of numeric aggregations (formula: `abs(a - b) <= tolerance`); `rel_tolerance`: relative tolerance for equality comparison of numeric aggregations (formula: `abs(a - b) <= rel_tolerance * max(abs(a), abs(b))`) |
1810
1810
| `is_aggr_not_equal` | Checks whether the aggregated values over group of rows or all rows are not equal to the provided limit. | `column`: column to check (can be a string column name or a column expression), optional for 'count' aggregation; `limit`: limit as number, column name or sql expression (string literals must be single quoted, e.g. 'string_value'); `aggr_type`: aggregation function (default: "count"), supports 20 curated functions (count, sum, avg, stddev, percentile, etc.) plus any Databricks built-in aggregate; `group_by`: (optional) list of columns or column expressions to group the rows for aggregation (no grouping by default); `aggr_params`: (optional) dict of parameters for aggregates requiring them; `abs_tolerance`: (optional) absolute tolerance for equality comparison of numeric aggregations (formula: `abs(a - b) <= tolerance`); `rel_tolerance`: relative tolerance for equality comparison of numeric aggregations (formula: `abs(a - b) <= rel_tolerance * max(abs(a), abs(b))`) |
1811
+
| `is_aggr_not_anomalous` | Rolling-window sigma anomaly check for a time-series aggregate. Fires when the current bucket's aggregate deviates by more than N standard deviations from the rolling mean of the preceding `lookback_num_intervals` buckets. All logic is pure PySpark. Passes silently during warmup (fewer than `warmup_num_intervals` historical buckets), when the series is constant (stddev == 0), or when the current bucket is missing. Complements `is_aggr_not_less_than` / `is_aggr_not_greater_than` for dynamic, data-driven anomaly detection on daily/hourly metrics. | `column`: column name (str) or Column expression to aggregate, e.g. `"revenue"` or `F.col("a") - F.col("b")`; `time_column`: name of the timestamp/date column used to bucket rows into time grains; `aggr_type`: aggregation type applied per bucket (default: `"avg"`); `sigma`: number of standard deviations defining the anomaly band (default: `3.0`, must be > 0); `lookback_num_intervals`: number of preceding buckets used to build the rolling baseline (default: `14`, must be >= 2); `warmup_num_intervals`: minimum historical buckets required before the check fires (default: `7`, must satisfy `1 <= warmup_num_intervals <= lookback_num_intervals`); `time_interval`: granularity for bucketing the `time_column` — one of `"minute"`, `"hour"`, `"day"`, `"week"`, `"month"` (default: `"day"`); `group_by`: (optional) list of columns or column expressions to segment the anomaly band per group; `row_filter`: (optional) SQL expression to filter rows before aggregation; `aggr_params`: (optional) dict of extra parameters for aggregate functions that require them |
1811
1812
| `foreign_key` (aka is_in_list) | Checks whether input column or columns can be found in the reference DataFrame or Table (foreign key check). It supports foreign key check on single and composite keys. This check can be used to validate whether values in the input column(s) exist in a predefined list of allowed values (stored in the reference DataFrame or Table). It serves as a scalable alternative to `is_in_list` row-level checks, when working with large lists. | `columns`: columns to check (can be a list of string column names or column expressions); `ref_columns`: columns to check for existence in the reference DataFrame or Table (can be a list string column name or a column expression); `ref_df_name`: (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name; either `ref_df_name` or `ref_table` must be provided but never both; the number of passed `columns` and `ref_columns` must match and keys are checks in the given order; negate: if True the condition is negated (i.e. the check fails when the foreign key values exist in the reference DataFrame/Table), if False the check fails when the foreign key values do not exist in the reference |
1812
1813
| `sql_query` | Checks whether the condition column produced by a SQL query is satisfied. The check supports two modes: **Row-level validation** (when `merge_columns` is provided) - query results are joined back to the input DataFrame to mark specific rows; **Dataset-level validation** (when `merge_columns` is None or empty) - the check result applies to all rows (or filtered rows if `filter` is used), making it ideal for aggregate validations with custom metrics. The query must return a boolean condition column (True = fail, False = pass). For row-level checks: if merge columns aren't unique, multiple query rows can attach to a single input row, potentially causing false positives. Performance tip: for complex queries, writing a custom dataset-level rule is usually more performant than `sql_query` check. | `query`: query string, must return condition column (and merge columns if provided); `input_placeholder`: name to be used in the sql query as `{{ input_placeholder }}` to refer to the input DataFrame, optional reference DataFrames are referred by the name provided in the dictionary of reference DataFrames (e.g. `{{ ref_df_key }}`, dictionary of DataFrames can be passed when applying checks); `merge_columns`: (optional) list of columns used for merging with the input DataFrame which must exist in the input DataFrame and be present in output of the sql query; when not provided (None or empty list), the check result applies to all rows in the dataset (dataset-level validation); `condition_column`: name of the column indicating a violation (False = pass, True = fail); `msg`: (optional) message to output; `name`: (optional) name of the resulting check (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated |
1813
1814
| `compare_datasets` | Compares two DataFrames at both row and column levels, providing detailed information about differences, including new or missing rows and column-level changes. Only columns present in both the source and reference DataFrames are compared. Use with caution if `check_missing_records` is enabled, as this may increase the number of rows in the output beyond the original input DataFrame. The comparison does not support Map types (any column comparison on map type is skipped automatically). Comparing datasets is valuable for validating data during migrations, detecting drift, performing regression testing, or verifying synchronization between source and target systems. | `columns`: columns to use for row matching with the reference DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'df.columns'; `ref_columns`: list of columns in the reference DataFrame or Table to row match against the source DataFrame (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'), if not having primary keys or wanting to match against all columns you can pass 'ref_df.columns'; note that `columns` are matched with `ref_columns` by position, so the order of the provided columns in both lists must be exactly aligned; `exclude_columns`: (optional) list of columns to exclude from the value comparison but not from row matching (can be a list of string column names or column expressions, but only simple column expressions are allowed such as 'F.col("col1")'); the `exclude_columns` field does not alter the list of columns used to determine row matches (columns), it only controls which columns are skipped during the value comparison; `ref_df_name`: (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); `ref_table`: (optional) fully qualified reference table name; either `ref_df_name` or `ref_table` must be provided but never both; the number of passed `columns` and `ref_columns` must match and keys are checks in the given order; `check_missing_records`: perform a FULL OUTER JOIN to identify records that are missing from source or reference DataFrames, default is False; use with caution as this may increase the number of rows in the output, as unmatched rows from both sides are included; `null_safe_row_matching`: (optional) treat NULLs as equal when matching rows using `columns` and `ref_columns` (default: True); `null_safe_column_value_matching`: (optional) treat NULLs as equal when comparing column values (default: True); `abs_tolerance`: (optional) numeric values are considered equal if the absolute difference is less than or equal to the tolerance (formula: `abs(a - b) <= tolerance`); `rel_tolerance`: differences in numeric values within this relative tolerance are ignored (formula: `abs(a - b) <= rel_tolerance * max(abs(a), abs(b))`) |
@@ -2042,6 +2043,46 @@ Complex data types are supported as well.
2042
2043
limit:
2043
2044
__decimal__: "200.30"
2044
2045
2046
+
# is_aggr_not_anomalous check — daily row count over all rows (3-sigma band, 14-interval history)
2047
+
- criticality: warn
2048
+
check:
2049
+
function: is_aggr_not_anomalous
2050
+
arguments:
2051
+
column: '*'
2052
+
time_column: event_date
2053
+
aggr_type: count # other types: sum, avg, min, max, stddev, etc.
2054
+
sigma: 3.0 # fire when |current - baseline| > 3 * stddev
2055
+
lookback_num_intervals: 14 # number of preceding buckets for the rolling baseline
2056
+
warmup_num_intervals: 7 # suppress the check until at least 7 historical buckets exist
2057
+
2058
+
# is_aggr_not_anomalous check — daily sum of revenue grouped by region
2059
+
- criticality: warn
2060
+
check:
2061
+
function: is_aggr_not_anomalous
2062
+
arguments:
2063
+
column: revenue
2064
+
time_column: event_date
2065
+
aggr_type: sum
2066
+
sigma: 3.0
2067
+
lookback_num_intervals: 14
2068
+
warmup_num_intervals: 7
2069
+
group_by:
2070
+
- region
2071
+
2072
+
# is_aggr_not_anomalous check — hourly average latency with a row filter
2073
+
- criticality: error
2074
+
check:
2075
+
function: is_aggr_not_anomalous
2076
+
arguments:
2077
+
column: latency_ms
2078
+
time_column: request_time
2079
+
aggr_type: avg
2080
+
sigma: 2.5
2081
+
lookback_num_intervals: 48 # 48 hourly buckets ≈ 2 days of history
2082
+
warmup_num_intervals: 12
2083
+
time_interval: hour # bucket by hour instead of day
0 commit comments