From eb90d6fbaf555d0d7ada5286ae861488e8792c19 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Thu, 11 Sep 2025 14:03:16 -0700 Subject: [PATCH 01/15] place holder --- .../data_source/datasource_reader.py | 10 ++++++++- .../data_source/drivers/base_driver.py | 7 ++++++- .../snowpark/_internal/data_source/utils.py | 9 ++++++-- src/snowflake/snowpark/exceptions.py | 21 +++++++++++++++++++ tests/integ/datasource/test_oracledb.py | 6 ++++++ 5 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py index 507141b9fa..96bedb746b 100644 --- a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py +++ b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py @@ -9,7 +9,10 @@ from snowflake.snowpark._internal.data_source.datasource_typing import Connection from snowflake.snowpark._internal.data_source.drivers.base_driver import BaseDriver -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, +) from snowflake.snowpark.types import StructType from snowflake.connector.options import pandas as pd import logging @@ -85,6 +88,11 @@ def read(self, partition: str) -> Iterator[List[Any]]: batch = [] else: raise ValueError("fetch size cannot be smaller than 0") + except Exception as exc: + if self.driver.non_retryable_error and isinstance( + exc, self.driver.non_retryable_error + ): + raise SnowparkDataSourceNonRetryableException(exc) finally: try: cursor.close() diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py index a9de3af7b7..29a094feff 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py @@ -3,7 +3,8 @@ # from enum import Enum import datetime -from typing import List, Callable, Any, Optional, TYPE_CHECKING +from functools import cached_property +from typing import List, Callable, Any, Optional, TYPE_CHECKING, Tuple, Type from snowflake.connector.options import pandas as pd from snowflake.snowpark._internal.analyzer.analyzer_utils import unquote_if_quoted @@ -55,6 +56,10 @@ def to_snow_type(self, schema: List[Any]) -> StructType: f"{self.__class__.__name__} has not implemented to_snow_type function" ) + @cached_property + def non_retryable_error(self) -> Tuple[Type[BaseException], ...]: + return () + @staticmethod def prepare_connection( conn: "Connection", diff --git a/src/snowflake/snowpark/_internal/data_source/utils.py b/src/snowflake/snowpark/_internal/data_source/utils.py index f3fbebbd71..8a352aaebd 100644 --- a/src/snowflake/snowpark/_internal/data_source/utils.py +++ b/src/snowflake/snowpark/_internal/data_source/utils.py @@ -32,8 +32,10 @@ ) import snowflake from snowflake.snowpark._internal.data_source import DataSourceReader -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException - +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, +) logger = logging.getLogger(__name__) @@ -282,6 +284,9 @@ def _retry_run(func: Callable, *args, **kwargs) -> Any: except SnowparkDataframeReaderException: # SnowparkDataframeReaderException is a non-retryable exception raise + except SnowparkDataSourceNonRetryableException: + # SnowparkDataSourceNonRetryableException is a non-retryable exception + raise except Exception as e: last_error = e error_trace = traceback.format_exc() diff --git a/src/snowflake/snowpark/exceptions.py b/src/snowflake/snowpark/exceptions.py index 1142e9545e..5cec835b13 100644 --- a/src/snowflake/snowpark/exceptions.py +++ b/src/snowflake/snowpark/exceptions.py @@ -283,3 +283,24 @@ class SnowparkInvalidObjectNameException(SnowparkGeneralException): """ pass + + +class SnowparkDataSourceNonRetryableException(SnowparkGeneralException): + """Exception for data source non-retryable error.""" + + def __init__( + self, + error: Exception, + ) -> None: + self.error: Exception = error + + self._pretty_msg = f"{self.__class__.__name__}({self.error})" + + def __repr__(self): + return f"{self.__class__.__name__}({self.error})" + + def __str__(self): + return self._pretty_msg + + def __reduce__(self): + return (self.__class__, (self.error,)) diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 7360f8a74c..8c9515ff7a 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -248,3 +248,9 @@ def test_unsupported_type(): create_connection_oracledb, DBMS_TYPE.ORACLE_DB ).to_snow_type([MockDescription("test_col", invalid_type, 0, 0, True)]) assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) + + +def test_oracledb_non_retryable_error(session): + # input_dict = {"query": "invalid syntax", "custom_schema": oracledb_real_schema} + # session.read.dbapi(create_connection_oracledb, **input_dict).collect() + pass From fd1934894b463170864985f5c33ecde431e32485 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Thu, 11 Sep 2025 15:54:09 -0700 Subject: [PATCH 02/15] add test --- .../data_source/datasource_reader.py | 6 ++-- .../data_source/drivers/base_driver.py | 8 ++---- .../data_source/drivers/oracledb_driver.py | 24 ++++++++++++++++ src/snowflake/snowpark/dataframe_reader.py | 9 +++++- tests/integ/datasource/test_oracledb.py | 14 +++++++--- tests/integ/test_data_source_api.py | 28 ++++++++++++++++++- 6 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py index 96bedb746b..5fdd42f452 100644 --- a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py +++ b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py @@ -89,10 +89,10 @@ def read(self, partition: str) -> Iterator[List[Any]]: else: raise ValueError("fetch size cannot be smaller than 0") except Exception as exc: - if self.driver.non_retryable_error and isinstance( - exc, self.driver.non_retryable_error - ): + if self.driver.non_retryable_error_checker(exc): raise SnowparkDataSourceNonRetryableException(exc) + else: + raise finally: try: cursor.close() diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py index 29a094feff..cdd23e04bc 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py @@ -3,8 +3,7 @@ # from enum import Enum import datetime -from functools import cached_property -from typing import List, Callable, Any, Optional, TYPE_CHECKING, Tuple, Type +from typing import List, Callable, Any, Optional, TYPE_CHECKING from snowflake.connector.options import pandas as pd from snowflake.snowpark._internal.analyzer.analyzer_utils import unquote_if_quoted @@ -56,9 +55,8 @@ def to_snow_type(self, schema: List[Any]) -> StructType: f"{self.__class__.__name__} has not implemented to_snow_type function" ) - @cached_property - def non_retryable_error(self) -> Tuple[Type[BaseException], ...]: - return () + def non_retryable_error_checker(self, error: Exception) -> bool: + return False @staticmethod def prepare_connection( diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py index 11d7b9ec07..4ed344aba0 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py @@ -110,6 +110,30 @@ def prepare_connection( conn.outputtypehandler = output_type_handler return conn + def non_retryable_error_checker(self, error: Exception) -> bool: + import oracledb + + if isinstance(error, oracledb.DatabaseError): + syntax_error_codes = [ + "ORA-00900", # invalid SQL statement + "ORA-00901", # invalid CREATE command + "ORA-00904", # invalid identifier + "ORA-00905", # missing keyword + "ORA-00906", # missing left parenthesis + "ORA-00907", # missing right parenthesis + "ORA-00911", # invalid character + "ORA-00920", # invalid relational operator + "ORA-00921", # unexpected end of SQL command + "ORA-00923", # FROM keyword not found where expected + "ORA-00933", # SQL command not properly ended + "ORA-00936", # missing expression + "ORA-00942", # table or view does not exist + ] + for error_code in syntax_error_codes: + if error_code in str(error): + return True + return False + def udtf_class_builder( self, fetch_size: int = 1000, schema: StructType = None ) -> type: diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 5533ebb246..0bb18c8ac7 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -83,6 +83,7 @@ from snowflake.snowpark.exceptions import ( SnowparkSessionException, SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, ) from snowflake.snowpark.functions import sql_expr, col, concat, lit, to_file from snowflake.snowpark.mock._connection import MockServerConnection @@ -2001,7 +2002,13 @@ def create_oracledb_connection(): f"Cancelled a remaining data fetching future {future} due to error in another thread." ) - if isinstance(exc, SnowparkDataframeReaderException): + if isinstance( + exc, + ( + SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, + ), + ): raise exc raise SnowparkDataframeReaderException( diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 8c9515ff7a..dfab5a342b 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -6,7 +6,6 @@ import math import sys from collections import namedtuple - import pytest from snowflake.snowpark import Row @@ -20,6 +19,7 @@ DBMS_TYPE, ) from snowflake.snowpark.types import StructType, StructField, StringType +from snowflake.snowpark.exceptions import SnowparkDataSourceNonRetryableException from tests.parameters import ORACLEDB_CONNECTION_PARAMETERS from tests.resources.test_data_source_dir.test_data_source_data import ( OracleDBType, @@ -251,6 +251,12 @@ def test_unsupported_type(): def test_oracledb_non_retryable_error(session): - # input_dict = {"query": "invalid syntax", "custom_schema": oracledb_real_schema} - # session.read.dbapi(create_connection_oracledb, **input_dict).collect() - pass + with pytest.raises( + SnowparkDataSourceNonRetryableException, + match="ORA-00920: invalid relational operator", + ): + session.read.dbapi( + create_connection_oracledb, + table=ORACLEDB_TABLE_NAME, + predicates=["invalid syntax"], + ).collect() diff --git a/tests/integ/test_data_source_api.py b/tests/integ/test_data_source_api.py index 7de529432e..a63a6009f3 100644 --- a/tests/integ/test_data_source_api.py +++ b/tests/integ/test_data_source_api.py @@ -51,7 +51,10 @@ random_name_for_temp_object, ) from snowflake.snowpark.dataframe_reader import _MAX_RETRY_TIME -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, +) from snowflake.snowpark.types import ( StructType, StructField, @@ -210,6 +213,29 @@ def test_dbapi_retry(session, fetch_with_process): assert mock_task.call_count == _MAX_RETRY_TIME +@pytest.mark.parametrize("fetch_with_process", [True, False]) +def test_dbapi_non_retryable_error(session, fetch_with_process): + with mock.patch( + "snowflake.snowpark._internal.data_source.utils._task_fetch_data_from_source", + side_effect=SnowparkDataSourceNonRetryableException(Exception("mock error")), + ) as mock_task: + mock_task.__name__ = "_task_fetch_from_data_source" + parquet_queue = multiprocessing.Queue() if fetch_with_process else queue.Queue() + with pytest.raises(SnowparkDataSourceNonRetryableException, match="mock error"): + _task_fetch_data_from_source_with_retry( + worker=DataSourceReader( + PyodbcDriver, + sql_server_create_connection, + StructType([StructField("col1", IntegerType(), False)]), + DBMS_TYPE.SQL_SERVER_DB, + ), + partition="SELECT * FROM test_table", + partition_idx=0, + parquet_queue=parquet_queue, + ) + assert mock_task.call_count == 1 + + @pytest.mark.skipif( IS_WINDOWS, reason="sqlite3 file can not be shared across processes on windows", From ff5734ae4507bc8c235eaab0e1251bf159c8d260 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Thu, 11 Sep 2025 16:24:48 -0700 Subject: [PATCH 03/15] add test --- .../data_source/drivers/databricks_driver.py | 12 ++++++++++++ .../data_source/drivers/psycopg2_driver.py | 12 ++++++++++++ .../data_source/drivers/pymsql_driver.py | 12 ++++++++++++ tests/integ/datasource/test_databricks.py | 17 ++++++++++++++++- tests/integ/datasource/test_mysql.py | 13 +++++++++++++ tests/integ/datasource/test_postgres.py | 17 ++++++++++++++++- 6 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py index 226b45236d..34dd6bc4cc 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py @@ -68,6 +68,18 @@ def to_snow_type(self, schema: List[Any]) -> StructType: all_columns.append(StructField(column_name, data_type, True)) return StructType(all_columns) + def non_retryable_error_checker(self, error: Exception) -> bool: + import databricks.sql + + if isinstance(error, databricks.sql.ServerOperationError): + syntax_error_codes = [ + "PARSE_SYNTAX_ERROR", # syntax error + ] + for error_code in syntax_error_codes: + if error_code in str(error): + return True + return False + def udtf_class_builder( self, fetch_size: int = 1000, schema: StructType = None ) -> type: diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py index 8bfa734f92..71bcd52a4f 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py @@ -211,6 +211,18 @@ def to_snow_type(self, schema: List[Any]) -> StructType: fields.append(StructField(name, data_type, True)) return StructType(fields) + def non_retryable_error_checker(self, error: Exception) -> bool: + import psycopg2 + + if isinstance(error, psycopg2.errors.SyntaxError): + syntax_error_codes = [ + "42601", # syntax error + ] + for error_code in syntax_error_codes: + if error_code == str(error.pgcode): + return True + return False + @staticmethod def to_result_snowpark_df( session: "Session", table_name, schema, _emit_ast: bool = True diff --git a/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py b/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py index 268a9145ae..7bfc1f9709 100644 --- a/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py +++ b/src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py @@ -183,6 +183,18 @@ def to_snow_type(self, schema: List[Any]) -> StructType: fields.append(StructField(name, data_type, null_ok)) return StructType(fields) + def non_retryable_error_checker(self, error: Exception) -> bool: + import pymysql + + if isinstance(error, pymysql.err.ProgrammingError): + syntax_error_codes = [ + "1064", # syntax error + ] + for error_code in syntax_error_codes: + if error_code in str(error): + return True + return False + def udtf_class_builder( self, fetch_size: int = 1000, schema: StructType = None ) -> type: diff --git a/tests/integ/datasource/test_databricks.py b/tests/integ/datasource/test_databricks.py index 954dec122d..616af8a435 100644 --- a/tests/integ/datasource/test_databricks.py +++ b/tests/integ/datasource/test_databricks.py @@ -17,7 +17,10 @@ random_name_for_temp_object, TempObjectType, ) -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, +) from snowflake.snowpark.types import ( StructType, StructField, @@ -258,3 +261,15 @@ def test_unsupported_type(): create_databricks_connection, DBMS_TYPE.DATABRICKS_DB ).to_snow_type([("test_col", "unsupported_type", True)]) assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) + + +def test_oracledb_non_retryable_error(session): + with pytest.raises( + SnowparkDataSourceNonRetryableException, + match="PARSE_SYNTAX_ERROR", + ): + session.read.dbapi( + create_databricks_connection, + table=TEST_TABLE_NAME, + predicates=["invalid syntax"], + ).collect() diff --git a/tests/integ/datasource/test_mysql.py b/tests/integ/datasource/test_mysql.py index 6878437a68..d761672a01 100644 --- a/tests/integ/datasource/test_mysql.py +++ b/tests/integ/datasource/test_mysql.py @@ -14,6 +14,7 @@ PymysqlTypeCode, ) from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE +from snowflake.snowpark.exceptions import SnowparkDataSourceNonRetryableException from snowflake.snowpark.types import StructType, StructField, StringType from tests.resources.test_data_source_dir.test_mysql_data import ( mysql_real_data, @@ -297,3 +298,15 @@ def test_unsupported_type(): [("test_col", "unsupported_type", None, None, 0, 0, True)] ) assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) + + +def test_oracledb_non_retryable_error(session): + with pytest.raises( + SnowparkDataSourceNonRetryableException, + match="You have an error in your SQL syntax", + ): + session.read.dbapi( + create_connection_mysql, + table=TEST_TABLE_NAME, + predicates=["invalid syntax"], + ).collect() diff --git a/tests/integ/datasource/test_postgres.py b/tests/integ/datasource/test_postgres.py index 82e1b13a8f..ebdbbd5d43 100644 --- a/tests/integ/datasource/test_postgres.py +++ b/tests/integ/datasource/test_postgres.py @@ -11,7 +11,10 @@ Psycopg2TypeCode, ) from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE -from snowflake.snowpark.exceptions import SnowparkDataframeReaderException +from snowflake.snowpark.exceptions import ( + SnowparkDataframeReaderException, + SnowparkDataSourceNonRetryableException, +) from snowflake.snowpark.types import ( DecimalType, BinaryType, @@ -481,3 +484,15 @@ def test_server_side_cursor(session): assert cursor.name is not None # Server-side cursor should have a name cursor.close() conn.close() + + +def test_oracledb_non_retryable_error(session): + with pytest.raises( + SnowparkDataSourceNonRetryableException, + match="syntax error", + ): + session.read.dbapi( + create_postgres_connection, + table=POSTGRES_TABLE_NAME, + predicates=["invalid syntax"], + ).collect() From 08cb331ada73cbbd203a72b9ced59fccb5b7575e Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Fri, 12 Sep 2025 09:48:40 -0700 Subject: [PATCH 04/15] fix test name --- tests/integ/datasource/test_databricks.py | 2 +- tests/integ/datasource/test_mysql.py | 2 +- tests/integ/datasource/test_postgres.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integ/datasource/test_databricks.py b/tests/integ/datasource/test_databricks.py index 616af8a435..1f8c2a02af 100644 --- a/tests/integ/datasource/test_databricks.py +++ b/tests/integ/datasource/test_databricks.py @@ -263,7 +263,7 @@ def test_unsupported_type(): assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) -def test_oracledb_non_retryable_error(session): +def test_databricks_non_retryable_error(session): with pytest.raises( SnowparkDataSourceNonRetryableException, match="PARSE_SYNTAX_ERROR", diff --git a/tests/integ/datasource/test_mysql.py b/tests/integ/datasource/test_mysql.py index d761672a01..bec3335632 100644 --- a/tests/integ/datasource/test_mysql.py +++ b/tests/integ/datasource/test_mysql.py @@ -300,7 +300,7 @@ def test_unsupported_type(): assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) -def test_oracledb_non_retryable_error(session): +def test_mysql_non_retryable_error(session): with pytest.raises( SnowparkDataSourceNonRetryableException, match="You have an error in your SQL syntax", diff --git a/tests/integ/datasource/test_postgres.py b/tests/integ/datasource/test_postgres.py index ebdbbd5d43..6d0f6b5ecb 100644 --- a/tests/integ/datasource/test_postgres.py +++ b/tests/integ/datasource/test_postgres.py @@ -486,7 +486,7 @@ def test_server_side_cursor(session): conn.close() -def test_oracledb_non_retryable_error(session): +def test_postgres_non_retryable_error(session): with pytest.raises( SnowparkDataSourceNonRetryableException, match="syntax error", From 38cc17261bbf23473ef8b6ccb5708eabfedee066 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Mon, 22 Sep 2025 17:10:49 -0700 Subject: [PATCH 05/15] fix merge error --- tests/integ/datasource/test_mysql.py | 7 ++++--- tests/integ/datasource/test_oracledb.py | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integ/datasource/test_mysql.py b/tests/integ/datasource/test_mysql.py index 95a1240cdd..eda747595e 100644 --- a/tests/integ/datasource/test_mysql.py +++ b/tests/integ/datasource/test_mysql.py @@ -310,18 +310,19 @@ def test_mysql_non_retryable_error(session): with pytest.raises( SnowparkDataSourceNonRetryableException, match="You have an error in your SQL syntax", - ): + ): session.read.dbapi( create_connection_mysql, table=TEST_TABLE_NAME, predicates=["invalid syntax"], - ) + ) + - def test_session_init(session): with pytest.raises( SnowparkDataframeReaderException, match="Mock error to test init_statement", + ): session.read.dbapi( create_connection_mysql, table=TEST_TABLE_NAME, diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 84c5d801e2..629205f180 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -6,6 +6,7 @@ import math import sys from collections import namedtuple + import pytest from snowflake.snowpark import Row From 2289c94d2edf8d9ec3392b05db43eabe0bb0e5e6 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Tue, 23 Sep 2025 10:15:36 -0700 Subject: [PATCH 06/15] fix lint and changelog --- CHANGELOG.md | 2 +- tests/integ/datasource/test_databricks.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0740db7fd..436d338d7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,7 +67,7 @@ - Hybrid execution mode is now enabled by default. Certain operations on smaller data will now automatically execute in native pandas in-memory. Use `from modin.config import AutoSwitchBackend; AutoSwitchBackend.disable()` to turn this off and force all execution to occur in Snowflake. - Added a session parameter `pandas_hybrid_execution_enabled` to enable/disable hybrid execution as an alternative to using `AutoSwitchBackend`. - Removed an unnecessary `SHOW OBJECTS` query issued from `read_snowflake` under certain conditions. - +- Add non-retryable failure in `DataFrameReader.dbapi`(PuPr), such as syntax error in external data source SQL. ## 1.39.0 (2025-09-17) ### Snowpark Python API Updates diff --git a/tests/integ/datasource/test_databricks.py b/tests/integ/datasource/test_databricks.py index 6b277faa19..f3e179c185 100644 --- a/tests/integ/datasource/test_databricks.py +++ b/tests/integ/datasource/test_databricks.py @@ -266,17 +266,16 @@ def test_unsupported_type(): assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)]) - def test_databricks_non_retryable_error(session): with pytest.raises( SnowparkDataSourceNonRetryableException, match="PARSE_SYNTAX_ERROR", - ): + ): session.read.dbapi( create_databricks_connection, table=TEST_TABLE_NAME, predicates=["invalid syntax"], - ) + ) def test_session_init(session): From e7023c72364a5256b4f4038782e7e2dda49553c8 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Tue, 23 Sep 2025 11:07:57 -0700 Subject: [PATCH 07/15] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 436d338d7e..ba77a1fb88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ - Added a session parameter `pandas_hybrid_execution_enabled` to enable/disable hybrid execution as an alternative to using `AutoSwitchBackend`. - Removed an unnecessary `SHOW OBJECTS` query issued from `read_snowflake` under certain conditions. - Add non-retryable failure in `DataFrameReader.dbapi`(PuPr), such as syntax error in external data source SQL. + ## 1.39.0 (2025-09-17) ### Snowpark Python API Updates From 355d5964443400e295df97377ba4690b8fe6129a Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Tue, 23 Sep 2025 14:44:22 -0700 Subject: [PATCH 08/15] fix test --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 5514898a7e..4e4fcdb1c9 100644 --- a/tox.ini +++ b/tox.ini @@ -244,6 +244,7 @@ commands = python -m pytest tests/integ/modin/test_old_numpy_aliases.py description = run the datasource tests that requires DBMS drivers deps = {[testenv]deps} + .[pandas] databricks-sql-connector oracledb psycopg2-binary From 083dce7c2322b6726f34f55858df6cf26403fad0 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Tue, 23 Sep 2025 15:34:21 -0700 Subject: [PATCH 09/15] fix test --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 4e4fcdb1c9..46f874be60 100644 --- a/tox.ini +++ b/tox.ini @@ -243,7 +243,7 @@ commands = python -m pytest tests/integ/modin/test_old_numpy_aliases.py [testenv:datasource] description = run the datasource tests that requires DBMS drivers deps = - {[testenv]deps} + .[development] .[pandas] databricks-sql-connector oracledb From 5bad3718b716d331a1fa98fe958a7f9bf4bf6dfa Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Tue, 23 Sep 2025 16:43:02 -0700 Subject: [PATCH 10/15] fix --- tox.ini | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tox.ini b/tox.ini index 46f874be60..418f5c4dd5 100644 --- a/tox.ini +++ b/tox.ini @@ -243,9 +243,8 @@ commands = python -m pytest tests/integ/modin/test_old_numpy_aliases.py [testenv:datasource] description = run the datasource tests that requires DBMS drivers deps = - .[development] - .[pandas] - databricks-sql-connector + {[testenv]deps} + databricks-sql-connector <= 4.0.3 oracledb psycopg2-binary pymysql From d8e3d4a6f639d3450b603d67063ebbe4660fe6ab Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Tue, 23 Sep 2025 16:46:18 -0700 Subject: [PATCH 11/15] fix --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 418f5c4dd5..90bfab8cb7 100644 --- a/tox.ini +++ b/tox.ini @@ -244,7 +244,7 @@ commands = python -m pytest tests/integ/modin/test_old_numpy_aliases.py description = run the datasource tests that requires DBMS drivers deps = {[testenv]deps} - databricks-sql-connector <= 4.0.3 + databricks-sql-connector > 4.0.0 oracledb psycopg2-binary pymysql From f3ba610488b4e667328362bd8ac47ede5e34a667 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Wed, 24 Sep 2025 13:54:59 -0700 Subject: [PATCH 12/15] rename --- CHANGELOG.md | 4 +++- .../snowpark/_internal/data_source/datasource_reader.py | 4 ++-- src/snowflake/snowpark/_internal/data_source/utils.py | 4 ++-- src/snowflake/snowpark/dataframe_reader.py | 4 ++-- src/snowflake/snowpark/exceptions.py | 2 +- tests/integ/datasource/test_databricks.py | 4 ++-- tests/integ/datasource/test_mysql.py | 4 ++-- tests/integ/datasource/test_oracledb.py | 4 ++-- tests/integ/datasource/test_postgres.py | 4 ++-- tests/integ/test_data_source_api.py | 8 +++++--- 10 files changed, 23 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba77a1fb88..3705f777d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,9 @@ - `nvl2` - `regr_valx` +#### Improvements + +- Improve `DataFrameReader.dbapi`(PuPr) that dbapi will not retry on non-retryable error such as SQL syntax error on external data source query. ### Snowpark pandas API Updates @@ -67,7 +70,6 @@ - Hybrid execution mode is now enabled by default. Certain operations on smaller data will now automatically execute in native pandas in-memory. Use `from modin.config import AutoSwitchBackend; AutoSwitchBackend.disable()` to turn this off and force all execution to occur in Snowflake. - Added a session parameter `pandas_hybrid_execution_enabled` to enable/disable hybrid execution as an alternative to using `AutoSwitchBackend`. - Removed an unnecessary `SHOW OBJECTS` query issued from `read_snowflake` under certain conditions. -- Add non-retryable failure in `DataFrameReader.dbapi`(PuPr), such as syntax error in external data source SQL. ## 1.39.0 (2025-09-17) diff --git a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py index 02a6f62e22..43ad78f422 100644 --- a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py +++ b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py @@ -11,7 +11,7 @@ from snowflake.snowpark._internal.data_source.drivers.base_driver import BaseDriver from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, ) from snowflake.snowpark.types import StructType from snowflake.connector.options import pandas as pd @@ -90,7 +90,7 @@ def read(self, partition: str) -> Iterator[List[Any]]: raise ValueError("fetch size cannot be smaller than 0") except Exception as exc: if self.driver.non_retryable_error_checker(exc): - raise SnowparkDataSourceNonRetryableException(exc) + raise _SnowparkDataSourceNonRetryableException(exc) else: raise finally: diff --git a/src/snowflake/snowpark/_internal/data_source/utils.py b/src/snowflake/snowpark/_internal/data_source/utils.py index 8a352aaebd..48fd56c494 100644 --- a/src/snowflake/snowpark/_internal/data_source/utils.py +++ b/src/snowflake/snowpark/_internal/data_source/utils.py @@ -34,7 +34,7 @@ from snowflake.snowpark._internal.data_source import DataSourceReader from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, ) logger = logging.getLogger(__name__) @@ -284,7 +284,7 @@ def _retry_run(func: Callable, *args, **kwargs) -> Any: except SnowparkDataframeReaderException: # SnowparkDataframeReaderException is a non-retryable exception raise - except SnowparkDataSourceNonRetryableException: + except _SnowparkDataSourceNonRetryableException: # SnowparkDataSourceNonRetryableException is a non-retryable exception raise except Exception as e: diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index ebd5ff32cc..0416076598 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -83,7 +83,7 @@ from snowflake.snowpark.exceptions import ( SnowparkSessionException, SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, ) from snowflake.snowpark.functions import sql_expr, col, concat, lit, to_file from snowflake.snowpark.mock._connection import MockServerConnection @@ -2008,7 +2008,7 @@ def create_oracledb_connection(): exc, ( SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, ), ): raise exc diff --git a/src/snowflake/snowpark/exceptions.py b/src/snowflake/snowpark/exceptions.py index 5cec835b13..c8df344180 100644 --- a/src/snowflake/snowpark/exceptions.py +++ b/src/snowflake/snowpark/exceptions.py @@ -285,7 +285,7 @@ class SnowparkInvalidObjectNameException(SnowparkGeneralException): pass -class SnowparkDataSourceNonRetryableException(SnowparkGeneralException): +class _SnowparkDataSourceNonRetryableException(SnowparkGeneralException): """Exception for data source non-retryable error.""" def __init__( diff --git a/tests/integ/datasource/test_databricks.py b/tests/integ/datasource/test_databricks.py index f3e179c185..80e6614aa5 100644 --- a/tests/integ/datasource/test_databricks.py +++ b/tests/integ/datasource/test_databricks.py @@ -19,7 +19,7 @@ ) from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, SnowparkSQLException, ) from snowflake.snowpark.types import ( @@ -268,7 +268,7 @@ def test_unsupported_type(): def test_databricks_non_retryable_error(session): with pytest.raises( - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, match="PARSE_SYNTAX_ERROR", ): session.read.dbapi( diff --git a/tests/integ/datasource/test_mysql.py b/tests/integ/datasource/test_mysql.py index eda747595e..ded75e0474 100644 --- a/tests/integ/datasource/test_mysql.py +++ b/tests/integ/datasource/test_mysql.py @@ -14,7 +14,7 @@ PymysqlTypeCode, ) from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE -from snowflake.snowpark.exceptions import SnowparkDataSourceNonRetryableException +from snowflake.snowpark.exceptions import _SnowparkDataSourceNonRetryableException from snowflake.snowpark.types import StructType, StructField, StringType from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, @@ -308,7 +308,7 @@ def test_unsupported_type(): def test_mysql_non_retryable_error(session): with pytest.raises( - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, match="You have an error in your SQL syntax", ): session.read.dbapi( diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 629205f180..3734757760 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -21,7 +21,7 @@ ) from snowflake.snowpark.types import StructType, StructField, StringType from snowflake.snowpark.exceptions import ( - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, SnowparkDataframeReaderException, SnowparkSQLException, ) @@ -257,7 +257,7 @@ def test_unsupported_type(): def test_oracledb_non_retryable_error(session): with pytest.raises( - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, match="ORA-00920: invalid relational operator", ): session.read.dbapi( diff --git a/tests/integ/datasource/test_postgres.py b/tests/integ/datasource/test_postgres.py index 36a5012d33..a4d2f4f4ba 100644 --- a/tests/integ/datasource/test_postgres.py +++ b/tests/integ/datasource/test_postgres.py @@ -13,7 +13,7 @@ from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, SnowparkSQLException, ) from snowflake.snowpark.types import ( @@ -512,7 +512,7 @@ def test_server_side_cursor(session): def test_postgres_non_retryable_error(session): with pytest.raises( - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, match="syntax error", ): session.read.dbapi( diff --git a/tests/integ/test_data_source_api.py b/tests/integ/test_data_source_api.py index 4f6b54f28a..26c3133282 100644 --- a/tests/integ/test_data_source_api.py +++ b/tests/integ/test_data_source_api.py @@ -53,7 +53,7 @@ from snowflake.snowpark.dataframe_reader import _MAX_RETRY_TIME from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - SnowparkDataSourceNonRetryableException, + _SnowparkDataSourceNonRetryableException, ) from snowflake.snowpark.types import ( StructType, @@ -217,11 +217,13 @@ def test_dbapi_retry(session, fetch_with_process): def test_dbapi_non_retryable_error(session, fetch_with_process): with mock.patch( "snowflake.snowpark._internal.data_source.utils._task_fetch_data_from_source", - side_effect=SnowparkDataSourceNonRetryableException(Exception("mock error")), + side_effect=_SnowparkDataSourceNonRetryableException(Exception("mock error")), ) as mock_task: mock_task.__name__ = "_task_fetch_from_data_source" parquet_queue = multiprocessing.Queue() if fetch_with_process else queue.Queue() - with pytest.raises(SnowparkDataSourceNonRetryableException, match="mock error"): + with pytest.raises( + _SnowparkDataSourceNonRetryableException, match="mock error" + ): _task_fetch_data_from_source_with_retry( worker=DataSourceReader( PyodbcDriver, From 947894cfb01ca2779b3d65e33a24fd0722ba92e6 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Fri, 26 Sep 2025 12:12:32 -0700 Subject: [PATCH 13/15] reuse exsiting error --- .../data_source/datasource_reader.py | 7 ++----- .../snowpark/_internal/data_source/utils.py | 8 +------ src/snowflake/snowpark/exceptions.py | 21 ------------------- tests/integ/datasource/test_databricks.py | 3 +-- tests/integ/datasource/test_mysql.py | 3 +-- tests/integ/datasource/test_oracledb.py | 3 +-- tests/integ/datasource/test_postgres.py | 3 +-- 7 files changed, 7 insertions(+), 41 deletions(-) diff --git a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py index 43ad78f422..24f1eefb0d 100644 --- a/src/snowflake/snowpark/_internal/data_source/datasource_reader.py +++ b/src/snowflake/snowpark/_internal/data_source/datasource_reader.py @@ -9,10 +9,7 @@ from snowflake.snowpark._internal.data_source.datasource_typing import Connection from snowflake.snowpark._internal.data_source.drivers.base_driver import BaseDriver -from snowflake.snowpark.exceptions import ( - SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, -) +from snowflake.snowpark.exceptions import SnowparkDataframeReaderException from snowflake.snowpark.types import StructType from snowflake.connector.options import pandas as pd import logging @@ -90,7 +87,7 @@ def read(self, partition: str) -> Iterator[List[Any]]: raise ValueError("fetch size cannot be smaller than 0") except Exception as exc: if self.driver.non_retryable_error_checker(exc): - raise _SnowparkDataSourceNonRetryableException(exc) + raise SnowparkDataframeReaderException(message=str(exc)) else: raise finally: diff --git a/src/snowflake/snowpark/_internal/data_source/utils.py b/src/snowflake/snowpark/_internal/data_source/utils.py index 48fd56c494..3ad319343d 100644 --- a/src/snowflake/snowpark/_internal/data_source/utils.py +++ b/src/snowflake/snowpark/_internal/data_source/utils.py @@ -32,10 +32,7 @@ ) import snowflake from snowflake.snowpark._internal.data_source import DataSourceReader -from snowflake.snowpark.exceptions import ( - SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, -) +from snowflake.snowpark.exceptions import SnowparkDataframeReaderException logger = logging.getLogger(__name__) @@ -284,9 +281,6 @@ def _retry_run(func: Callable, *args, **kwargs) -> Any: except SnowparkDataframeReaderException: # SnowparkDataframeReaderException is a non-retryable exception raise - except _SnowparkDataSourceNonRetryableException: - # SnowparkDataSourceNonRetryableException is a non-retryable exception - raise except Exception as e: last_error = e error_trace = traceback.format_exc() diff --git a/src/snowflake/snowpark/exceptions.py b/src/snowflake/snowpark/exceptions.py index c8df344180..1142e9545e 100644 --- a/src/snowflake/snowpark/exceptions.py +++ b/src/snowflake/snowpark/exceptions.py @@ -283,24 +283,3 @@ class SnowparkInvalidObjectNameException(SnowparkGeneralException): """ pass - - -class _SnowparkDataSourceNonRetryableException(SnowparkGeneralException): - """Exception for data source non-retryable error.""" - - def __init__( - self, - error: Exception, - ) -> None: - self.error: Exception = error - - self._pretty_msg = f"{self.__class__.__name__}({self.error})" - - def __repr__(self): - return f"{self.__class__.__name__}({self.error})" - - def __str__(self): - return self._pretty_msg - - def __reduce__(self): - return (self.__class__, (self.error,)) diff --git a/tests/integ/datasource/test_databricks.py b/tests/integ/datasource/test_databricks.py index 80e6614aa5..9c643322da 100644 --- a/tests/integ/datasource/test_databricks.py +++ b/tests/integ/datasource/test_databricks.py @@ -19,7 +19,6 @@ ) from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, SnowparkSQLException, ) from snowflake.snowpark.types import ( @@ -268,7 +267,7 @@ def test_unsupported_type(): def test_databricks_non_retryable_error(session): with pytest.raises( - _SnowparkDataSourceNonRetryableException, + SnowparkDataframeReaderException, match="PARSE_SYNTAX_ERROR", ): session.read.dbapi( diff --git a/tests/integ/datasource/test_mysql.py b/tests/integ/datasource/test_mysql.py index ded75e0474..1e1a927ea3 100644 --- a/tests/integ/datasource/test_mysql.py +++ b/tests/integ/datasource/test_mysql.py @@ -14,7 +14,6 @@ PymysqlTypeCode, ) from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE -from snowflake.snowpark.exceptions import _SnowparkDataSourceNonRetryableException from snowflake.snowpark.types import StructType, StructField, StringType from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, @@ -308,7 +307,7 @@ def test_unsupported_type(): def test_mysql_non_retryable_error(session): with pytest.raises( - _SnowparkDataSourceNonRetryableException, + SnowparkDataframeReaderException, match="You have an error in your SQL syntax", ): session.read.dbapi( diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 3734757760..453515e3d4 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -21,7 +21,6 @@ ) from snowflake.snowpark.types import StructType, StructField, StringType from snowflake.snowpark.exceptions import ( - _SnowparkDataSourceNonRetryableException, SnowparkDataframeReaderException, SnowparkSQLException, ) @@ -257,7 +256,7 @@ def test_unsupported_type(): def test_oracledb_non_retryable_error(session): with pytest.raises( - _SnowparkDataSourceNonRetryableException, + SnowparkDataframeReaderException, match="ORA-00920: invalid relational operator", ): session.read.dbapi( diff --git a/tests/integ/datasource/test_postgres.py b/tests/integ/datasource/test_postgres.py index a4d2f4f4ba..464ae7d1f0 100644 --- a/tests/integ/datasource/test_postgres.py +++ b/tests/integ/datasource/test_postgres.py @@ -13,7 +13,6 @@ from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE from snowflake.snowpark.exceptions import ( SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, SnowparkSQLException, ) from snowflake.snowpark.types import ( @@ -512,7 +511,7 @@ def test_server_side_cursor(session): def test_postgres_non_retryable_error(session): with pytest.raises( - _SnowparkDataSourceNonRetryableException, + SnowparkDataframeReaderException, match="syntax error", ): session.read.dbapi( From dea1369d40b3f2bf6d6d83a76e328b066a2ec034 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Fri, 26 Sep 2025 12:14:15 -0700 Subject: [PATCH 14/15] revert chaneg --- src/snowflake/snowpark/_internal/data_source/utils.py | 1 + src/snowflake/snowpark/dataframe_reader.py | 9 +-------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/snowflake/snowpark/_internal/data_source/utils.py b/src/snowflake/snowpark/_internal/data_source/utils.py index 3ad319343d..f3fbebbd71 100644 --- a/src/snowflake/snowpark/_internal/data_source/utils.py +++ b/src/snowflake/snowpark/_internal/data_source/utils.py @@ -34,6 +34,7 @@ from snowflake.snowpark._internal.data_source import DataSourceReader from snowflake.snowpark.exceptions import SnowparkDataframeReaderException + logger = logging.getLogger(__name__) _MAX_RETRY_TIME = 3 diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 0416076598..d376751d8b 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -83,7 +83,6 @@ from snowflake.snowpark.exceptions import ( SnowparkSessionException, SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, ) from snowflake.snowpark.functions import sql_expr, col, concat, lit, to_file from snowflake.snowpark.mock._connection import MockServerConnection @@ -2004,13 +2003,7 @@ def create_oracledb_connection(): f"Cancelled a remaining data fetching future {future} due to error in another thread." ) - if isinstance( - exc, - ( - SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, - ), - ): + if isinstance(exc, SnowparkDataframeReaderException): raise exc raise SnowparkDataframeReaderException( From f969cd6b5d6f82740050e29081eb72bccc8e1d09 Mon Sep 17 00:00:00 2001 From: Yuyang Wang Date: Mon, 29 Sep 2025 11:06:04 -0700 Subject: [PATCH 15/15] remove deleted exceotion --- tests/integ/test_data_source_api.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tests/integ/test_data_source_api.py b/tests/integ/test_data_source_api.py index cd96db74c3..7ec189ac96 100644 --- a/tests/integ/test_data_source_api.py +++ b/tests/integ/test_data_source_api.py @@ -51,10 +51,7 @@ random_name_for_temp_object, ) from snowflake.snowpark.dataframe_reader import _MAX_RETRY_TIME -from snowflake.snowpark.exceptions import ( - SnowparkDataframeReaderException, - _SnowparkDataSourceNonRetryableException, -) +from snowflake.snowpark.exceptions import SnowparkDataframeReaderException from snowflake.snowpark.types import ( StructType, StructField, @@ -217,13 +214,11 @@ def test_dbapi_retry(session, fetch_with_process): def test_dbapi_non_retryable_error(session, fetch_with_process): with mock.patch( "snowflake.snowpark._internal.data_source.utils._task_fetch_data_from_source", - side_effect=_SnowparkDataSourceNonRetryableException(Exception("mock error")), + side_effect=SnowparkDataframeReaderException("mock error"), ) as mock_task: mock_task.__name__ = "_task_fetch_from_data_source" parquet_queue = multiprocessing.Queue() if fetch_with_process else queue.Queue() - with pytest.raises( - _SnowparkDataSourceNonRetryableException, match="mock error" - ): + with pytest.raises(SnowparkDataframeReaderException, match="mock error"): _task_fetch_data_from_source_with_retry( worker=DataSourceReader( PyodbcDriver,