diff --git a/CHANGELOG.md b/CHANGELOG.md index 8848d22fd9..b44d6aabea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - The following operations are currently supported and can benefit from the optimization: `read_snowflake`, `repr`, `loc`, `reset_index`, `merge`, and binary operations. - If a lazy object (e.g., DataFrame or Series) depends on a mix of supported and unsupported operations, the optimization will not be used. - Updated the error message for when Snowpark pandas is referenced within apply. +- Added a session parameter `dummy_row_pos_optimization_enabled` to enable/disable dummy row position optimization in faster pandas. #### Dependency Updates diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 095abe5196..612055329c 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -10,7 +10,6 @@ import itertools import json import logging -import os import re from collections import Counter, defaultdict import typing @@ -1455,15 +1454,9 @@ def from_snowflake( See detailed docstring and examples in ``read_snowflake`` in frontend layer: src/snowflake/snowpark/modin/plugin/pd_extensions.py """ - dummy_row_pos_optimization_enabled = ( - os.environ.get( - "SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED", "true" - ).lower() - == "true" - ) relaxed_query_compiler = None if ( - dummy_row_pos_optimization_enabled + pd.session.dummy_row_pos_optimization_enabled and not enforce_ordering and not dummy_row_pos_mode ): diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 25f850cd86..10f3874dd5 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -301,6 +301,9 @@ "PYTHON_SNOWPARK_GENERATE_MULTILINE_QUERIES" ) _PYTHON_SNOWPARK_INTERNAL_TELEMETRY_ENABLED = "ENABLE_SNOWPARK_FIRST_PARTY_TELEMETRY" +_SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED = ( + "SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED" +) # AST encoding. _PYTHON_SNOWPARK_USE_AST = "PYTHON_SNOWPARK_USE_AST" @@ -745,6 +748,12 @@ def __init__( _PYTHON_SNOWPARK_DATAFRAME_JOIN_ALIAS_FIX_VERSION ) + self._dummy_row_pos_optimization_enabled: bool = ( + self._conn._get_client_side_session_parameter( + _SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED, True + ) + ) + self._thread_store = create_thread_local( self._conn._thread_safe_session_enabled ) @@ -1009,6 +1018,13 @@ def reduce_describe_query_enabled(self) -> bool: """ return self._reduce_describe_query_enabled + @property + def dummy_row_pos_optimization_enabled(self) -> bool: + """Set to ``True`` to enable the dummy row position optimization (defaults to ``True``). + The generated SQLs from pandas transformations would potentially have fewer expensive window functions to compute the row position column. + """ + return self._dummy_row_pos_optimization_enabled + @property def custom_package_usage_config(self) -> Dict: """Get or set configuration parameters related to usage of custom Python packages in Snowflake. @@ -1174,6 +1190,16 @@ def reduce_describe_query_enabled(self, value: bool) -> None: "value for reduce_describe_query_enabled must be True or False!" ) + @dummy_row_pos_optimization_enabled.setter + def dummy_row_pos_optimization_enabled(self, value: bool) -> None: + """Set the value for dummy_row_pos_optimization_enabled""" + if value in [True, False]: + self._dummy_row_pos_optimization_enabled = value + else: + raise ValueError( + "value for dummy_row_pos_optimization_enabled must be True or False!" + ) + @custom_package_usage_config.setter @experimental_parameter(version="1.6.0") def custom_package_usage_config(self, config: Dict) -> None: diff --git a/tests/integ/modin/test_faster_pandas.py b/tests/integ/modin/test_faster_pandas.py index 885b2101e3..b465a3bb84 100644 --- a/tests/integ/modin/test_faster_pandas.py +++ b/tests/integ/modin/test_faster_pandas.py @@ -2,11 +2,16 @@ # Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. # +import copy import modin.pandas as pd import pandas as native_pd from snowflake.snowpark._internal.utils import TempObjectType import snowflake.snowpark.modin.plugin # noqa: F401 +from snowflake.snowpark.session import ( + _SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED, + Session, +) from tests.integ.modin.utils import assert_frame_equal, assert_index_equal from tests.integ.utils.sql_counter import sql_count_checker from tests.utils import Utils @@ -129,3 +134,60 @@ def test_read_filter_groupby_agg(session): # compare results assert_frame_equal(snow_result, native_result) + + +@sql_count_checker(query_count=5, join_count=1) +def test_read_filter_join_flag_disabled(session): + # test a chain of operations that are fully supported in faster pandas + # but with the dummy_row_pos_optimization_enabled flag turned off + session.dummy_row_pos_optimization_enabled = False + + # create tables + table_name1 = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame([[1, 11], [2, 12], [3, 13]], columns=["A", "B"]) + ).write.save_as_table(table_name1, table_type="temp") + table_name2 = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame([[1, 21], [2, 22], [3, 23]], columns=["C", "D"]) + ).write.save_as_table(table_name2, table_type="temp") + + # create snow dataframes + df1 = pd.read_snowflake(table_name1) + df2 = pd.read_snowflake(table_name2) + snow_result = df1[df1["B"] > 11].merge( + df2[df2["D"] == 22], left_on="A", right_on="C" + ) + + # verify that the input dataframes have an empty relaxed query compiler + assert df1._query_compiler._relaxed_query_compiler is None + assert df2._query_compiler._relaxed_query_compiler is None + # verify that the output dataframe also has an empty relaxed query compiler + assert snow_result._query_compiler._relaxed_query_compiler is None + + # create pandas dataframes + native_df1 = df1.to_pandas() + native_df2 = df2.to_pandas() + native_result = native_df1[native_df1["B"] > 11].merge( + native_df2[native_df2["D"] == 22], left_on="A", right_on="C" + ) + + # compare results + assert_frame_equal(snow_result, native_result) + + +@sql_count_checker(query_count=0) +def test_dummy_row_pos_optimization_enabled_on_session(db_parameters): + with Session.builder.configs(db_parameters).create() as new_session: + default_value = new_session.dummy_row_pos_optimization_enabled + new_session.dummy_row_pos_optimization_enabled = not default_value + assert new_session.dummy_row_pos_optimization_enabled is not default_value + new_session.dummy_row_pos_optimization_enabled = default_value + assert new_session.dummy_row_pos_optimization_enabled is default_value + + parameters = copy.deepcopy(db_parameters) + parameters["session_parameters"] = { + _SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED: not default_value + } + with Session.builder.configs(parameters).create() as new_session2: + assert new_session2.dummy_row_pos_optimization_enabled is not default_value