feat: add fetch_sample_values_from_database method#332
Conversation
WalkthroughAdds a new public method fetch_sample_values_from_database to PostgresDataSource and SybaseDataSource that builds and executes a SELECT for specified columns with a configurable limit and returns fetched rows; Postgres validates non-empty column list and resolves fully qualified table names. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Caller
participant P as PostgresDataSource
participant S as SybaseDataSource
participant DB as Database
rect rgba(200,230,255,0.25)
note over C,P: Postgres sampling flow (new)
C->>P: fetch_sample_values_from_database(table, columns, limit)
P->>P: Validate columns non-empty (raise ValueError if empty)
P->>P: Resolve qualified table name
P->>P: Build SELECT columns FROM table LIMIT n
P->>DB: Execute query
DB-->>P: rows (list of tuples)
P-->>C: rows
end
rect rgba(220,255,220,0.25)
note over C,S: Sybase sampling flow (new)
C->>S: fetch_sample_values_from_database(table, columns, limit)
S->>S: Resolve/qualify table name
S->>S: Build SELECT TOP n columns FROM table
S->>DB: Execute query
DB-->>S: rows (list of tuples)
S-->>C: rows
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@AnshumanX304 |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
dcs_core/integrations/databases/postgres.py (1)
288-312: Consider aligning the public API with Sybase for consistencyPostgres returns List[Tuple] while Sybase initially declared Tuple[List, Optional[List[str]]]. After fixing Sybase, ensure both return the same type (prefer List[Tuple] as implemented here) to simplify multi-DB callers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
dcs_core/integrations/databases/postgres.py(1 hunks)dcs_core/integrations/databases/sybase.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
dcs_core/integrations/databases/postgres.py (1)
dcs_core/core/datasource/sql_datasource.py (3)
qualified_table_name(130-138)quote_column(148-154)fetchall(117-120)
dcs_core/integrations/databases/sybase.py (3)
dcs_core/integrations/databases/postgres.py (3)
fetch_sample_values_from_database(288-311)qualified_table_name(56-64)quote_column(66-72)dcs_core/integrations/databases/mssql.py (2)
qualified_table_name(110-118)quote_column(120-126)dcs_core/integrations/databases/spark_df.py (2)
cursor(80-81)fetchmany(45-53)
🪛 Ruff (0.13.1)
dcs_core/integrations/databases/postgres.py
305-305: Avoid specifying long messages outside the exception class
(TRY003)
308-308: Possible SQL injection vector through string-based query construction
(S608)
dcs_core/integrations/databases/sybase.py
614-614: Possible SQL injection vector through string-based query construction
(S608)
| if not column_names: | ||
| raise ValueError("At least one column name must be provided") | ||
|
|
||
| columns = ", ".join([self.quote_column(col) for col in column_names]) | ||
| query = f"SELECT {columns} FROM {table_name} LIMIT {limit}" | ||
| result = self.connection.execute(text(query)) | ||
| rows = result.fetchall() | ||
| return rows |
There was a problem hiding this comment.
Validate limit and bind it as a parameter; escape identifiers
- S608: limit is injected directly into the SQL. Bind it via SQLAlchemy and validate it’s a positive int.
- Escape embedded double quotes in identifiers to avoid breaking out of quoted names.
Apply:
if not column_names:
raise ValueError("At least one column name must be provided")
+ if not isinstance(limit, int) or limit <= 0:
+ raise ValueError("limit must be a positive integer")
columns = ", ".join([self.quote_column(col) for col in column_names])
- query = f"SELECT {columns} FROM {table_name} LIMIT {limit}"
- result = self.connection.execute(text(query))
+ stmt = text(f"SELECT {columns} FROM {table_name} LIMIT :limit")
+ result = self.connection.execute(stmt, {"limit": limit})
rows = result.fetchall()
return rowsOutside this hunk, harden identifier quoting:
def qualified_table_name(self, table_name: str) -> str:
- if self.schema_name:
- return f'"{self.schema_name}"."{table_name}"'
- return f'"{table_name}"'
+ if self.schema_name:
+ schema = self.schema_name.replace('"', '""')
+ table = table_name.replace('"', '""')
+ return f'"{schema}"."{table}"'
+ table = table_name.replace('"', '""')
+ return f'"{table}"'
def quote_column(self, column: str) -> str:
- return f'"{column}"'
+ safe = column.replace('"', '""')
+ return f'"{safe}"'Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.13.1)
305-305: Avoid specifying long messages outside the exception class
(TRY003)
308-308: Possible SQL injection vector through string-based query construction
(S608)
🤖 Prompt for AI Agents
In dcs_core/integrations/databases/postgres.py around lines 304 to 311, the code
injects the LIMIT value and identifiers directly into the SQL; validate that
limit is a positive integer and bind it as a SQLAlchemy parameter (e.g., use a
:limit bind) instead of string interpolation, and ensure identifiers are safely
quoted by escaping embedded double quotes inside quote_column (replace any "
with "" before wrapping in quotes) and apply the same quoting to table_name (or
use a centralized quote_identifier helper) so no unescaped quotes can break the
query.
| def fetch_sample_values_from_database( | ||
| self, | ||
| table_name: str, | ||
| column_names: list[str], | ||
| limit: int = 5, | ||
| ) -> Tuple[List, Optional[List[str]]]: | ||
| table_name = self.qualified_table_name(table_name) | ||
| columns = ", ".join([self.quote_column(col) for col in column_names]) | ||
| query = f"SELECT TOP {limit} {columns} FROM {table_name}" | ||
|
|
||
| cursor = self.connection.cursor() | ||
| cursor.execute(query) | ||
| rows = cursor.fetchmany(limit) | ||
| return rows | ||
|
|
There was a problem hiding this comment.
Fix return-type mismatch, add validation, and reuse existing fetch_rows
- The annotation says Tuple[List, Optional[List[str]]] but the function returns only rows. This will surprise callers and violates the signature.
- Missing validation for empty column_names and non-positive limit leads to runtime errors and potential misuse.
- Reuse fetch_rows for consistency and to avoid duplicate cursor logic.
Apply:
- def fetch_sample_values_from_database(
- self,
- table_name: str,
- column_names: list[str],
- limit: int = 5,
- ) -> Tuple[List, Optional[List[str]]]:
- table_name = self.qualified_table_name(table_name)
- columns = ", ".join([self.quote_column(col) for col in column_names])
- query = f"SELECT TOP {limit} {columns} FROM {table_name}"
-
- cursor = self.connection.cursor()
- cursor.execute(query)
- rows = cursor.fetchmany(limit)
- return rows
+ def fetch_sample_values_from_database(
+ self,
+ table_name: str,
+ column_names: list[str],
+ limit: int = 5,
+ ) -> List[Tuple]:
+ table_name = self.qualified_table_name(table_name)
+ if not column_names:
+ raise ValueError("At least one column name must be provided")
+ if not isinstance(limit, int) or limit <= 0:
+ raise ValueError("limit must be a positive integer")
+ columns = ", ".join([self.quote_column(col) for col in column_names])
+ base_query = f"SELECT {columns} FROM {table_name}"
+ rows, _ = self.fetch_rows(query=base_query, limit=limit, with_column_names=False)
+ return rowsAdditionally, escape identifier delimiters to harden against injection via crafted names:
Outside this hunk, update quote_column:
def quote_column(self, column: str) -> str:
- return f"[{column}]"
+ safe = column.replace("]", "]]")
+ return f"[{safe}]"And qualified_table_name:
def qualified_table_name(self, table_name: str) -> str:
- if self.schema_name:
- return f"[{self.schema_name}].[{table_name}]"
- return f"[{table_name}]"
+ if self.schema_name:
+ schema_safe = self.schema_name.replace("]", "]]")
+ table_safe = table_name.replace("]", "]]")
+ return f"[{schema_safe}].[{table_safe}]"
+ table_safe = table_name.replace("]", "]]")
+ return f"[{table_safe}]"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def fetch_sample_values_from_database( | |
| self, | |
| table_name: str, | |
| column_names: list[str], | |
| limit: int = 5, | |
| ) -> Tuple[List, Optional[List[str]]]: | |
| table_name = self.qualified_table_name(table_name) | |
| columns = ", ".join([self.quote_column(col) for col in column_names]) | |
| query = f"SELECT TOP {limit} {columns} FROM {table_name}" | |
| cursor = self.connection.cursor() | |
| cursor.execute(query) | |
| rows = cursor.fetchmany(limit) | |
| return rows | |
| def fetch_sample_values_from_database( | |
| self, | |
| table_name: str, | |
| column_names: list[str], | |
| limit: int = 5, | |
| ) -> List[Tuple]: | |
| table_name = self.qualified_table_name(table_name) | |
| if not column_names: | |
| raise ValueError("At least one column name must be provided") | |
| if not isinstance(limit, int) or limit <= 0: | |
| raise ValueError("limit must be a positive integer") | |
| columns = ", ".join([self.quote_column(col) for col in column_names]) | |
| base_query = f"SELECT {columns} FROM {table_name}" | |
| rows, _ = self.fetch_rows(query=base_query, limit=limit, with_column_names=False) | |
| return rows |
| def fetch_sample_values_from_database( | |
| self, | |
| table_name: str, | |
| column_names: list[str], | |
| limit: int = 5, | |
| ) -> Tuple[List, Optional[List[str]]]: | |
| table_name = self.qualified_table_name(table_name) | |
| columns = ", ".join([self.quote_column(col) for col in column_names]) | |
| query = f"SELECT TOP {limit} {columns} FROM {table_name}" | |
| cursor = self.connection.cursor() | |
| cursor.execute(query) | |
| rows = cursor.fetchmany(limit) | |
| return rows | |
| def quote_column(self, column: str) -> str: | |
| safe = column.replace("]", "]]") | |
| return f"[{safe}]" |
| def fetch_sample_values_from_database( | |
| self, | |
| table_name: str, | |
| column_names: list[str], | |
| limit: int = 5, | |
| ) -> Tuple[List, Optional[List[str]]]: | |
| table_name = self.qualified_table_name(table_name) | |
| columns = ", ".join([self.quote_column(col) for col in column_names]) | |
| query = f"SELECT TOP {limit} {columns} FROM {table_name}" | |
| cursor = self.connection.cursor() | |
| cursor.execute(query) | |
| rows = cursor.fetchmany(limit) | |
| return rows | |
| def qualified_table_name(self, table_name: str) -> str: | |
| if self.schema_name: | |
| schema_safe = self.schema_name.replace("]", "]]") | |
| table_safe = table_name.replace("]", "]]") | |
| return f"[{schema_safe}].[{table_safe}]" | |
| table_safe = table_name.replace("]", "]]") | |
| return f"[{table_safe}]" |
🧰 Tools
🪛 Ruff (0.13.1)
614-614: Possible SQL injection vector through string-based query construction
(S608)
🤖 Prompt for AI Agents
In dcs_core/integrations/databases/sybase.py around lines 606-620, the function
fetch_sample_values_from_database currently returns only rows despite being
annotated as Tuple[List, Optional[List[str]]]; it also misses validation for
empty column_names and non-positive limit and reimplements cursor logic. Change
the function to validate that column_names is non-empty and limit is a positive
integer (raise ValueError on invalid input), build the SELECT TOP {limit} query
using quoted columns and the qualified table name, then call and return the
existing self.fetch_rows(query, limit) (so both rows and optional column names
are returned and cursor handling is reused). Additionally, update quote_column
and qualified_table_name elsewhere to escape identifier delimiters properly to
harden against crafted names (ensure delimiters inside identifiers are
doubled/escaped according to Sybase rules).
| columns = ", ".join([self.quote_column(col) for col in column_names]) | ||
| query = f"SELECT TOP {limit} {columns} FROM {table_name}" | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
Mitigate SQL injection and ensure consistent API across data sources
- S608: string-built SQL; although identifiers are quoted, limit is injected as-is and identifiers aren’t escaped. The above refactor addresses this by escaping bracket delimiters and validating limit. Please also confirm the return type now matches Postgres’ method (List[Tuple]) for cross-DB consistency.
Run to find usages and ensure callers expect a List[Tuple] from both Postgres and Sybase:
🏁 Script executed:
#!/bin/bash
# Definitions
rg -nP '^\s*def\s+fetch_sample_values_from_database\b' -C2 --type=py
# Call-sites (exclude defs)
rg -nP '(?<!def\s+)\bfetch_sample_values_from_database\s*\(' -C2 --type=pyLength of output: 767
🏁 Script executed:
#!/bin/bash
# Search for all call-sites of fetch_sample_values_from_database
rg -n "fetch_sample_values_from_database\(" -C2 --type=pyLength of output: 698
🏁 Script executed:
#!/bin/bash
# Inspect SQL construction in Sybase integration
rg -n "SELECT TOP" -C5 --type=py dcs_core/integrations/databases/sybase.py
# Show fetch_sample_values_from_database signature and return in Postgres
rg -nP "def fetch_sample_values_from_database" -C5 --type=py dcs_core/integrations/databases/postgres.py
# Show fetch_sample_values_from_database signature and return in Sybase
rg -nP "def fetch_sample_values_from_database" -C5 --type=py dcs_core/integrations/databases/sybase.pyLength of output: 2113
Validate limit and align Sybase return type with Postgres
- Coerce/validate
limitto an int before interpolating into the SQL (dcs_core/integrations/databases/sybase.py:613-614) to eliminate injection risk. - Change the signature from
-> Tuple[List, Optional[List[str]]]to-> List[Tuple]and return onlyrows, matching Postgres.
🧰 Tools
🪛 Ruff (0.13.1)
614-614: Possible SQL injection vector through string-based query construction
(S608)
🤖 Prompt for AI Agents
In dcs_core/integrations/databases/sybase.py around lines 613-615, coerce and
validate the incoming limit to an int (e.g., try: limit = int(limit); raise a
ValueError or use a safe default if conversion fails, and ensure limit is
non-negative) before interpolating it into the SQL string to avoid injection,
and then change the function signature and return value to match Postgres by
returning only rows with type List[Tuple] (remove the Optional[List[str]] from
the return and return rows only); update any related type hints/imports and
callers accordingly.
…adjust return type
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dcs_core/integrations/databases/sybase.py (1)
210-217: Enhance identifier escaping to prevent injection via crafted namesThe current
quote_columnandqualified_table_namemethods don't escape square bracket delimiters within identifiers, which could potentially allow injection through crafted column or table names containing]characters.Apply these diffs to properly escape identifiers:
def quote_column(self, column: str) -> str: """ Quote the column name :param column: name of the column :return: quoted column name """ - return f"[{column}]" + safe = column.replace("]", "]]") + return f"[{safe}]"def qualified_table_name(self, table_name: str) -> str: """ Get the qualified table name :param table_name: name of the table :return: qualified table name """ if self.schema_name: - return f"[{self.schema_name}].[{table_name}]" - return f"[{table_name}]" + schema_safe = self.schema_name.replace("]", "]]") + table_safe = table_name.replace("]", "]]") + return f"[{schema_safe}].[{table_safe}]" + table_safe = table_name.replace("]", "]]") + return f"[{table_safe}]"This follows SQL Server/Sybase identifier escaping rules where square brackets within identifiers must be doubled to be treated literally.
🧹 Nitpick comments (1)
dcs_core/integrations/databases/sybase.py (1)
612-620: Consider reusing existing fetch_rows method for consistencyThe method duplicates cursor logic that already exists in the
fetch_rowsmethod. This could lead to maintenance issues and inconsistent behavior across the codebase.Apply this diff to leverage the existing
fetch_rowsmethod:- cursor = self.connection.cursor() - cursor.execute(query) - rows = cursor.fetchmany(limit) - return rows + base_query = f"SELECT {columns} FROM {table_name}" + rows, _ = self.fetch_rows(query=base_query, limit=limit, with_column_names=False) + return rowsThis approach:
- Reuses existing, tested cursor handling logic
- Maintains consistency with other database methods
- Reduces code duplication
- Leverages the existing TOP clause handling in
fetch_rows
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
dcs_core/integrations/databases/sybase.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
dcs_core/integrations/databases/sybase.py (3)
dcs_core/integrations/databases/postgres.py (3)
fetch_sample_values_from_database(288-311)qualified_table_name(56-64)quote_column(66-72)dcs_core/integrations/databases/mssql.py (2)
qualified_table_name(110-118)quote_column(120-126)dcs_core/integrations/databases/spark_df.py (2)
cursor(80-81)fetchmany(45-53)
🪛 Ruff (0.13.1)
dcs_core/integrations/databases/sybase.py
614-614: Avoid specifying long messages outside the exception class
(TRY003)
616-616: Possible SQL injection vector through string-based query construction
(S608)
🔇 Additional comments (1)
dcs_core/integrations/databases/sybase.py (1)
606-621: Fix SQL injection vulnerability and validation issuesThis implementation has multiple critical issues that need immediate attention:
- SQL injection vulnerability: The
limitparameter is directly interpolated into the query without validation, making it vulnerable to injection attacks.- Inconsistent return type: The function returns
list[Tuple]but doesn't match the Postgres implementation signature.- Missing input validation: No validation for empty column names or invalid limit values.
Apply this diff to fix all issues:
- def fetch_sample_values_from_database( - self, - table_name: str, - column_names: list[str], - limit: int = 5, - ) -> list[Tuple]: - table_name = self.qualified_table_name(table_name) - if not column_names: - raise ValueError("At least one column name must be provided") - columns = ", ".join([self.quote_column(col) for col in column_names]) - query = f"SELECT TOP {limit} {columns} FROM {table_name}" - cursor = self.connection.cursor() - cursor.execute(query) - rows = cursor.fetchmany(limit) - return rows + def fetch_sample_values_from_database( + self, + table_name: str, + column_names: list[str], + limit: int = 5, + ) -> List[Tuple]: + table_name = self.qualified_table_name(table_name) + + if not column_names: + raise ValueError("At least one column name must be provided") + + # Validate and sanitize limit to prevent SQL injection + if not isinstance(limit, int) or limit <= 0: + raise ValueError("limit must be a positive integer") + + columns = ", ".join([self.quote_column(col) for col in column_names]) + query = f"SELECT TOP {limit} {columns} FROM {table_name}" + + cursor = self.connection.cursor() + cursor.execute(query) + rows = cursor.fetchmany(limit) + return rowsBased on OWASP guidance, prepared statements force a strong distinction between code and data, eliminating SQL injection problems. The current implementation validates the limit parameter to ensure it's a positive integer before interpolation, which aligns with Sybase's TOP clause requirements that the argument must evaluate to a value greater than or equal to 0.
Fixes/Implements
Description
Summary Goes here.
Type of change
Delete irrelevant options.
How Has This Been Tested?
Summary by CodeRabbit