Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions docs/dqx/docs/reference/quality_checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -4080,6 +4080,85 @@ Using DQX classes:
When using dataset-level checks, the top-level `filter` condition is pushed down as `row_filter` to the check function and applied before aggregation, ensuring that the check operates only on the relevant subset of rows rather than on the aggregated results.
</Admonition>

## Customizing check messages

Users can override the default failure message of any `DQRule` by specifying a custom message expression. Set `message_expr` to either a Spark SQL expression string or a Spark `Column` expression that returns a string-valued message when a check fails.

<Admonition type="tip" title="Null-safe dynamic messages">
When your expression references a column, wrap it with `coalesce` to avoid null messages. In Spark SQL, `concat(..., null)` returns `null`.
</Admonition>

<Tabs>
<TabItem value="Python" label="Python" default>
```python
import pyspark.sql.functions as F
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.rule import DQRowRule

# static message: "Email must not be null"
checks = [
DQRowRule(
name="email_not_null",
criticality="error",
check_func=check_funcs.is_not_null,
column="email",
message_expr="'Email must not be null'",
)
]

# dynamic message using a SQL expression string: "age_positive: age <value> is not valid"
checks = [
DQRowRule(
name="age_positive",
criticality="error",
check_func=check_funcs.is_not_less_than,
column="age",
check_func_kwargs={"limit": 0},
message_expr="concat('age_positive: age ', coalesce(cast(age as string), 'null'), ' is not valid')",
)
]

# dynamic message using a Spark Column expression: "age_positive: age <value> is not valid"
checks = [
DQRowRule(
name="age_positive",
criticality="error",
check_func=check_funcs.is_not_less_than,
column="age",
check_func_kwargs={"limit": 0},
message_expr=F.concat(
F.lit("age_positive: age "),
F.coalesce(F.col("age").cast("string"), F.lit("null")),
F.lit(" is not valid"),
),
)
]
Comment thread
mwojtyczka marked this conversation as resolved.
```
</TabItem>
<TabItem value="YAML" label="YAML">
```yaml
# static message: "Email must not be null"
- name: email_not_null
criticality: error
message_expr: "'Email must not be null'"
check:
function: is_not_null
arguments:
column: email

# dynamic message using a SQL expression string: "age_positive: age <value> is not valid"
- name: age_positive
criticality: error
message_expr: "concat('age_positive: age ', coalesce(cast(age as string), 'null'), ' is not valid')"
check:
function: is_not_less_than
arguments:
column: age
limit: 0
```
</TabItem>
</Tabs>

## Converting checks between formats

In DQX, checks can be defined either as Python classes or YAML declarations. When using YAML, the files are first parsed into dictionaries and then transformed into DQX class instances under the hood. Since both formats share the same internal structure, they are interchangeable and can be safely converted between one another.
Expand Down
4 changes: 4 additions & 0 deletions src/databricks/labs/dqx/checks_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def deserialize(self, checks: list[dict]) -> list[DQRule]:
criticality = check_def.get("criticality", "error")
filter_str = check_def.get("filter")
user_metadata = check_def.get("user_metadata")
message_expr = check_def.get("message_expr")

# Exclude `column` and `columns` from check_func_kwargs
# as these are always included in the check function call
Expand All @@ -282,6 +283,7 @@ def deserialize(self, checks: list[dict]) -> list[DQRule]:
filter=filter_str,
check_func_kwargs=check_func_kwargs,
user_metadata=user_metadata,
message_expr=message_expr,
).get_rules()
else:
rule_type = CHECK_FUNC_REGISTRY.get(func_name)
Expand All @@ -296,6 +298,7 @@ def deserialize(self, checks: list[dict]) -> list[DQRule]:
criticality=criticality,
filter=filter_str,
user_metadata=user_metadata,
message_expr=message_expr,
)
)
else: # default to row-level rule
Expand All @@ -309,6 +312,7 @@ def deserialize(self, checks: list[dict]) -> list[DQRule]:
criticality=criticality,
filter=filter_str,
user_metadata=user_metadata,
message_expr=message_expr,
)
)

Expand Down
26 changes: 25 additions & 1 deletion src/databricks/labs/dqx/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ def _build_result_struct(self, condition: Column, skipped: bool = False) -> Colu
# or use literal run time if explicitly overridden
run_time_expr = F.current_timestamp() if self.run_time_overwrite is None else F.lit(self.run_time_overwrite)

message_col = self._build_message_col(condition)

return F.struct(
F.lit(self.check.name).alias("name"),
condition.alias("message"),
message_col.alias("message"),
self.check.columns_as_string_expr.alias("columns"),
F.lit(self.check.filter or None).cast("string").alias("filter"),
F.lit(self.check.check_func.__name__).alias("function"),
Expand All @@ -167,6 +169,28 @@ def _build_result_struct(self, condition: Column, skipped: bool = False) -> Colu
F.lit(skipped or None).alias("skipped"),
Comment thread
mwojtyczka marked this conversation as resolved.
).cast(dq_result_item_schema)

def _build_message_col(self, condition: Column) -> Column:
"""
Builds the message column, using the default message or the user-supplied
``message_expr`` from the rule definition. The expression is evaluated as-is — DQX
does not substitute placeholders. Accepts either a Spark SQL expression string or a
Spark Column.

Args:
condition: Default DQX condition message returned by evaluating the DQX check function
Copy link
Copy Markdown
Contributor

@mwojtyczka mwojtyczka Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When self.check.column is None but self.check.columns is set (e.g., a multi-column row rule), column_value falls back to F.lit(None). The message function receives no useful value to display.
Either document this or pass all column values as a struct. Then always pass a struct, even for single-column rules:

  if self.check.column:                                                                                                   
      column_value = F.struct(F.col(self.check.column).cast("string").alias(self.check.column))                           
  elif self.check.columns:                                                                                                
      column_value = F.struct(*[F.col(c).cast("string").alias(c) for c in self.check.columns])                            
  else:                                                                                                                   
      column_value = F.lit(None)            

Passing as struct would be more flexible, but it will add significant complexity for users when defining the message func so maybe just document this.


Returns:
The custom DQX condition message if ``message_expr`` is set on the rule, otherwise the
default DQX condition message.
"""
if self.check.message_expr is None:
return condition

custom_message = (
self.check.message_expr if isinstance(self.check.message_expr, Column) else F.expr(self.check.message_expr)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary Spark SQL evaluation of strings from non-trusted sources.

F.expr(self.check.message_expr) runs raw SQL with no validation. When checks come from YAML files, table-stored checks, or any persisted metadata source, this becomes an arbitrary-expression evaluation surface:

- check: { function: is_not_null, arguments: { column: id } }
  message_expr: "concat('hi ', (select secret_key from secrets.config limit 1))"

Not a classic SQL injection (Spark expr doesn't allow DDL/DML), but it does allow:

  • Subqueries against any catalog the running session can read.
  • Calls to potentially expensive UDFs (reflect, java_method, custom Spark UDFs).
  • Information disclosure via embedded subqueries that surface secrets in the violation message.

The project's CLAUDE.md security policy states:

User-provided or templated SQL must be validated with is_sql_query_safe() from utils.py before execution. Raise UnsafeSqlQueryError when the query is unsafe.

Options:

  • Validate message_expr strings via is_sql_query_safe() (or a stricter subset that disallows subqueries) before passing to F.expr.
  • Restrict the string form to literal/concat/coalesce patterns only.
  • At minimum, document message_expr strings as trusted input so metadata sources validate before storage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UX trap: a plain string passes through F.expr and fails confusingly.

A new user will write:

DQRowRule(..., message_expr="Email must not be null")

F.expr("Email must not be null") parses this as Email (column ref) must (column ref) not be null and fails with a confusing column-not-found error far from the rule definition.

The docs example correctly shows "'Email must not be null'" (Python double-quote wrapping a SQL single-quoted literal), but that's an awkward shape and easy to forget.

Options:

  • Auto-detect plain strings (no parens, no SQL operators) and wrap with F.lit automatically.
  • Best-effort validate at construction time (__post_init__ tries F.expr once and surfaces a clear error mentioning the SQL-quote requirement).
  • At minimum, expand the docs Admonition with a "common mistake" callout showing the unquoted form failing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if it is a string, to avoid problems with spark Column vs spark connect Column

)
return F.when(condition.isNotNull(), custom_message).otherwise(F.lit(None).cast("string"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No length cap on rendered message.

A pathological expression like concat(repeat('x', 100000)) produces a 100k-char message embedded in every result row's struct — bloating Delta files and breaking downstream consumers that assume short messages. Other parts of DQX cap at 500 chars (_LLM_FIELD_MAX_LEN in the anomaly explainer).

Consider wrapping with substring(message, 1, MAX_MESSAGE_LEN) here, or applying the cap as a Spark expression on custom_message before the F.when.


def _get_invalid_cols_message(self) -> str:
"""
Returns invalid columns message containing info about invalid columns to check should be applied to or filter.
Expand Down
13 changes: 13 additions & 0 deletions src/databricks/labs/dqx/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ class DQRule(abc.ABC, DQRuleTypeMixin, SingleColumnMixin, MultipleColumnsMixin):
* *check_func_args* (optional) - Positional arguments for the check function (excluding *column*).
* *check_func_kwargs* (optional) - Keyword arguments for the check function (excluding *column*).
* *user_metadata* (optional) - User-defined key-value pairs added to metadata generated by the check.
* *message_expr* (optional) - User-defined expression used as the check failure message. Accepts either
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No placeholder support for rule_name / check_func_name / column_value — document the limitation.

The PR description lists these as expected context, but the implementation evaluates the expression as-is. Users who want a message like "<rule_name> failed for value <column_value>" have to hand-type the rule name twice and inline the column reference (and the column reference doesn't generalise across DQForEachColRule instances).

This will be a frequent feature request as a follow-up. Worth either:

  • Documenting the limitation explicitly in the docstring ("placeholder substitution is not supported; reference column names directly via Spark SQL").
  • Considering whether to add {rule_name}, {column_value} placeholder substitution in this PR or as a tracked follow-up. If you choose to defer, link to a follow-up issue here so users searching the codebase find it.

a Spark SQL expression string or a Spark *Column* expression. The expression is evaluated as-is.
Any column references, casts, or rule-identifying literals must be supplied directly by the caller
(e.g., ``F.concat(F.lit('age_positive: value '), F.col('age').cast('string'))`` or
``"concat('age_positive: value ', cast(age as string))"``).
"""

check_func: Callable
Expand All @@ -176,6 +181,7 @@ class DQRule(abc.ABC, DQRuleTypeMixin, SingleColumnMixin, MultipleColumnsMixin):
check_func_args: list[Any] = field(default_factory=list)
check_func_kwargs: dict[str, Any] = field(default_factory=dict)
user_metadata: dict[str, str] | None = None
message_expr: str | Column | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overloaded parameter type — consider splitting before this becomes public API.

message_expr: str | Column forces callers (and serialisers, and future API extensions) to runtime-dispatch on type. Cleaner alternatives:

  • message_sql: str | None and message_col: Column | None with mutual-exclusion validation.
  • A single message: Column | None plus a message_from_sql(s: str) -> Column helper that wraps F.expr with the validation/sanitisation from the prior comments.

Not a blocker — your current shape works — but worth deciding now since changing the field name later is a breaking change.


def __post_init__(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add best-effort message_expr validation in __post_init__.

A typo'd SQL string ("concat(invalid") currently fails only when the rule is applied to a DataFrame, far from where it was defined. A best-effort validation here surfaces the error at the call site:

if isinstance(self.message_expr, str) and self.message_expr:
    try:
        F.expr(self.message_expr)
    except Exception as exc:
        raise InvalidParameterError(
            f"message_expr is not a valid Spark SQL expression: {exc}. "
            f"Note: literal strings must be wrapped in SQL quotes, e.g. \"'my message'\"."
        ) from exc

Cheap (build-only — no DataFrame eval), and the error message hints at the unquoted-string trap in the previous comment.

self._validate_rule_type(self.check_func)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM - Type annotation: The field is message: Callable[..., Column] | None = None here on DQRule, but on DQForEachColRule (line 441) it's message: Callable | None = None without the return type. These should be consistent — use Callable[..., Column] | None in both places.

Expand Down Expand Up @@ -259,6 +265,10 @@ def to_dict(self) -> dict:

if self.user_metadata:
metadata["user_metadata"] = self.user_metadata
# Only string expressions can be round-tripped through metadata; Column objects are
# in-process Spark expressions with no canonical YAML/JSON representation.
if isinstance(self.message_expr, str) and self.message_expr:
metadata["message_expr"] = self.message_expr
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_dict() silently drops Column-typed message_expr — users won't notice.

A user who passes message_expr=F.concat(...) and calls to_dict() to persist their rules to YAML/Delta loses the message with no signal. They'll discover it only when re-loading and seeing the default message instead.

Options:

  • Log a WARNING when a Column-typed message_expr is encountered during serialisation: "Column message_expr cannot be serialised; falling back to default message on round-trip."
  • Or raise InvalidParameterError("Column-typed message_expr cannot be serialised; pass a SQL expression string if you need round-trip persistence").

Logging is friendlier; raising forces the user to make an explicit choice.

return metadata

def _initialize_column_if_missing(self):
Expand Down Expand Up @@ -428,6 +438,7 @@ class DQForEachColRule(DQRuleTypeMixin):
check_func_args: list[Any] = field(default_factory=list)
check_func_kwargs: dict[str, Any] = field(default_factory=dict)
user_metadata: dict[str, str] | None = None
message_expr: str | Column | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DQForEachColRule.message_expr is shared across all generated rules — flag in the field comment.

The integration test correctly notes this ("The same expression is used for every generated rule. To produce a per-column message, reference each column inline"), but the field itself has no docstring or warning. A user expecting per-column message substitution will be surprised when all generated rules emit identical messages — and won't know the workaround.

Add a one-line comment on the field describing the shared-expression behaviour, ideally with the inline-reference workaround pointer:

# Shared across every generated rule; reference column names inline (e.g. "concat('a=', cast(a as string))")
# if you need per-column messages, or construct rules individually instead of via DQForEachColRule.
message_expr: str | Column | None = None


def get_rules(self) -> list[DQRule]:
"""Build a list of rules for a set of columns.
Expand All @@ -453,6 +464,7 @@ def get_rules(self) -> list[DQRule]:
criticality=self.criticality,
filter=self.filter,
user_metadata=self.user_metadata,
message_expr=self.message_expr,
)
)
else: # default to row-level rule
Expand All @@ -467,6 +479,7 @@ def get_rules(self) -> list[DQRule]:
criticality=self.criticality,
filter=self.filter,
user_metadata=self.user_metadata,
message_expr=self.message_expr,
)
)
return rules
Expand Down
Loading
Loading