diff --git a/demos/dqx_demo_industry/dqx_banking_demo.py b/demos/dqx_demo_industry/dqx_banking_demo.py new file mode 100644 index 000000000..dfd4b4e58 --- /dev/null +++ b/demos/dqx_demo_industry/dqx_banking_demo.py @@ -0,0 +1,600 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # DQX - Banking / FSI Industry Accelerator Demo +# MAGIC ## Anti-Money Laundering (AML) & Transaction Monitoring +# MAGIC +# MAGIC This demo showcases DQX data quality rules tailored for the **Banking and Financial Services industry**, with a focus on **fraud detection, transaction structuring prevention, and regulatory compliance**. +# MAGIC +# MAGIC ### Use Case: Real-Time Transaction Quality & Fraud Monitoring +# MAGIC +# MAGIC Process incoming transaction batches and automatically: +# MAGIC * **Quarantine invalid transactions** (errors) for immediate investigation +# MAGIC * **Flag suspicious patterns** (warnings) for AML review while allowing transactions to process +# MAGIC * **Enforce data integrity** to ensure downstream systems receive clean, compliant data +# MAGIC +# MAGIC ### Implemented Data Quality Checks +# MAGIC +# MAGIC **1. Custom Fraud Detection (AML Red Flags)** +# MAGIC * **Structuring Detection**: Perfectly round amounts (\$5K, \$10K, \$15K) often indicate structuring to avoid reporting +# MAGIC * **Threshold Avoidance**: Amounts just below \$10K reporting threshold (\$9,950-\$9,999) +# MAGIC * **Anomalous Patterns**: Repeated digit patterns (\$7,777.77, \$3,333.33) statistically unlikely in legitimate transactions +# MAGIC +# MAGIC **2. Critical Field Validation (Errors)** +# MAGIC * Required fields: `transaction_id`, `currency` must not be null +# MAGIC * Amount constraints: Not null, not zero, within system bounds (-999M to 999M) +# MAGIC +# MAGIC **3. Business Logic Validation (Errors)** +# MAGIC * **Transaction Sign Consistency**: CREDIT/DEPOSIT/INCOMING_TRANSFER must be positive; DEBIT/WITHDRAWAL/FEE/CHARGEBACK must be negative +# MAGIC * Prevents accounting errors and reconciliation failures +# MAGIC +# MAGIC **4. Data Quality Standards (Warnings)** +# MAGIC * **Currency Validation**: ISO-4217 compliance (USD, EUR, GBP, etc.) +# MAGIC * **Precision Standards**: Maximum 2 decimal places for monetary amounts +# MAGIC +# MAGIC ### Key DQX Features Demonstrated +# MAGIC +# MAGIC * **Dual-Routing Split Behavior**: Warnings go to BOTH valid and invalid DataFrames (monitoring without blocking) +# MAGIC * **Custom Check Functions**: Industry-specific fraud detection logic integrated with built-in checks +# MAGIC * **Centralized Rule Management**: Rules stored in Delta tables for versioning and governance +# MAGIC * **Single-Pass Execution**: All checks applied in one data scan for optimal performance + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Install DQX + +# COMMAND ---------- + +# DBTITLE 1,Install DQX Library + +dbutils.widgets.text("test_library_ref", "", "Test Library Ref") + +if dbutils.widgets.get("test_library_ref") != "": + %pip install '{dbutils.widgets.get("test_library_ref")}' +else: + %pip install databricks-labs-dqx + +%restart_python + +# COMMAND ---------- + +import os + +workspace_root_path = os.getcwd() +quality_rules_path = f"{workspace_root_path}/quality_checks" + +# Cleanup existing DQ Rules files, if already exists +if os.path.exists(quality_rules_path): + for filename in os.listdir(quality_rules_path): + file_path = os.path.join(quality_rules_path, filename) + # Only delete files, not subdirectories + if os.path.isfile(file_path): + os.remove(file_path) + print(f"Deleted: {file_path}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Production Best Practice: Pin DQX Version +# MAGIC +# MAGIC To ensure consistent behavior and avoid unexpected issues from automatic upgrades, it is highly recommended to always pin DQX to a specific version in your production pipelines. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Setup Catalog and Schema +# MAGIC +# MAGIC Specify the catalog and schema where the quality rules table will be stored. + +# COMMAND ---------- + +# DBTITLE 1,Set Catalog and Schema for Demo +default_catalog_name = "main" +default_schema_name = "default" + +dbutils.widgets.text("demo_catalog", default_catalog_name, "Catalog Name") +dbutils.widgets.text("demo_schema", default_schema_name, "Schema Name") + +catalog = dbutils.widgets.get("demo_catalog") +schema = dbutils.widgets.get("demo_schema") + +print(f"Selected Catalog: {catalog}") +print(f"Selected Schema: {schema}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Setup Sample Financial Data +# MAGIC +# MAGIC We'll create a DataFrame simulating a typical transaction stream or daily batch (`transactions_df`). + +# COMMAND ---------- + +from pyspark.sql import SparkSession, Row +from datetime import datetime + +# Create a comprehensive sample DataFrame simulating international banking transactions +transactions = [ + # Valid transactions + Row(transaction_id="TRX-101", account_id="ACC-US-001", country="USA", transaction_type="CREDIT", currency="USD", amount=1247.89, timestamp="2024-03-15 10:30:00"), + Row(transaction_id="TRX-102", account_id="ACC-UK-002", country="GBR", transaction_type="DEPOSIT", currency="GBP", amount=876.45, timestamp="2024-03-15 14:20:00"), + Row(transaction_id="TRX-103", account_id="ACC-JP-003", country="JPN", transaction_type="WITHDRAWAL", currency="JPY", amount=-23456.00, timestamp="2024-03-15 09:15:00"), + Row(transaction_id="TRX-104", account_id="ACC-FR-004", country="FRA", transaction_type="CREDIT", currency="EUR", amount=543.21, timestamp="2024-03-15 11:45:00"), + + # Suspicious Pattern: Perfectly round amounts (structuring) + Row(transaction_id="TRX-201", account_id="ACC-US-005", country="USA", transaction_type="DEPOSIT", currency="USD", amount=10000.00, timestamp="2024-03-15 10:00:00"), # Warn: Perfectly round 10K + Row(transaction_id="TRX-202", account_id="ACC-CA-006", country="CAN", transaction_type="CREDIT", currency="CAD", amount=5000.00, timestamp="2024-03-15 15:30:00"), # Warn: Perfectly round 5K + Row(transaction_id="TRX-203", account_id="ACC-AU-007", country="AUS", transaction_type="DEPOSIT", currency="AUD", amount=15000.00, timestamp="2024-03-16 08:20:00"), # Warn: Perfectly round 15K + + # Suspicious Pattern: Just below reporting thresholds + Row(transaction_id="TRX-301", account_id="ACC-US-008", country="USA", transaction_type="DEPOSIT", currency="USD", amount=9995.00, timestamp="2024-03-16 11:00:00"), # Warn: Just below 10K threshold + Row(transaction_id="TRX-302", account_id="ACC-US-009", country="USA", transaction_type="CREDIT", currency="USD", amount=4975.50, timestamp="2024-03-16 13:45:00"), # Warn: Just below 5K threshold + Row(transaction_id="TRX-303", account_id="ACC-CH-010", country="CHE", transaction_type="DEPOSIT", currency="CHF", amount=2980.00, timestamp="2024-03-16 16:20:00"), # Warn: Just below 3K threshold + + # Suspicious Pattern: Repeated digit patterns + Row(transaction_id="TRX-401", account_id="ACC-BR-011", country="BRA", transaction_type="CREDIT", currency="BRL", amount=7777.77, timestamp="2024-03-17 09:30:00"), # Warn: Repeated 7s + Row(transaction_id="TRX-402", account_id="ACC-MX-012", country="MEX", transaction_type="DEPOSIT", currency="MXN", amount=3333.33, timestamp="2024-03-17 10:15:00"), # Warn: Repeated 3s + Row(transaction_id="TRX-403", account_id="ACC-IN-013", country="IND", transaction_type="CREDIT", currency="INR", amount=8888.88, timestamp="2024-03-17 14:00:00"), # Warn: Repeated 8s + + # Data quality errors: Sign inconsistency + Row(transaction_id="TRX-501", account_id="ACC-DE-014", country="DEU", transaction_type="CREDIT", currency="EUR", amount=-500.00, timestamp="2024-03-18 10:00:00"), # Error: Credit must be positive + Row(transaction_id="TRX-502", account_id="ACC-ES-015", country="ESP", transaction_type="WITHDRAWAL", currency="EUR", amount=300.00, timestamp="2024-03-18 11:30:00"), # Error: Withdrawal must be negative + Row(transaction_id="TRX-503", account_id="ACC-IT-016", country="ITA", transaction_type="CHARGEBACK", currency="EUR", amount=125.50, timestamp="2024-03-18 13:15:00"), # Error: Chargeback must be negative + + # Data quality errors: Precision issues + Row(transaction_id="TRX-601", account_id="ACC-SG-017", country="SGP", transaction_type="FEE", currency="USD", amount=-1.234, timestamp="2024-03-19 09:00:00"), # Warn: Too many decimals + Row(transaction_id="TRX-602", account_id="ACC-CN-018", country="CHN", transaction_type="CREDIT", currency="CNY", amount=567.8901, timestamp="2024-03-19 10:30:00"), # Warn: Too many decimals + + # Data quality errors: Null values + Row(transaction_id="TRX-701", account_id="ACC-NL-019", country="NLD", transaction_type="DEBIT", currency=None, amount=-150.00, timestamp="2024-03-20 08:45:00"), # Error: Null currency + Row(transaction_id=None, account_id="ACC-SE-020", country="SWE", transaction_type="CREDIT", currency="EUR", amount=200.00, timestamp="2024-03-20 09:30:00"), # Error: Null transaction_id + + # More valid international transactions + Row(transaction_id="TRX-801", account_id="ACC-KR-021", country="KOR", transaction_type="INCOMING_TRANSFER", currency="USD", amount=2345.67, timestamp="2024-03-20 11:00:00"), + Row(transaction_id="TRX-802", account_id="ACC-ZA-022", country="ZAF", transaction_type="FEE", currency="USD", amount=-5.50, timestamp="2024-03-20 12:15:00"), + Row(transaction_id="TRX-803", account_id="ACC-AR-023", country="ARG", transaction_type="WITHDRAWAL", currency="USD", amount=-789.25, timestamp="2024-03-20 14:30:00"), + Row(transaction_id="TRX-804", account_id="ACC-PL-024", country="POL", transaction_type="DEPOSIT", currency="EUR", amount=1456.88, timestamp="2024-03-20 15:45:00"), +] + +transactions_table = f"{catalog}.{schema}.banking_transactions" + +if spark.catalog.tableExists(transactions_table) and spark.table(transactions_table).count() > 0: + print(f"Table {transactions_table} already exists with demo data. Skipping data generation") +else: + transactions_df = spark.createDataFrame(transactions) + transactions_df.write.mode("overwrite").saveAsTable(transactions_table) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Understanding the Dataset +# MAGIC +# MAGIC ### Banking Transactions Dataset +# MAGIC +# MAGIC | Column Name | Data Type | Description | Example Value | +# MAGIC |--------------------|-----------|------------------------------------------------------|------------------------| +# MAGIC | `transaction_id` | string | Unique transaction identifier | TRX-101 | +# MAGIC | `account_id` | string | Account identifier | ACC-US-001 | +# MAGIC | `country` | string | Country code (ISO 3166-1 alpha-3) | USA | +# MAGIC | `transaction_type` | string | Type of transaction (CREDIT, DEBIT, DEPOSIT, etc.) | CREDIT | +# MAGIC | `currency` | string | Currency code (ISO 4217) | USD | +# MAGIC | `amount` | double | Transaction amount (positive for credits, negative for debits) | 1247.89 | +# MAGIC | `timestamp` | string | Transaction timestamp | 2024-03-15 10:30:00 | + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Some Sample Transaction Data + +# COMMAND ---------- + +# DBTITLE 1,Transactions Bronze Table +transactions_df = spark.read.table(transactions_table) +print("=== Transactions Data Sample ===") +display(transactions_df.limit(10)) + +# COMMAND ---------- + +# DBTITLE 1,Common Imports +import yaml +from pprint import pprint + +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator +from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.config import WorkspaceFileChecksStorageConfig, TableChecksStorageConfig + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Auto-Infer Quality Rules with DQProfiler +# MAGIC +# MAGIC Before defining rules manually, DQX can **automatically infer data quality rules** from the data profile. This is useful for bootstrapping a quality rule set for a new dataset. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Step 1**: Read raw data and instantiate DQX + +# COMMAND ---------- + +# Read Input Data +transactions_df = spark.read.table(transactions_table) + +# Instantiate DQX engine +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Step 2**: Run DQProfiler to infer quality rules + +# COMMAND ---------- + +# DBTITLE 1,Profile Data and Infer Quality Rules +profiler = DQProfiler(ws) +summary_stats, profiles = profiler.profile(transactions_df) + +generator = DQGenerator(ws) +inferred_checks = generator.generate_dq_rules(profiles) + +print("=== Inferred DQ Checks ===\n") +for idx, check in enumerate(inferred_checks): + print(f"========Check {idx} ==========\n") + pprint(check) + +# COMMAND ---------- + +# DBTITLE 1,Save Inferred Rules to Workspace File +banking_inferred_rules_yaml = f"{quality_rules_path}/banking_inferred_dq_rules.yml" +dq_engine.save_checks(inferred_checks, config=WorkspaceFileChecksStorageConfig(location=banking_inferred_rules_yaml)) +displayHTML(f'Banking Inferred Quality Rules YAML') + +# COMMAND ---------- + +# DBTITLE 1,Save Inferred Rules to Delta Table +banking_inferred_rules_table = f"{catalog}.{schema}.banking_inferred_quality_rules" +dq_engine.save_checks(inferred_checks, config=TableChecksStorageConfig(location=banking_inferred_rules_table, run_config_name="banking_inferred")) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Step 3**: Apply Inferred Quality Rules to Input Data + +# COMMAND ---------- + +# Load checks from workspace file +inferred_quality_checks = dq_engine.load_checks(config=WorkspaceFileChecksStorageConfig(location=banking_inferred_rules_yaml)) + +# Apply checks on input data +valid_inferred_df, quarantined_inferred_df = dq_engine.apply_checks_by_metadata_and_split(transactions_df, inferred_quality_checks) + +print("=== Transactions Quarantined by Inferred Rules ===") +display(quarantined_inferred_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Bring Your Own Rule: Suspicious Amount Pattern Detection +# MAGIC +# MAGIC This section demonstrates how to extend DQX with a **custom quality rule**. The workflow follows 3 steps: +# MAGIC 1. **Define** the custom rule function +# MAGIC 2. **Add** the rule to the YAML definition +# MAGIC 3. **Apply** the DQ rules on input data +# MAGIC +# MAGIC For this demo, we define a custom fraud detection rule that flags suspicious amount patterns commonly associated with financial crime and structuring: +# MAGIC * **Perfectly round amounts** above threshold (e.g., exactly \$5,000, \$10,000) which may indicate attempts to structure transactions +# MAGIC * **Just below reporting thresholds** (e.g., \$9,950-\$9,999) to avoid regulatory reporting +# MAGIC * **Repeated digit patterns** (e.g., \$7,777.77, \$3,333.33) which are statistically unlikely in legitimate transactions + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Step 1: Define the Custom Rule Function + +# COMMAND ---------- + +# DBTITLE 1,Custom Industry Check: Suspicious Amount Pattern Detection +import pyspark.sql.functions as F +from pyspark.sql import Column +from databricks.labs.dqx.check_funcs import make_condition, get_normalized_column_and_expr +from databricks.labs.dqx.rule import register_rule + +@register_rule("row") +def is_suspicious_amount_pattern( + column: str | Column, + threshold: float = 5000.0, + reporting_thresholds: float | list[float] = 10000.0, + margin: float = 50.0 +) -> Column: + """ + Detects suspicious amount patterns commonly associated with structuring or financial crime: + 1. Perfectly round amounts above threshold (e.g., exactly 5000.00, 10000.00) + 2. Amounts just below configurable reporting thresholds (e.g., 9950-9999 for a 10K threshold) + 3. Repeated digit patterns (e.g., 7777.77, 3333.33) + + These patterns may indicate attempts to avoid regulatory reporting requirements. + """ + col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column) + + # Take absolute value for the checks + abs_amount = F.abs(col_expr) + + # Check 1: Perfectly round amounts above threshold (e.g., 5000.00, 10000.00) + # Amount equals its floor and is divisible by 1000 or 5000 + is_round = ( + (abs_amount >= threshold) & + (abs_amount == F.floor(abs_amount)) & + ((abs_amount % 1000 == 0) | (abs_amount % 5000 == 0)) + ) + + # Check 2: Just below the reporting thresholds + if isinstance(reporting_thresholds, (int, float)): + reporting_thresholds = [float(reporting_thresholds)] + + near_threshold = F.lit(False) + for t in reporting_thresholds: + near_threshold = near_threshold | ((abs_amount >= t - margin) & (abs_amount < t)) + + # Check 3: Repeated digit patterns (e.g., 7777.77, 3333.33) + amount_str = F.format_number(abs_amount, 2) + # Remove decimal point and check if all digits are the same + digits_only = F.regexp_replace(amount_str, "[^0-9]", "") + first_digit = F.substring(digits_only, 1, 1) + all_same_digit = ( + (F.length(digits_only) >= 4) & + (F.length(F.regexp_replace(digits_only, first_digit, "")) == 0) + ) + + # Flag as suspicious if any pattern is detected + suspicious = is_round | near_threshold | all_same_digit + + return make_condition( + suspicious, + F.concat( + F.lit(f"Column '{col_expr_str}' value '"), + col_expr.cast("string"), + F.lit("' matches suspicious amount pattern (possible structuring/fraud indicator)") + ), + f"{col_str_norm}_suspicious_amount_pattern" + ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Step 2: Add the Custom Rule to YAML Definition +# MAGIC +# MAGIC We define all quality checks — both built-in and custom — in a single YAML configuration. + +# COMMAND ---------- + +# Define industry-specific checks in YAML format +banking_checks_yaml = f""" +# 1. Critical fields must not be null +- criticality: error + check: + function: is_not_null_and_not_empty + for_each_column: + - transaction_id + - currency + user_metadata: + version: v1 + location: {catalog}.{schema}.banking_quality_rules + +# 2. Suspicious Amount Pattern Detection (Custom Fraud Check) +- name: suspicious_amount_pattern + criticality: warn + check: + function: is_suspicious_amount_pattern + arguments: + column: amount + threshold: 5000.0 + reporting_thresholds: [10000.0, 5000.0, 3000.0] + margin: 50.0 + user_metadata: + version: v1 + location: {catalog}.{schema}.banking_quality_rules + +# 3. Currency validation +- criticality: warn + check: + function: is_in_list + arguments: + column: currency + allowed: ["USD", "EUR", "GBP", "JPY", "CHF", "CAD", "AUD", "CNY", "INR", "BRL", "MXN"] + name: valid_iso_currency + user_metadata: + version: v1 + +# 4. Amount Basic Checks +- criticality: error + check: + function: is_not_null + arguments: + column: amount + name: amount_not_null + user_metadata: + version: v1 + +- criticality: error + check: + function: is_not_equal_to + arguments: + column: amount + value: 0 + name: amount_not_zero + user_metadata: + version: v1 + +# 5. Business Boundary Checks +- criticality: error + check: + function: is_in_range + arguments: + column: amount + min_limit: -999999999.99 + max_limit: 999999999.99 + name: amount_within_system_bounds + user_metadata: + version: v1 + +# 6. Transaction Sign Consistency +- criticality: error + check: + function: sql_expression + arguments: + expression: > + NOT ( + transaction_type IN ('CREDIT', 'DEPOSIT', 'INCOMING_TRANSFER') + AND amount < 0 + ) + name: credit_transaction_must_be_positive + user_metadata: + version: v1 + +- criticality: error + check: + function: sql_expression + arguments: + expression: > + NOT ( + transaction_type IN ('DEBIT', 'WITHDRAWAL', 'FEE', 'CHARGEBACK') + AND amount > 0 + ) + name: debit_transaction_must_be_negative + user_metadata: + version: v1 + +# 7. Banking Precision (max 2 decimals) +- criticality: warn + check: + function: sql_expression + arguments: + expression: "amount = ROUND(amount, 2)" + name: amount_max_two_decimal_places + user_metadata: + version: v1 +""" + +checks = yaml.safe_load(banking_checks_yaml) + +# COMMAND ---------- + +# DBTITLE 1,Validate Checks +status = DQEngine.validate_checks( + checks, + custom_check_functions={'is_suspicious_amount_pattern': is_suspicious_amount_pattern} +) +print(status) +assert not status.has_errors + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Production Best Practice: Centralized Rule Management +# MAGIC +# MAGIC Instead of hardcoding rules in every pipeline, DQX recommends maintaining them in a **centralized Delta table**. This enables versioning, governance, sharing across teams, and easier discoverability. +# MAGIC +# MAGIC **Recommended Practices:** +# MAGIC * **Access Control**: Grant read-only access to users; restrict write access to service principals. +# MAGIC * **Granularity**: Prioritize row-level checks for distributed performance. +# MAGIC * **One-Pass Execution**: Apply all checks (row, dataset, anomaly) in a single pass to minimize redundant data scans. + +# COMMAND ---------- + +# DBTITLE 1,Centralize Rules in Delta Table +banking_rules_table = f"{catalog}.{schema}.banking_quality_rules" + +# Note: Custom check functions (like is_suspicious_amount_pattern) cannot be saved to Delta tables. +# Only built-in checks are persisted. Custom checks must be applied directly from the YAML list. + +# Filter out custom checks for table storage (keep only built-in checks) +built_in_checks = [check for check in checks if check.get('check', {}).get('function') != 'is_suspicious_amount_pattern'] + +# 1. Save built-in checks to a Delta table +dq_engine.save_checks( + built_in_checks, + config=TableChecksStorageConfig(location=banking_rules_table, run_config_name="banking_prod_v1") +) + +# 2. Load checks from the Delta table +quality_checks = dq_engine.load_checks( + config=TableChecksStorageConfig(location=banking_rules_table, run_config_name="banking_prod_v1") +) + +# 3. Add the custom check back to the loaded checks for runtime application +custom_amount_check = next(check for check in checks if check.get('check', {}).get('function') == 'is_suspicious_amount_pattern') +quality_checks.append(custom_amount_check) + +# COMMAND ---------- + +# DBTITLE 1,Save Rules to Workspace File (Alternative Storage) +# DQX also supports storing rules as workspace YAML files +banking_rules_yaml = f"{quality_rules_path}/banking_quality_rules.yml" +dq_engine.save_checks(built_in_checks, config=WorkspaceFileChecksStorageConfig(location=banking_rules_yaml)) + +# Display a link to the saved checks file +displayHTML(f'Banking Quality Rules YAML') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC ### Step 3: Apply Checks & Quarantine Invalid Records +# MAGIC +# MAGIC We'll use `apply_checks_by_metadata_and_split` to process the **loaded** checks. +# MAGIC +# MAGIC **Custom Function Handling**: +# MAGIC * **Better practice**: Pass only specific functions: `{'is_suspicious_amount_pattern': is_suspicious_amount_pattern}` +# MAGIC * **Production best practice**: Package custom checks as an importable module instead of notebook-defined functions +# MAGIC +# MAGIC **Split Behavior**: +# MAGIC * Records with **`error`** criticality violations → go **only** to `invalid_df` (quarantined) +# MAGIC * Records with **`warn`** criticality violations → go to **BOTH** `valid_df` and `invalid_df` (allows processing while flagging for review) +# MAGIC * This dual-routing for warnings enables fraud monitoring without blocking legitimate transactions +# MAGIC * For different behavior, use `apply_checks_by_metadata()` and filter manually + +# COMMAND ---------- + +# DQX Split Behavior: +# - Records with 'error' criticality violations → ONLY in invalid_df (quarantined) +# - Records with 'warn' criticality violations → BOTH valid_df AND invalid_df (dual-routing) +# +# This design allows suspicious patterns (warnings) to be monitored in the quarantine dataset +# while still flowing through the pipeline for processing. + +valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split( + transactions_df, + quality_checks, + custom_check_functions={'is_suspicious_amount_pattern': is_suspicious_amount_pattern} +) + +# COMMAND ---------- + +display(valid_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Invalid / Quarantined Transactions +# MAGIC Data quality issues are captured in the invalid DataFrame via the `_errors` and `_warnings` array columns mapping directly to our named checks. + +# COMMAND ---------- + +display(invalid_df) + +# COMMAND ---------- + +# DBTITLE 1,Persist Quarantine Table +quarantine_table = f"{catalog}.{schema}.banking_transactions_quarantine" +invalid_df.write.mode("overwrite").saveAsTable(quarantine_table) +print(f"Quarantined transactions saved to {quarantine_table}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Visualize Quality on Pre-Configured Dashboard +# MAGIC +# MAGIC When you deploy DQX as [`workspace tool`](https://databrickslabs.github.io/dqx/docs/installation/#dqx-installation-as-a-tool-in-a-databricks-workspace), it automatically generates a Quality Dashboard.
You can open the dashboard using Databricks CLI: `databricks labs dqx open-dashboards` diff --git a/demos/dqx_demo_industry/dqx_fashion_demo.py b/demos/dqx_demo_industry/dqx_fashion_demo.py new file mode 100644 index 000000000..66fcc7b36 --- /dev/null +++ b/demos/dqx_demo_industry/dqx_fashion_demo.py @@ -0,0 +1,641 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # DQX - Fashion & Retail Industry Accelerator Demo +# MAGIC ## Product Catalog Quality & Merchandising Compliance +# MAGIC +# MAGIC This demo showcases DQX data quality rules tailored for the **Fashion and Retail industry**, with a focus on **product catalog integrity, pricing compliance, and merchandising standards**. +# MAGIC +# MAGIC ### Use Case: Product Catalog Ingestion & Quality Gate +# MAGIC +# MAGIC Process incoming product catalog uploads from vendors and internal merchandising teams and automatically: +# MAGIC * **Quarantine invalid products** (errors) that must be fixed before going live on the storefront +# MAGIC * **Flag merchandising issues** (warnings) for buyer review while still allowing products into the pipeline +# MAGIC * **Enforce data integrity** to ensure downstream inventory, pricing, and recommendation systems receive clean data +# MAGIC +# MAGIC ### Implemented Data Quality Checks +# MAGIC +# MAGIC **1. Custom Pricing Intelligence (Suspicious Pricing Detection)** +# MAGIC * **Pennies-off psychological pricing**: Prices ending in .95, .97, .98, .99 are expected — but extreme penny-off patterns (e.g., $99.01) may indicate data entry errors +# MAGIC * **Suspiciously round wholesale prices**: Perfectly round prices ($100.00, $500.00) in retail contexts often indicate placeholder or draft pricing +# MAGIC * **Extreme margin outliers**: Unit cost exceeding retail price (negative margin) or margin above 95% (likely data error) +# MAGIC +# MAGIC **2. Critical Field Validation (Errors)** +# MAGIC * Required fields: `sku`, `product_name`, `category` must not be null or empty +# MAGIC * SKU format: Must follow standard pattern (e.g., `TSHIRT-BLK-M`, `JEANS-BLU-32`) +# MAGIC * Hex color code: Must be valid CSS hex color (#RRGGBB or #RGB) +# MAGIC * Positive price: Listing price must be greater than zero +# MAGIC +# MAGIC **3. Business Logic Validation (Errors)** +# MAGIC * **Margin consistency**: Unit cost must not exceed retail price (negative margin) +# MAGIC * **Weight plausibility**: Shipping weight must be within plausible bounds for apparel (0.01–25 kg) +# MAGIC +# MAGIC **4. Data Quality Standards (Warnings)** +# MAGIC * **Apparel sizing**: Must follow standard sizing codes (XS, S, M, L, XL, XXL, OS, or numeric like 32-L) +# MAGIC * **Category taxonomy**: Must belong to known product categories +# MAGIC * **Season code**: Must follow format (e.g., SS25, FW24, AW25, RS25) +# MAGIC * **Brand-category consistency**: Luxury brands should not have products priced below $50 +# MAGIC +# MAGIC ### Key DQX Features Demonstrated +# MAGIC +# MAGIC * **Dual-Routing Split Behavior**: Warnings go to BOTH valid and invalid DataFrames (monitoring without blocking) +# MAGIC * **Custom Check Functions**: Industry-specific pricing intelligence integrated with built-in checks +# MAGIC * **Centralized Rule Management**: Rules stored in Delta tables for versioning and governance +# MAGIC * **Single-Pass Execution**: All checks applied in one data scan for optimal performance + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Install DQX + +# COMMAND ---------- + +# DBTITLE 1,Install DQX Library + +dbutils.widgets.text("test_library_ref", "", "Test Library Ref") + +if dbutils.widgets.get("test_library_ref") != "": + %pip install '{dbutils.widgets.get("test_library_ref")}' +else: + %pip install databricks-labs-dqx + +%restart_python + +# COMMAND ---------- + +import os + +workspace_root_path = os.getcwd() +quality_rules_path = f"{workspace_root_path}/quality_checks" + +# Cleanup existing DQ Rules files, if already exists +if os.path.exists(quality_rules_path): + for filename in os.listdir(quality_rules_path): + file_path = os.path.join(quality_rules_path, filename) + # Only delete files, not subdirectories + if os.path.isfile(file_path): + os.remove(file_path) + print(f"Deleted: {file_path}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Production Best Practice: Pin DQX Version +# MAGIC +# MAGIC To ensure consistent behavior and avoid unexpected issues from automatic upgrades, it is highly recommended to always pin DQX to a specific version in your production pipelines. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Setup Catalog and Schema +# MAGIC +# MAGIC Specify the catalog and schema where the quality rules table will be stored. + +# COMMAND ---------- + +# DBTITLE 1,Set Catalog and Schema for Demo +default_catalog_name = "main" +default_schema_name = "default" + +dbutils.widgets.text("demo_catalog", default_catalog_name, "Catalog Name") +dbutils.widgets.text("demo_schema", default_schema_name, "Schema Name") + +catalog = dbutils.widgets.get("demo_catalog") +schema = dbutils.widgets.get("demo_schema") + +print(f"Selected Catalog: {catalog}") +print(f"Selected Schema: {schema}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Setup Sample Fashion Product Catalog Data +# MAGIC +# MAGIC We'll create a DataFrame simulating a typical product catalog upload from vendors or a merchandising system sync. + +# COMMAND ---------- + +from pyspark.sql import SparkSession, Row + +# Create a comprehensive sample DataFrame simulating a fashion product catalog ingestion +products = [ + # ── Valid products ────────────────────────────────────────────────────────── + Row(sku="TSHIRT-BLK-M", product_name="Classic Crew Tee", category="Tops", brand="BasicsLab", size_code="M", color_hex="#000000", retail_price=29.99, unit_cost=8.50, weight_kg=0.18, season_code="SS25"), + Row(sku="JEANS-BLU-32", product_name="Slim Fit Denim", category="Bottoms", brand="DenimCo", size_code="32-L", color_hex="#1A3A5C", retail_price=89.99, unit_cost=28.00, weight_kg=0.65, season_code="FW24"), + Row(sku="HOODIE-RED-XL", product_name="Oversized Hoodie", category="Outerwear", brand="StreetPulse", size_code="XL", color_hex="#C0392B", retail_price=65.00, unit_cost=22.00, weight_kg=0.55, season_code="AW25"), + Row(sku="DRESS-PNK-S", product_name="Midi Wrap Dress", category="Dresses", brand="Atelier Lux", size_code="S", color_hex="#F5A6C8", retail_price=185.00, unit_cost=52.00, weight_kg=0.35, season_code="SS25"), + Row(sku="SNEAKER-WHT-42", product_name="Retro Runner", category="Footwear", brand="StepForward", size_code="42", color_hex="#F8F8F8", retail_price=120.00, unit_cost=38.00, weight_kg=0.75, season_code="RS25"), + Row(sku="SCARF-GLD-OS", product_name="Silk Evening Scarf", category="Accessories", brand="Atelier Lux", size_code="OS", color_hex="#D4AF37", retail_price=95.00, unit_cost=30.00, weight_kg=0.08, season_code="FW24"), + Row(sku="BLAZER-NVY-L", product_name="Tailored Wool Blazer", category="Outerwear", brand="Atelier Lux", size_code="L", color_hex="#1B2631", retail_price=320.00, unit_cost=110.00, weight_kg=0.90, season_code="AW25"), + Row(sku="SHORTS-KHK-M", product_name="Chino Shorts", category="Bottoms", brand="BasicsLab", size_code="M", color_hex="#C8B88A", retail_price=45.00, unit_cost=12.00, weight_kg=0.28, season_code="SS25"), + + # ── Suspicious pricing patterns (warnings) ───────────────────────────────── + Row(sku="COAT-BLK-L", product_name="Trench Coat", category="Outerwear", brand="BasicsLab", size_code="L", color_hex="#0D0D0D", retail_price=200.00, unit_cost=60.00, weight_kg=1.20, season_code="FW24"), # Warn: Perfectly round retail price + Row(sku="POLO-GRN-M", product_name="Piqué Polo Shirt", category="Tops", brand="BasicsLab", size_code="M", color_hex="#2E8B57", retail_price=500.00, unit_cost=15.00, weight_kg=0.22, season_code="SS25"), # Warn: Round price + extreme 97% margin + Row(sku="BAG-TAN-OS", product_name="Leather Tote Bag", category="Accessories", brand="Atelier Lux", size_code="OS", color_hex="#D2B48C", retail_price=350.00, unit_cost=12.00, weight_kg=0.60, season_code="AW25"), # Warn: Round price + extreme 96.6% margin + + # ── Invalid sizing (warning) ──────────────────────────────────────────────── + Row(sku="SOCKS-WHT-PK3", product_name="Cotton Ankle Socks 3-Pack",category="Accessories", brand="BasicsLab", size_code="HUGE", color_hex="#FFFFFF", retail_price=12.50, unit_cost=3.00, weight_kg=0.10, season_code="SS25"), # Warn: Non-standard size "HUGE" + + # ── Invalid hex color (error) ─────────────────────────────────────────────── + Row(sku="HAT-GRN-OS", product_name="Bucket Hat", category="Accessories", brand="StreetPulse", size_code="OS", color_hex="GREEN", retail_price=25.00, unit_cost=7.00, weight_kg=0.12, season_code="SS25"), # Error: "GREEN" is not a valid hex color + Row(sku="VEST-CRM-M", product_name="Knit Vest", category="Tops", brand="BasicsLab", size_code="M", color_hex="#GGHHII",retail_price=55.00, unit_cost=18.00, weight_kg=0.30, season_code="FW24"), # Error: Invalid hex chars + + # ── Negative / zero price (error) ─────────────────────────────────────────── + Row(sku="BELT-BRN-36", product_name="Leather Belt", category="Accessories", brand="DenimCo", size_code="36", color_hex="#8B4513", retail_price=-15.00, unit_cost=5.00, weight_kg=0.15, season_code="SS25"), # Error: Negative price + Row(sku="TIE-BLU-OS", product_name="Silk Tie", category="Accessories", brand="Atelier Lux", size_code="OS", color_hex="#2C3E50", retail_price=0.00, unit_cost=25.00, weight_kg=0.05, season_code="FW24"), # Error: Zero price + + # ── Null required fields (error) ──────────────────────────────────────────── + Row(sku=None, product_name="Mystery Item", category="Tops", brand="BasicsLab", size_code="S", color_hex="#FFC0CB", retail_price=40.00, unit_cost=12.00, weight_kg=0.20, season_code="SS25"), # Error: Null SKU + Row(sku="JACKET-GRY-L", product_name=None, category="Outerwear", brand="StreetPulse", size_code="L", color_hex="#808080", retail_price=110.00, unit_cost=35.00, weight_kg=0.70, season_code="AW25"), # Error: Null product name + + # ── Invalid category (warning) ────────────────────────────────────────────── + Row(sku="LAMP-WHT-OS", product_name="Table Lamp", category="HomeGoods", brand="BasicsLab", size_code="OS", color_hex="#EEEEEE", retail_price=75.00, unit_cost=20.00, weight_kg=2.50, season_code="SS25"), # Warn: Category not in taxonomy + + # ── Invalid season code (warning) ─────────────────────────────────────────── + Row(sku="CARDIGAN-BRG-M", product_name="Cashmere Cardigan", category="Tops", brand="Atelier Lux", size_code="M", color_hex="#800020", retail_price=245.00, unit_cost=85.00, weight_kg=0.40, season_code="XMAS"), # Warn: Non-standard season code + + # ── Negative margin: cost > price (error) ─────────────────────────────────── + Row(sku="BOOT-BLK-40", product_name="Chelsea Boot", category="Footwear", brand="StepForward", size_code="40", color_hex="#1C1C1C", retail_price=89.00, unit_cost=120.00, weight_kg=1.10, season_code="AW25"), # Error: Cost exceeds price + + # ── Implausible weight (error) ────────────────────────────────────────────── + Row(sku="RING-SLV-OS", product_name="Sterling Silver Ring", category="Accessories", brand="Atelier Lux", size_code="OS", color_hex="#C0C0C0", retail_price=150.00, unit_cost=45.00, weight_kg=0.005,season_code="RS25"), # Error: Weight below 0.01 kg minimum + + # ── Brand-price inconsistency (warning) ───────────────────────────────────── + Row(sku="TEE-WHT-XS", product_name="Basic White Tee", category="Tops", brand="Atelier Lux", size_code="XS", color_hex="#FAFAFA", retail_price=15.00, unit_cost=4.00, weight_kg=0.16, season_code="SS25"), # Warn: Luxury brand priced under $50 + + # ── More valid products for pipeline flow ─────────────────────────────────── + Row(sku="PARKA-OLV-XL", product_name="Expedition Parka", category="Outerwear", brand="StreetPulse", size_code="XL", color_hex="#556B2F", retail_price=275.00, unit_cost=95.00, weight_kg=1.40, season_code="FW24"), +] + +products_table = f"{catalog}.{schema}.fashion_products" + +if spark.catalog.tableExists(products_table) and spark.table(products_table).count() > 0: + print(f"Table {products_table} already exists with demo data. Skipping data generation") +else: + products_df = spark.createDataFrame(products) + products_df.write.mode("overwrite").saveAsTable(products_table) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Understanding the Dataset +# MAGIC +# MAGIC ### Fashion Product Catalog Dataset +# MAGIC +# MAGIC | Column Name | Data Type | Description | Example Value | +# MAGIC |----------------|-----------|----------------------------------------------------------------|-------------------| +# MAGIC | `sku` | string | Stock Keeping Unit — `CATEGORY-COLOR-SIZE` format | TSHIRT-BLK-M | +# MAGIC | `product_name` | string | Human-readable product description | Classic Crew Tee | +# MAGIC | `category` | string | Product taxonomy category | Tops | +# MAGIC | `brand` | string | Brand attribution | BasicsLab | +# MAGIC | `size_code` | string | Apparel sizing (XS–XXL, OS, or numeric like 32-L) | M | +# MAGIC | `color_hex` | string | CSS hex color for storefront display | #000000 | +# MAGIC | `retail_price` | double | Listed retail price (must be positive) | 29.99 | +# MAGIC | `unit_cost` | double | Vendor unit cost (must not exceed retail price) | 8.50 | +# MAGIC | `weight_kg` | double | Shipping weight in kilograms (0.01–25 kg) | 0.18 | +# MAGIC | `season_code` | string | Merchandising season code (e.g., SS25, FW24, AW25, RS25) | SS25 | + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Some Sample Product Catalog Data + +# COMMAND ---------- + +# DBTITLE 1,Products Bronze Table +products_df = spark.read.table(products_table) +print("=== Product Catalog Data Sample ===") +display(products_df.limit(10)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Auto-Infer Quality Rules with DQProfiler +# MAGIC +# MAGIC Before defining rules manually, DQX can **automatically infer data quality rules** from the data profile. This is useful for bootstrapping a quality rule set for a new dataset. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Step 1**: Read raw data and instantiate DQX + +# COMMAND ---------- + +import yaml +from pprint import pprint + +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator +from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.config import WorkspaceFileChecksStorageConfig, TableChecksStorageConfig + +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Step 2**: Run DQProfiler to infer quality rules + +# COMMAND ---------- + +# DBTITLE 1,Profile Data and Infer Quality Rules +profiler = DQProfiler(ws) +summary_stats, profiles = profiler.profile(products_df) + +generator = DQGenerator(ws) +inferred_checks = generator.generate_dq_rules(profiles) + +print("=== Inferred DQ Checks ===\n") +for idx, check in enumerate(inferred_checks): + print(f"========Check {idx} ==========\n") + pprint(check) + +# COMMAND ---------- + +# DBTITLE 1,Save Inferred Rules to Workspace File +fashion_inferred_rules_yaml = f"{quality_rules_path}/fashion_inferred_dq_rules.yml" +dq_engine.save_checks(inferred_checks, config=WorkspaceFileChecksStorageConfig(location=fashion_inferred_rules_yaml)) +displayHTML(f'Fashion Inferred Quality Rules YAML') + +# COMMAND ---------- + +# DBTITLE 1,Save Inferred Rules to Delta Table +fashion_inferred_rules_table = f"{catalog}.{schema}.fashion_inferred_quality_rules" +dq_engine.save_checks(inferred_checks, config=TableChecksStorageConfig(location=fashion_inferred_rules_table, run_config_name="fashion_inferred")) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC **Step 3**: Apply Inferred Quality Rules to Input Data + +# COMMAND ---------- + +# Load checks from workspace file +inferred_quality_checks = dq_engine.load_checks(config=WorkspaceFileChecksStorageConfig(location=fashion_inferred_rules_yaml)) + +# Apply checks on input data +valid_inferred_df, quarantined_inferred_df = dq_engine.apply_checks_by_metadata_and_split(products_df, inferred_quality_checks) + +print("=== Products Quarantined by Inferred Rules ===") +display(quarantined_inferred_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Bring Your Own Rule: Suspicious Pricing Pattern Detection +# MAGIC +# MAGIC This section demonstrates how to extend DQX with a **custom quality rule**. The workflow follows 3 steps: +# MAGIC 1. **Define** the custom rule function +# MAGIC 2. **Add** the rule to the YAML definition +# MAGIC 3. **Apply** the DQ rules on input data +# MAGIC +# MAGIC For this demo, we define a custom pricing intelligence rule that flags suspicious pricing patterns common in retail product catalogs: +# MAGIC * **Perfectly round retail prices** (e.g., exactly \$100.00, \$200.00) which often indicate placeholder or draft pricing that was never finalized +# MAGIC * **Extreme margin outliers** — margins above 95% typically indicate a data entry error (e.g., cost entered in the wrong currency) +# MAGIC * **Negative margins** — unit cost exceeding retail price signals a pricing or cost error + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Step 1: Define the Custom Rule Function + +# COMMAND ---------- + +# DBTITLE 1,Custom Industry Check: Suspicious Pricing Pattern Detection +import pyspark.sql.functions as F +from pyspark.sql import Column +from databricks.labs.dqx.check_funcs import make_condition, get_normalized_column_and_expr +from databricks.labs.dqx.rule import register_rule + +@register_rule("row") +def is_suspicious_pricing_pattern( + price_column: str | Column, + cost_column: str | Column, + round_price_threshold: float = 100.0, + max_margin_pct: float = 95.0, +) -> Column: + """ + Detects suspicious pricing patterns common in fashion/retail product catalogs: + 1. Perfectly round retail prices above threshold (e.g., $100.00, $200.00, $500.00) + — often indicates placeholder/draft pricing + 2. Extreme margin percentages above max_margin_pct (e.g., 95%+) + — likely a data entry error (wrong currency, missing digit, etc.) + 3. Negative margins (cost > price) + — pricing or cost error + + These patterns may indicate catalog data quality issues requiring merchandising review. + """ + price_norm, price_expr_str, price_expr = get_normalized_column_and_expr(price_column) + _cost_norm, cost_expr_str, cost_expr = get_normalized_column_and_expr(cost_column) + + # Check 1: Perfectly round retail prices above threshold + is_round = ( + (price_expr >= round_price_threshold) + & (price_expr == F.floor(price_expr)) + & ((price_expr % 50 == 0) | (price_expr % 100 == 0)) + ) + + # Check 2: Extreme margin (margin% = (price - cost) / price * 100) + margin_pct = ((price_expr - cost_expr) / price_expr) * 100 + extreme_margin = (price_expr > 0) & (cost_expr > 0) & (margin_pct > max_margin_pct) + + # Check 3: Negative margin (cost exceeds price) + negative_margin = (price_expr > 0) & (cost_expr > price_expr) + + suspicious = is_round | extreme_margin | negative_margin + + return make_condition( + suspicious, + F.concat( + F.lit(f"Suspicious pricing: '{price_expr_str}'="), + price_expr.cast("string"), + F.lit(f", '{cost_expr_str}'="), + cost_expr.cast("string"), + F.lit(" (possible placeholder, margin outlier, or cost/price inversion)"), + ), + f"{price_norm}_suspicious_pricing_pattern", + ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Step 2: Add the Custom Rule to YAML Definition +# MAGIC +# MAGIC We define all quality checks — both built-in and custom — in a single YAML configuration. + +# COMMAND ---------- + +import yaml + +# Define industry-specific checks in YAML format +fashion_checks_yaml = f""" +# ──── 1. Critical Fields: Must not be null or empty ──────────────────────────── +- criticality: error + check: + function: is_not_null_and_not_empty + for_each_column: + - sku + - product_name + - category + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 2. SKU Format Validation ──────────────────────────────────────────────── +# Standard pattern: CATEGORY-COLOR-SIZE (e.g., TSHIRT-BLK-M, JEANS-BLU-32) +- criticality: error + check: + function: regex_match + arguments: + column: sku + regex: "^[A-Z]{{2,10}}-[A-Z]{{2,4}}-[A-Z0-9]{{1,4}}$" + name: sku_format_valid + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 3. Hex Color Code Validation ──────────────────────────────────────────── +# Must be valid CSS hex color: #RRGGBB or #RGB +- criticality: error + check: + function: regex_match + arguments: + column: color_hex + regex: "^#([A-Fa-f0-9]{{6}}|[A-Fa-f0-9]{{3}})$" + name: valid_css_hex_color + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 4. Listing Price: Must be positive ────────────────────────────────────── +- criticality: error + check: + function: is_not_less_than + arguments: + column: retail_price + limit: 0.01 + name: positive_listing_price + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 5. Suspicious Pricing Pattern (Custom Check) ─────────────────────────── +- name: suspicious_pricing_pattern + criticality: warn + check: + function: is_suspicious_pricing_pattern + arguments: + price_column: retail_price + cost_column: unit_cost + round_price_threshold: 100.0 + max_margin_pct: 95.0 + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 6. Apparel Sizing Validation ──────────────────────────────────────────── +# Accepted formats: XS, S, M, L, XL, XXL, OS, or numeric (32, 42, 32-L) +- criticality: warn + check: + function: regex_match + arguments: + column: size_code + regex: "^(XS|S|M|L|XL|XXL|OS|[0-9]{{2}}(-[A-Z])?)$" + name: standard_apparel_sizing + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 7. Product Category Taxonomy ──────────────────────────────────────────── +- criticality: warn + check: + function: is_in_list + arguments: + column: category + allowed: ["Tops", "Bottoms", "Dresses", "Outerwear", "Footwear", "Accessories", "Activewear", "Swimwear", "Loungewear"] + name: valid_product_category + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 8. Season Code Format ────────────────────────────────────────────────── +# Format: SS25 (Spring/Summer), FW24 (Fall/Winter), AW25 (Autumn/Winter), RS25 (Resort) +- criticality: warn + check: + function: regex_match + arguments: + column: season_code + regex: "^(SS|FW|AW|RS|PF|CR)[0-9]{{2}}$" + name: valid_season_code + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 9. Shipping Weight Plausibility ──────────────────────────────────────── +# Apparel and accessories typically weigh between 0.01 kg and 25 kg +- criticality: error + check: + function: is_in_range + arguments: + column: weight_kg + min_limit: 0.01 + max_limit: 25.0 + name: weight_within_bounds + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 10. Margin Consistency: Cost must not exceed Price ───────────────────── +- criticality: error + check: + function: sql_expression + arguments: + expression: "unit_cost <= retail_price OR retail_price <= 0" + name: cost_does_not_exceed_price + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules + +# ──── 11. Brand-Price Consistency: Luxury brands must be priced >= $50 ─────── +- criticality: warn + check: + function: sql_expression + arguments: + expression: > + NOT ( + brand IN ('Atelier Lux') + AND retail_price < 50 + ) + name: luxury_brand_minimum_price + user_metadata: + version: v1 + location: {catalog}.{schema}.fashion_quality_rules +""" + +checks = yaml.safe_load(fashion_checks_yaml) + +# COMMAND ---------- + +# DBTITLE 1,Validate Checks +status = DQEngine.validate_checks( + checks, + custom_check_functions={'is_suspicious_pricing_pattern': is_suspicious_pricing_pattern} +) +print(status) +assert not status.has_errors + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Production Best Practice: Centralized Rule Management +# MAGIC +# MAGIC Instead of hardcoding rules in every pipeline, DQX recommends maintaining them in a **centralized Delta table**. This enables versioning, governance, sharing across teams, and easier discoverability. +# MAGIC +# MAGIC **Recommended Practices:** +# MAGIC * **Access Control**: Grant read-only access to users; restrict write access to service principals. +# MAGIC * **Granularity**: Prioritize row-level checks for distributed performance. +# MAGIC * **One-Pass Execution**: Apply all checks (row, dataset, anomaly) in a single pass to minimize redundant data scans. + +# COMMAND ---------- + +# DBTITLE 1,Centralize Rules in Delta Table +fashion_rules_table = f"{catalog}.{schema}.fashion_quality_rules" + +# Note: Custom check functions (like is_suspicious_pricing_pattern) cannot be saved to Delta tables. +# Only built-in checks are persisted. Custom checks must be applied directly from the YAML list. + +# Filter out custom checks for table storage (keep only built-in checks) +built_in_checks = [check for check in checks if check.get('check', {}).get('function') != 'is_suspicious_pricing_pattern'] + +# 1. Save built-in checks to a Delta table +dq_engine.save_checks( + built_in_checks, + config=TableChecksStorageConfig(location=fashion_rules_table, run_config_name="fashion_prod_v1") +) + +# 2. Load checks from the Delta table +quality_checks = dq_engine.load_checks( + config=TableChecksStorageConfig(location=fashion_rules_table, run_config_name="fashion_prod_v1") +) + +# 3. Add the custom check back to the loaded checks for runtime application +custom_pricing_check = next(check for check in checks if check.get('check', {}).get('function') == 'is_suspicious_pricing_pattern') +quality_checks.append(custom_pricing_check) + +# COMMAND ---------- + +# DBTITLE 1,Save Rules to Workspace File (Alternative Storage) +# DQX also supports storing rules as workspace YAML files +fashion_rules_yaml = f"{quality_rules_path}/fashion_quality_rules.yml" +dq_engine.save_checks(built_in_checks, config=WorkspaceFileChecksStorageConfig(location=fashion_rules_yaml)) + +# Display a link to the saved checks file +displayHTML(f'Fashion Quality Rules YAML') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Step 3: Apply Checks & Quarantine Invalid Products +# MAGIC +# MAGIC We'll use `apply_checks_by_metadata_and_split` to process the **loaded** checks. +# MAGIC +# MAGIC **Custom Function Handling**: +# MAGIC * **Better practice**: Pass only specific functions: `{'is_suspicious_pricing_pattern': is_suspicious_pricing_pattern}` +# MAGIC * **Production best practice**: Package custom checks as an importable module instead of notebook-defined functions +# MAGIC +# MAGIC **Split Behavior**: +# MAGIC * Records with **`error`** criticality violations → go **only** to `invalid_df` (quarantined) +# MAGIC * Records with **`warn`** criticality violations → go to **BOTH** `valid_df` and `invalid_df` (allows processing while flagging for review) +# MAGIC * This dual-routing for warnings enables merchandising review without blocking catalog publication +# MAGIC * For different behavior, use `apply_checks_by_metadata()` and filter manually + +# COMMAND ---------- + +# DQX Split Behavior: +# - Records with 'error' criticality violations → ONLY in invalid_df (quarantined) +# - Records with 'warn' criticality violations → BOTH valid_df AND invalid_df (dual-routing) +# +# This design allows merchandising issues (warnings) to be reviewed in the quarantine dataset +# while still flowing through the pipeline to the storefront. + +valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split( + products_df, + quality_checks, + custom_check_functions={'is_suspicious_pricing_pattern': is_suspicious_pricing_pattern} +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Valid Products — Ready for Storefront +# MAGIC Products that pass all error-level checks. Warning-flagged products still appear here for downstream processing. + +# COMMAND ---------- + +display(valid_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Invalid / Quarantined Products — Merchandising Review Queue +# MAGIC Data quality issues are captured in the invalid DataFrame via the `_errors` and `_warnings` array columns mapping directly to our named checks. + +# COMMAND ---------- + +display(invalid_df) + +# COMMAND ---------- + +# DBTITLE 1,Persist Quarantine Table +quarantine_table = f"{catalog}.{schema}.fashion_products_quarantine" +invalid_df.write.mode("overwrite").saveAsTable(quarantine_table) +print(f"Quarantined products saved to {quarantine_table}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Visualize Quality on Pre-Configured Dashboard +# MAGIC +# MAGIC When you deploy DQX as [`workspace tool`](https://databrickslabs.github.io/dqx/docs/installation/#dqx-installation-as-a-tool-in-a-databricks-workspace), it automatically generates a Quality Dashboard.
You can open the dashboard using Databricks CLI: `databricks labs dqx open-dashboards` diff --git a/demos/dqx_manufacturing_demo.py b/demos/dqx_demo_industry/dqx_manufacturing_demo.py similarity index 99% rename from demos/dqx_manufacturing_demo.py rename to demos/dqx_demo_industry/dqx_manufacturing_demo.py index 1b255b698..47c19588b 100644 --- a/demos/dqx_manufacturing_demo.py +++ b/demos/dqx_demo_industry/dqx_manufacturing_demo.py @@ -79,6 +79,13 @@ # COMMAND ---------- +# MAGIC %md +# MAGIC ### Production Best Practice: Pin DQX Version +# MAGIC +# MAGIC To ensure consistent behavior and avoid unexpected issues from automatic upgrades, it is highly recommended to always pin DQX to a specific version in your production pipelines. + +# COMMAND ---------- + # MAGIC %md # MAGIC ### Sample Data Generation # MAGIC diff --git a/docs/dqx/docs/demos.mdx b/docs/dqx/docs/demos.mdx index 9c136903d..bb6921aec 100644 --- a/docs/dqx/docs/demos.mdx +++ b/docs/dqx/docs/demos.mdx @@ -26,9 +26,13 @@ Import the following notebooks in the Databricks workspace to try DQX out: ## Deploy as Workspace Tool * [DQX Demo Notebook](https://github.com/databrickslabs/dqx/blob/v0.13.0/demos/dqx_demo_tool.py) - demonstrates how to use DQX as a tool when installed in the workspace. +## Industry Accelerators +* [DQX for Banking / FSI Industry Accelerator Demo](https://github.com/databrickslabs/dqx/blob/v0.13.0/demos/dqx_demo_industry/dqx_banking_demo.py) - showcases DQX data quality rules tailored for the Banking and Financial Services industry, focusing on fraud detection, transaction structuring, and regulatory compliance. +* [DQX for Fashion & Retail Accelerator Demo](https://github.com/databrickslabs/dqx/blob/v0.13.0/demos/dqx_demo_industry/dqx_fashion_demo.py) - demonstrates DQX rules tailored for the Fashion and Retail industry, including SKU validation, hex color formats, and pricing compliance. +* [DQX for Manufacturing Industry Accelerator Demo](https://github.com/databrickslabs/dqx/blob/v0.13.0/demos/dqx_demo_industry/dqx_manufacturing_demo.py) - demonstrates how to use DQX to check data quality for Manufacturing Industry datasets, specifically for machine maintenance and sensor data. + ## Use Cases * [DQX for PII Detection Notebook](https://github.com/databrickslabs/dqx/blob/v0.13.0/demos/dqx_demo_pii_detection.py) - demonstrates how to use DQX to check data for Personally Identifiable Information (PII). -* [DQX for Manufacturing Notebook](https://github.com/databrickslabs/dqx/blob/v0.13.0/demos/dqx_manufacturing_demo.py) - demonstrates how to use DQX to check data quality for Manufacturing Industry datasets.
diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index c759f56da..f70649d57 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -26,6 +26,29 @@ def _get_local_git_branch() -> str | None: return None +def _get_remote_url(branch: str) -> str | None: + """Get the git remote URL for the current branch, or None if not available.""" + try: + remote = subprocess.check_output( + ["git", "config", f"branch.{branch}.remote"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + url = subprocess.check_output( + ["git", "config", f"remote.{remote}.url"], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + if url.endswith(".git"): + url = url[:-4] + # Convert SSH URLs to HTTPS so pip can use them (git+git@... is invalid) + if url.startswith("git@github.com:"): + url = url.replace("git@github.com:", "https://github.com/", 1) + return url + except Exception: + return None + + @pytest.fixture def library_ref() -> str: """ @@ -55,6 +78,9 @@ def library_ref() -> str: # Local behavior: use current git branch ref_name = _get_local_git_branch() if ref_name and ref_name != "HEAD": + remote_url = _get_remote_url(ref_name) + if remote_url: + return f"git+{remote_url}.git@{ref_name}" return f"{base_ref}.git@{ref_name}" # Default to main branch diff --git a/tests/e2e/test_run_demos.py b/tests/e2e/test_run_demos.py index c33b12ea0..56133f1cb 100644 --- a/tests/e2e/test_run_demos.py +++ b/tests/e2e/test_run_demos.py @@ -77,10 +77,10 @@ def test_run_intermediate_dqx_demo_library(ws, make_notebook, make_schema, make_ def test_run_dqx_manufacturing_demo(ws, make_notebook, make_directory, make_schema, make_job, library_ref): - path = Path(__file__).parent.parent.parent / "demos" / "dqx_manufacturing_demo.py" + path = Path(__file__).parent.parent.parent / "demos" / "dqx_demo_industry" / "dqx_manufacturing_demo.py" with open(path, "rb") as f: notebook = make_notebook(content=f, format=ImportFormat.SOURCE) - folder = notebook.as_fuse().parent / "quality_rules" + folder = (notebook.as_fuse().parent / "quality_checks").as_posix() make_directory(path=folder) catalog = TEST_CATALOG @@ -101,6 +101,52 @@ def test_run_dqx_manufacturing_demo(ws, make_notebook, make_directory, make_sche logging.info(f"Job run {run.run_id} completed successfully for dqx_manufacturing_demo") +def test_run_dqx_banking_demo(ws, make_notebook, make_schema, make_job, library_ref): + path = Path(__file__).parent.parent.parent / "demos" / "dqx_demo_industry" / "dqx_banking_demo.py" + with open(path, "rb") as f: + notebook = make_notebook(content=f, format=ImportFormat.SOURCE) + + catalog = TEST_CATALOG + schema = make_schema(catalog_name=catalog).name + notebook_path = notebook.as_fuse().as_posix() + notebook_task = NotebookTask( + notebook_path=notebook_path, + base_parameters={"demo_catalog": catalog, "demo_schema": schema, "test_library_ref": library_ref}, + ) + job = make_job(tasks=[Task(task_key="dqx_banking_demo", notebook_task=notebook_task)]) + + waiter = ws.jobs.run_now_and_wait(job.job_id) + run = ws.jobs.wait_get_run_job_terminated_or_skipped( + run_id=waiter.run_id, + timeout=timedelta(minutes=30), + callback=lambda r: validate_run_status(r, ws), + ) + logging.info(f"Job run {run.run_id} completed successfully for dqx_banking_demo") + + +def test_run_dqx_fashion_demo(ws, make_notebook, make_schema, make_job, library_ref): + path = Path(__file__).parent.parent.parent / "demos" / "dqx_demo_industry" / "dqx_fashion_demo.py" + with open(path, "rb") as f: + notebook = make_notebook(content=f, format=ImportFormat.SOURCE) + + catalog = TEST_CATALOG + schema = make_schema(catalog_name=catalog).name + notebook_path = notebook.as_fuse().as_posix() + notebook_task = NotebookTask( + notebook_path=notebook_path, + base_parameters={"demo_catalog": catalog, "demo_schema": schema, "test_library_ref": library_ref}, + ) + job = make_job(tasks=[Task(task_key="dqx_fashion_demo", notebook_task=notebook_task)]) + + waiter = ws.jobs.run_now_and_wait(job.job_id) + run = ws.jobs.wait_get_run_job_terminated_or_skipped( + run_id=waiter.run_id, + timeout=timedelta(minutes=30), + callback=lambda r: validate_run_status(r, ws), + ) + logging.info(f"Job run {run.run_id} completed successfully for dqx_fashion_demo") + + def test_run_dqx_quick_start_demo_library(ws, make_notebook, make_job, library_ref): path = Path(__file__).parent.parent.parent / "demos" / "dqx_quick_start_demo_library.py" with open(path, "rb") as f: