diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index d3688fd92528..a962049abb13 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -106,6 +106,7 @@ ) from metadata.ingestion.source.database.snowflake.utils import ( _current_database_schema, + _get_schema_unique_constraints, get_columns, get_foreign_keys, get_pk_constraint, @@ -176,6 +177,7 @@ def __init__( SnowflakeDialect.get_all_view_definitions = get_all_view_definitions SnowflakeDialect.get_view_definition = get_view_definition SnowflakeDialect.get_unique_constraints = get_unique_constraints +SnowflakeDialect._get_schema_unique_constraints = _get_schema_unique_constraints SnowflakeDialect._get_schema_columns = get_schema_columns Inspector.get_table_names = get_table_names_reflection Inspector.get_view_names = get_view_names_reflection diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index 72ee49956e35..31ce0fc913ab 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -619,6 +619,43 @@ def get_unique_constraints(self, connection, table_name, schema, **kw): ).get(table_name, []) +def _get_schema_unique_constraints(self, connection, schema, **kw): + result = connection.execute( + text( + f"SHOW /* sqlalchemy:_get_schema_unique_constraints */ " + f"UNIQUE KEYS IN SCHEMA {schema}" + ) + ) + unique_constraints = {} + for row in result: + name = self.normalize_name(row._mapping["constraint_name"]) + table_name = self.normalize_name(row._mapping["table_name"]) + + # OpenMetadata Patch: Append the table_name into the uniqueness dictionary + # to support DBs that allow duplicate constraint names across tables + constraint_key = (name, table_name) + + if constraint_key not in unique_constraints: + unique_constraints[constraint_key] = { + "column_names": [self.normalize_name(row._mapping["column_name"])], + "name": name, + "table_name": table_name, + } + else: + unique_constraints[constraint_key]["column_names"].append( + self.normalize_name(row._mapping["column_name"]) + ) + + ans = {} + for constraint in unique_constraints.values(): + t_name = constraint.pop("table_name") + if t_name not in ans: + ans[t_name] = [] + ans[t_name].append(constraint) + + return ans + + @reflection.cache def get_columns(self, connection, table_name, schema=None, **kw): """ diff --git a/ingestion/tests/unit/topology/database/test_snowflake_constraints.py b/ingestion/tests/unit/topology/database/test_snowflake_constraints.py new file mode 100644 index 000000000000..9ca3d0500af1 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_snowflake_constraints.py @@ -0,0 +1,53 @@ +from unittest.mock import Mock +from metadata.ingestion.source.database.snowflake.utils import _get_schema_unique_constraints + +def test_snowflake_unique_constraint_collision(): + # Mocking self (SnowflakeDialect) + dialect_mock = Mock() + dialect_mock.normalize_name = lambda name: name.lower() if name else name + + # Mocking connection + connection_mock = Mock() + + # Mocking the result of connection.execute(...) + # Simulating two tables 'table_1' and 'table_2' inside the same schema + # both sharing an identical constraint name like "unique_id" + row1 = Mock() + row1._mapping = { + "constraint_name": "unique_id", + "table_name": "table_1", + "column_name": "id" + } + + row2 = Mock() + row2._mapping = { + "constraint_name": "unique_id", + "table_name": "table_2", + "column_name": "id" + } + + # Composite constraint on table_2 (second column of the same unique key) + row3 = Mock() + row3._mapping = { + "constraint_name": "unique_id", + "table_name": "table_2", + "column_name": "email" + } + + result_mock = [row1, row2, row3] + connection_mock.execute.return_value = result_mock + + # Run the patched function + output = _get_schema_unique_constraints(dialect_mock, connection_mock, "public") + + # Output should correctly split the constraints for table_1 and table_2 without collision + assert "table_1" in output + assert "table_2" in output + + assert len(output["table_1"]) == 1 + assert output["table_1"][0]["name"] == "unique_id" + assert output["table_1"][0]["column_names"] == ["id"] + + assert len(output["table_2"]) == 1 + assert output["table_2"][0]["name"] == "unique_id" + assert output["table_2"][0]["column_names"] == ["id", "email"]