Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- The following operations are currently supported and can benefit from the optimization: `read_snowflake`, `repr`, `loc`, `reset_index`, `merge`, and binary operations.
- If a lazy object (e.g., DataFrame or Series) depends on a mix of supported and unsupported operations, the optimization will not be used.
- Updated the error message for when Snowpark pandas is referenced within apply.
- Added a session parameter `dummy_row_pos_optimization_enabled` to enable/disable dummy row position optimization in faster pandas.

#### Dependency Updates

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import itertools
import json
import logging
import os
import re
from collections import Counter, defaultdict
import typing
Expand Down Expand Up @@ -1455,15 +1454,9 @@ def from_snowflake(
See detailed docstring and examples in ``read_snowflake`` in frontend layer:
src/snowflake/snowpark/modin/plugin/pd_extensions.py
"""
dummy_row_pos_optimization_enabled = (
os.environ.get(
"SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED", "true"
).lower()
== "true"
)
relaxed_query_compiler = None
if (
dummy_row_pos_optimization_enabled
pd.session.dummy_row_pos_optimization_enabled
and not enforce_ordering
and not dummy_row_pos_mode
):
Expand Down
26 changes: 26 additions & 0 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@
"PYTHON_SNOWPARK_GENERATE_MULTILINE_QUERIES"
)
_PYTHON_SNOWPARK_INTERNAL_TELEMETRY_ENABLED = "ENABLE_SNOWPARK_FIRST_PARTY_TELEMETRY"
_SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED = (
"SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED"
)

# AST encoding.
_PYTHON_SNOWPARK_USE_AST = "PYTHON_SNOWPARK_USE_AST"
Expand Down Expand Up @@ -745,6 +748,12 @@ def __init__(
_PYTHON_SNOWPARK_DATAFRAME_JOIN_ALIAS_FIX_VERSION
)

self._dummy_row_pos_optimization_enabled: bool = (
self._conn._get_client_side_session_parameter(
_SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED, True
)
)

self._thread_store = create_thread_local(
self._conn._thread_safe_session_enabled
)
Expand Down Expand Up @@ -1009,6 +1018,13 @@ def reduce_describe_query_enabled(self) -> bool:
"""
return self._reduce_describe_query_enabled

@property
def dummy_row_pos_optimization_enabled(self) -> bool:
"""Set to ``True`` to enable the dummy row position optimization (defaults to ``True``).
The generated SQLs from pandas transformations would potentially have fewer expensive window functions to compute the row position column.
"""
return self._dummy_row_pos_optimization_enabled

@property
def custom_package_usage_config(self) -> Dict:
"""Get or set configuration parameters related to usage of custom Python packages in Snowflake.
Expand Down Expand Up @@ -1174,6 +1190,16 @@ def reduce_describe_query_enabled(self, value: bool) -> None:
"value for reduce_describe_query_enabled must be True or False!"
)

@dummy_row_pos_optimization_enabled.setter
def dummy_row_pos_optimization_enabled(self, value: bool) -> None:
"""Set the value for dummy_row_pos_optimization_enabled"""
if value in [True, False]:
self._dummy_row_pos_optimization_enabled = value
else:
raise ValueError(
"value for dummy_row_pos_optimization_enabled must be True or False!"
)

@custom_package_usage_config.setter
@experimental_parameter(version="1.6.0")
def custom_package_usage_config(self, config: Dict) -> None:
Expand Down
62 changes: 62 additions & 0 deletions tests/integ/modin/test_faster_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#

import copy
import modin.pandas as pd
import pandas as native_pd

from snowflake.snowpark._internal.utils import TempObjectType
import snowflake.snowpark.modin.plugin # noqa: F401
from snowflake.snowpark.session import (
_SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED,
Session,
)
from tests.integ.modin.utils import assert_frame_equal, assert_index_equal
from tests.integ.utils.sql_counter import sql_count_checker
from tests.utils import Utils
Expand Down Expand Up @@ -129,3 +134,60 @@ def test_read_filter_groupby_agg(session):

# compare results
assert_frame_equal(snow_result, native_result)


@sql_count_checker(query_count=5, join_count=1)
def test_read_filter_join_flag_disabled(session):
# test a chain of operations that are fully supported in faster pandas
# but with the dummy_row_pos_optimization_enabled flag turned off
session.dummy_row_pos_optimization_enabled = False

# create tables
table_name1 = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.create_dataframe(
native_pd.DataFrame([[1, 11], [2, 12], [3, 13]], columns=["A", "B"])
).write.save_as_table(table_name1, table_type="temp")
table_name2 = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.create_dataframe(
native_pd.DataFrame([[1, 21], [2, 22], [3, 23]], columns=["C", "D"])
).write.save_as_table(table_name2, table_type="temp")

# create snow dataframes
df1 = pd.read_snowflake(table_name1)
df2 = pd.read_snowflake(table_name2)
snow_result = df1[df1["B"] > 11].merge(
df2[df2["D"] == 22], left_on="A", right_on="C"
)

# verify that the input dataframes have an empty relaxed query compiler
assert df1._query_compiler._relaxed_query_compiler is None
assert df2._query_compiler._relaxed_query_compiler is None
# verify that the output dataframe also has an empty relaxed query compiler
assert snow_result._query_compiler._relaxed_query_compiler is None

# create pandas dataframes
native_df1 = df1.to_pandas()
native_df2 = df2.to_pandas()
native_result = native_df1[native_df1["B"] > 11].merge(
native_df2[native_df2["D"] == 22], left_on="A", right_on="C"
)

# compare results
assert_frame_equal(snow_result, native_result)


@sql_count_checker(query_count=0)
def test_dummy_row_pos_optimization_enabled_on_session(db_parameters):
with Session.builder.configs(db_parameters).create() as new_session:
default_value = new_session.dummy_row_pos_optimization_enabled
new_session.dummy_row_pos_optimization_enabled = not default_value
assert new_session.dummy_row_pos_optimization_enabled is not default_value
new_session.dummy_row_pos_optimization_enabled = default_value
assert new_session.dummy_row_pos_optimization_enabled is default_value

parameters = copy.deepcopy(db_parameters)
parameters["session_parameters"] = {
_SNOWPARK_PANDAS_DUMMY_ROW_POS_OPTIMIZATION_ENABLED: not default_value
}
with Session.builder.configs(parameters).create() as new_session2:
assert new_session2.dummy_row_pos_optimization_enabled is not default_value
Loading