From 230851408d9d7e4f54d4c63bcc62924733d00fcf Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Wed, 24 Sep 2025 19:19:30 -0400 Subject: [PATCH 1/6] pyodbc fixes --- .../parameters/parameters_dbapi.py.gpg | Bin 644 -> 771 bytes .github/workflows/precommit.yml | 8 + CHANGELOG.md | 15 +- .../dbms_dialects/sqlserver_dialect.py | 31 +- .../data_source/drivers/base_driver.py | 5 +- .../data_source/drivers/pyodbc_driver.py | 2 +- src/snowflake/snowpark/dataframe_reader.py | 2 +- tests/integ/datasource/test_pyodbc.py | 190 ++++++++++++ .../test_data_source_dir/test_pyodbc_data.py | 285 ++++++++++++++++++ tests/unit/scala/test_utils_suite.py | 1 + tox.ini | 1 + 11 files changed, 532 insertions(+), 8 deletions(-) create mode 100644 tests/integ/datasource/test_pyodbc.py create mode 100644 tests/resources/test_data_source_dir/test_pyodbc_data.py diff --git a/.github/workflows/parameters/parameters_dbapi.py.gpg b/.github/workflows/parameters/parameters_dbapi.py.gpg index 977140aa92bcefb2a34d5b2d0c4d1354da46d2c8..2b9bb69c5f83d586e5cbb9f98851fea03971ad31 100644 GIT binary patch literal 771 zcmV+e1N{7q4Fm}T2(b~(%uU#l(Erlu0g5skF#6Glt5@A_Obu(TX@ON~SoTP**mvo_ z4>v@iv$tp0y@o;!0?nq}k{HE>=6iinm%@Mw?edHDKRpZX4qyxkm$i>xEYirYM8N&F z>;Lz8m0IcT6|)8UN4SN|W5>9EU7eU4^XnI8{8OndKedO(7tqzs!zCeU-ZH(?_SDZiq}qSic6ZchbN4t@w(c|(aI;T2YVz`93|4+pW!<*5i~au_aqw()Gdt&eomxLqxb7C(<;m35olQ^NSKkwQo-0YN{Atq@~VHqDi~I01oVzGv5-m49)C$r5WXktJ3O BcftSw literal 644 zcmV-~0(Fsjei?(v(vT0QpHn6S|}p3fhodwd57M z27xM{)D9%0hbbsA<{&&J^k=u*&DNMj{ zg`9R>=0a#sl=ikF%N-y^sr5O<9pua8ly7R7Zez71)pd_(U5u6!BLs5;Zl@VWJOQcr z@AmaYhJw9TyOA9`+UvzQO*c0Zw2$E>wxAt@CEV;Cn-d5>*0o}l<2?P}xN+B%n) zpRT&xQ&p)I@34<9`*9@E!~_Tv3s=BybZ!LTUvGGvKp&N+&zJL=4;^d_)G3O>aFmvv zl)1yz!|Zi)(^j3f4__jum%3R#|K|Tg)47Ess%n}m&^l%eftd}!4pCJHcp%s~WN3ah zYUSYiH6q&8a&TWzHPR#?P!-ZTkLeG8j@&XojMDSaqx(sf-H~6p>QC$fr@M5qlesSV z5COq&-61b+FH}3?>+7$aP6#;f7{9><&BP65WS6OW7C8_)x;=b9Z}w@=zzI9fFk+{9 zr8cWqF(t`Cc6Z7_s>>VpEW&(C0#M-Yg@iW?^TR?21J*mglmLTyV!m_J^aF;@4TVQv z&%?9i(!ydhVu!u0c8dLAU@|q%?qWN25R^LIhOWLpoJ3wno9#%oyK%6ZMz)a3N@^Q- zy5j9O_Sz2{Om=+CJ{Yyea@bpSwQLAI`k&1en!yjCpvqFk3nOHpRUhws8UH$hnZ4NW z%+RbFR`nm}lcUMZ3C!Wa6Ai362~0w4^3u?kNyGN4Xs4X@2C^LeC8boO1L5T*7J&ms eh^)D2Q%r diff --git a/.github/workflows/precommit.yml b/.github/workflows/precommit.yml index d453116157..7e7b1ccadc 100644 --- a/.github/workflows/precommit.yml +++ b/.github/workflows/precommit.yml @@ -207,6 +207,14 @@ jobs: SNOWPARK_PYTHON_API_S3_STORAGE_INTEGRATION: ${{ vars.SNOWPARK_PYTHON_API_S3_STORAGE_INTEGRATION }} TOX_PARALLEL_NO_SPINNER: 1 shell: bash + - name: Install MS ODBC Driver (Ubuntu only) + if: ${{ contains(matrix.os, 'ubuntu') }} + run: | + curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add - + curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list + sudo apt-get update + sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18 unixodbc-dev + shell: bash - name: Run data source tests # psycopg2 is not supported on macos 3.9 # SNOW-2213578: Re-enable the test for 3.13 diff --git a/CHANGELOG.md b/CHANGELOG.md index c32372fe83..f4313c1ba0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,6 @@ ### Snowpark Python API Updates -#### Bug Fixes - -- Fixed a bug that `DataFrame.limit()` fail if there is parameter binding in the executed SQL. - #### New Features - Added a new module `snowflake.snowpark.secrets` that provides Python wrappers for accessing Snowflake Secrets within Python UDFs and stored procedures that execute inside Snowflake. @@ -56,6 +52,17 @@ - `nvl2` - `regr_valx` +#### Bug Fixes + +- Fixed a bug that `DataFrame.limit()` fail if there is parameter binding in the executed SQL. +- Fixed the following bugs in `DataFrameReader.dbapi`(PuPr): + - udtf ingestion does not work for `pyodbc` driver. + - query input does not work for SQL Server. + - nullability of output schema of udft ingestion is not preserved. + +#### Improvements + +- Improved `DataFrameReader.dbapi` (PuPr) reading performance by setting the default `fetch_size` parameter value to 10000. ### Snowpark pandas API Updates diff --git a/src/snowflake/snowpark/_internal/data_source/dbms_dialects/sqlserver_dialect.py b/src/snowflake/snowpark/_internal/data_source/dbms_dialects/sqlserver_dialect.py index 38daa35b33..2bc4f95ff5 100644 --- a/src/snowflake/snowpark/_internal/data_source/dbms_dialects/sqlserver_dialect.py +++ b/src/snowflake/snowpark/_internal/data_source/dbms_dialects/sqlserver_dialect.py @@ -1,8 +1,37 @@ # # Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. # +from typing import List + from snowflake.snowpark._internal.data_source.dbms_dialects import BaseDialect +from snowflake.snowpark._internal.data_source.dbms_dialects.base_dialect import ( + QUERY_TEMPLATE, +) +from snowflake.snowpark._internal.utils import quote_name +from snowflake.snowpark.types import StructType class SqlServerDialect(BaseDialect): - pass + def generate_select_query( + self, + table_or_query: str, + schema: StructType, + raw_schema: List[tuple], + is_query: bool, + query_input_alias: str, + ) -> str: + cols = [] + for _field, raw_field in zip(schema.fields, raw_schema): + field_name = ( + f"{query_input_alias}.{quote_name(raw_field[0], keep_case=True)}" + if is_query + else f"{quote_name(raw_field[0], keep_case=True)}" + ) + cols.append(f"{field_name} AS {raw_field[0]}") if is_query else cols.append( + field_name + ) + return QUERY_TEMPLATE.format( + cols=", ".join(cols), + table_or_query=f"({table_or_query})" if is_query else table_or_query, + query_input_alias=query_input_alias if is_query else "", + ) diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py index a9de3af7b7..01bf566034 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py @@ -259,7 +259,10 @@ def to_result_snowpark_df_udtf( res_df[field.name].cast(field.datatype).alias(field.name) for field in schema.fields ] - return res_df.select(cols, _emit_ast=_emit_ast) + selected_df = res_df.select(cols, _emit_ast=_emit_ast) + for attr, source_field in zip(selected_df._plan.attributes, schema.fields): + attr.nullable = source_field.nullable + return selected_df def get_server_cursor_if_supported(self, conn: "Connection") -> "Cursor": """ diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py index 9ecbac9ea9..9ecdd54348 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py @@ -106,7 +106,7 @@ def process(self, query: str): rows = cursor.fetchmany(fetch_size) if not rows: break - yield from rows + yield from map(tuple, rows) return UDTFIngestion diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 5533ebb246..bdd2bf458a 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -1695,7 +1695,7 @@ def dbapi( num_partitions: Optional[int] = None, max_workers: Optional[int] = None, query_timeout: Optional[int] = 0, - fetch_size: Optional[int] = 1000, + fetch_size: Optional[int] = 10000, custom_schema: Optional[Union[str, StructType]] = None, predicates: Optional[List[str]] = None, session_init_statement: Optional[Union[str, List[str]]] = None, diff --git a/tests/integ/datasource/test_pyodbc.py b/tests/integ/datasource/test_pyodbc.py new file mode 100644 index 0000000000..d820d55fce --- /dev/null +++ b/tests/integ/datasource/test_pyodbc.py @@ -0,0 +1,190 @@ +# +# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. +# + +import pytest + +from snowflake.snowpark.types import StringType + +from tests.parameters import SQL_SERVER_CONNECTION_PARAMETERS +from tests.utils import IS_IN_STORED_PROC, Utils +from tests.resources.test_data_source_dir.test_pyodbc_data import ( + SQL_SERVER_TABLE_NAME, + EXPECTED_TEST_DATA, + SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, + EXPECTED_UNICODE_TEST_DATA, + SQL_SERVER_SCHEMA, + SQL_SERVER_UNICODE_SCHEMA, + SQL_SEVER_UNICODE_TABLE_NAME, +) + +DEPENDENCIES_PACKAGE_UNAVAILABLE = True +try: + import pyodbc + import pandas # noqa: F401 + + DEPENDENCIES_PACKAGE_UNAVAILABLE = False +except ImportError: + pass + + +pytestmark = [ + pytest.mark.skipif(DEPENDENCIES_PACKAGE_UNAVAILABLE, reason="Missing 'pyodbc'"), + pytest.mark.skipif(IS_IN_STORED_PROC, reason="Need External Access Integration"), +] + + +def construct_input_dict(input_type, table_name): + return { + input_type: table_name + if input_type == "table" + else f"SELECT * FROM {table_name}" + } + + +def verify_save_table_result( + session, df, expected_data, expected_schema, apply_order, ignore_string_size=False +): + + if apply_order: + df = df.order_by("ID") + + Utils.check_answer(df, expected_data) + + def verify_schemas(df, expected_schema, ignore_string_size): + # TODO: SNOW-2362041 + # - UDTF ingestion returning StringType 128 MB (due to variant default to 128MB) + # - parquet based ingestion returning StringType 16 MB + # we should align the two + for field, expected_field in zip(df.schema.fields, expected_schema.fields): + if isinstance(field.datatype, StringType): + assert isinstance(field.datatype, type(expected_field.datatype)) + if ignore_string_size: + assert ( + field.datatype.length == expected_field.datatype.length + or field.datatype.length == 134217728 + ) + else: + assert field.datatype.length == expected_field.datatype.length + else: + assert field.datatype == expected_field.datatype + assert field.name == expected_field.name + assert field.nullable == expected_field.nullable + + verify_schemas(df, expected_schema, ignore_string_size) + # after the fix SNOW-2362041, we should be able to enable this assertion + # assert df.schema == expected_schema + + table_name = Utils.random_table_name() + # save and read + df.write.mode("overwrite").save_as_table(table_name, table_type="temp") + read_table = session.table(table_name) + + if apply_order: + read_table = read_table.order_by("ID") + + Utils.check_answer(read_table, expected_data) + verify_schemas(read_table, expected_schema, ignore_string_size) + # after the fix SNOW-2362041, we should be able to enable this assertion + # assert read_table.schema == expected_schema + + +def create_connection_sql_server(): + return pyodbc.connect( + "DRIVER=" + SQL_SERVER_CONNECTION_PARAMETERS["DRIVER"] + ";" + "SERVER=" + SQL_SERVER_CONNECTION_PARAMETERS["SERVER"] + ";" + "UID=" + SQL_SERVER_CONNECTION_PARAMETERS["UID"] + ";" + "PWD=" + SQL_SERVER_CONNECTION_PARAMETERS["PWD"] + ";" + "TrustServerCertificate=" + + SQL_SERVER_CONNECTION_PARAMETERS["TrustServerCertificate"] + + ";" + "Encrypt=" + SQL_SERVER_CONNECTION_PARAMETERS["Encrypt"] + ";" + ) + + +@pytest.mark.parametrize( + "input_type, table_name, expected_data, expected_schema, apply_order", + [ + ("table", SQL_SERVER_TABLE_NAME, EXPECTED_TEST_DATA, SQL_SERVER_SCHEMA, True), + ("query", SQL_SERVER_TABLE_NAME, EXPECTED_TEST_DATA, SQL_SERVER_SCHEMA, True), + ( + "table", + SQL_SEVER_UNICODE_TABLE_NAME, + EXPECTED_UNICODE_TEST_DATA, + SQL_SERVER_UNICODE_SCHEMA, + False, + ), + ( + "query", + SQL_SEVER_UNICODE_TABLE_NAME, + EXPECTED_UNICODE_TEST_DATA, + SQL_SERVER_UNICODE_SCHEMA, + False, + ), + ], +) +def test_sql_server_ingestion( + session, input_type, table_name, expected_data, expected_schema, apply_order +): + if "ODBC Driver 18 for SQL Server" not in pyodbc.drivers(): + pytest.skip("Microsoft ODBC Driver 18 for SQL Server is not installed") + + df = session.read.dbapi( + create_connection_sql_server, + **construct_input_dict(input_type, table_name), + ) + + verify_save_table_result(session, df, expected_data, expected_schema, apply_order) + + +@pytest.mark.parametrize( + "input_type, table_name, expected_data, expected_schema, apply_order", + [ + ("table", SQL_SERVER_TABLE_NAME, EXPECTED_TEST_DATA, SQL_SERVER_SCHEMA, True), + ("query", SQL_SERVER_TABLE_NAME, EXPECTED_TEST_DATA, SQL_SERVER_SCHEMA, True), + ( + "table", + SQL_SEVER_UNICODE_TABLE_NAME, + EXPECTED_UNICODE_TEST_DATA, + SQL_SERVER_UNICODE_SCHEMA, + False, + ), + ( + "query", + SQL_SEVER_UNICODE_TABLE_NAME, + EXPECTED_UNICODE_TEST_DATA, + SQL_SERVER_UNICODE_SCHEMA, + False, + ), + ], +) +def test_sql_server_udtf_ingestion( + session, input_type, table_name, expected_data, expected_schema, apply_order +): + local_parameters = SQL_SERVER_CONNECTION_PARAMETERS.copy() + + def local_create_connection_sql_server(): + return pyodbc.connect( + "DRIVER=" + local_parameters["DRIVER"] + ";" + "SERVER=" + local_parameters["SERVER"] + ";" + "UID=" + local_parameters["UID"] + ";" + "PWD=" + local_parameters["PWD"] + ";" + "TrustServerCertificate=" + local_parameters["TrustServerCertificate"] + ";" + "Encrypt=" + local_parameters["Encrypt"] + ";" + ) + + # sql server pyodbc required microsoft odbc driver installed on the machine + df = session.read.dbapi( + local_create_connection_sql_server, + **construct_input_dict(input_type, table_name), + udtf_configs=SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, + ) + + verify_save_table_result( + session, + df, + expected_data, + expected_schema, + apply_order, + ignore_string_size=True, + ) diff --git a/tests/resources/test_data_source_dir/test_pyodbc_data.py b/tests/resources/test_data_source_dir/test_pyodbc_data.py new file mode 100644 index 0000000000..43459d31ad --- /dev/null +++ b/tests/resources/test_data_source_dir/test_pyodbc_data.py @@ -0,0 +1,285 @@ +from decimal import Decimal +import datetime +from snowflake.snowpark import Row +from snowflake.snowpark.types import ( + StructType, + StructField, + LongType, + BooleanType, + DecimalType, + DoubleType, + DateType, + TimeType, + TimestampType, + StringType, + BinaryType, + TimestampTimeZone, +) + +# XML, GEOMETRY, GEOGRAPHY types are not supported in ODBC driver + +SQL_SERVER_TABLE_NAME = "test_db.dbo.ALL_TYPE_TABLE" +SQL_SEVER_UNICODE_TABLE_NAME = "test_db.dbo.用户資料" +SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION = { + "external_access_integration": "snowpark_dbapi_sql_server_test_integration" +} +SQL_SERVER_SCHEMA = StructType( + [ + StructField("ID", LongType(), False), + StructField("BIGINT_COL", LongType(), True), + StructField("INT_COL", LongType(), True), + StructField("SMALLINT_COL", LongType(), True), + StructField("TINYINT_COL", LongType(), True), + StructField("BIT_COL", BooleanType(), True), + StructField("DECIMAL_COL", DecimalType(18, 4), True), + StructField("NUMERIC_COL", DecimalType(10, 2), True), + StructField("MONEY_COL", DecimalType(19, 4), True), + StructField("SMALLMONEY_COL", DecimalType(10, 4), True), + StructField("FLOAT_COL", DoubleType(), True), + StructField("REAL_COL", DoubleType(), True), + StructField("DATE_COL", DateType(), True), + StructField("TIME_COL", TimeType(), True), + StructField("DATETIME_COL", TimestampType(TimestampTimeZone("ntz")), True), + StructField("DATETIME2_COL", TimestampType(TimestampTimeZone("ntz")), True), + StructField("SMALLDATETIME_COL", TimestampType(TimestampTimeZone("ntz")), True), + StructField("CHAR_COL", StringType(16777216), True), + StructField("VARCHAR_COL", StringType(16777216), True), + StructField("VARCHAR_MAX_COL", StringType(16777216), True), + StructField("TEXT_COL", StringType(16777216), True), + StructField("NCHAR_COL", StringType(16777216), True), + StructField("NVARCHAR_COL", StringType(16777216), True), + StructField("NVARCHAR_MAX_COL", StringType(16777216), True), + StructField("NTEXT_COL", StringType(16777216), True), + StructField("BINARY_COL", BinaryType(), True), + StructField("VARBINARY_COL", BinaryType(), True), + StructField("VARBINARY_MAX_COL", BinaryType(), True), + StructField("IMAGE_COL", BinaryType(), True), + StructField("UNIQUEIDENTIFIER_COL", StringType(16777216), True), + StructField("SQL_VARIANT_COL", BinaryType(), True), + StructField("TIMESTAMP_COL", BinaryType(), False), + ] +) + +SQL_SERVER_UNICODE_SCHEMA = StructType( + [ + StructField('"編號"', LongType(), nullable=False), + StructField('"姓名"', StringType(16777216), nullable=True), + StructField('"國家"', StringType(16777216), nullable=True), + StructField('"備註"', StringType(16777216), nullable=True), + ] +) + + +EXPECTED_TEST_DATA = [ + Row( + ID=1, + BIGINT_COL=9223372036854775807, + INT_COL=2147483647, + SMALLINT_COL=32767, + TINYINT_COL=255, + BIT_COL=True, + DECIMAL_COL=Decimal("123456.7890"), + NUMERIC_COL=Decimal("9999.99"), + MONEY_COL=Decimal("922337203685477.5000"), + SMALLMONEY_COL=Decimal("214748.3600"), + FLOAT_COL=1.79e100, + REAL_COL=3.3999999274014946e20, + DATE_COL=datetime.date(2023, 1, 15), + TIME_COL=datetime.time(14, 30, 25), + DATETIME_COL=datetime.datetime(2023, 1, 15, 14, 30, 25, 123000), + DATETIME2_COL=datetime.datetime(2023, 1, 15, 14, 30, 25), + SMALLDATETIME_COL=datetime.datetime(2023, 1, 15, 14, 30), + CHAR_COL="Sample1 ", + VARCHAR_COL="Sample VARCHAR 1", + VARCHAR_MAX_COL="This is a long VARCHAR(MAX) sample text for row 1", + TEXT_COL="Sample TEXT 1", + NCHAR_COL="Unicode1 ", + NVARCHAR_COL="Unicode VARCHAR 1", + NVARCHAR_MAX_COL="This is a long NVARCHAR(MAX) unicode text for row 1", + NTEXT_COL="Unicode TEXT 1", + BINARY_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + VARBINARY_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + VARBINARY_MAX_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef\x01#Eg\x89\xab\xcd\xef"), + IMAGE_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + UNIQUEIDENTIFIER_COL="3AE71804-5408-450E-B393-4EA521D7F425", + SQL_VARIANT_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + TIMESTAMP_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x08k"), + ), + Row( + ID=2, + BIGINT_COL=-9223372036854775808, + INT_COL=-2147483648, + SMALLINT_COL=-32768, + TINYINT_COL=0, + BIT_COL=False, + DECIMAL_COL=Decimal("-123456.7890"), + NUMERIC_COL=Decimal("-9999.99"), + MONEY_COL=Decimal("-922337203685477.5000"), + SMALLMONEY_COL=Decimal("-214748.3600"), + FLOAT_COL=2.23e-100, + REAL_COL=1.1799999625533166e-20, + DATE_COL=datetime.date(2023, 2, 20), + TIME_COL=datetime.time(9, 15, 45), + DATETIME_COL=datetime.datetime(2023, 2, 20, 9, 15, 45, 457000), + DATETIME2_COL=datetime.datetime(2023, 2, 20, 9, 15, 45), + SMALLDATETIME_COL=datetime.datetime(2023, 2, 20, 9, 16), + CHAR_COL="Test2 ", + VARCHAR_COL="Test VARCHAR 2", + VARCHAR_MAX_COL="Another long VARCHAR(MAX) sample text for row 2", + TEXT_COL="Test TEXT 2", + NCHAR_COL="测试2 ", + NVARCHAR_COL="测试 VARCHAR 2", + NVARCHAR_MAX_COL="Another long NVARCHAR(MAX) unicode text for row 2 with Chinese characters", + NTEXT_COL="测试 TEXT 2", + BINARY_COL=bytearray(b"\xfe\xdc\xba\x98vT2\x10"), + VARBINARY_COL=bytearray(b"\xfe\xdc\xba\x98vT2\x10"), + VARBINARY_MAX_COL=bytearray(b"\xfe\xdc\xba\x98vT2\x10\xfe\xdc\xba\x98vT2\x10"), + IMAGE_COL=bytearray(b"\xfe\xdc\xba\x98vT2\x10"), + UNIQUEIDENTIFIER_COL="9043B413-B91E-4F24-A82E-58D10E1CB1EF", + SQL_VARIANT_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + TIMESTAMP_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x08o"), + ), + Row( + ID=3, + BIGINT_COL=1000000000000000, + INT_COL=1000000, + SMALLINT_COL=1000, + TINYINT_COL=128, + BIT_COL=True, + DECIMAL_COL=Decimal("0.0001"), + NUMERIC_COL=Decimal("0.01"), + MONEY_COL=Decimal("100.5000"), + SMALLMONEY_COL=Decimal("50.2500"), + FLOAT_COL=3.14159265, + REAL_COL=2.718280076980591, + DATE_COL=datetime.date(2023, 3, 10), + TIME_COL=datetime.time(18, 45, 30), + DATETIME_COL=datetime.datetime(2023, 3, 10, 18, 45, 30, 790000), + DATETIME2_COL=datetime.datetime(2023, 3, 10, 18, 45, 30), + SMALLDATETIME_COL=datetime.datetime(2023, 3, 10, 18, 45), + CHAR_COL="Data3 ", + VARCHAR_COL="Data VARCHAR 3", + VARCHAR_MAX_COL="Third VARCHAR(MAX) sample with special chars: !@#$%^&*()", + TEXT_COL="Data TEXT 3", + NCHAR_COL="データ3 ", + NVARCHAR_COL="データ VARCHAR 3", + NVARCHAR_MAX_COL="Third NVARCHAR(MAX) with Japanese: こんにちは世界", + NTEXT_COL="データ TEXT 3", + BINARY_COL=bytearray(b"\x11\x11\x11\x11\x11\x11\x11\x11"), + VARBINARY_COL=bytearray(b'""""""""'), + VARBINARY_MAX_COL=bytearray(b"33333333DDDDDDDD"), + IMAGE_COL=bytearray(b"UUUUUUUU"), + UNIQUEIDENTIFIER_COL="C7F28568-6F34-41B1-97E4-3CF55C57D43C", + SQL_VARIANT_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + TIMESTAMP_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x08s"), + ), + Row( + ID=4, + BIGINT_COL=-500000000000000, + INT_COL=-500000, + SMALLINT_COL=-500, + TINYINT_COL=64, + BIT_COL=False, + DECIMAL_COL=Decimal("999.9999"), + NUMERIC_COL=Decimal("999.99"), + MONEY_COL=Decimal("1000000.0000"), + SMALLMONEY_COL=Decimal("10000.0000"), + FLOAT_COL=1.23e50, + REAL_COL=98700001280.0, + DATE_COL=datetime.date(2023, 4, 25), + TIME_COL=datetime.time(12, 0), + DATETIME_COL=datetime.datetime(2023, 4, 25, 12, 0), + DATETIME2_COL=datetime.datetime(2023, 4, 25, 12, 0), + SMALLDATETIME_COL=datetime.datetime(2023, 4, 25, 12, 0), + CHAR_COL="Record4 ", + VARCHAR_COL="Record VARCHAR 4", + VARCHAR_MAX_COL="Fourth record with emojis", + TEXT_COL="Record TEXT 4", + NCHAR_COL="Запись4 ", + NVARCHAR_COL="Запись VARCHAR 4", + NVARCHAR_MAX_COL="Fourth record with Cyrillic: Привет мир!", + NTEXT_COL="Запись TEXT 4", + BINARY_COL=bytearray(b"\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"), + VARBINARY_COL=bytearray(b"\xbb\xbb\xbb\xbb\xbb\xbb\xbb\xbb"), + VARBINARY_MAX_COL=bytearray( + b"\xcc\xcc\xcc\xcc\xcc\xcc\xcc\xcc\xdd\xdd\xdd\xdd\xdd\xdd\xdd\xdd" + ), + IMAGE_COL=bytearray(b"\xee\xee\xee\xee\xee\xee\xee\xee"), + UNIQUEIDENTIFIER_COL="32E648D8-BFBE-4419-A478-07C7AE897FC4", + SQL_VARIANT_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + TIMESTAMP_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x08w"), + ), + Row( + ID=5, + BIGINT_COL=7777777777777777, + INT_COL=777777, + SMALLINT_COL=7777, + TINYINT_COL=200, + BIT_COL=True, + DECIMAL_COL=Decimal("0.0000"), + NUMERIC_COL=Decimal("0.00"), + MONEY_COL=Decimal("-50.7500"), + SMALLMONEY_COL=Decimal("-25.2500"), + FLOAT_COL=0.0, + REAL_COL=0.0, + DATE_COL=datetime.date(2023, 5, 30), + TIME_COL=datetime.time(23, 59, 59), + DATETIME_COL=datetime.datetime(2023, 5, 31, 0, 0), + DATETIME2_COL=datetime.datetime(2023, 5, 30, 23, 59, 59), + SMALLDATETIME_COL=datetime.datetime(2023, 5, 30, 23, 59), + CHAR_COL="Final5 ", + VARCHAR_COL="Final VARCHAR 5", + VARCHAR_MAX_COL="Last record with symbols", + TEXT_COL="Final TEXT 5", + NCHAR_COL="最終5 ", + NVARCHAR_COL="最終 VARCHAR 5", + NVARCHAR_MAX_COL="Last record with mixed scripts", + NTEXT_COL="最終 TEXT 5", + BINARY_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x00\x00"), + VARBINARY_COL=bytearray(b"\xff\xff\xff\xff\xff\xff\xff\xff"), + VARBINARY_MAX_COL=bytearray( + b"\x0f\x0f\x0f\x0f\x0f\x0f\x0f\x0f\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e" + ), + IMAGE_COL=bytearray(b"\x99\x99\x99\x99\x99\x99\x99\x99"), + UNIQUEIDENTIFIER_COL="384A86A8-5DF5-4AEC-8688-4C9BCBA26592", + SQL_VARIANT_COL=bytearray(b"\x01#Eg\x89\xab\xcd\xef"), + TIMESTAMP_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x08{"), + ), + Row( + ID=6, + BIGINT_COL=None, + INT_COL=None, + SMALLINT_COL=None, + TINYINT_COL=None, + BIT_COL=None, + DECIMAL_COL=None, + NUMERIC_COL=None, + MONEY_COL=None, + SMALLMONEY_COL=None, + FLOAT_COL=None, + REAL_COL=None, + DATE_COL=None, + TIME_COL=None, + DATETIME_COL=None, + DATETIME2_COL=None, + SMALLDATETIME_COL=None, + CHAR_COL=None, + VARCHAR_COL=None, + VARCHAR_MAX_COL=None, + TEXT_COL=None, + NCHAR_COL=None, + NVARCHAR_COL=None, + NVARCHAR_MAX_COL=None, + NTEXT_COL=None, + BINARY_COL=None, + VARBINARY_COL=None, + VARBINARY_MAX_COL=None, + IMAGE_COL=None, + UNIQUEIDENTIFIER_COL=None, + SQL_VARIANT_COL=bytearray(b""), + TIMESTAMP_COL=bytearray(b"\x00\x00\x00\x00\x00\x00\x08\x7f"), + ), # TIMESTAMP_COL is system generated, NOT NULL +] + + +EXPECTED_UNICODE_TEST_DATA = [Row(編號=1, 姓名="山田太郎", 國家="日本", 備註="これはUnicodeテストです")] diff --git a/tests/unit/scala/test_utils_suite.py b/tests/unit/scala/test_utils_suite.py index 47e6164d77..a72a81c9ec 100644 --- a/tests/unit/scala/test_utils_suite.py +++ b/tests/unit/scala/test_utils_suite.py @@ -365,6 +365,7 @@ def check_zip_files_and_close_stream(input_stream, expected_files): "resources/test_data_source_dir/test_databricks_data.py", "resources/test_data_source_dir/test_jdbc_data.py", "resources/test_data_source_dir/test_mysql_data.py", + "resources/test_data_source_dir/test_pyodbc_data.py", "resources/test_debug_utils_dir/", "resources/test_debug_utils_dir/dataframe_generator1.py", "resources/test_debug_utils_dir/dataframe_generator2.py", diff --git a/tox.ini b/tox.ini index 5514898a7e..a6a9e7dab9 100644 --- a/tox.ini +++ b/tox.ini @@ -248,6 +248,7 @@ deps = oracledb psycopg2-binary pymysql + pyodbc commands = {env:SNOWFLAKE_PYTEST_CMD} -m "{env:SNOWFLAKE_TEST_TYPE}" {posargs:} tests/integ/datasource -n 8 From 0d6edd29ebb4866c2f5f5e451f54bd131443e293 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Wed, 24 Sep 2025 20:10:17 -0400 Subject: [PATCH 2/6] fix test failure --- tests/integ/test_data_source_api.py | 64 ++++++++++++++--------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/tests/integ/test_data_source_api.py b/tests/integ/test_data_source_api.py index 7de529432e..f2093064c3 100644 --- a/tests/integ/test_data_source_api.py +++ b/tests/integ/test_data_source_api.py @@ -253,10 +253,10 @@ def test_parallel(session, upper_bound, expected_upload_cnt, fetch_with_process) 15, 4, [ - "SELECT ID FROM fake_table WHERE ID < '8' OR ID is null", - "SELECT ID FROM fake_table WHERE ID >= '8' AND ID < '10'", - "SELECT ID FROM fake_table WHERE ID >= '10' AND ID < '12'", - "SELECT ID FROM fake_table WHERE ID >= '12'", + "SELECT \"ID\" FROM fake_table WHERE ID < '8' OR ID is null", + "SELECT \"ID\" FROM fake_table WHERE ID >= '8' AND ID < '10'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '10' AND ID < '12'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '12'", ], ), ( @@ -267,10 +267,10 @@ def test_parallel(session, upper_bound, expected_upload_cnt, fetch_with_process) 5, 4, [ - "SELECT ID FROM fake_table WHERE ID < '-2' OR ID is null", - "SELECT ID FROM fake_table WHERE ID >= '-2' AND ID < '0'", - "SELECT ID FROM fake_table WHERE ID >= '0' AND ID < '2'", - "SELECT ID FROM fake_table WHERE ID >= '2'", + "SELECT \"ID\" FROM fake_table WHERE ID < '-2' OR ID is null", + "SELECT \"ID\" FROM fake_table WHERE ID >= '-2' AND ID < '0'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '0' AND ID < '2'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '2'", ], ), ( @@ -281,16 +281,16 @@ def test_parallel(session, upper_bound, expected_upload_cnt, fetch_with_process) 15, 10, [ - "SELECT ID FROM fake_table WHERE ID < '6' OR ID is null", - "SELECT ID FROM fake_table WHERE ID >= '6' AND ID < '7'", - "SELECT ID FROM fake_table WHERE ID >= '7' AND ID < '8'", - "SELECT ID FROM fake_table WHERE ID >= '8' AND ID < '9'", - "SELECT ID FROM fake_table WHERE ID >= '9' AND ID < '10'", - "SELECT ID FROM fake_table WHERE ID >= '10' AND ID < '11'", - "SELECT ID FROM fake_table WHERE ID >= '11' AND ID < '12'", - "SELECT ID FROM fake_table WHERE ID >= '12' AND ID < '13'", - "SELECT ID FROM fake_table WHERE ID >= '13' AND ID < '14'", - "SELECT ID FROM fake_table WHERE ID >= '14'", + "SELECT \"ID\" FROM fake_table WHERE ID < '6' OR ID is null", + "SELECT \"ID\" FROM fake_table WHERE ID >= '6' AND ID < '7'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '7' AND ID < '8'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '8' AND ID < '9'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '9' AND ID < '10'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '10' AND ID < '11'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '11' AND ID < '12'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '12' AND ID < '13'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '13' AND ID < '14'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '14'", ], ), ( @@ -301,9 +301,9 @@ def test_parallel(session, upper_bound, expected_upload_cnt, fetch_with_process) 15, 3, [ - "SELECT ID FROM fake_table WHERE ID < '8' OR ID is null", - "SELECT ID FROM fake_table WHERE ID >= '8' AND ID < '11'", - "SELECT ID FROM fake_table WHERE ID >= '11'", + "SELECT \"ID\" FROM fake_table WHERE ID < '8' OR ID is null", + "SELECT \"ID\" FROM fake_table WHERE ID >= '8' AND ID < '11'", + "SELECT \"ID\" FROM fake_table WHERE ID >= '11'", ], ), ( @@ -314,10 +314,10 @@ def test_parallel(session, upper_bound, expected_upload_cnt, fetch_with_process) str(datetime.date(2020, 12, 15)), 4, [ - "SELECT DATE FROM fake_table WHERE DATE < '2020-07-30 18:00:00+00:00' OR DATE is null", - "SELECT DATE FROM fake_table WHERE DATE >= '2020-07-30 18:00:00+00:00' AND DATE < '2020-09-14 12:00:00+00:00'", - "SELECT DATE FROM fake_table WHERE DATE >= '2020-09-14 12:00:00+00:00' AND DATE < '2020-10-30 06:00:00+00:00'", - "SELECT DATE FROM fake_table WHERE DATE >= '2020-10-30 06:00:00+00:00'", + "SELECT \"DATE\" FROM fake_table WHERE DATE < '2020-07-30 18:00:00+00:00' OR DATE is null", + "SELECT \"DATE\" FROM fake_table WHERE DATE >= '2020-07-30 18:00:00+00:00' AND DATE < '2020-09-14 12:00:00+00:00'", + "SELECT \"DATE\" FROM fake_table WHERE DATE >= '2020-09-14 12:00:00+00:00' AND DATE < '2020-10-30 06:00:00+00:00'", + "SELECT \"DATE\" FROM fake_table WHERE DATE >= '2020-10-30 06:00:00+00:00'", ], ), ( @@ -328,10 +328,10 @@ def test_parallel(session, upper_bound, expected_upload_cnt, fetch_with_process) str(datetime.datetime(2020, 12, 15, 7, 8, 20)), 4, [ - "SELECT DATE FROM fake_table WHERE DATE < '2020-07-31 05:06:13+00:00' OR DATE is null", - "SELECT DATE FROM fake_table WHERE DATE >= '2020-07-31 05:06:13+00:00' AND DATE < '2020-09-14 21:46:55+00:00'", - "SELECT DATE FROM fake_table WHERE DATE >= '2020-09-14 21:46:55+00:00' AND DATE < '2020-10-30 14:27:37+00:00'", - "SELECT DATE FROM fake_table WHERE DATE >= '2020-10-30 14:27:37+00:00'", + "SELECT \"DATE\" FROM fake_table WHERE DATE < '2020-07-31 05:06:13+00:00' OR DATE is null", + "SELECT \"DATE\" FROM fake_table WHERE DATE >= '2020-07-31 05:06:13+00:00' AND DATE < '2020-09-14 21:46:55+00:00'", + "SELECT \"DATE\" FROM fake_table WHERE DATE >= '2020-09-14 21:46:55+00:00' AND DATE < '2020-10-30 14:27:37+00:00'", + "SELECT \"DATE\" FROM fake_table WHERE DATE >= '2020-10-30 14:27:37+00:00'", ], ), ], @@ -516,9 +516,9 @@ def test_predicates(): mock_schema.return_value = StructType([StructField("ID", IntegerType(), False)]) queries = partitioner.partitions expected_result = [ - "SELECT ID FROM fake_table WHERE id > 1 AND id <= 1000", - "SELECT ID FROM fake_table WHERE id > 1001 AND id <= 2000", - "SELECT ID FROM fake_table WHERE id > 2001", + 'SELECT "ID" FROM fake_table WHERE id > 1 AND id <= 1000', + 'SELECT "ID" FROM fake_table WHERE id > 1001 AND id <= 2000', + 'SELECT "ID" FROM fake_table WHERE id > 2001', ] assert queries == expected_result From c0176576b9d9c6b91ef0c302b2eeb4d9b2c26d8a Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 25 Sep 2025 13:03:36 -0400 Subject: [PATCH 3/6] fix tests --- .../{test_pyodbc.py => test_sql_server.py} | 187 +++++++++++++++++- ...pyodbc_data.py => test_sql_server_data.py} | 0 tests/unit/scala/test_utils_suite.py | 2 +- 3 files changed, 187 insertions(+), 2 deletions(-) rename tests/integ/datasource/{test_pyodbc.py => test_sql_server.py} (51%) rename tests/resources/test_data_source_dir/{test_pyodbc_data.py => test_sql_server_data.py} (100%) diff --git a/tests/integ/datasource/test_pyodbc.py b/tests/integ/datasource/test_sql_server.py similarity index 51% rename from tests/integ/datasource/test_pyodbc.py rename to tests/integ/datasource/test_sql_server.py index d820d55fce..4e7841f418 100644 --- a/tests/integ/datasource/test_pyodbc.py +++ b/tests/integ/datasource/test_sql_server.py @@ -4,11 +4,12 @@ import pytest +from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE from snowflake.snowpark.types import StringType from tests.parameters import SQL_SERVER_CONNECTION_PARAMETERS from tests.utils import IS_IN_STORED_PROC, Utils -from tests.resources.test_data_source_dir.test_pyodbc_data import ( +from tests.resources.test_data_source_dir.test_sql_server_data import ( SQL_SERVER_TABLE_NAME, EXPECTED_TEST_DATA, SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, @@ -17,6 +18,10 @@ SQL_SERVER_UNICODE_SCHEMA, SQL_SEVER_UNICODE_TABLE_NAME, ) +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkSQLException, +) DEPENDENCIES_PACKAGE_UNAVAILABLE = True try: @@ -188,3 +193,183 @@ def local_create_connection_sql_server(): apply_order, ignore_string_size=True, ) + + +@pytest.mark.parametrize( + "input_type, input_value, error_message, udtf_configs", + [ + ("table", "NONEXISTTABLE", "Invalid object name", None), + ("query", "SELEC ** FORM TABLE", "Incorrect syntax near", None), + ( + "table", + "NONEXISTTABLE", + "Invalid object name", + SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, + ), + ( + "query", + "SELEC ** FORM TABLE", + "Incorrect syntax near", + SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, + ), + ], +) +def test_error_case(session, input_type, input_value, error_message, udtf_configs): + # Use local connection function when udtf_configs is provided + if udtf_configs: + local_parameters = SQL_SERVER_CONNECTION_PARAMETERS.copy() + + def connection_func(): + return pyodbc.connect( + "DRIVER=" + local_parameters["DRIVER"] + ";" + "SERVER=" + local_parameters["SERVER"] + ";" + "UID=" + local_parameters["UID"] + ";" + "PWD=" + local_parameters["PWD"] + ";" + "TrustServerCertificate=" + + local_parameters["TrustServerCertificate"] + + ";" + "Encrypt=" + local_parameters["Encrypt"] + ";" + ) + + else: + connection_func = create_connection_sql_server + + # Prepare kwargs for dbapi call + dbapi_kwargs = construct_input_dict(input_type, input_value) + if udtf_configs: + dbapi_kwargs["udtf_configs"] = udtf_configs + + with pytest.raises(SnowparkDataframeReaderException, match=error_message): + session.read.dbapi(connection_func, **dbapi_kwargs) + + +@pytest.mark.parametrize( + "udtf_configs", + [ + None, + SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, + ], +) +def test_partitions_and_predicates(session, udtf_configs): + # Use local connection function when udtf_configs is provided + if udtf_configs: + local_parameters = SQL_SERVER_CONNECTION_PARAMETERS.copy() + + def connection_func(): + return pyodbc.connect( + "DRIVER=" + local_parameters["DRIVER"] + ";" + "SERVER=" + local_parameters["SERVER"] + ";" + "UID=" + local_parameters["UID"] + ";" + "PWD=" + local_parameters["PWD"] + ";" + "TrustServerCertificate=" + + local_parameters["TrustServerCertificate"] + + ";" + "Encrypt=" + local_parameters["Encrypt"] + ";" + ) + + else: + connection_func = create_connection_sql_server + + # Prepare kwargs for dbapi call + dbapi_kwargs = { + "table": SQL_SERVER_TABLE_NAME, + "column": "ID", + "num_partitions": 3, + "upper_bound": 10, + "lower_bound": 0, + } + if udtf_configs: + dbapi_kwargs["udtf_configs"] = udtf_configs + + df = session.read.dbapi(connection_func, **dbapi_kwargs) + + # Use ignore_string_size=True for UDTF scenarios like in other tests + verify_save_table_result( + session, + df, + EXPECTED_TEST_DATA, + SQL_SERVER_SCHEMA, + True, + ignore_string_size=bool(udtf_configs), + ) + + dbapi_kwargs = { + "table": SQL_SERVER_TABLE_NAME, + "predicates": ["ID < 6", "ID >= 6"], + } + if udtf_configs: + dbapi_kwargs["udtf_configs"] = udtf_configs + + df = session.read.dbapi(connection_func, **dbapi_kwargs) + + verify_save_table_result( + session, + df, + EXPECTED_TEST_DATA, + SQL_SERVER_SCHEMA, + True, + ignore_string_size=bool(udtf_configs), + ) + + +@pytest.mark.parametrize( + "udtf_configs", + [ + None, + SQL_SERVER_TEST_EXTERNAL_ACCESS_INTEGRATION, + ], +) +def test_session_init_statement(session, udtf_configs): + # Use local connection function when udtf_configs is provided + if udtf_configs: + local_parameters = SQL_SERVER_CONNECTION_PARAMETERS.copy() + + def connection_func(): + return pyodbc.connect( + "DRIVER=" + local_parameters["DRIVER"] + ";" + "SERVER=" + local_parameters["SERVER"] + ";" + "UID=" + local_parameters["UID"] + ";" + "PWD=" + local_parameters["PWD"] + ";" + "TrustServerCertificate=" + + local_parameters["TrustServerCertificate"] + + ";" + "Encrypt=" + local_parameters["Encrypt"] + ";" + ) + + else: + connection_func = create_connection_sql_server + + # here we use a statement that will fail to verify the session init statement is executed + statements = [ + "DECLARE @VAR1 INT;", + "DECLARE @VAR2 INT;", + "SET @VAR_NON_EXIST = 12345;", + ] + + # Prepare kwargs for dbapi call + dbapi_kwargs = { + "table": SQL_SERVER_TABLE_NAME, + "session_init_statement": statements, + } + if udtf_configs: + dbapi_kwargs["udtf_configs"] = udtf_configs + + with pytest.raises(SnowparkSQLException, match="Must declare the scalar variable"): + # TODO: 2362041, UDTF error experience is different from parquet ingestion + # 1. UDTF needs .collect() to trigger the error while parquet ingestion triggers on .dbapi() + # 2. error exception is different + session.read.dbapi(connection_func, **dbapi_kwargs).collect() + + +def test_pyodbc_driver_class_builder(): + from snowflake.snowpark._internal.data_source.drivers.pyodbc_driver import ( + PyodbcDriver, + ) + + driver = PyodbcDriver(create_connection_sql_server, DBMS_TYPE.SQL_SERVER_DB) + udtf_class = driver.udtf_class_builder( + fetch_size=2, + ) + ingestion = udtf_class() + results = list(ingestion.process(f"SELECT * FROM {SQL_SERVER_TABLE_NAME}")) + assert len(results) == len(EXPECTED_TEST_DATA) diff --git a/tests/resources/test_data_source_dir/test_pyodbc_data.py b/tests/resources/test_data_source_dir/test_sql_server_data.py similarity index 100% rename from tests/resources/test_data_source_dir/test_pyodbc_data.py rename to tests/resources/test_data_source_dir/test_sql_server_data.py diff --git a/tests/unit/scala/test_utils_suite.py b/tests/unit/scala/test_utils_suite.py index a72a81c9ec..62d5a2d14c 100644 --- a/tests/unit/scala/test_utils_suite.py +++ b/tests/unit/scala/test_utils_suite.py @@ -365,7 +365,7 @@ def check_zip_files_and_close_stream(input_stream, expected_files): "resources/test_data_source_dir/test_databricks_data.py", "resources/test_data_source_dir/test_jdbc_data.py", "resources/test_data_source_dir/test_mysql_data.py", - "resources/test_data_source_dir/test_pyodbc_data.py", + "resources/test_data_source_dir/test_sql_server_data.py", "resources/test_debug_utils_dir/", "resources/test_debug_utils_dir/dataframe_generator1.py", "resources/test_debug_utils_dir/dataframe_generator2.py", From 77aa250ee1b52ae40c1ff6dd95ba666fe411c6f8 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 25 Sep 2025 13:42:51 -0400 Subject: [PATCH 4/6] fix exception class --- tests/integ/datasource/test_sql_server.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integ/datasource/test_sql_server.py b/tests/integ/datasource/test_sql_server.py index 4e7841f418..9f546a8b41 100644 --- a/tests/integ/datasource/test_sql_server.py +++ b/tests/integ/datasource/test_sql_server.py @@ -20,7 +20,7 @@ ) from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - SnowparkSQLException, + SnowparkClientException, ) DEPENDENCIES_PACKAGE_UNAVAILABLE = True @@ -354,7 +354,9 @@ def connection_func(): if udtf_configs: dbapi_kwargs["udtf_configs"] = udtf_configs - with pytest.raises(SnowparkSQLException, match="Must declare the scalar variable"): + with pytest.raises( + SnowparkClientException, match="Must declare the scalar variable" + ): # TODO: 2362041, UDTF error experience is different from parquet ingestion # 1. UDTF needs .collect() to trigger the error while parquet ingestion triggers on .dbapi() # 2. error exception is different From 464dc3a107897e7f5cd3b702b95de5571212b1ae Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 25 Sep 2025 19:48:44 -0400 Subject: [PATCH 5/6] fix --- CHANGELOG.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95a89a63a4..40de5e352a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,10 +56,10 @@ - Fixed a bug that `DataFrame.limit()` fail if there is parameter binding in the executed SQL. - Added an experimental fix for a bug in schema query generation that could cause invalid sql to be genrated when using nested structured types. -- Fixed the following bugs in `DataFrameReader.dbapi`(PuPr): - - udtf ingestion does not work for `pyodbc` driver. - - query input does not work for SQL Server. - - nullability of output schema of udft ingestion is not preserved. +- Fixed multiple bugs in `DataFrameReader.dbapi` (PuPr): + - Fixed UDTF ingestion failure with `pyodbc` driver caused by unprocessed row data. + - Fixed SQL Server query input failure due to incorrect select query generation. + - Fixed UDTF ingestion not preserving column nullability in the output schema. #### Improvements From b2f4d9b1f3299ab421705d92e606b6d36837643b Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Fri, 26 Sep 2025 14:04:43 -0400 Subject: [PATCH 6/6] update fetch size --- CHANGELOG.md | 2 +- src/snowflake/snowpark/dataframe_reader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 40de5e352a..92a0502914 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,7 +63,7 @@ #### Improvements -- Improved `DataFrameReader.dbapi` (PuPr) reading performance by setting the default `fetch_size` parameter value to 10000. +- Improved `DataFrameReader.dbapi` (PuPr) reading performance by setting the default `fetch_size` parameter value to 100000. ### Snowpark pandas API Updates diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 35298627d0..f1e5ed57b4 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -1695,7 +1695,7 @@ def dbapi( num_partitions: Optional[int] = None, max_workers: Optional[int] = None, query_timeout: Optional[int] = 0, - fetch_size: Optional[int] = 10000, + fetch_size: Optional[int] = 100000, custom_schema: Optional[Union[str, StructType]] = None, predicates: Optional[List[str]] = None, session_init_statement: Optional[Union[str, List[str]]] = None,