-
Notifications
You must be signed in to change notification settings - Fork 23
feat: add get_table_foreign_key_info method for MySQL and Oracle database #351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -702,3 +702,47 @@ def generate_view_name(self, view_name: str | None = None) -> str: | |
| ) | ||
| timestamp = int(time.time()) | ||
| return f"dcs_view_{timestamp}_{random_string.lower()}".upper() | ||
|
|
||
| def get_table_foreign_key_info(self, table_name: str, schema: str | None = None): | ||
| schema = schema or self.schema_name | ||
|
|
||
| query = f""" | ||
| SELECT | ||
| ac.CONSTRAINT_NAME AS constraint_name, | ||
| ac.TABLE_NAME AS table_name, | ||
| acc.COLUMN_NAME AS fk_column, | ||
| r_ac.TABLE_NAME AS referenced_table, | ||
| r_acc.COLUMN_NAME AS referenced_column | ||
| FROM ALL_CONSTRAINTS ac | ||
| JOIN ALL_CONS_COLUMNS acc | ||
| ON ac.CONSTRAINT_NAME = acc.CONSTRAINT_NAME | ||
| AND ac.OWNER = acc.OWNER | ||
| JOIN ALL_CONSTRAINTS r_ac | ||
| ON ac.R_CONSTRAINT_NAME = r_ac.CONSTRAINT_NAME | ||
| AND ac.R_OWNER = r_ac.OWNER | ||
| JOIN ALL_CONS_COLUMNS r_acc | ||
| ON r_ac.CONSTRAINT_NAME = r_acc.CONSTRAINT_NAME | ||
| AND r_ac.OWNER = r_acc.OWNER | ||
| AND acc.POSITION = r_acc.POSITION | ||
| WHERE ac.CONSTRAINT_TYPE = 'R' | ||
| AND ac.TABLE_NAME = '{table_name.upper()}' | ||
| AND ac.OWNER = '{schema.upper()}'; | ||
| """ | ||
|
|
||
| try: | ||
| rows = self.fetchall(query) | ||
| except Exception as e: | ||
| logger.error(f"Failed to fetch fk info for dataset: {table_name} ({e})") | ||
| return [] | ||
|
|
||
| data = [ | ||
| { | ||
| "constraint_name": row[0], | ||
| "table_name": row[1], | ||
| "fk_column": row[2], | ||
| "referenced_table": row[3], | ||
| "referenced_column": row[4], | ||
| } | ||
| for row in rows | ||
| ] | ||
| return data | ||
|
Comment on lines
+706
to
+748
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection vulnerability: use parameterized queries. The Apply this diff to use parameterized queries with SQLAlchemy: def get_table_foreign_key_info(self, table_name: str, schema: str | None = None):
schema = schema or self.schema_name
query = f"""
SELECT
ac.CONSTRAINT_NAME AS constraint_name,
ac.TABLE_NAME AS table_name,
acc.COLUMN_NAME AS fk_column,
r_ac.TABLE_NAME AS referenced_table,
r_acc.COLUMN_NAME AS referenced_column
FROM ALL_CONSTRAINTS ac
JOIN ALL_CONS_COLUMNS acc
ON ac.CONSTRAINT_NAME = acc.CONSTRAINT_NAME
AND ac.OWNER = acc.OWNER
JOIN ALL_CONSTRAINTS r_ac
ON ac.R_CONSTRAINT_NAME = r_ac.CONSTRAINT_NAME
AND ac.R_OWNER = r_ac.OWNER
JOIN ALL_CONS_COLUMNS r_acc
ON r_ac.CONSTRAINT_NAME = r_acc.CONSTRAINT_NAME
AND r_ac.OWNER = r_acc.OWNER
AND acc.POSITION = r_acc.POSITION
WHERE ac.CONSTRAINT_TYPE = 'R'
- AND ac.TABLE_NAME = '{table_name.upper()}'
- AND ac.OWNER = '{schema.upper()}';
+ AND ac.TABLE_NAME = :table_name
+ AND ac.OWNER = :schema
"""
try:
- rows = self.fetchall(query)
+ from sqlalchemy import text
+ result = self.connection.execute(
+ text(query),
+ {"table_name": table_name.upper(), "schema": schema.upper()}
+ )
+ rows = result.fetchall()
except Exception as e:
logger.error(f"Failed to fetch fk info for dataset: {table_name} ({e})")
return []
data = [
{
"constraint_name": row[0],
"table_name": row[1],
"fk_column": row[2],
"referenced_table": row[3],
"referenced_column": row[4],
}
for row in rows
]
return dataNote: Similar SQL injection vulnerabilities exist in other methods throughout the codebase (e.g.,
🧰 Tools🪛 Ruff (0.14.6)709-730: Possible SQL injection vector through string-based query construction (S608) 734-734: Do not catch blind exception: (BLE001) |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL injection vulnerability: use parameterized queries.
The
table_nameandschemaparameters are directly interpolated into the SQL query using f-strings, creating a SQL injection vulnerability. A malicious input liketable_name="users' OR '1'='1"could manipulate the query logic or extract unauthorized data.Apply this diff to use parameterized queries with SQLAlchemy:
def get_table_foreign_key_info(self, table_name: str, schema: str | None = None): schema = schema or self.schema_name query = f""" SELECT kcu.CONSTRAINT_NAME AS constraint_name, kcu.TABLE_NAME AS table_name, kcu.COLUMN_NAME AS fk_column, kcu.REFERENCED_TABLE_NAME AS referenced_table, kcu.REFERENCED_COLUMN_NAME AS referenced_column FROM information_schema.TABLE_CONSTRAINTS tc JOIN information_schema.KEY_COLUMN_USAGE kcu ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA WHERE tc.CONSTRAINT_TYPE = 'FOREIGN KEY' - AND tc.TABLE_NAME = '{table_name}' - AND tc.TABLE_SCHEMA = '{schema}'; + AND tc.TABLE_NAME = :table_name + AND tc.TABLE_SCHEMA = :schema """ try: - rows = self.fetchall(query) + result = self.connection.execute( + text(query), + {"table_name": table_name, "schema": schema} + ) + rows = result.fetchall() except Exception as e: logger.error(f"Failed to fetch fk info for dataset: {table_name} ({e})") return [] data = [ { "constraint_name": row[0], "table_name": row[1], "fk_column": row[2], "referenced_table": row[3], "referenced_column": row[4], } for row in rows ] return dataNote: This SQL injection pattern appears throughout the codebase in similar query methods. Consider addressing this systematically across all database datasource classes.
🧰 Tools
🪛 Ruff (0.14.6)
405-419: Possible SQL injection vector through string-based query construction
(S608)
423-423: Do not catch blind exception:
Exception(BLE001)