diff --git a/dcs_core/integrations/databases/mysql.py b/dcs_core/integrations/databases/mysql.py index fcff5dc..cda0183 100644 --- a/dcs_core/integrations/databases/mysql.py +++ b/dcs_core/integrations/databases/mysql.py @@ -15,6 +15,7 @@ from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Union +from loguru import logger from sqlalchemy import create_engine, text from sqlalchemy.engine import URL @@ -397,3 +398,40 @@ def query_get_time_diff(self, table: str, field: str) -> int: updated_time = datetime.strptime(updated_time, "%Y-%m-%d %H:%M:%S.%f") return int((datetime.utcnow() - updated_time).total_seconds()) return 0 + + def get_table_foreign_key_info(self, table_name: str, schema: str | None = None): + schema = schema or self.schema_name + + query = f""" + SELECT + kcu.CONSTRAINT_NAME AS constraint_name, + kcu.TABLE_NAME AS table_name, + kcu.COLUMN_NAME AS fk_column, + kcu.REFERENCED_TABLE_NAME AS referenced_table, + kcu.REFERENCED_COLUMN_NAME AS referenced_column + FROM information_schema.TABLE_CONSTRAINTS tc + JOIN information_schema.KEY_COLUMN_USAGE kcu + ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME + AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA + WHERE tc.CONSTRAINT_TYPE = 'FOREIGN KEY' + AND tc.TABLE_NAME = '{table_name}' + AND tc.TABLE_SCHEMA = '{schema}'; + """ + + try: + rows = self.fetchall(query) + except Exception as e: + logger.error(f"Failed to fetch fk info for dataset: {table_name} ({e})") + return [] + + data = [ + { + "constraint_name": row[0], + "table_name": row[1], + "fk_column": row[2], + "referenced_table": row[3], + "referenced_column": row[4], + } + for row in rows + ] + return data diff --git a/dcs_core/integrations/databases/oracle.py b/dcs_core/integrations/databases/oracle.py index 6860b7b..aea3a1e 100644 --- a/dcs_core/integrations/databases/oracle.py +++ b/dcs_core/integrations/databases/oracle.py @@ -702,3 +702,47 @@ def generate_view_name(self, view_name: str | None = None) -> str: ) timestamp = int(time.time()) return f"dcs_view_{timestamp}_{random_string.lower()}".upper() + + def get_table_foreign_key_info(self, table_name: str, schema: str | None = None): + schema = schema or self.schema_name + + query = f""" + SELECT + ac.CONSTRAINT_NAME AS constraint_name, + ac.TABLE_NAME AS table_name, + acc.COLUMN_NAME AS fk_column, + r_ac.TABLE_NAME AS referenced_table, + r_acc.COLUMN_NAME AS referenced_column + FROM ALL_CONSTRAINTS ac + JOIN ALL_CONS_COLUMNS acc + ON ac.CONSTRAINT_NAME = acc.CONSTRAINT_NAME + AND ac.OWNER = acc.OWNER + JOIN ALL_CONSTRAINTS r_ac + ON ac.R_CONSTRAINT_NAME = r_ac.CONSTRAINT_NAME + AND ac.R_OWNER = r_ac.OWNER + JOIN ALL_CONS_COLUMNS r_acc + ON r_ac.CONSTRAINT_NAME = r_acc.CONSTRAINT_NAME + AND r_ac.OWNER = r_acc.OWNER + AND acc.POSITION = r_acc.POSITION + WHERE ac.CONSTRAINT_TYPE = 'R' + AND ac.TABLE_NAME = '{table_name.upper()}' + AND ac.OWNER = '{schema.upper()}'; + """ + + try: + rows = self.fetchall(query) + except Exception as e: + logger.error(f"Failed to fetch fk info for dataset: {table_name} ({e})") + return [] + + data = [ + { + "constraint_name": row[0], + "table_name": row[1], + "fk_column": row[2], + "referenced_table": row[3], + "referenced_column": row[4], + } + for row in rows + ] + return data