-
Notifications
You must be signed in to change notification settings - Fork 150
SNOW-3485471: cte dedup for self-join case #4245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
bf8dbd0
poc fix
sfc-gh-aling dd89363
fix
sfc-gh-aling 556a19e
Merge remote-tracking branch 'origin/main' into aling-poc-cte-optimiz…
sfc-gh-aling 1580af4
update changelog
sfc-gh-aling 79d095b
merge
sfc-gh-aling a726afe
PR comment
sfc-gh-aling 16dbb7e
merge
sfc-gh-aling 4b15904
update
sfc-gh-aling 5172f7f
Merge remote-tracking branch 'origin/main' into aling-poc-cte-optimiz…
sfc-gh-aling c29f910
minor changelog update
sfc-gh-aling File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| # Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. | ||
| # | ||
|
|
||
| import copy | ||
| import re | ||
| import tracemalloc | ||
| from contextlib import contextmanager | ||
|
|
@@ -30,6 +31,8 @@ | |
| uuid_string, | ||
| when_matched, | ||
| to_timestamp, | ||
| stddev_samp, | ||
| when, | ||
| ) | ||
| from snowflake.snowpark.types import ( | ||
| StructType, | ||
|
|
@@ -699,9 +702,10 @@ def test_cte_preserves_join_suffix_aliases(session, use_different_df): | |
| # the second one is incorrect join condition as we have rsuffix for join alias | ||
| assert 'ON ("AD_GROUP_ID_WITH_AD_GROUP" = "AD_GROUP_ID")' in union_sql | ||
| assert 'ON ("AD_GROUP_ID" = "AD_GROUP_ID")' not in union_sql | ||
| # when using different df_ad_group with disambiguation, because rsuffix in join, | ||
| # they have different alias map (expr_to_alias), so they are considered different and we can't convert them to a CTE | ||
| # However there is still a CTE for create_dataframe call | ||
| # Both cases produce 1 CTE: the disambiguated rhs_remapped wrapper nodes hash | ||
| # identically (same SQL + same alias values, different UUID keys), so they're | ||
| # merged into a single CTE via the expr_to_alias merge fix. The raw VALUES | ||
| # table is absorbed inline into that CTE body rather than becoming its own CTE. | ||
| assert count_number_of_ctes(Utils.normalize_sql(union_sql)) == 1 | ||
|
|
||
|
|
||
|
|
@@ -874,12 +878,15 @@ def test_sql_simplifier(session): | |
| join_count=2, | ||
| ) | ||
| with SqlCounter(query_count=0, describe_count=0): | ||
| # When adding a lsuffix, expr alias map will be updated, so df2 and df3 are considered | ||
| # different and have different ids. So only df1 and df will be converted to a CTE | ||
| # With value-sort hashing, df1/df2/df3 now hash identically (same SQL + | ||
| # same alias values, different UUID keys). df2 and df3 are replaced with a | ||
| # shared CTE, but df1's left-join position remains inline. That gives 2 | ||
| # CTEs (base VALUES + filtered df1) and the filter appears twice (once in | ||
| # the CTE body, once inline for the left-join position). | ||
| assert ( | ||
| count_number_of_ctes(Utils.normalize_sql(df6.queries["queries"][-1])) == 1 | ||
| count_number_of_ctes(Utils.normalize_sql(df6.queries["queries"][-1])) == 2 | ||
| ) | ||
| assert Utils.normalize_sql(df6.queries["queries"][-1]).count(filter_clause) == 3 | ||
| assert Utils.normalize_sql(df6.queries["queries"][-1]).count(filter_clause) == 2 | ||
|
Comment on lines
-877
to
+889
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for clarity, more CTE is better for this case, see the below comparison between the generated sql: Before (1 CTE, filter appears 3 times): After (2 CTEs, filter appears 2 times): |
||
|
|
||
| df7 = df1.with_column("c", lit(1)) | ||
| df8 = df1.with_column("c", lit(1)).with_column("d", lit(1)) | ||
|
|
@@ -1949,3 +1956,113 @@ def test_uniform_cte_optimization_depends_on_gen(session, use_bare_random, expec | |
|
|
||
| vals = [row["VAL"] for row in result_df.collect()] | ||
| assert (vals[:5] == vals[5:]) == expect_cte | ||
|
|
||
|
|
||
| def test_cte_tpcds_q39_style_self_join_deduplication(session): | ||
| """TPCDS_Q39-style self-join: filtered aggregation df aliased twice and self-joined. | ||
|
|
||
| Verifies that the shared `inv` computation (group-by + agg + cov filter) is | ||
| pushed into a single CTE rather than being inlined once per alias branch. | ||
| The CTE body should contain stddev_samp/avg exactly once; the outer query | ||
| references it twice (once for inv1, once for inv2). | ||
| """ | ||
| if not session._sql_simplifier_enabled: | ||
| pytest.skip("SQL simplifier is not enabled") | ||
|
|
||
| # Synthetic data shaped like the Q39 inventory result after the inner join: | ||
| # (item_sk, warehouse_sk, month, quantity). High-variance values so that | ||
| # BOTH months pass cov > 1 for each item/warehouse pair, making an incorrect | ||
| # cross-join (4 rows) detectable vs. the correct equi-join (2 rows). | ||
| raw = session.create_dataframe( | ||
| [ | ||
| (10, 1, 1, 10), | ||
| ( | ||
| 10, | ||
| 1, | ||
| 1, | ||
| 390, | ||
| ), # item 10, wh 1, month 1: mean=200, stdev≈268.7, cov≈1.34 > 1 | ||
| (10, 1, 2, 20), | ||
| ( | ||
| 10, | ||
| 1, | ||
| 2, | ||
| 380, | ||
| ), # item 10, wh 1, month 2: mean=200, stdev≈254.6, cov≈1.27 > 1 | ||
| (20, 2, 1, 5), | ||
| ( | ||
| 20, | ||
| 2, | ||
| 1, | ||
| 395, | ||
| ), # item 20, wh 2, month 1: mean=200, stdev≈275.8, cov≈1.38 > 1 | ||
| (20, 2, 2, 30), | ||
| ( | ||
| 20, | ||
| 2, | ||
| 2, | ||
| 370, | ||
| ), # item 20, wh 2, month 2: mean=200, stdev≈240.4, cov≈1.20 > 1 | ||
| ], | ||
| schema=["i_item_sk", "w_warehouse_sk", "d_moy", "qty"], | ||
| ) | ||
|
|
||
| # Mirrors Q39's inner "foo" aggregation subquery. | ||
| agg = raw.group_by("i_item_sk", "w_warehouse_sk", "d_moy").agg( | ||
| stddev_samp("qty").alias("stdev"), | ||
| avg("qty").cast("double").alias("mean"), | ||
| ) | ||
|
|
||
| # Mirrors Q39's outer "inv" CTE: compute cov and filter on cov > 1. | ||
| # All four (item, warehouse, month) combinations pass cov > 1. | ||
| inv = agg.with_column( | ||
| "cov", | ||
| when(col("mean") == 0, lit(None)).otherwise(col("stdev") / col("mean")), | ||
| ).filter(when(col("mean") == 0, lit(0)).otherwise(col("stdev") / col("mean")) > 1) | ||
|
|
||
| inv_r = copy.copy(inv) | ||
| result = ( | ||
| inv.join(inv_r, on=["i_item_sk", "w_warehouse_sk"], rsuffix="_r") | ||
| .filter(col("d_moy") == 1) | ||
| .filter(col("d_moy_r") == 2) | ||
| ) | ||
|
|
||
| sql = result.queries["queries"][-1] | ||
| normalized = Utils.normalize_sql(sql) | ||
|
|
||
| with SqlCounter(query_count=0, describe_count=0): | ||
| # The shared `inv` computation should be deduplicated into exactly one CTE. | ||
| assert count_number_of_ctes(normalized) == 1 | ||
|
|
||
| # The CTE should appear at least 3 times: once in the WITH definition | ||
| # and at least twice in the body (one per alias branch). | ||
| cte_name_match = re.search(r"WITH\s+(\w+)\s+AS", normalized) | ||
| assert cte_name_match is not None, "expected a WITH CTE in the generated SQL" | ||
| cte_name = cte_name_match.group(1) | ||
| assert ( | ||
| normalized.count(cte_name) >= 3 | ||
| ), f"CTE '{cte_name}' should appear in the definition and both join branches" | ||
|
|
||
| # The aggregation (stddev_samp / GROUP BY) must appear exactly once — | ||
| # inside the CTE body. Two occurrences would mean `inv` is inlined | ||
| # separately for each alias branch instead of being shared. | ||
| assert ( | ||
| normalized.lower().count("stddev_samp") == 1 | ||
| ), "stddev_samp should appear once (in the CTE), not once per alias branch" | ||
| assert ( | ||
| normalized.upper().count("GROUP BY") == 1 | ||
| ), "GROUP BY should appear once (in the CTE), not once per alias branch" | ||
|
|
||
| # Correctness: the CTE-optimized result must match the non-optimized result. | ||
| # Correct equi-join on (i_item_sk, w_warehouse_sk) produces 2 rows (item 10 | ||
| # and item 20, each pairing their month-1 and month-2 stats). A wrong | ||
| # cross-join would produce 4 rows, so this check_result is meaningful. | ||
| check_result( | ||
| session, | ||
| result, | ||
| expect_cte_optimized=True, | ||
| query_count=1, | ||
| describe_count=0, | ||
| union_count=0, | ||
| join_count=1, | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what kind of cases will this produce same hash-keys. When it is wrong, what kind of risk are we dealing with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(hash-keys for expr_to_alias are generated from
uuid, it can rarely happen that two uuid collide)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any chance of duplicate value entries here that would silently get swallowed by the
sorted(set(...))call?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, duplicate alias can happen but consolidating them is harmless in our case, it's not reflected in the newly-generated CTE optimized node, and the generated sql in the hash is enough for dup node detection in the self-join case.
the
expr_to_aliasis mostly for internal column name resolution in the downstream compilation stage.theoretically I think this info shall be excluded from the hash computation of a node, but right now I keep it as a defensive layer to distinguish nodes.