diff --git a/ingestion/src/metadata/pii/india_patterns.py b/ingestion/src/metadata/pii/india_patterns.py new file mode 100644 index 000000000000..cb7fdbd7fa1c --- /dev/null +++ b/ingestion/src/metadata/pii/india_patterns.py @@ -0,0 +1,92 @@ +""" +India specific PII patterns for DPDP Act 2023 compliance. +Extends OpenMetadata auto PII tagging with locale aware detection. +""" + +import re + +# Patterns for column name matching +INDIA_COLUMN_PATTERNS = { + "aadhaar": re.compile(r".*aadhaar.*|.*aadhar.*|.*uidai.*", re.IGNORECASE), + "pan": re.compile( + r".*\bpan_?(card|number|no|num)\b.*|.*permanent_account.*", + re.IGNORECASE, + ), + "upi": re.compile( + r".*\bupi_?(id|address|vpa)\b.*|.*\bvpa\b.*", + re.IGNORECASE, + ), +} + +# Verhoeff algorithm for Aadhaar validation +_VERHOEFF_D = [ + [0,1,2,3,4,5,6,7,8,9], + [1,2,3,4,0,6,7,8,9,5], + [2,3,4,0,1,7,8,9,5,6], + [3,4,0,1,2,8,9,5,6,7], + [4,0,1,2,3,9,5,6,7,8], + [5,9,8,7,6,0,4,3,2,1], + [6,5,9,8,7,1,0,4,3,2], + [7,6,5,9,8,2,1,0,4,3], + [8,7,6,5,9,3,2,1,0,4], + [9,8,7,6,5,4,3,2,1,0] +] + +_VERHOEFF_P = [ + [0,1,2,3,4,5,6,7,8,9], + [1,5,7,6,2,8,3,0,9,4], + [5,8,0,3,7,9,6,1,4,2], + [8,9,1,6,0,4,3,5,2,7], + [9,4,5,3,1,2,6,8,7,0], + [4,2,8,6,5,7,3,9,0,1], + [2,7,9,3,8,0,6,4,1,5], + [7,0,4,6,9,1,3,2,5,8] +] + +def validate_aadhaar(number: str) -> bool: + """ + Validate Aadhaar number using Verhoeff checksum. + Returns True only if 12 digits pass the algorithm. + This prevents tagging random 12-digit order IDs as Aadhaar. + """ + if not number or len(number)!= 12 or not number.isdigit(): + return False + + # UIDAI rule: Aadhaar cannot start with 0 or 1 + if number[0] in ('0', '1'): + return False + + c = 0 + for i, digit in enumerate(reversed(number)): + c = _VERHOEFF_D[c][_VERHOEFF_P[i % 8][int(digit)]] + return c == 0 + +def validate_pan(number: str) -> bool: + """ + Validate PAN format: 5 uppercase letters, 4 digits, 1 uppercase letter. + 4th character must be one of: A,B,C,F,G,H,J,L,P,T + """ + if not number: + return False + if not re.match(r'^[A-Z]{5}[0-9]{4}[A-Z]$', number): + return False + + # Check 4th character (index 3) is valid holder type + valid_types = {'A', 'B', 'C', 'F', 'G', 'H', 'J', 'L', 'P', 'T'} + return number[3] in valid_types + +def is_india_pii_column(column_name: str) -> str | None: + """ + Check if column name matches India PII patterns. + Returns the PII type or None. + """ + name_lower = column_name.lower() + + if INDIA_COLUMN_PATTERNS["aadhaar"].match(name_lower): + return "Aadhaar" + if INDIA_COLUMN_PATTERNS["pan"].match(name_lower): + return "PAN" + if INDIA_COLUMN_PATTERNS["upi"].match(name_lower): + return "UPI" + + return None diff --git a/ingestion/src/metadata/pii/processor.py b/ingestion/src/metadata/pii/processor.py index 858c9157e0e5..c0b35e8a2e46 100644 --- a/ingestion/src/metadata/pii/processor.py +++ b/ingestion/src/metadata/pii/processor.py @@ -55,6 +55,11 @@ from metadata.pii.constants import PII from metadata.utils import fqn from metadata.utils.logger import profiler_logger +from metadata.pii.india_patterns import ( + is_india_pii_column, + validate_aadhaar, + validate_pan, +) logger = profiler_logger() @@ -105,6 +110,27 @@ def create_column_tag_labels( if PII in tag.tagFQN.root: return [] + # Check India PII patterns + india_pii = is_india_pii_column(column.name.root) + # India PII validation - check majority of samples like existing classifier + sample_values = [str(v) for v in sample_data if v] if sample_data else [] + + if india_pii == "Aadhaar" and sample_values: + valid_count = sum(1 for v in sample_values if validate_aadhaar(v)) + if valid_count > len(sample_values) * 0.5: + return [self.build_tag_label(PIISensitivityTag.SENSITIVE, "India Aadhaar detected")] + + if india_pii == "PAN" and sample_values: + valid_count = sum(1 for v in sample_values if validate_pan(v)) + if valid_count > len(sample_values) * 0.5: + return [self.build_tag_label(PIISensitivityTag.SENSITIVE, "India PAN detected")] + + if india_pii == "UPI" and sample_values: + # UPI validation is simpler - check for @ symbol in majority + valid_count = sum(1 for v in sample_values if "@" in v and len(v) > 5) + if valid_count > len(sample_values) * 0.5: + return [self.build_tag_label(PIISensitivityTag.SENSITIVE, "India UPI detected")] + # Build classifier with the results capturing patcher result_capturer = ResultCapturingPatcher() classifier: ColumnClassifier[PIISensitivityTag] = PIISensitiveClassifier( diff --git a/ingestion/tests/unit/pii/test_india_patterns.py b/ingestion/tests/unit/pii/test_india_patterns.py new file mode 100644 index 000000000000..d928f36715aa --- /dev/null +++ b/ingestion/tests/unit/pii/test_india_patterns.py @@ -0,0 +1,28 @@ +import pytest +from metadata.pii.india_patterns import ( + validate_aadhaar, + validate_pan, + is_india_pii_column, +) + +def test_aadhaar_validation(): + # Valid test Aadhaar (passes Verhoeff) + assert validate_aadhaar("999999990019") is True + # Invalid - fails checksum + assert validate_aadhaar("123456789012") is False + # Invalid - wrong length + assert validate_aadhaar("12345") is False + # Invalid - not digits + assert validate_aadhaar("abcdefghijkl") is False + +def test_pan_validation(): + assert validate_pan("ABCDE1234F") is True + assert validate_pan("abcdE1234f") is False # must be uppercase + assert validate_pan("ABCD1234F") is False # too short + assert validate_pan("ABCDE12345") is False # wrong format + +def test_column_name_matching(): + assert is_india_pii_column("aadhaar_number") == "Aadhaar" + assert is_india_pii_column("customer_pan") == "PAN" + assert is_india_pii_column("upi_id") == "UPI" + assert is_india_pii_column("order_id") is None