Skip to content

Commit b59743a

Browse files
Fix drop_duplicates issue due to mismatching row positions in dfs based on same data source (#3766)
1 parent 137c7b5 commit b59743a

3 files changed

Lines changed: 38 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848

4949
#### Bug Fixes
5050

51+
- Fixed an issue with drop_duplicates where the same data source could be read multiple times in the same query but in a different order each time, resulting in missing rows in the final result. The fix ensures that the data source is read only once.
52+
5153
### Snowpark Local Testing Updates
5254

5355
#### New Features

src/snowflake/snowpark/modin/plugin/extensions/base_overrides.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,6 +1514,17 @@ def drop_duplicates(
15141514
"""
15151515
if keep not in ("first", "last", False):
15161516
raise ValueError('keep must be either "first", "last" or False')
1517+
1518+
# Make sure CTE optimization is enabled.
1519+
# The reason this is required for drop_duplicates is that two dataframes need to be joined
1520+
# on their row position (one is used as a filter for the other) and while not identical,
1521+
# they both originate from the same source.
1522+
# Since read_snowflake can result in assigning row positions differently each time it's run,
1523+
# then if we compute the two dataframes independently, their row positions may not match.
1524+
# With the CTE optimization, we are guaranteed that reading the input source will only happen
1525+
# once in the finally generated query, and hence no mismatch in row positions will take place.
1526+
pd.session.cte_optimization_enabled = True
1527+
15171528
inplace = validate_bool_kwarg(inplace, "inplace")
15181529
ignore_index = kwargs.get("ignore_index", False)
15191530
subset = kwargs.get("subset", None)

tests/integ/modin/frame/test_drop_duplicates.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import pandas as native_pd
77
import pytest
88

9+
from snowflake.snowpark._internal.utils import TempObjectType
910
import snowflake.snowpark.modin.plugin # noqa: F401
1011
from tests.integ.modin.utils import assert_frame_equal
1112
from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker
13+
from tests.utils import Utils
1214

1315

1416
@pytest.mark.parametrize("subset", ["a", ["a"], ["a", "B"], []])
@@ -79,6 +81,29 @@ def test_drop_duplicates(subset, keep, ignore_index):
7981
)
8082

8183

84+
def test_drop_duplicates_after_read_snowflake(session):
85+
pandas_df = native_pd.DataFrame(
86+
{"A": [0, 1, 1, 2, 0], "B": ["a", "b", "c", "b", "a"]}
87+
)
88+
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
89+
session.create_dataframe(pandas_df).write.save_as_table(
90+
table_name, table_type="temp"
91+
)
92+
# Simulate random order each time read_snowflake is run.
93+
snow_df = pd.read_snowflake(
94+
f"select A, B from (select random() as r, A, B from {table_name}) order by r"
95+
)
96+
query_count = 1
97+
join_count = 2
98+
with SqlCounter(query_count=query_count, join_count=join_count):
99+
assert_frame_equal(
100+
snow_df.drop_duplicates(keep="first"),
101+
pandas_df.drop_duplicates(keep="first"),
102+
check_dtype=False,
103+
check_index_type=False,
104+
)
105+
106+
82107
@pytest.mark.parametrize("subset", ["a", ["a"], ["b"], ["a", "b"]])
83108
@pytest.mark.parametrize("keep", ["first", "last", False])
84109
@sql_count_checker(query_count=1, join_count=2)

0 commit comments

Comments
 (0)