Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#### Bug Fixes

- Fixed an issue with drop_duplicates where the same data source could be read multiple times in the same query but in a different order each time, resulting in missing rows in the final result. The fix ensures that the data source is read only once.
- Fixed a bug with hybrid execution mode where an `AssertionError` was unexpectedly raised by certain indexing operations.

### Snowpark Local Testing Updates

Expand Down
27 changes: 27 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from packaging import version # noqa: E402,F401

import modin.pandas as pd
from modin.config import context as config_context
import numpy as np
import pandas as native_pd
from pandas._typing import AnyArrayLike, Scalar
Expand Down Expand Up @@ -2259,3 +2260,29 @@ def add_extra_columns_and_select_required_columns(
# explicitly drop the unwanted columns. This also ensures that the columns in the resultant DataFrame are in the
# same order as the columns in the `columns` parameter.
return query_compiler.take_2d_labels(slice(None), columns)


def new_snow_series(*args: Any, **kwargs: Any) -> pd.Series:
"""
Create a new modin Series, guaranteed to use the Snowpark pandas backend.

This is necessary to prevent accidental backend switching when a modin Series is created in an
internal helper function, which may otherwise incorrectly produce a NativeQueryCompiler.

See SNOW-2084670 and SNOW-2331021 for examples of such failures.
"""
with config_context(AutoSwitchBackend=False):
return pd.Series(*args, **kwargs)


def new_snow_df(*args: Any, **kwargs: Any) -> pd.DataFrame:
"""
Create a new modin DataFrame, guaranteed to use the Snowpark pandas backend.

This is necessary to prevent accidental backend switching when modin a DataFrame is created in an
internal helper function, which may otherwise incorrectly produce a NativeQueryCompiler.

See SNOW-2084670 and SNOW-2331021 for examples of such failures.
"""
with config_context(AutoSwitchBackend=False):
return pd.DataFrame(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@
unpivot_empty_df,
)
from snowflake.snowpark.modin.plugin._internal.utils import (
new_snow_series,
INDEX_LABEL,
ROW_COUNT_COLUMN_LABEL,
ROW_POSITION_COLUMN_LABEL,
Expand Down Expand Up @@ -1205,15 +1206,15 @@ def from_date_range(
delta = end.value * 1.0 - start.value # type: ignore[union-attr]
if div == 0:
# Only 1 period, just return the start value
ns_values = pd.Series([start.value])._query_compiler # type: ignore[union-attr]
ns_values = new_snow_series([start.value])._query_compiler # type: ignore[union-attr]
else:
stride = delta / div
# Make sure end is included in this case
e = start.value + delta // stride * stride + stride // 2 + 1 # type: ignore[union-attr]
ns_values = generator_utils.generate_range(start.value, e, stride) # type: ignore[union-attr]
dt_values = ns_values.series_to_datetime()

dt_series = pd.Series(query_compiler=dt_values)
dt_series = new_snow_series(query_compiler=dt_values)
if remove_non_business_days:
dt_series = dt_series[dt_series.dt.dayofweek < 5]
if not left_inclusive or not right_inclusive:
Expand Down Expand Up @@ -3266,7 +3267,7 @@ def _reindex_axis_0(
if isinstance(labels, pd.Index):
new_index_qc = labels.to_series()._query_compiler
else:
new_index_qc = pd.Series(labels)._query_compiler
new_index_qc = new_snow_series(labels)._query_compiler

new_index_modin_frame = new_index_qc._modin_frame
modin_frame = self._modin_frame
Expand Down Expand Up @@ -6857,7 +6858,7 @@ def agg(
)

if len(query_compiler.columns) == 0:
return pd.Series()._query_compiler
return new_snow_series()._query_compiler

internal_frame = query_compiler._modin_frame

Expand Down Expand Up @@ -10663,13 +10664,10 @@ def _take_2d_labels_internal(
if is_scalar(index):
index = (index,)
elif is_scalar(index):
# SNOW-2084670
# Force this query compiler to be an SFQC, since with auto-switch behavior
# it may become a NativeQueryCompiler.
index = pd.Series([index]).set_backend("Snowflake")._query_compiler
index = new_snow_series([index])._query_compiler
# convert list like to series
elif is_list_like(index):
index = pd.Series(index).set_backend("Snowflake")
index = new_snow_series(index)
if index.dtype == "bool":
# boolean list like indexer is always select rows by row position
return SnowflakeQueryCompiler(
Expand Down Expand Up @@ -12070,7 +12068,7 @@ def _fillna_with_masking(
not self_is_series and isinstance(value, pd.DataFrame)
):
if isinstance(value, dict):
value = pd.Series(value)
value = new_snow_series(value)
return self.where(cond=self.notna(), other=value._query_compiler)

# case 2: fillna with a method
Expand Down Expand Up @@ -12370,7 +12368,7 @@ def setitem(
)

# create series out of key and insert
value = pd.Series(value)._query_compiler
value = new_snow_series(value)._query_compiler

return self.insert(loc, key, value, True, replace=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from pandas.core.indexing import IndexingError

import snowflake.snowpark.modin.plugin.extensions.utils as frontend_utils
from snowflake.snowpark.modin.plugin._internal.utils import new_snow_series, new_snow_df
from snowflake.snowpark.modin.plugin._internal.indexing_utils import (
MULTIPLE_ELLIPSIS_INDEXING_ERROR_MESSAGE,
TOO_FEW_INDEXERS_INDEXING_ERROR_MESSAGE,
Expand Down Expand Up @@ -1015,7 +1016,7 @@ def __setitem__(

# If the row key is list-like (Index, list, np.ndarray, etc.), convert it to Series.
if not isinstance(row_loc, pd.Series) and is_list_like(row_loc):
row_loc = pd.Series(row_loc)
row_loc = new_snow_series(row_loc)

matching_item_columns_by_label = self._loc_set_matching_item_columns_by_label(
key, item
Expand All @@ -1040,7 +1041,7 @@ def __setitem__(
else col_loc
)
if item_is_2d_array:
item = pd.DataFrame(item)
item = new_snow_df(item)
frame_is_df_and_item_is_series = isinstance(item, pd.Series) and isinstance(
self.df, pd.DataFrame
)
Expand Down Expand Up @@ -1209,7 +1210,7 @@ def __getitem__(
dtype = float
else:
dtype = None
row_loc = pd.Series(row_loc, dtype=dtype)
row_loc = new_snow_series(row_loc, dtype=dtype)

# Check whether the row and column input is of numeric dtype.
self._validate_numeric_get_key_values(row_loc, original_row_loc)
Expand Down Expand Up @@ -1331,10 +1332,10 @@ def __setitem__(
item = item.flatten()[0]
else:
if item.ndim == 1:
item = pd.Series(item)
item = new_snow_series(item)
is_item_series = True
else:
item = pd.DataFrame(item)
item = new_snow_df(item)

is_row_key_df = isinstance(row_loc, pd.DataFrame)
is_col_key_df = isinstance(col_loc, pd.DataFrame)
Expand Down
11 changes: 7 additions & 4 deletions tests/integ/modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ def read_hybrid_known_failures():
pytest tests/integ/modin -n 10
--enable_modin_hybrid_mode
--csv tests/integ/modin/modin_hybrid_integ_results.csv
* (Recommended) Pre-Filtering the results to reduce the file size:
* Pre-filtering and sorting the results to reduce the file and diff size:
import pandas as pd
df = pd.read_csv("tests/integ/modin/modin_hybrid_integ_results.csv")
filtered = df[["module", "name", "message", "status"]][
df["status"].isin(["failed", "xfailed", "error"])
]
filtered.to_csv("tests/integ/modin/modin_hybrid_integ_results.csv")
df["status"].isin(["failed", "xfailed", "error"])
]
filtered = filtered.sort_values(by=["module", "name"])
filtered.to_csv("tests/integ/modin/modin_hybrid_integ_results.csv", index=False)
"""
if not os.path.exists("../modin/modin_hybrid_integ_results.csv"):
return pandas.DataFrame([], columns=["module", "name", "message", "status"])
HYBRID_RESULTS_PATH = os.path.normpath(
os.path.join(
os.path.dirname(__file__), "../modin/modin_hybrid_integ_results.csv"
Expand Down
17 changes: 17 additions & 0 deletions tests/integ/modin/hybrid/test_switch_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage
from snowflake.snowpark.modin.plugin.extensions.datetime_index import DatetimeIndex
from tests.integ.utils.sql_counter import sql_count_checker
from tests.integ.modin.utils import assert_snowpark_pandas_equal_to_pandas

# snowflake-ml-python, which provides snowflake.cortex, may not be available in
# the test environment. If it's not available, skip all tests in this module.
Expand Down Expand Up @@ -591,3 +592,19 @@ def test_applying_cortex_function_causes_backend_switch(self, data_class, method
sentiment = method(pandas_backend_data, Sentiment)
assert sentiment.get_backend() == "Snowflake"
sentiment.to_pandas()


@sql_count_checker(query_count=1, join_count=2)
def test_switch_then_iloc():
# Switching backends then calling iloc should be valid.
# Prior to fixing SNOW-2331021, discrepancies with the index class caused an AssertionError.
df = pd.DataFrame([[0] * 10] * 10)
assert df.get_backend() == "Pandas"
# Should not error
assert_snowpark_pandas_equal_to_pandas(
df.move_to("Snowflake").iloc[[1, 3, 9], 1],
Comment thread
sfc-gh-joshi marked this conversation as resolved.
df.iloc[[1, 3, 9], 1].to_pandas(),
)
# Setting should similarly not error
df.iloc[1, 1] = 100
assert df.iloc[1, 1] == 100
Loading
Loading