Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- Added a new option `cacheResult` to `DataFrameReader.xml` that allows users to cache the result of the XML reader to a temporary table after calling `xml`. It helps improve performance when subsequent operations are performed on the same DataFrame.

### Snowpark pandas API Updates
- Added support for `DataFrame.eval()` for dataframes with single-level indexes.

#### New Features

Expand Down
2 changes: 1 addition & 1 deletion docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``equals`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``eval`` | N | | |
| ``eval`` | P | | No support for dataframes with a row MultiIndex. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``ewm`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
5 changes: 5 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@

_logger = logging.getLogger(__name__)

# Flag guarding certain features available only in newer modin versions.
# Snowpark pandas supports the newest two released versions of modin; update this flag and remove legacy
# code as needed when we bump dependency versions.
MODIN_IS_AT_LEAST_0_36_0 = version.parse(pd.__version__) >= version.parse("0.36.0")
Comment thread
sfc-gh-mvashishtha marked this conversation as resolved.
Comment thread
sfc-gh-mvashishtha marked this conversation as resolved.


# This is the default statement parameters for queries from Snowpark pandas API. It provides the fine grain metric for
# the server to track all pandas API usage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from modin.pandas.base import BasePandasDataset
from modin.pandas.io import from_pandas
from modin.pandas.utils import is_scalar
from modin.core.computation.eval import eval as _eval
from modin.core.storage_formats.pandas.query_compiler_caster import (
register_function_for_pre_op_switch,
)
Expand Down Expand Up @@ -226,9 +227,63 @@ def dot(self, other): # noqa: PR01, RT01, D200
pass # pragma: no cover


@register_dataframe_not_implemented()
# Override eval because
# 1) we treat the "engine" parameter differently
# 2) We have to update the level parameter to reflect this method's place in
# the function stack. Modin can't do that for us since it doesn't know our
# place in the stack.
@register_dataframe_accessor("eval")
def eval(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
pass # pragma: no cover
"""
Evaluate a string describing operations on ``DataFrame`` columns.
"""
if self._query_compiler.nlevels() > 1:
# If the rows of this dataframe have a multi-index, we store the index
# as a native_pd.MultiIndex, and the usual method of getting index
# resolvers with _get_index_resolvers() does not work.
ErrorMessage.not_implemented("eval() does not support a multi-level index.")

inplace = validate_bool_kwarg(inplace, "inplace")

# numexpr engine is useful for chained operations on numpy-backed
# arrays. It doesn't support all the syntax that the python engine
# does, and the Snowpark backend doesn't store data in numpy, so the
# numexpr performance optimizations are not useful. Ignore the "engine"
# requirement, and warn the user that if they explicitly select
# engine="numexpr", we will not honor their preference.
if kwargs.get("engine", None) == "numexpr":
WarningMessage.ignored_argument(
operation="eval",
argument="engine",
message="Snowpark pandas always uses the python engine in "
+ "favor of the numexpr engine, even if the numexpr engine is "
+ "available",
)
kwargs["engine"] = "python"

# eval() lets the user reference variables according to dynamic scope
# at `level` stack frames below the stack frame that called eval(). The
# eval() implementation is 4 stack frames above the frame where we execute
# the _eval() implementation:
# 1) query_compiler_caster wrapper dispatches to snowflake implementation
# 2) telemetry wrapper 1
# 3) telemetry wrapper 2 calls this implementation.
# 4) This method implementation calls the _eval() implementation
# so we add 4 to the `level` param.
kwargs["level"] = kwargs.get("level", 0) + 4
Comment thread
sfc-gh-mvashishtha marked this conversation as resolved.

index_resolvers = self._get_index_resolvers()
column_resolvers = self._get_cleaned_column_resolvers()
kwargs["resolvers"] = (
*kwargs.get("resolvers", ()),
index_resolvers,
column_resolvers,
)

if "target" not in kwargs:
kwargs["target"] = self

return _eval(expr, inplace=inplace, **kwargs)


@register_dataframe_not_implemented()
Expand Down
10 changes: 10 additions & 0 deletions tests/integ/modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from snowflake.snowpark.modin.plugin._internal.apply_utils import (
clear_session_udf_and_udtf_caches,
)
from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage
from tests.integ.modin.pandas_api_coverage import PandasAPICoverageGenerator
from tests.integ.utils.sql_counter import (
SqlCounter,
Expand Down Expand Up @@ -857,3 +858,12 @@ def testing_dfs_from_read_snowflake(
auto_create_table=True,
)
return pd.read_snowflake(test_table_name), pandas_df


@pytest.fixture(autouse=True)
def clear_printed_warnings() -> Generator[None, None, None]:
# Preserve a copy of the warnings printed before the test.
warnings = set(WarningMessage.printed_warnings)
WarningMessage.printed_warnings.clear()
yield
WarningMessage.printed_warnings = warnings
Loading