Skip to content

Commit 870c417

Browse files
Merge branch 'main' into mvashishtha/SNOW-2387227/use-session-scoped-session
2 parents 2b215d1 + d2c6216 commit 870c417

13 files changed

Lines changed: 898 additions & 163 deletions

File tree

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@
9393
- `st_asewkt`
9494
- `st_asgeojson`
9595
- `st_aswkb`
96+
- `st_aswkt`
97+
- `st_azimuth`
98+
- `st_buffer`
99+
- `st_centroid`
100+
- `st_collect`
101+
- `st_contains`
102+
- `st_coveredby`
103+
- `st_covers`
104+
- `st_difference`
105+
- `st_dimension`
96106

97107
#### Bug Fixes
98108

@@ -126,6 +136,7 @@
126136
- Added a session parameter `pandas_hybrid_execution_enabled` to enable/disable hybrid execution as an alternative to using `AutoSwitchBackend`.
127137
- Removed an unnecessary `SHOW OBJECTS` query issued from `read_snowflake` under certain conditions.
128138
- When hybrid execution is enabled, `pd.merge`, `pd.concat`, `DataFrame.merge`, and `DataFrame.join` may now move arguments to backends other than those among the function arguments.
139+
- Improved performance of `DataFrame.to_snowflake` and `pd.to_snowflake(dataframe)` for large data by uploading data via a parquet file. You can control the dataset size at which Snowpark pandas switches to parquet with the variable `modin.config.PandasToSnowflakeParquetThresholdBytes`.
129140

130141
## 1.39.0 (2025-09-17)
131142

docs/source/snowpark/functions.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,16 @@ Functions
440440
st_asewkt
441441
st_asgeojson
442442
st_aswkb
443+
st_aswkt
444+
st_azimuth
445+
st_buffer
446+
st_centroid
447+
st_collect
448+
st_contains
449+
st_coveredby
450+
st_covers
451+
st_difference
452+
st_dimension
443453
substr
444454
substring
445455
sum

scripts/jenkins_regress.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ gpg --quiet --batch --yes --decrypt --passphrase="$GPG_KEY" --output "tests/para
2121
pip install protoc-wheel-0==21.1 mypy-protobuf
2222

2323
# Run linter, Python test and code coverage jobs
24-
exit_code_decorator "python -m tox -c $WORKING_DIR" -e notdoctest-pyarrowcap
24+
exit_code_decorator "python -m tox -c $WORKING_DIR" -e notdoctest-pandascap-pyarrowcap

src/snowflake/snowpark/_functions/scalar_functions.py

Lines changed: 316 additions & 6 deletions
Large diffs are not rendered by default.

src/snowflake/snowpark/modin/config/envvars.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,25 @@ class SnowflakePandasTransferThreshold(EnvironmentVariable, type=int):
116116
default = 100_000
117117

118118

119-
# have to monkey patch this into modin right now to use config contexts
119+
class PandasToSnowflakeParquetThresholdBytes(EnvironmentVariable, type=int):
120+
"""
121+
When a pandas-backend dataframe's shallow memory usage exceeds this
122+
threshold, implement to_snowflake() by writing the dataframe to a parquet
123+
file and loading the parquet file into Snowflake.
124+
"""
125+
126+
varname = "SNOWFLAKE_PANDAS_MAX_TO_SNOWFLAKE_MEMORY_BYTES"
127+
# This default comes from experimentation on integer data. At about this
128+
# point, insertion via parquet appears to be faster on a 3XL warehouse.
129+
default = 3_000_000
130+
131+
132+
# have to monkey patch these variables into modin right now to use config
133+
# contexts
120134
modin_config.SnowflakePandasTransferThreshold = SnowflakePandasTransferThreshold
135+
modin_config.PandasToSnowflakeParquetThresholdBytes = (
136+
PandasToSnowflakeParquetThresholdBytes
137+
)
121138

122139

123140
class EnvWithSibilings(

src/snowflake/snowpark/modin/plugin/_internal/utils.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,38 @@ def extract_all_duplicates(elements: Sequence[Hashable]) -> Sequence[Hashable]:
980980
return unique_duplicated_elements
981981

982982

983+
def validate_column_labels_for_to_snowflake(
984+
index_column_labels: Sequence[Hashable], data_column_labels: Sequence[Hashable]
985+
) -> None:
986+
"""
987+
Validate column labels for to_snowflake.
988+
989+
Check that the column labels are not duplicated, and that the data column
990+
labels are not None.
991+
992+
Args:
993+
index_column_labels: index column labels
994+
data_column_labels: data column labels
995+
996+
Returns:
997+
None
998+
"""
999+
duplicates = extract_all_duplicates((*index_column_labels, *data_column_labels))
1000+
if len(duplicates) > 0:
1001+
raise ValueError(
1002+
f"Duplicated labels {duplicates} found in index columns {index_column_labels} and data columns {data_column_labels}. "
1003+
f"Snowflake does not allow duplicated identifiers, please rename to make sure there is no duplication "
1004+
f"among both index and data columns."
1005+
)
1006+
1007+
if any(is_all_label_components_none(label) for label in data_column_labels):
1008+
raise ValueError(
1009+
f"Label None is found in the data columns {data_column_labels}, which is invalid in Snowflake. "
1010+
"Please give it a name by set the dataframe columns like df.columns=['A', 'B'],"
1011+
" or set the series name if it is a series like series.name='A'."
1012+
)
1013+
1014+
9831015
def is_duplicate_free(names: Sequence[Hashable]) -> bool:
9841016
"""
9851017
check whether names contains duplicates
@@ -2300,3 +2332,52 @@ def new_snow_df(*args: Any, **kwargs: Any) -> pd.DataFrame:
23002332
"""
23012333
with config_context(AutoSwitchBackend=False):
23022334
return pd.DataFrame(*args, **kwargs)
2335+
2336+
2337+
def extract_and_validate_index_labels_for_to_snowflake(
2338+
index_label_param: Any, num_index_columns: int
2339+
) -> list[Hashable]:
2340+
"""
2341+
Extract and validate index labels for read snowflake.
2342+
2343+
Args:
2344+
index_label_param: index_label parameter
2345+
num_index_columns: number of index columns
2346+
Returns:
2347+
list of index column labels
2348+
"""
2349+
index_column_labels = (
2350+
index_label_param
2351+
if isinstance(index_label_param, list)
2352+
else [index_label_param]
2353+
)
2354+
if len(index_column_labels) != num_index_columns:
2355+
raise ValueError(
2356+
f"Length of 'index_label' should match number of levels, which is {num_index_columns}"
2357+
)
2358+
return index_column_labels
2359+
2360+
2361+
def handle_if_exists_for_to_snowflake(
2362+
if_exists: str, name: Union[str, Iterable[str]]
2363+
) -> None:
2364+
"""
2365+
Handle if_exists for to_snowflake.
2366+
2367+
Validate if_exists for to_snowflake and raise an error if the table
2368+
already exists and if_exists == "fail".
2369+
2370+
Args:
2371+
if_exists: if_exists parameter
2372+
name: name parameter
2373+
Returns:
2374+
None
2375+
"""
2376+
if if_exists not in ("fail", "replace", "append"):
2377+
raise ValueError(f"'{if_exists}' is not valid for if_exists")
2378+
if if_exists == "fail" and pd.session._table_exists(
2379+
parse_table_name(name) if isinstance(name, str) else name
2380+
):
2381+
raise ValueError(
2382+
f"Table '{name}' already exists. Set 'if_exists' parameter as 'replace' to override existing table."
2383+
)

src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@
9595
from snowflake.snowpark._internal.type_utils import ColumnOrName
9696
from snowflake.snowpark._internal.utils import (
9797
generate_random_alphanumeric,
98-
parse_table_name,
9998
random_name_for_temp_object,
10099
)
101100
from snowflake.snowpark.column import CaseExpr, Column as SnowparkColumn
@@ -350,7 +349,8 @@
350349
unpivot_empty_df,
351350
)
352351
from snowflake.snowpark.modin.plugin._internal.utils import (
353-
MODIN_IS_AT_LEAST_0_37_0,
352+
extract_and_validate_index_labels_for_to_snowflake,
353+
handle_if_exists_for_to_snowflake,
354354
new_snow_series,
355355
INDEX_LABEL,
356356
ROW_COUNT_COLUMN_LABEL,
@@ -366,7 +366,6 @@
366366
create_frame_with_data_columns,
367367
create_ordered_dataframe_from_pandas,
368368
create_initial_ordered_dataframe,
369-
extract_all_duplicates,
370369
extract_pandas_label_from_snowflake_quoted_identifier,
371370
fill_missing_levels_for_pandas_label,
372371
fill_none_in_index_labels,
@@ -383,6 +382,8 @@
383382
parse_object_construct_snowflake_quoted_identifier_and_extract_pandas_label,
384383
parse_snowflake_object_construct_identifier_to_map,
385384
unquote_name_if_quoted,
385+
validate_column_labels_for_to_snowflake,
386+
MODIN_IS_AT_LEAST_0_37_0,
386387
)
387388
from snowflake.snowpark.modin.plugin._internal.where_utils import (
388389
validate_expected_boolean_data_columns,
@@ -1896,12 +1897,11 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
18961897
# Include index columns
18971898
if index_label:
18981899
index_column_labels = (
1899-
index_label if isinstance(index_label, list) else [index_label]
1900-
)
1901-
if len(index_column_labels) != self._modin_frame.num_index_columns:
1902-
raise ValueError(
1903-
f"Length of 'index_label' should match number of levels, which is {self._modin_frame.num_index_columns}"
1900+
extract_and_validate_index_labels_for_to_snowflake(
1901+
index_label_param=index_label,
1902+
num_index_columns=self._modin_frame.num_index_columns,
19041903
)
1904+
)
19051905
else:
19061906
index_column_labels = frame.index_column_pandas_labels
19071907

@@ -1920,23 +1920,10 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
19201920
# label for the data column, set the label to be None
19211921
data_column_labels = [None]
19221922

1923-
# check if there is any data column label is none
1924-
if any(is_all_label_components_none(label) for label in data_column_labels):
1925-
raise ValueError(
1926-
f"Label None is found in the data columns {data_column_labels}, which is invalid in Snowflake. "
1927-
"Please give it a name by set the dataframe columns like df.columns=['A', 'B'],"
1928-
" or set the series name if it is a series like series.name='A'."
1929-
)
1930-
1931-
# perform a column name duplication check
1932-
index_and_data_columns = data_column_labels + index_column_labels
1933-
duplicates = extract_all_duplicates(index_and_data_columns)
1934-
if duplicates:
1935-
raise ValueError(
1936-
f"Duplicated labels {duplicates} found in index columns {index_column_labels} and data columns {data_column_labels}. "
1937-
f"Snowflake does not allow duplicated identifiers, please rename to make sure there is no duplication "
1938-
f"among both index and data columns."
1939-
)
1923+
validate_column_labels_for_to_snowflake(
1924+
index_column_labels=index_column_labels,
1925+
data_column_labels=data_column_labels,
1926+
)
19401927

19411928
# rename snowflake quoted identifiers for the retained index columns and data columns to
19421929
# be the same as quoted pandas labels.
@@ -2038,24 +2025,15 @@ def to_snowflake(
20382025
table_type: Literal["", "temp", "temporary", "transient"] = "",
20392026
) -> None:
20402027
self._warn_lost_snowpark_pandas_type()
2028+
handle_if_exists_for_to_snowflake(if_exists=if_exists, name=name)
20412029

2042-
if if_exists not in ("fail", "replace", "append"):
2043-
# Same error message as native pandas.
2044-
raise ValueError(f"'{if_exists}' is not valid for if_exists")
20452030
if if_exists == "fail":
20462031
mode = "errorifexists"
20472032
elif if_exists == "replace":
20482033
mode = "overwrite"
20492034
else:
20502035
mode = "append"
20512036

2052-
if mode == "errorifexists" and pd.session._table_exists(
2053-
parse_table_name(name) if isinstance(name, str) else name
2054-
):
2055-
raise ValueError(
2056-
f"Table '{name}' already exists. Set 'if_exists' parameter as 'replace' to override existing table."
2057-
)
2058-
20592037
self._to_snowpark_dataframe_from_snowpark_pandas_dataframe(
20602038
index, index_label
20612039
).write.save_as_table(

0 commit comments

Comments
 (0)