Skip to content

Commit e4584e2

Browse files
SNOW-2372686: Write large_pandas_backend_df.to_snowflake() via parquet. (#3820)
To implement to_snowflake() for a Snowpark pandas dataframe on the pandas backend with large enough data, upload data via a parquet file instead of via a Snowpark dataframe. The Snowpark dataframe typically inserts values through parametrized SQL queries. Benchmarking showed that a good threshold to switch to parquet was roughly 3 MB, so I've set that as the configurable default switching threshold. Performance of this approach seems to improve with dataset size. Exporting an 800 MB dataframe took about 55 seconds via parquet versus about 429 seconds via the old method, so we get over 7x speedup. We can take a similar approach to speed up pandas_backend_df.move_to('snowflake'). Here are benchmark results with a 3XL warehouse: <img width="571" height="432" alt="to_snowflake_timing_2" src="https://github.com/user-attachments/assets/0f652f6e-4510-4a94-9a75-028f5e09f2b0" /> Signed-off-by: sfc-gh-mvashishtha <mahesh.vashishtha@snowflake.com> Co-authored-by: Jonathan Shi <149419494+sfc-gh-joshi@users.noreply.github.com>
1 parent 183a392 commit e4584e2

8 files changed

Lines changed: 553 additions & 150 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
- Added a session parameter `pandas_hybrid_execution_enabled` to enable/disable hybrid execution as an alternative to using `AutoSwitchBackend`.
127127
- Removed an unnecessary `SHOW OBJECTS` query issued from `read_snowflake` under certain conditions.
128128
- When hybrid execution is enabled, `pd.merge`, `pd.concat`, `DataFrame.merge`, and `DataFrame.join` may now move arguments to backends other than those among the function arguments.
129+
- Improved performance of `DataFrame.to_snowflake` and `pd.to_snowflake(dataframe)` for large data by uploading data via a parquet file. You can control the dataset size at which Snowpark pandas switches to parquet with the variable `modin.config.PandasToSnowflakeParquetThresholdBytes`.
129130

130131
## 1.39.0 (2025-09-17)
131132

src/snowflake/snowpark/modin/config/envvars.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,25 @@ class SnowflakePandasTransferThreshold(EnvironmentVariable, type=int):
116116
default = 100_000
117117

118118

119-
# have to monkey patch this into modin right now to use config contexts
119+
class PandasToSnowflakeParquetThresholdBytes(EnvironmentVariable, type=int):
120+
"""
121+
When a pandas-backend dataframe's shallow memory usage exceeds this
122+
threshold, implement to_snowflake() by writing the dataframe to a parquet
123+
file and loading the parquet file into Snowflake.
124+
"""
125+
126+
varname = "SNOWFLAKE_PANDAS_MAX_TO_SNOWFLAKE_MEMORY_BYTES"
127+
# This default comes from experimentation on integer data. At about this
128+
# point, insertion via parquet appears to be faster on a 3XL warehouse.
129+
default = 3_000_000
130+
131+
132+
# have to monkey patch these variables into modin right now to use config
133+
# contexts
120134
modin_config.SnowflakePandasTransferThreshold = SnowflakePandasTransferThreshold
135+
modin_config.PandasToSnowflakeParquetThresholdBytes = (
136+
PandasToSnowflakeParquetThresholdBytes
137+
)
121138

122139

123140
class EnvWithSibilings(

src/snowflake/snowpark/modin/plugin/_internal/utils.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,38 @@ def extract_all_duplicates(elements: Sequence[Hashable]) -> Sequence[Hashable]:
980980
return unique_duplicated_elements
981981

982982

983+
def validate_column_labels_for_to_snowflake(
984+
index_column_labels: Sequence[Hashable], data_column_labels: Sequence[Hashable]
985+
) -> None:
986+
"""
987+
Validate column labels for to_snowflake.
988+
989+
Check that the column labels are not duplicated, and that the data column
990+
labels are not None.
991+
992+
Args:
993+
index_column_labels: index column labels
994+
data_column_labels: data column labels
995+
996+
Returns:
997+
None
998+
"""
999+
duplicates = extract_all_duplicates((*index_column_labels, *data_column_labels))
1000+
if len(duplicates) > 0:
1001+
raise ValueError(
1002+
f"Duplicated labels {duplicates} found in index columns {index_column_labels} and data columns {data_column_labels}. "
1003+
f"Snowflake does not allow duplicated identifiers, please rename to make sure there is no duplication "
1004+
f"among both index and data columns."
1005+
)
1006+
1007+
if any(is_all_label_components_none(label) for label in data_column_labels):
1008+
raise ValueError(
1009+
f"Label None is found in the data columns {data_column_labels}, which is invalid in Snowflake. "
1010+
"Please give it a name by set the dataframe columns like df.columns=['A', 'B'],"
1011+
" or set the series name if it is a series like series.name='A'."
1012+
)
1013+
1014+
9831015
def is_duplicate_free(names: Sequence[Hashable]) -> bool:
9841016
"""
9851017
check whether names contains duplicates
@@ -2300,3 +2332,52 @@ def new_snow_df(*args: Any, **kwargs: Any) -> pd.DataFrame:
23002332
"""
23012333
with config_context(AutoSwitchBackend=False):
23022334
return pd.DataFrame(*args, **kwargs)
2335+
2336+
2337+
def extract_and_validate_index_labels_for_to_snowflake(
2338+
index_label_param: Any, num_index_columns: int
2339+
) -> list[Hashable]:
2340+
"""
2341+
Extract and validate index labels for read snowflake.
2342+
2343+
Args:
2344+
index_label_param: index_label parameter
2345+
num_index_columns: number of index columns
2346+
Returns:
2347+
list of index column labels
2348+
"""
2349+
index_column_labels = (
2350+
index_label_param
2351+
if isinstance(index_label_param, list)
2352+
else [index_label_param]
2353+
)
2354+
if len(index_column_labels) != num_index_columns:
2355+
raise ValueError(
2356+
f"Length of 'index_label' should match number of levels, which is {num_index_columns}"
2357+
)
2358+
return index_column_labels
2359+
2360+
2361+
def handle_if_exists_for_to_snowflake(
2362+
if_exists: str, name: Union[str, Iterable[str]]
2363+
) -> None:
2364+
"""
2365+
Handle if_exists for to_snowflake.
2366+
2367+
Validate if_exists for to_snowflake and raise an error if the table
2368+
already exists and if_exists == "fail".
2369+
2370+
Args:
2371+
if_exists: if_exists parameter
2372+
name: name parameter
2373+
Returns:
2374+
None
2375+
"""
2376+
if if_exists not in ("fail", "replace", "append"):
2377+
raise ValueError(f"'{if_exists}' is not valid for if_exists")
2378+
if if_exists == "fail" and pd.session._table_exists(
2379+
parse_table_name(name) if isinstance(name, str) else name
2380+
):
2381+
raise ValueError(
2382+
f"Table '{name}' already exists. Set 'if_exists' parameter as 'replace' to override existing table."
2383+
)

src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@
9595
from snowflake.snowpark._internal.type_utils import ColumnOrName
9696
from snowflake.snowpark._internal.utils import (
9797
generate_random_alphanumeric,
98-
parse_table_name,
9998
random_name_for_temp_object,
10099
)
101100
from snowflake.snowpark.column import CaseExpr, Column as SnowparkColumn
@@ -350,7 +349,8 @@
350349
unpivot_empty_df,
351350
)
352351
from snowflake.snowpark.modin.plugin._internal.utils import (
353-
MODIN_IS_AT_LEAST_0_37_0,
352+
extract_and_validate_index_labels_for_to_snowflake,
353+
handle_if_exists_for_to_snowflake,
354354
new_snow_series,
355355
INDEX_LABEL,
356356
ROW_COUNT_COLUMN_LABEL,
@@ -366,7 +366,6 @@
366366
create_frame_with_data_columns,
367367
create_ordered_dataframe_from_pandas,
368368
create_initial_ordered_dataframe,
369-
extract_all_duplicates,
370369
extract_pandas_label_from_snowflake_quoted_identifier,
371370
fill_missing_levels_for_pandas_label,
372371
fill_none_in_index_labels,
@@ -383,6 +382,8 @@
383382
parse_object_construct_snowflake_quoted_identifier_and_extract_pandas_label,
384383
parse_snowflake_object_construct_identifier_to_map,
385384
unquote_name_if_quoted,
385+
validate_column_labels_for_to_snowflake,
386+
MODIN_IS_AT_LEAST_0_37_0,
386387
)
387388
from snowflake.snowpark.modin.plugin._internal.where_utils import (
388389
validate_expected_boolean_data_columns,
@@ -1896,12 +1897,11 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
18961897
# Include index columns
18971898
if index_label:
18981899
index_column_labels = (
1899-
index_label if isinstance(index_label, list) else [index_label]
1900-
)
1901-
if len(index_column_labels) != self._modin_frame.num_index_columns:
1902-
raise ValueError(
1903-
f"Length of 'index_label' should match number of levels, which is {self._modin_frame.num_index_columns}"
1900+
extract_and_validate_index_labels_for_to_snowflake(
1901+
index_label_param=index_label,
1902+
num_index_columns=self._modin_frame.num_index_columns,
19041903
)
1904+
)
19051905
else:
19061906
index_column_labels = frame.index_column_pandas_labels
19071907

@@ -1920,23 +1920,10 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
19201920
# label for the data column, set the label to be None
19211921
data_column_labels = [None]
19221922

1923-
# check if there is any data column label is none
1924-
if any(is_all_label_components_none(label) for label in data_column_labels):
1925-
raise ValueError(
1926-
f"Label None is found in the data columns {data_column_labels}, which is invalid in Snowflake. "
1927-
"Please give it a name by set the dataframe columns like df.columns=['A', 'B'],"
1928-
" or set the series name if it is a series like series.name='A'."
1929-
)
1930-
1931-
# perform a column name duplication check
1932-
index_and_data_columns = data_column_labels + index_column_labels
1933-
duplicates = extract_all_duplicates(index_and_data_columns)
1934-
if duplicates:
1935-
raise ValueError(
1936-
f"Duplicated labels {duplicates} found in index columns {index_column_labels} and data columns {data_column_labels}. "
1937-
f"Snowflake does not allow duplicated identifiers, please rename to make sure there is no duplication "
1938-
f"among both index and data columns."
1939-
)
1923+
validate_column_labels_for_to_snowflake(
1924+
index_column_labels=index_column_labels,
1925+
data_column_labels=data_column_labels,
1926+
)
19401927

19411928
# rename snowflake quoted identifiers for the retained index columns and data columns to
19421929
# be the same as quoted pandas labels.
@@ -2038,24 +2025,15 @@ def to_snowflake(
20382025
table_type: Literal["", "temp", "temporary", "transient"] = "",
20392026
) -> None:
20402027
self._warn_lost_snowpark_pandas_type()
2028+
handle_if_exists_for_to_snowflake(if_exists=if_exists, name=name)
20412029

2042-
if if_exists not in ("fail", "replace", "append"):
2043-
# Same error message as native pandas.
2044-
raise ValueError(f"'{if_exists}' is not valid for if_exists")
20452030
if if_exists == "fail":
20462031
mode = "errorifexists"
20472032
elif if_exists == "replace":
20482033
mode = "overwrite"
20492034
else:
20502035
mode = "append"
20512036

2052-
if mode == "errorifexists" and pd.session._table_exists(
2053-
parse_table_name(name) if isinstance(name, str) else name
2054-
):
2055-
raise ValueError(
2056-
f"Table '{name}' already exists. Set 'if_exists' parameter as 'replace' to override existing table."
2057-
)
2058-
20592037
self._to_snowpark_dataframe_from_snowpark_pandas_dataframe(
20602038
index, index_label
20612039
).write.save_as_table(

src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
import functools
1212
from typing import Any, List, Literal, Optional, Union
1313

14+
from snowflake.snowpark.modin.config.envvars import (
15+
PandasToSnowflakeParquetThresholdBytes,
16+
)
17+
from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage
18+
1419
import modin.pandas as pd
1520
from modin.pandas.api.extensions import (
1621
register_dataframe_accessor as _register_dataframe_accessor,
@@ -21,6 +26,17 @@
2126
from snowflake.snowpark._internal.type_utils import ColumnOrName
2227
from snowflake.snowpark.async_job import AsyncJob
2328
from snowflake.snowpark.dataframe import DataFrame as SnowparkDataFrame
29+
from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import (
30+
SnowparkPandasType,
31+
)
32+
from snowflake.snowpark.modin.plugin._internal.utils import (
33+
extract_and_validate_index_labels_for_to_snowflake,
34+
handle_if_exists_for_to_snowflake,
35+
is_all_label_components_none,
36+
is_valid_snowflake_quoted_identifier,
37+
unquote_name_if_quoted,
38+
validate_column_labels_for_to_snowflake,
39+
)
2440
from snowflake.snowpark.modin.plugin.extensions.utils import (
2541
add_cache_result_docstring,
2642
register_non_snowflake_accessors,
@@ -37,9 +53,120 @@
3753
register_non_snowflake_accessors(_register_dataframe_accessor, "DataFrame")
3854

3955

56+
def _convert_to_snowflake_table_name_to_write_pandas_table_name(name: str) -> str:
57+
"""
58+
Convert the user's to_snowflake() table name to the name we need for write_pandas().
59+
60+
We call write_pandas() with quote_identifiers=True, so we need to strip
61+
quotes from quoted identifiers and convert unquoted identifiers to
62+
uppercase.
63+
64+
Parameters:
65+
name: The name that the user passed to to_snowflake().
66+
67+
Returns:
68+
The name we will pass to write_pandas().
69+
"""
70+
if is_valid_snowflake_quoted_identifier(name):
71+
# quoted -> strip quotes. e.g. '"CUSTOMER"' -> 'CUSTOMER'
72+
return unquote_name_if_quoted(name)
73+
else:
74+
# unquoted identifier -> convert to uppercase
75+
# e.g. 'ab$ab' -> 'AB$AB', 'customer' -> 'CUSTOMER'
76+
return name.upper()
77+
78+
4079
# Snowflake specific dataframe methods
4180
# We use extensions, as we want to make clear that a Snowpark pandas DataFrame is NOT a
4281
# pandas DataFrame.
82+
@_register_dataframe_accessor("to_snowflake", backend="Pandas")
83+
def pandas_to_snowflake(
84+
self,
85+
name: Union[str, Iterable[str]],
86+
if_exists: Optional[Literal["fail", "replace", "append"]] = "fail",
87+
index: bool = True,
88+
index_label: Optional[IndexLabel] = None,
89+
table_type: Literal["", "temp", "temporary", "transient"] = "",
90+
) -> None:
91+
if (
92+
# Shallow memory usage may underestimate the memory usage of the
93+
# dataframe, but deep memory usage can be expensive to compute.
94+
# Since this threshold uses shallow memory usage, it may sometimes
95+
# not use the parquet method in cases where the parquet method would be
96+
# faster, especially if this frame contains deep data like Python
97+
# strings.
98+
self.memory_usage(deep=False).sum()
99+
<= PandasToSnowflakeParquetThresholdBytes.get()
100+
):
101+
return self.set_backend("Snowflake").to_snowflake(
102+
name=name,
103+
if_exists=if_exists,
104+
index=index,
105+
index_label=index_label,
106+
table_type=table_type,
107+
)
108+
109+
handle_if_exists_for_to_snowflake(if_exists=if_exists, name=name)
110+
111+
pandas_frame = self._query_compiler._modin_frame
112+
num_index_columns = pandas_frame.index.nlevels
113+
if index:
114+
if index_label:
115+
index_column_labels = extract_and_validate_index_labels_for_to_snowflake(
116+
index_label_param=index_label,
117+
num_index_columns=num_index_columns,
118+
)
119+
else:
120+
index_column_labels = list(pandas_frame.index.names)
121+
122+
if any(is_all_label_components_none(label) for label in index_column_labels):
123+
# It's common to have index level named None, in which case we
124+
# follow the naming convention that pandas would follow if we
125+
# were to convert the index to a data column with reset_index()
126+
# and then skip writing the new index.
127+
index = False
128+
pandas_frame = pandas_frame.reset_index(drop=False, names=None)
129+
index_column_labels = []
130+
else:
131+
index_column_labels = []
132+
validate_column_labels_for_to_snowflake(
133+
index_column_labels=index_column_labels,
134+
data_column_labels=list(pandas_frame.columns),
135+
)
136+
if index:
137+
# write_pandas() will always drop the index, so we move the index into
138+
# the data columns with reset_index().
139+
pandas_frame = pandas_frame.reset_index(drop=False, names=index_column_labels)
140+
141+
unsupported_types = list(
142+
SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(dtype)
143+
for dtype in pandas_frame.dtypes
144+
if SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(dtype)
145+
is not None
146+
)
147+
if len(unsupported_types) > 0:
148+
WarningMessage.lost_type_warning(
149+
"to_snowflake", ", ".join(type(t).__name__ for t in unsupported_types)
150+
)
151+
152+
pd.session.write_pandas(
153+
pandas_frame.rename(str, axis=1),
154+
# We want to pass table_name as is, but quote column names. This is
155+
# undocumented behavior of to_snowflake() on the "Snowflake" backend, so
156+
# we mimic it here.
157+
# Note that if we try to use quote_identifiers=False and quote the
158+
# column identifiers ourselves, we get the correct column names and we
159+
# don't have to modify the table name, but the snowflake connector seems
160+
# to incorrectly insert null data.
161+
table_name=_convert_to_snowflake_table_name_to_write_pandas_table_name(name),
162+
auto_create_table=True,
163+
overwrite=if_exists != "append",
164+
table_type=table_type,
165+
)
166+
167+
return None
168+
169+
43170
# Implementation note: Arguments names and types are kept consistent with pandas.DataFrame.to_sql
44171
@register_dataframe_accessor("to_snowflake")
45172
def to_snowflake(

0 commit comments

Comments
 (0)