Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
)
from metadata.ingestion.source.database.snowflake.utils import (
_current_database_schema,
_get_schema_unique_constraints,
get_columns,
get_foreign_keys,
get_pk_constraint,
Expand Down Expand Up @@ -176,6 +177,7 @@ def __init__(
SnowflakeDialect.get_all_view_definitions = get_all_view_definitions
SnowflakeDialect.get_view_definition = get_view_definition
SnowflakeDialect.get_unique_constraints = get_unique_constraints
SnowflakeDialect._get_schema_unique_constraints = _get_schema_unique_constraints
SnowflakeDialect._get_schema_columns = get_schema_columns
Inspector.get_table_names = get_table_names_reflection
Inspector.get_view_names = get_view_names_reflection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,43 @@ def get_unique_constraints(self, connection, table_name, schema, **kw):
).get(table_name, [])


def _get_schema_unique_constraints(self, connection, schema, **kw):
result = connection.execute(
Comment on lines +622 to +623
text(
f"SHOW /* sqlalchemy:_get_schema_unique_constraints */ "
f"UNIQUE KEYS IN SCHEMA {schema}"
Comment on lines +622 to +626
)
Comment on lines +622 to +627
)
Comment on lines +622 to +628
Comment on lines +622 to +628
unique_constraints = {}
for row in result:
name = self.normalize_name(row._mapping["constraint_name"])
table_name = self.normalize_name(row._mapping["table_name"])

# OpenMetadata Patch: Append the table_name into the uniqueness dictionary
# to support DBs that allow duplicate constraint names across tables
constraint_key = (name, table_name)

if constraint_key not in unique_constraints:
unique_constraints[constraint_key] = {
"column_names": [self.normalize_name(row._mapping["column_name"])],
"name": name,
"table_name": table_name,
}
else:
unique_constraints[constraint_key]["column_names"].append(
self.normalize_name(row._mapping["column_name"])
)

ans = {}
for constraint in unique_constraints.values():
t_name = constraint.pop("table_name")
if t_name not in ans:
ans[t_name] = []
ans[t_name].append(constraint)

return ans
Comment on lines +622 to +656


@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from unittest.mock import Mock
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: UTF-8 BOM added to test file

The latest commit introduces a UTF-8 BOM (byte order mark, \xEF\xBB\xBF) at the start of test_snowflake_constraints.py. While Python's parser tolerates this, it is unconventional for Python source files and no other test file in the repository uses a BOM. The commit message says "set proper UTF-8 encoding" but a BOM is not needed for UTF-8 Python files and is discouraged by PEP 263. This should be removed to stay consistent with the rest of the codebase.

Suggested fix:

Remove the BOM character from the file. In most editors, save as "UTF-8 without BOM". Or:
  sed -i '1s/^\xEF\xBB\xBF//' ingestion/tests/unit/topology/database/test_snowflake_constraints.py

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

from metadata.ingestion.source.database.snowflake.utils import _get_schema_unique_constraints

def test_snowflake_unique_constraint_collision():
# Mocking self (SnowflakeDialect)
Comment on lines +1 to +5
dialect_mock = Mock()
dialect_mock.normalize_name = lambda name: name.lower() if name else name

# Mocking connection
connection_mock = Mock()

# Mocking the result of connection.execute(...)
# Simulating two tables 'table_1' and 'table_2' inside the same schema
# both sharing an identical constraint name like "unique_id"
row1 = Mock()
row1._mapping = {
"constraint_name": "unique_id",
"table_name": "table_1",
"column_name": "id"
}

row2 = Mock()
row2._mapping = {
"constraint_name": "unique_id",
"table_name": "table_2",
"column_name": "id"
}

# Composite constraint on table_2 (second column of the same unique key)
row3 = Mock()
row3._mapping = {
"constraint_name": "unique_id",
"table_name": "table_2",
"column_name": "email"
}

result_mock = [row1, row2, row3]
connection_mock.execute.return_value = result_mock

# Run the patched function
output = _get_schema_unique_constraints(dialect_mock, connection_mock, "public")

# Output should correctly split the constraints for table_1 and table_2 without collision
assert "table_1" in output
assert "table_2" in output

assert len(output["table_1"]) == 1
assert output["table_1"][0]["name"] == "unique_id"
assert output["table_1"][0]["column_names"] == ["id"]

assert len(output["table_2"]) == 1
assert output["table_2"][0]["name"] == "unique_id"
assert output["table_2"][0]["column_names"] == ["id", "email"]
Loading