From 642bc49bd0c60a49a9135ea8c3443174928b57df Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Thu, 25 Sep 2025 19:29:08 -0400 Subject: [PATCH] fix --- CHANGELOG.md | 3 ++ .../snowpark/_internal/data_source/utils.py | 3 ++ tests/integ/datasource/test_oracledb.py | 30 +++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fb54cc5c6..1824861fa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,9 @@ - `nvl2` - `regr_valx` +#### Bug Fixes + +- Fixed a bug that caused `DataFrameReader.dbapi` (PuPr) to hang during parquet data ingestion in multithreading mode and a data fetching error occurred. ### Snowpark pandas API Updates diff --git a/src/snowflake/snowpark/_internal/data_source/utils.py b/src/snowflake/snowpark/_internal/data_source/utils.py index f3fbebbd71..58dc17bcaa 100644 --- a/src/snowflake/snowpark/_internal/data_source/utils.py +++ b/src/snowflake/snowpark/_internal/data_source/utils.py @@ -308,6 +308,9 @@ def worker_process( ): """Worker process that fetches data from multiple partitions""" while True: + if stop_event and stop_event.is_set(): + # other worker has set the stop event signalling me to stop, exit gracefully + break try: # Get item from queue with timeout partition_idx, query = partition_queue.get(timeout=1.0) diff --git a/tests/integ/datasource/test_oracledb.py b/tests/integ/datasource/test_oracledb.py index 6b5178590c..ea8457b942 100644 --- a/tests/integ/datasource/test_oracledb.py +++ b/tests/integ/datasource/test_oracledb.py @@ -6,6 +6,7 @@ import math import sys from collections import namedtuple +from unittest.mock import patch import pytest @@ -334,3 +335,32 @@ def test_oracledb_driver_udtf_class_builder(): # Verify we got data with the right structure (2 columns) assert len(column_result_rows) > 0 assert len(column_result_rows[0]) == 2 # Two columns + + +def test_dbapi_no_hang_on_exit_when_worker_error(session): + """ + Test that the dbapi reader does not hang on exit when a worker raises an error + + Ideally the test should be put in test_data_source_api.py, + however, reproducing using SQLite is hard to achieve while pure mocking gets the test code too complex. + Hence, we use Oracledb here which can repro the issue reliably without the fix. + """ + with patch( + "snowflake.snowpark._internal.data_source.drivers.base_driver.BaseDriver.data_source_data_to_pandas_df" + ) as mock_data_source_data_to_pandas_df: + # Mock the data_source_data_to_pandas_df method to raise RuntimeError + mock_data_source_data_to_pandas_df.side_effect = RuntimeError( + "conversion error" + ) + + # Expect the dbapi call to raise a SnowparkDataframeReaderException due to the worker error + with pytest.raises(SnowparkDataframeReaderException, match="conversion error"): + session.read.dbapi( + create_connection_oracledb, + table=ORACLEDB_TABLE_NAME, + column="ID", + lower_bound=0, + upper_bound=100, + num_partitions=10, + max_workers=2, + )