Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .github/workflows/parameters/parameters_dbapi.py.gpg
Binary file not shown.
8 changes: 8 additions & 0 deletions .github/workflows/precommit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ jobs:
SNOWPARK_PYTHON_API_S3_STORAGE_INTEGRATION: ${{ vars.SNOWPARK_PYTHON_API_S3_STORAGE_INTEGRATION }}
TOX_PARALLEL_NO_SPINNER: 1
shell: bash
- name: Install MS ODBC Driver (Ubuntu only)
if: ${{ contains(matrix.os, 'ubuntu') }}
run: |
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18 unixodbc-dev
shell: bash
- name: Run data source tests
# psycopg2 is not supported on macos 3.9
# SNOW-2213578: Re-enable the test for 3.13
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@
- `st_asgeojson`
- `st_aswkb`

#### Bug Fixes

- Fixed multiple bugs in `DataFrameReader.dbapi` (PuPr):
- Fixed UDTF ingestion failure with `pyodbc` driver caused by unprocessed row data.
- Fixed SQL Server query input failure due to incorrect select query generation.
- Fixed UDTF ingestion not preserving column nullability in the output schema.

#### Improvements

- Improved `DataFrameReader.dbapi` (PuPr) reading performance by setting the default `fetch_size` parameter value to 100000.

### Snowpark pandas API Updates

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
from typing import List

from snowflake.snowpark._internal.data_source.dbms_dialects import BaseDialect
from snowflake.snowpark._internal.data_source.dbms_dialects.base_dialect import (
QUERY_TEMPLATE,
)
from snowflake.snowpark._internal.utils import quote_name
from snowflake.snowpark.types import StructType


class SqlServerDialect(BaseDialect):
pass
def generate_select_query(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about how this is used, is it related to the bug fixes?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is related to the bug query input does not work for SQL Server.

default base driver add backtik to column name which is invalid in sql server.
the fix is to use double quote

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the changelog, hope the bug is easier to understand now

self,
table_or_query: str,
schema: StructType,
raw_schema: List[tuple],
is_query: bool,
query_input_alias: str,
) -> str:
cols = []
for _field, raw_field in zip(schema.fields, raw_schema):
field_name = (
f"{query_input_alias}.{quote_name(raw_field[0], keep_case=True)}"
if is_query
else f"{quote_name(raw_field[0], keep_case=True)}"
)
cols.append(f"{field_name} AS {raw_field[0]}") if is_query else cols.append(
field_name
)
return QUERY_TEMPLATE.format(
cols=", ".join(cols),
table_or_query=f"({table_or_query})" if is_query else table_or_query,
query_input_alias=query_input_alias if is_query else "",
)
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,10 @@ def to_result_snowpark_df_udtf(
res_df[field.name].cast(field.datatype).alias(field.name)
for field in schema.fields
]
return res_df.select(cols, _emit_ast=_emit_ast)
selected_df = res_df.select(cols, _emit_ast=_emit_ast)
for attr, source_field in zip(selected_df._plan.attributes, schema.fields):
attr.nullable = source_field.nullable
return selected_df

def get_server_cursor_if_supported(self, conn: "Connection") -> "Cursor":
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def process(self, query: str):
rows = cursor.fetchmany(fetch_size)
if not rows:
break
yield from rows
yield from map(tuple, rows)

return UDTFIngestion

Expand Down
2 changes: 1 addition & 1 deletion src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ def dbapi(
num_partitions: Optional[int] = None,
max_workers: Optional[int] = None,
query_timeout: Optional[int] = 0,
fetch_size: Optional[int] = 1000,
fetch_size: Optional[int] = 100000,
custom_schema: Optional[Union[str, StructType]] = None,
predicates: Optional[List[str]] = None,
session_init_statement: Optional[Union[str, List[str]]] = None,
Expand Down
Loading