File tree Expand file tree Collapse file tree
src/snowflake/snowpark/_internal/data_source Expand file tree Collapse file tree Original file line number Diff line number Diff line change 8383 - Fixed UDTF ingestion failure with ` pyodbc ` driver caused by unprocessed row data.
8484 - Fixed SQL Server query input failure due to incorrect select query generation.
8585 - Fixed UDTF ingestion not preserving column nullability in the output schema.
86+ - Fixed an issue that caused the program to hang during multithreaded Parquet based ingestion when a data fetching error occurred.
8687
8788#### Improvements
8889
Original file line number Diff line number Diff line change @@ -308,6 +308,9 @@ def worker_process(
308308):
309309 """Worker process that fetches data from multiple partitions"""
310310 while True :
311+ if stop_event and stop_event .is_set ():
312+ # other worker has set the stop event signalling me to stop, exit gracefully
313+ break
311314 try :
312315 # Get item from queue with timeout
313316 partition_idx , query = partition_queue .get (timeout = 1.0 )
Original file line number Diff line number Diff line change 66import math
77import sys
88from collections import namedtuple
9+ from unittest .mock import patch
910
1011import pytest
1112
@@ -334,3 +335,32 @@ def test_oracledb_driver_udtf_class_builder():
334335 # Verify we got data with the right structure (2 columns)
335336 assert len (column_result_rows ) > 0
336337 assert len (column_result_rows [0 ]) == 2 # Two columns
338+
339+
340+ def test_dbapi_no_hang_on_exit_when_worker_error (session ):
341+ """
342+ Test that the dbapi reader does not hang on exit when a worker raises an error
343+
344+ Ideally the test should be put in test_data_source_api.py,
345+ however, reproducing using SQLite is hard to achieve while pure mocking gets the test code too complex.
346+ Hence, we use Oracledb here which can repro the issue reliably without the fix.
347+ """
348+ with patch (
349+ "snowflake.snowpark._internal.data_source.drivers.base_driver.BaseDriver.data_source_data_to_pandas_df"
350+ ) as mock_data_source_data_to_pandas_df :
351+ # Mock the data_source_data_to_pandas_df method to raise RuntimeError
352+ mock_data_source_data_to_pandas_df .side_effect = RuntimeError (
353+ "conversion error"
354+ )
355+
356+ # Expect the dbapi call to raise a SnowparkDataframeReaderException due to the worker error
357+ with pytest .raises (SnowparkDataframeReaderException , match = "conversion error" ):
358+ session .read .dbapi (
359+ create_connection_oracledb ,
360+ table = ORACLEDB_TABLE_NAME ,
361+ column = "ID" ,
362+ lower_bound = 0 ,
363+ upper_bound = 100 ,
364+ num_partitions = 10 ,
365+ max_workers = 2 ,
366+ )
You can’t perform that action at this time.
0 commit comments