Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
- Fixed UDTF ingestion failure with `pyodbc` driver caused by unprocessed row data.
- Fixed SQL Server query input failure due to incorrect select query generation.
- Fixed UDTF ingestion not preserving column nullability in the output schema.
- Fixed an issue that caused the program to hang during multithreaded Parquet based ingestion when a data fetching error occurred.

#### Improvements

Expand Down
3 changes: 3 additions & 0 deletions src/snowflake/snowpark/_internal/data_source/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ def worker_process(
):
"""Worker process that fetches data from multiple partitions"""
while True:
if stop_event and stop_event.is_set():
# other worker has set the stop event signalling me to stop, exit gracefully
break
try:
# Get item from queue with timeout
partition_idx, query = partition_queue.get(timeout=1.0)
Expand Down
30 changes: 30 additions & 0 deletions tests/integ/datasource/test_oracledb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import math
import sys
from collections import namedtuple
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -334,3 +335,32 @@ def test_oracledb_driver_udtf_class_builder():
# Verify we got data with the right structure (2 columns)
assert len(column_result_rows) > 0
assert len(column_result_rows[0]) == 2 # Two columns


def test_dbapi_no_hang_on_exit_when_worker_error(session):
"""
Test that the dbapi reader does not hang on exit when a worker raises an error

Ideally the test should be put in test_data_source_api.py,
however, reproducing using SQLite is hard to achieve while pure mocking gets the test code too complex.
Hence, we use Oracledb here which can repro the issue reliably without the fix.
"""
with patch(
"snowflake.snowpark._internal.data_source.drivers.base_driver.BaseDriver.data_source_data_to_pandas_df"
) as mock_data_source_data_to_pandas_df:
# Mock the data_source_data_to_pandas_df method to raise RuntimeError
mock_data_source_data_to_pandas_df.side_effect = RuntimeError(
"conversion error"
)

# Expect the dbapi call to raise a SnowparkDataframeReaderException due to the worker error
with pytest.raises(SnowparkDataframeReaderException, match="conversion error"):
session.read.dbapi(
create_connection_oracledb,
table=ORACLEDB_TABLE_NAME,
column="ID",
lower_bound=0,
upper_bound=100,
num_partitions=10,
max_workers=2,
)