Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions dcs_core/integrations/databases/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,28 @@ def fetch_rows(
return rows, list(column_names)
else:
return rows, None

def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> List[Tuple]:
"""
Fetch sample rows for specific columns from the given table.

:param table_name: The name of the table.
:param column_names: List of column names to fetch.
:param limit: Number of rows to fetch.
:return: List of row tuples.
"""
table_name = self.qualified_table_name(table_name)

if not column_names:
raise ValueError("At least one column name must be provided")

columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT {columns} FROM {table_name} LIMIT {limit}"
result = self.connection.execute(text(query))
rows = result.fetchall()
return rows
Comment on lines +304 to +311

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate limit and bind it as a parameter; escape identifiers

  • S608: limit is injected directly into the SQL. Bind it via SQLAlchemy and validate it’s a positive int.
  • Escape embedded double quotes in identifiers to avoid breaking out of quoted names.

Apply:

         if not column_names:
             raise ValueError("At least one column name must be provided")
+        if not isinstance(limit, int) or limit <= 0:
+            raise ValueError("limit must be a positive integer")
 
         columns = ", ".join([self.quote_column(col) for col in column_names])
-        query = f"SELECT {columns} FROM {table_name} LIMIT {limit}"
-        result = self.connection.execute(text(query))
+        stmt = text(f"SELECT {columns} FROM {table_name} LIMIT :limit")
+        result = self.connection.execute(stmt, {"limit": limit})
         rows = result.fetchall()
         return rows

Outside this hunk, harden identifier quoting:

 def qualified_table_name(self, table_name: str) -> str:
-        if self.schema_name:
-            return f'"{self.schema_name}"."{table_name}"'
-        return f'"{table_name}"'
+        if self.schema_name:
+            schema = self.schema_name.replace('"', '""')
+            table = table_name.replace('"', '""')
+            return f'"{schema}"."{table}"'
+        table = table_name.replace('"', '""')
+        return f'"{table}"'
 
 def quote_column(self, column: str) -> str:
-        return f'"{column}"'
+        safe = column.replace('"', '""')
+        return f'"{safe}"'

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.13.1)

305-305: Avoid specifying long messages outside the exception class

(TRY003)


308-308: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
In dcs_core/integrations/databases/postgres.py around lines 304 to 311, the code
injects the LIMIT value and identifiers directly into the SQL; validate that
limit is a positive integer and bind it as a SQLAlchemy parameter (e.g., use a
:limit bind) instead of string interpolation, and ensure identifiers are safely
quoted by escaping embedded double quotes inside quote_column (replace any "
with "" before wrapping in quotes) and apply the same quoting to table_name (or
use a centralized quote_identifier helper) so no unescaped quotes can break the
query.

16 changes: 16 additions & 0 deletions dcs_core/integrations/databases/sybase.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,22 @@ def fetch_rows(
else:
return rows, None

def fetch_sample_values_from_database(
self,
table_name: str,
column_names: list[str],
limit: int = 5,
) -> list[Tuple]:
table_name = self.qualified_table_name(table_name)
if not column_names:
raise ValueError("At least one column name must be provided")
columns = ", ".join([self.quote_column(col) for col in column_names])
query = f"SELECT TOP {limit} {columns} FROM {table_name}"
cursor = self.connection.cursor()
cursor.execute(query)
rows = cursor.fetchmany(limit)
return rows

def convert_regex_to_sybase_pattern(self, regex_pattern: str) -> str:
"""
Convert a regex pattern into a Sybase-compatible LIKE pattern.
Expand Down
Loading