Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions databricks-skills/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ cp -r ai-dev-kit/databricks-skills/databricks-agent-bricks .claude/skills/
- **databricks-unity-catalog** - System tables for lineage, audit, billing

### 🔧 Data Engineering
- **databricks-dqx** - Data quality framework (DQX): profiling, rule generation (AI-assisted), quality checks, quarantining, dashboards
- **databricks-iceberg** - Apache Iceberg tables (Managed/Foreign), UniForm, Iceberg REST Catalog, Iceberg Clients Interoperability
- **databricks-spark-declarative-pipelines** - SDP (formerly DLT) in SQL/Python
- **databricks-jobs** - Multi-task workflows, triggers, schedules
Expand Down
167 changes: 167 additions & 0 deletions databricks-skills/databricks-dqx/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
---
name: databricks-dqx
description: "Data quality framework for PySpark pipelines using DQX (Databricks Labs). Use when building data quality checks, profiling data, generating quality rules (including AI-assisted), applying validation checks, quarantining bad data, creating quality dashboards, or integrating quality checks into Lakeflow Pipelines (DLT), streaming, or batch workflows. Triggers: 'data quality', 'DQX', 'profiling', 'quality checks', 'data validation', 'quarantine', 'quality rules', 'data contract'."
---

# Databricks DQX — Data Quality Framework

DQX is a Databricks Labs data quality framework for Apache Spark. Define, monitor, and address data quality issues in Python-based data pipelines — batch, streaming, and Lakeflow Pipelines (DLT).

| Capability | Description |
|-----------|-------------|
| **Profiling** | Auto-profile DataFrames/tables and generate rule candidates |
| **AI-Assisted Rules** | Generate checks from natural language using LLMs |
| **Row & Dataset Rules** | Row-level (per-row) and dataset-level (aggregates, uniqueness) checks |
| **Batch & Streaming** | Spark batch, Structured Streaming, and Lakeflow Pipelines |
| **Quarantine** | Split valid/invalid data or annotate rows with error/warning columns |
| **Quality Dashboard** | Track metrics over time with a built-in Lakeview dashboard |
| **Data Contracts** | Generate rules from Open Data Contract Standard (ODCS) YAML |
| **Storage Backends** | YAML, JSON, Delta tables, Workspace files, Volumes, Lakebase |

## Installation

```bash
pip install databricks-labs-dqx # Core
pip install 'databricks-labs-dqx[llm]' # AI-assisted rule generation
pip install 'databricks-labs-dqx[pii]' # PII detection
pip install 'databricks-labs-dqx[datacontract]' # Data contract (ODCS) support
databricks labs install dqx # Workspace tool (CLI workflows)
```

## Quick Start — Profile → Generate → Apply

```python
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine

ws = WorkspaceClient()
input_df = spark.read.table("catalog.schema.my_table")

# Step 1: Profile
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)

# Step 2: Generate rules from profiles
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)

# Step 3: Apply checks — split valid and invalid rows
dq_engine = DQEngine(ws)
valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)

# Write results
valid_df.write.mode("overwrite").saveAsTable("catalog.schema.silver")
invalid_df.write.mode("overwrite").saveAsTable("catalog.schema.quarantine")
```

## Common Patterns

### AI-Assisted Rule Generation

```python
generator = DQGenerator(workspace_client=ws, spark=spark)
checks = generator.generate_dq_rules_ai_assisted(
user_input="Username must not start with 's' if age < 18. All users need valid email. Age 0-120.",
input_config=InputConfig(location="catalog.schema.customers")
)
```

Combine profiler + AI rules: `all_checks = profiler_checks + ai_checks`. See [profiling.md](profiling.md) for full options including custom models, summary stats context, and custom check functions.

### Lakeflow Pipelines (DLT) Integration

```python
import dlt
from databricks.labs.dqx.engine import DQEngine

dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks(config=...)

@dlt.view
def bronze_dq_check():
return dq_engine.apply_checks_by_metadata(dlt.read_stream("bronze"), checks)

@dlt.table
def silver():
return dq_engine.get_valid(dlt.read_stream("bronze_dq_check"))

@dlt.table
def quarantine():
return dq_engine.get_invalid(dlt.read_stream("bronze_dq_check"))
```

### Structured Streaming (foreachBatch)

```python
# checks defined as DQX classes (DQRowRule, DQDatasetRule)
def validate_and_write(batch_df, batch_id):
valid_df, invalid_df = dq_engine.apply_checks_and_split(batch_df, checks)
valid_df.write.format("delta").mode("append").saveAsTable("catalog.schema.output")
invalid_df.write.format("delta").mode("append").saveAsTable("catalog.schema.quarantine")

(spark.readStream.format("delta").table("catalog.schema.bronze")
.writeStream.foreachBatch(validate_and_write).start())
```

### End-to-End with Config

```python
dq_engine.apply_checks_by_metadata_and_save_in_table(
checks=checks,
input_config=InputConfig(location="catalog.schema.bronze", format="delta"),
output_config=OutputConfig(location="catalog.schema.silver", format="delta", mode="append"),
quarantine_config=OutputConfig(location="catalog.schema.quarantine", format="delta", mode="append"),
metrics_config=OutputConfig(location="catalog.schema.dq_metrics", format="delta", mode="append")
)
```

### Multi-Table Checks

```python
# By explicit table list
dq_engine.apply_checks_and_save_in_tables(run_configs=[RunConfig(...), RunConfig(...)])

# By pattern matching
dq_engine.apply_checks_and_save_in_tables_for_patterns(
patterns=["catalog.schema.*"], checks_location="catalog.schema.checks_table"
)
```

## Reference Files

| Topic | File | Description |
|-------|------|-------------|
| Quality Checks | [quality-checks-reference.md](quality-checks-reference.md) | Built-in row/dataset check functions, YAML format, custom checks |
| Profiling | [profiling.md](profiling.md) | Profiler options, AI-assisted generation, data contracts, primary key detection |
| Storage & Config | [storage-and-config.md](storage-and-config.md) | Storage backends, config.yml, metrics, dashboard, CLI reference, best practices |

## Common Issues

| Issue | Solution |
|-------|----------|
| `ModuleNotFoundError: databricks.labs.dqx` | `pip install databricks-labs-dqx` or add to job libraries |
| AI generation fails | `pip install 'databricks-labs-dqx[llm]'` and enable serverless clusters |
| PII detection not available | `pip install 'databricks-labs-dqx[pii]'` |
| Data contract rules missing | `pip install 'databricks-labs-dqx[datacontract]'` |
| Checks validation errors | `dq_engine.validate_checks(checks)` or `databricks labs dqx validate-checks` |
| Custom column name conflicts | `ExtraParams(result_column_names={"errors": "custom_errors", "warnings": "custom_warnings"})` |
| Streaming metrics missing | Use `get_streaming_metrics_listener` or end-to-end methods |
| Dashboard shows no tables | Tables must have `_errors`/`_warnings` columns (or custom names) |
| Profiler too slow | Adjust `sample_fraction` (default 0.3), `limit` (default 1000), or use `filter` |

## Related Skills

- **[databricks-spark-declarative-pipelines](../databricks-spark-declarative-pipelines/SKILL.md)** — Lakeflow Pipelines with DQX quality checks
- **[databricks-spark-structured-streaming](../databricks-spark-structured-streaming/SKILL.md)** — Streaming with foreachBatch quality validation
- **[databricks-aibi-dashboards](../databricks-aibi-dashboards/SKILL.md)** — Custom dashboards for quality metrics
- **[databricks-jobs](../databricks-jobs/SKILL.md)** — Schedule DQX workflows as Databricks Jobs
- **[databricks-asset-bundles](../databricks-asset-bundles/SKILL.md)** — Deploy DQX pipelines via Asset Bundles
- **[databricks-unity-catalog](../databricks-unity-catalog/SKILL.md)** — Manage catalogs, schemas, and tables for quality checks

## Resources

- [DQX Documentation](https://databrickslabs.github.io/dqx/)
- [DQX GitHub Repository](https://github.com/databrickslabs/dqx)
- [DQX Best Practices](https://databrickslabs.github.io/dqx/docs/guide/best_practices/)
205 changes: 205 additions & 0 deletions databricks-skills/databricks-dqx/profiling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# DQX Profiling & Rule Generation

## Profile a DataFrame

```python
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator

ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(spark.read.table("catalog.schema.my_table"))
```

## Profile a Table Directly

```python
from databricks.labs.dqx.config import InputConfig

summary_stats, profiles = profiler.profile_table(
input_config=InputConfig(location="catalog.schema.my_table"),
columns=["col1", "col2", "col3"] # optional
)
```

## Profile Multiple Tables

```python
results = profiler.profile_tables_for_patterns(patterns=["main.data.*"])
results = profiler.profile_tables_for_patterns(
patterns=["main.*"], exclude_patterns=["*_dq_output", "*_quarantine"]
)
```

## Profiling Options

| Option | Default | Description |
|--------|---------|-------------|
| `sample_fraction` | 0.3 | Fraction of data to sample |
| `sample_seed` | None | Seed for reproducible sampling |
| `limit` | 1000 | Max records to analyze |
| `remove_outliers` | True | Remove outliers before min/max |
| `num_sigmas` | 3 | Std devs for outlier detection |
| `max_null_ratio` | 0.05 | Null ratio threshold for `is_not_null` |
| `trim_strings` | True | Trim whitespace before analysis |
| `max_empty_ratio` | 0.02 | Empty ratio threshold for `is_not_null_or_empty` |
| `distinct_ratio` | 0.01 | Distinct value ratio for `is_in` rule |
| `max_in_count` | 20 | Max items in `is_in_list` rules |
| `round` | True | Round min/max values |
| `filter` | None | SQL filter before profiling |
| `llm_primary_key_detection` | True | Use LLM to detect primary keys |

## Summary Statistics Fields

| Field | Meaning |
|-------|---------|
| `count` | Rows profiled (after sampling/limit) |
| `mean` / `stddev` | Average and standard deviation |
| `min` / `max` | Smallest/largest non-null value |
| `25` / `50` / `75` | Approximate percentiles |
| `count_non_null` / `count_null` | Non-null and null counts |

## Profiler → Check Function Mapping

| Profile Type | Check Function | Column Types | Trigger |
|-------------|---------------|-------------|---------|
| `is_not_null` | `is_not_null` | All | Null ratio <= `max_null_ratio` |
| `is_not_null_or_empty` | `is_not_null_and_not_empty` | String | Null+empty <= thresholds |
| `is_in` | `is_in_list` | String, Int, Long | Distinct ratio <= threshold, count <= max |
| `min_max` | `is_in_range` | Numeric, Date, Timestamp | With outlier removal and rounding |
| `is_unique` | `is_unique` | All | Requires LLM primary key detection |

## Generate Rules from Profiles

```python
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)
```

## AI-Assisted Rule Generation

Generate quality rules from natural language. Requires `pip install 'databricks-labs-dqx[llm]'`.

```python
generator = DQGenerator(workspace_client=ws, spark=spark)

# Basic
checks = generator.generate_dq_rules_ai_assisted(
user_input="Username must not start with 's' if age < 18. Valid email required. Age 0-120."
)

# With table schema context
checks = generator.generate_dq_rules_ai_assisted(
user_input=user_input, input_config=InputConfig(location="catalog.schema.customers")
)

# With profiler summary stats
checks = generator.generate_dq_rules_ai_assisted(
user_input="Validate sales data for anomalies", summary_stats=summary_stats
)
```

### With Custom Check Functions

```python
@register_rule("row")
def not_ends_with_suffix(column: str, suffix: str):
return make_condition(F.col(column).endswith(suffix), ...)

generator = DQGenerator(ws, spark=spark, custom_check_functions={"ends_with_suffix": not_ends_with_suffix})
checks = generator.generate_dq_rules_ai_assisted(user_input=user_input)
```

### Custom Model

```python
from databricks.labs.dqx.config import LLMModelConfig

generator = DQGenerator(ws, spark=spark,
llm_model_config=LLMModelConfig(model_name="databricks/databricks-claude-sonnet-4-5"))
```

### Combine Profiler + AI Rules

```python
all_checks = generator.generate_dq_rules(profiles) + generator.generate_dq_rules_ai_assisted(
user_input="GDPR compliance: no PII in public fields",
input_config=InputConfig(location="catalog.schema.customers")
)
```

## Primary Key Detection

```python
result = profiler.detect_primary_keys_with_llm(
input_config=InputConfig(location="catalog.schema.users")
)
# result: {"primary_key_columns": [...], "confidence": ..., "reasoning": ...}
```

## Data Contract Rules (ODCS)

Requires `pip install 'databricks-labs-dqx[datacontract]'`.

```python
rules = generator.generate_rules_from_contract(
contract_file="/Workspace/Shared/contracts/customers.yaml"
)
```

### Constraint Mapping

| ODCS Constraint | DQX Rule | Example |
|----------------|----------|---------|
| `required: true` | `is_not_null` | Mandatory fields |
| `unique: true` | `is_unique` | Primary keys |
| `pattern` | `regex_match` | Email validation |
| `minimum` / `maximum` | `is_in_range` or `sql_expression` | Amount limits |
| `minLength` / `maxLength` | `sql_expression` | Length constraints |

### Explicit DQX Rules in Contract

```yaml
quality:
- type: custom
engine: dqx
implementation:
name: rule_name
criticality: error
check:
function: check_function_name
arguments:
column: field_name
```

### Text-Based Rules (LLM-Processed)

```yaml
quality:
- type: text
description: |
Email addresses must be valid and from approved corporate domains only.
```

Requires `pip install 'databricks-labs-dqx[datacontract,llm]'`.

## Lakeflow DLT Expectations Generation

Generate DLT expectations from profiled data:

```python
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator

dlt_generator = DQDltGenerator(ws)

sql_rules = dlt_generator.generate_dlt_rules(profiles, language="SQL")
# CONSTRAINT user_id_is_null EXPECT (user_id is not null)

sql_drop = dlt_generator.generate_dlt_rules(profiles, language="SQL", action="drop")
# CONSTRAINT user_id_is_null EXPECT (user_id is not null) ON VIOLATION DROP ROW

python_rules = dlt_generator.generate_dlt_rules(profiles, language="Python")
# @dlt.expect_all({"user_id_is_null": "user_id is not null"})

dict_rules = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
```
Loading