Skip to content

Commit 094e05e

Browse files
committed
fix(core): make appendcol row ordering deterministic on parallel engines
appendcol lowers to a FULL JOIN of two ROW_NUMBER() OVER () windows (empty PARTITION BY / ORDER BY) on _row_number_main_ = _row_number_subsearch_, with no trailing sort. That positional zip is only correct on a serial, order-preserving executor: a bare ROW_NUMBER() OVER () assigns sequence numbers in input order and the join preserves it. On a parallel/distributed backend the row-number assignment is arbitrary and the hash join drops ordering, so columns get zipped onto the wrong rows and downstream `head` slices a non-deterministic subset. Fix visitAppendCol to not depend on implicit input-order preservation: - derive an explicit window ORDER BY from each child's collation (deriveCollationOrderKeys), so ROW_NUMBER assignment follows the upstream sort; falls back to the prior bare OVER () when the input has no collation (positional correspondence is undefined without a sort). - add a trailing sort by the row-number columns after the join (NULLS LAST, same pattern as streamstats) so output order is deterministic regardless of how the backend executes the join. No behavior change on the serial v2/Calcite engine; makes the lowering correct on parallel backends. Updates CalcitePPLAppendcolTest expected plans/SparkSQL. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent bcbbc59 commit 094e05e

2 files changed

Lines changed: 129 additions & 57 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.calcite.plan.RelOptTable;
5151
import org.apache.calcite.plan.ViewExpanders;
5252
import org.apache.calcite.rel.RelCollation;
53+
import org.apache.calcite.rel.RelFieldCollation;
5354
import org.apache.calcite.rel.RelHomogeneousShuttle;
5455
import org.apache.calcite.rel.RelNode;
5556
import org.apache.calcite.rel.core.Aggregate;
@@ -2769,19 +2770,53 @@ public RelNode visitFillNull(FillNull node, CalcitePlanContext context) {
27692770
return context.relBuilder.peek();
27702771
}
27712772

2773+
/**
2774+
* Derives window {@code ORDER BY} keys from the collation of the current top RelNode, so a {@code
2775+
* ROW_NUMBER()} window assigns sequence numbers in the same order the upstream pipeline produced
2776+
* rows. Returns an empty list when the input carries no collation (e.g. no preceding sort);
2777+
* appendcol's positional correspondence is itself unspecified without an upstream sort, so
2778+
* leaving {@code ROW_NUMBER() OVER ()} unordered in that case is acceptable.
2779+
*/
2780+
private static List<RexNode> deriveCollationOrderKeys(CalcitePlanContext context) {
2781+
RelBuilder relBuilder = context.relBuilder;
2782+
List<RelCollation> collations =
2783+
relBuilder.getCluster().getMetadataQuery().collations(relBuilder.peek());
2784+
if (collations == null || collations.isEmpty()) {
2785+
return List.of();
2786+
}
2787+
List<RexNode> orderKeys = new ArrayList<>();
2788+
for (RelFieldCollation fieldCollation : collations.get(0).getFieldCollations()) {
2789+
RexNode key = relBuilder.field(fieldCollation.getFieldIndex());
2790+
if (fieldCollation.direction.isDescending()) {
2791+
key = relBuilder.desc(key);
2792+
}
2793+
if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
2794+
key = relBuilder.nullsLast(key);
2795+
} else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
2796+
key = relBuilder.nullsFirst(key);
2797+
}
2798+
orderKeys.add(key);
2799+
}
2800+
return orderKeys;
2801+
}
2802+
27722803
@Override
27732804
public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
27742805
// 1. resolve main plan
27752806
visitChildren(node, context);
2776-
// 2. add row_number() column to main
2807+
// 2. add row_number() column to main, ordered by the main pipeline's existing collation. A bare
2808+
// ROW_NUMBER() OVER () is only assigned in input order on a serial executor; parallel
2809+
// backends (e.g. the analytics engine) assign it arbitrarily, which would mismatch the
2810+
// positional zip below. Threading the upstream sort into the window keeps it deterministic.
2811+
List<RexNode> mainOrderKeys = deriveCollationOrderKeys(context);
27772812
RexNode mainRowNumber =
27782813
PlanUtils.makeOver(
27792814
context,
27802815
BuiltinFunctionName.ROW_NUMBER,
27812816
null,
27822817
List.of(),
27832818
List.of(),
2784-
List.of(),
2819+
mainOrderKeys,
27852820
WindowFrame.toCurrentRow());
27862821
context.relBuilder.projectPlus(
27872822
context.relBuilder.alias(mainRowNumber, ROW_NUMBER_COLUMN_FOR_MAIN));
@@ -2791,15 +2826,17 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
27912826
transformPlanToAttachChild(node.getSubSearch(), relation);
27922827
// 4. resolve subsearch plan
27932828
node.getSubSearch().accept(this, context);
2794-
// 5. add row_number() column to subsearch
2829+
// 5. add row_number() column to subsearch, ordered by the subsearch's existing collation (same
2830+
// determinism rationale as the main side above).
2831+
List<RexNode> subsearchOrderKeys = deriveCollationOrderKeys(context);
27952832
RexNode subsearchRowNumber =
27962833
PlanUtils.makeOver(
27972834
context,
27982835
BuiltinFunctionName.ROW_NUMBER,
27992836
null,
28002837
List.of(),
28012838
List.of(),
2802-
List.of(),
2839+
subsearchOrderKeys,
28032840
WindowFrame.toCurrentRow());
28042841
context.relBuilder.projectPlus(
28052842
context.relBuilder.alias(subsearchRowNumber, ROW_NUMBER_COLUMN_FOR_SUBSEARCH));
@@ -2821,6 +2858,15 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
28212858
context.relBuilder.join(
28222859
JoinAndLookupUtils.translateJoinType(Join.JoinType.FULL), joinCondition);
28232860

2861+
// The FULL JOIN does not guarantee output order, and downstream commands (e.g. `head`) rely on
2862+
// appendcol preserving the main search's row order. Sort by the main row number so the result
2863+
// order is deterministic regardless of how the backend executes the join. Extra subsearch-only
2864+
// rows have a null main row number and sort last (ordered among themselves by subsearch row
2865+
// number). This runs before the row-number columns are projected away below.
2866+
context.relBuilder.sort(
2867+
context.relBuilder.nullsLast(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_MAIN)),
2868+
context.relBuilder.nullsLast(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_SUBSEARCH)));
2869+
28242870
if (!node.isOverride()) {
28252871
// 8. if override = false, drop both _row_number_ columns
28262872
context.relBuilder.projectExcept(

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendcolTest.java

Lines changed: 79 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,32 @@ public void testAppendcol() {
2222
String expectedLogical =
2323
"LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5],"
2424
+ " COMM=[$6], DEPTNO=[$7])\n"
25-
+ " LogicalJoin(condition=[=($8, $9)], joinType=[full])\n"
26-
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
27-
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], _row_number_main_=[ROW_NUMBER() OVER ()])\n"
28-
+ " LogicalTableScan(table=[[scott, EMP]])\n"
29-
+ " LogicalProject(_row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
30-
+ " LogicalFilter(condition=[=($7, 20)])\n"
31-
+ " LogicalTableScan(table=[[scott, EMP]])\n";
25+
+ " LogicalSort(sort0=[$8], sort1=[$9], dir0=[ASC], dir1=[ASC])\n"
26+
+ " LogicalJoin(condition=[=($8, $9)], joinType=[full])\n"
27+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
28+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], _row_number_main_=[ROW_NUMBER() OVER (ORDER BY $0"
29+
+ " NULLS LAST)])\n"
30+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
31+
+ " LogicalProject(_row_number_subsearch_=[ROW_NUMBER() OVER (ORDER BY $0 NULLS"
32+
+ " LAST)])\n"
33+
+ " LogicalFilter(condition=[=($7, 20)])\n"
34+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
3235
verifyLogical(root, expectedLogical);
3336
verifyResultCount(root, 14);
3437

3538
String expectedSparkSql =
3639
"SELECT `t`.`EMPNO`, `t`.`ENAME`, `t`.`JOB`, `t`.`MGR`, `t`.`HIREDATE`, `t`.`SAL`,"
3740
+ " `t`.`COMM`, `t`.`DEPTNO`\n"
3841
+ "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`,"
39-
+ " ROW_NUMBER() OVER () `_row_number_main_`\n"
42+
+ " ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST) `_row_number_main_`\n"
4043
+ "FROM `scott`.`EMP`) `t`\n"
41-
+ "FULL JOIN (SELECT ROW_NUMBER() OVER () `_row_number_subsearch_`\n"
44+
+ "FULL JOIN (SELECT ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST)"
45+
+ " `_row_number_subsearch_`\n"
4246
+ "FROM `scott`.`EMP`\n"
4347
+ "WHERE `DEPTNO` = 20) `t1` ON `t`.`_row_number_main_` ="
44-
+ " `t1`.`_row_number_subsearch_`";
48+
+ " `t1`.`_row_number_subsearch_`\n"
49+
+ "ORDER BY `t`.`_row_number_main_` NULLS LAST, `t1`.`_row_number_subsearch_` NULLS"
50+
+ " LAST";
4551
verifyPPLToSparkSQL(root, expectedSparkSql);
4652
}
4753

@@ -54,31 +60,37 @@ public void testAppendcol2() {
5460
String expectedLogical =
5561
"LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5],"
5662
+ " COMM=[$6], DEPTNO=[$7], left_col=[$8], right_col=[$10])\n"
57-
+ " LogicalJoin(condition=[=($9, $11)], joinType=[full])\n"
58-
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
63+
+ " LogicalSort(sort0=[$9], sort1=[$11], dir0=[ASC], dir1=[ASC])\n"
64+
+ " LogicalJoin(condition=[=($9, $11)], joinType=[full])\n"
65+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
5966
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], left_col=[$7], _row_number_main_=[ROW_NUMBER()"
60-
+ " OVER ()])\n"
61-
+ " LogicalTableScan(table=[[scott, EMP]])\n"
62-
+ " LogicalProject(right_col=[$8], _row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
63-
+ " LogicalFilter(condition=[=($7, 20)])\n"
64-
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
67+
+ " OVER (ORDER BY $0 NULLS LAST)])\n"
68+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
69+
+ " LogicalProject(right_col=[$8], _row_number_subsearch_=[ROW_NUMBER() OVER"
70+
+ " (ORDER BY $0 NULLS LAST)])\n"
71+
+ " LogicalFilter(condition=[=($7, 20)])\n"
72+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
6573
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], right_col=[$7])\n"
66-
+ " LogicalTableScan(table=[[scott, EMP]])\n";
74+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
6775
verifyLogical(root, expectedLogical);
6876
verifyResultCount(root, 14);
6977

7078
String expectedSparkSql =
7179
"SELECT `t`.`EMPNO`, `t`.`ENAME`, `t`.`JOB`, `t`.`MGR`, `t`.`HIREDATE`, `t`.`SAL`,"
7280
+ " `t`.`COMM`, `t`.`DEPTNO`, `t`.`left_col`, `t2`.`right_col`\n"
7381
+ "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`,"
74-
+ " `DEPTNO` `left_col`, ROW_NUMBER() OVER () `_row_number_main_`\n"
82+
+ " `DEPTNO` `left_col`, ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST)"
83+
+ " `_row_number_main_`\n"
7584
+ "FROM `scott`.`EMP`) `t`\n"
76-
+ "FULL JOIN (SELECT `right_col`, ROW_NUMBER() OVER () `_row_number_subsearch_`\n"
85+
+ "FULL JOIN (SELECT `right_col`, ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST)"
86+
+ " `_row_number_subsearch_`\n"
7787
+ "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`,"
7888
+ " `DEPTNO` `right_col`\n"
7989
+ "FROM `scott`.`EMP`) `t0`\n"
8090
+ "WHERE `DEPTNO` = 20) `t2` ON `t`.`_row_number_main_` ="
81-
+ " `t2`.`_row_number_subsearch_`";
91+
+ " `t2`.`_row_number_subsearch_`\n"
92+
+ "ORDER BY `t`.`_row_number_main_` NULLS LAST, `t2`.`_row_number_subsearch_` NULLS"
93+
+ " LAST";
8294
verifyPPLToSparkSQL(root, expectedSparkSql);
8395
}
8496

@@ -91,14 +103,17 @@ public void testAppendcolOverride() {
91103
+ " JOB=[CASE(=($8, $17), $11, $2)], MGR=[CASE(=($8, $17), $12, $3)],"
92104
+ " HIREDATE=[CASE(=($8, $17), $13, $4)], SAL=[CASE(=($8, $17), $14, $5)],"
93105
+ " COMM=[CASE(=($8, $17), $15, $6)], DEPTNO=[CASE(=($8, $17), $16, $7)])\n"
94-
+ " LogicalJoin(condition=[=($8, $17)], joinType=[full])\n"
95-
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
96-
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], _row_number_main_=[ROW_NUMBER() OVER ()])\n"
97-
+ " LogicalTableScan(table=[[scott, EMP]])\n"
98-
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
99-
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], _row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
100-
+ " LogicalFilter(condition=[=($7, 20)])\n"
101-
+ " LogicalTableScan(table=[[scott, EMP]])\n";
106+
+ " LogicalSort(sort0=[$8], sort1=[$17], dir0=[ASC], dir1=[ASC])\n"
107+
+ " LogicalJoin(condition=[=($8, $17)], joinType=[full])\n"
108+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
109+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], _row_number_main_=[ROW_NUMBER() OVER (ORDER BY $0"
110+
+ " NULLS LAST)])\n"
111+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
112+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
113+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], _row_number_subsearch_=[ROW_NUMBER() OVER (ORDER"
114+
+ " BY $0 NULLS LAST)])\n"
115+
+ " LogicalFilter(condition=[=($7, 20)])\n"
116+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
102117
verifyLogical(root, expectedLogical);
103118
verifyResultCount(root, 14);
104119

@@ -116,13 +131,16 @@ public void testAppendcolOverride() {
116131
+ " `t`.`COMM` END `COMM`, CASE WHEN `t`.`_row_number_main_` ="
117132
+ " `t1`.`_row_number_subsearch_` THEN `t1`.`DEPTNO` ELSE `t`.`DEPTNO` END `DEPTNO`\n"
118133
+ "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`,"
119-
+ " ROW_NUMBER() OVER () `_row_number_main_`\n"
134+
+ " ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST) `_row_number_main_`\n"
120135
+ "FROM `scott`.`EMP`) `t`\n"
121136
+ "FULL JOIN (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`,"
122-
+ " `DEPTNO`, ROW_NUMBER() OVER () `_row_number_subsearch_`\n"
137+
+ " `DEPTNO`, ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST)"
138+
+ " `_row_number_subsearch_`\n"
123139
+ "FROM `scott`.`EMP`\n"
124140
+ "WHERE `DEPTNO` = 20) `t1` ON `t`.`_row_number_main_` ="
125-
+ " `t1`.`_row_number_subsearch_`";
141+
+ " `t1`.`_row_number_subsearch_`\n"
142+
+ "ORDER BY `t`.`_row_number_main_` NULLS LAST, `t1`.`_row_number_subsearch_` NULLS"
143+
+ " LAST";
126144
verifyPPLToSparkSQL(root, expectedSparkSql);
127145
}
128146

@@ -132,16 +150,17 @@ public void testAppendcolStats() {
132150
RelNode root = getRelNode(ppl);
133151
String expectedLogical =
134152
"LogicalProject(count()=[$0], DEPTNO=[$1], avg(SAL)=[$3])\n"
135-
+ " LogicalJoin(condition=[=($2, $4)], joinType=[full])\n"
136-
+ " LogicalProject(count()=[$1], DEPTNO=[$0], _row_number_main_=[ROW_NUMBER() OVER"
153+
+ " LogicalSort(sort0=[$2], sort1=[$4], dir0=[ASC], dir1=[ASC])\n"
154+
+ " LogicalJoin(condition=[=($2, $4)], joinType=[full])\n"
155+
+ " LogicalProject(count()=[$1], DEPTNO=[$0], _row_number_main_=[ROW_NUMBER() OVER"
137156
+ " ()])\n"
138-
+ " LogicalAggregate(group=[{0}], count()=[COUNT()])\n"
139-
+ " LogicalProject(DEPTNO=[$7])\n"
140-
+ " LogicalTableScan(table=[[scott, EMP]])\n"
141-
+ " LogicalProject(avg(SAL)=[$1], _row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
142-
+ " LogicalAggregate(group=[{0}], avg(SAL)=[AVG($1)])\n"
143-
+ " LogicalProject(DEPTNO=[$7], SAL=[$5])\n"
144-
+ " LogicalTableScan(table=[[scott, EMP]])\n";
157+
+ " LogicalAggregate(group=[{0}], count()=[COUNT()])\n"
158+
+ " LogicalProject(DEPTNO=[$7])\n"
159+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
160+
+ " LogicalProject(avg(SAL)=[$1], _row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
161+
+ " LogicalAggregate(group=[{0}], avg(SAL)=[AVG($1)])\n"
162+
+ " LogicalProject(DEPTNO=[$7], SAL=[$5])\n"
163+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
145164
verifyLogical(root, expectedLogical);
146165
String expectedResult =
147166
""
@@ -159,7 +178,10 @@ public void testAppendcolStats() {
159178
+ "FULL JOIN (SELECT AVG(`SAL`) `avg(SAL)`, ROW_NUMBER() OVER ()"
160179
+ " `_row_number_subsearch_`\n"
161180
+ "FROM `scott`.`EMP`\n"
162-
+ "GROUP BY `DEPTNO`) `t4` ON `t1`.`_row_number_main_` = `t4`.`_row_number_subsearch_`";
181+
+ "GROUP BY `DEPTNO`) `t4` ON `t1`.`_row_number_main_` ="
182+
+ " `t4`.`_row_number_subsearch_`\n"
183+
+ "ORDER BY `t1`.`_row_number_main_` NULLS LAST, `t4`.`_row_number_subsearch_` NULLS"
184+
+ " LAST";
163185
verifyPPLToSparkSQL(root, expectedSparkSql);
164186
}
165187

@@ -171,17 +193,18 @@ public void testAppendcolStatsOverride() {
171193
RelNode root = getRelNode(ppl);
172194
String expectedLogical =
173195
"LogicalProject(count()=[$0], DEPTNO=[CASE(=($2, $5), $4, $1)], avg(SAL)=[$3])\n"
174-
+ " LogicalJoin(condition=[=($2, $5)], joinType=[full])\n"
175-
+ " LogicalProject(count()=[$1], DEPTNO=[$0], _row_number_main_=[ROW_NUMBER() OVER"
196+
+ " LogicalSort(sort0=[$2], sort1=[$5], dir0=[ASC], dir1=[ASC])\n"
197+
+ " LogicalJoin(condition=[=($2, $5)], joinType=[full])\n"
198+
+ " LogicalProject(count()=[$1], DEPTNO=[$0], _row_number_main_=[ROW_NUMBER() OVER"
176199
+ " ()])\n"
177-
+ " LogicalAggregate(group=[{0}], count()=[COUNT()])\n"
178-
+ " LogicalProject(DEPTNO=[$7])\n"
179-
+ " LogicalTableScan(table=[[scott, EMP]])\n"
180-
+ " LogicalProject(avg(SAL)=[$1], DEPTNO=[$0], _row_number_subsearch_=[ROW_NUMBER()"
181-
+ " OVER ()])\n"
182-
+ " LogicalAggregate(group=[{0}], avg(SAL)=[AVG($1)])\n"
183-
+ " LogicalProject(DEPTNO=[$7], SAL=[$5])\n"
184-
+ " LogicalTableScan(table=[[scott, EMP]])\n";
200+
+ " LogicalAggregate(group=[{0}], count()=[COUNT()])\n"
201+
+ " LogicalProject(DEPTNO=[$7])\n"
202+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
203+
+ " LogicalProject(avg(SAL)=[$1], DEPTNO=[$0],"
204+
+ " _row_number_subsearch_=[ROW_NUMBER() OVER ()])\n"
205+
+ " LogicalAggregate(group=[{0}], avg(SAL)=[AVG($1)])\n"
206+
+ " LogicalProject(DEPTNO=[$7], SAL=[$5])\n"
207+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
185208
verifyLogical(root, expectedLogical);
186209
String expectedResult =
187210
""
@@ -200,7 +223,10 @@ public void testAppendcolStatsOverride() {
200223
+ "FULL JOIN (SELECT AVG(`SAL`) `avg(SAL)`, `DEPTNO`, ROW_NUMBER() OVER ()"
201224
+ " `_row_number_subsearch_`\n"
202225
+ "FROM `scott`.`EMP`\n"
203-
+ "GROUP BY `DEPTNO`) `t4` ON `t1`.`_row_number_main_` = `t4`.`_row_number_subsearch_`";
226+
+ "GROUP BY `DEPTNO`) `t4` ON `t1`.`_row_number_main_` ="
227+
+ " `t4`.`_row_number_subsearch_`\n"
228+
+ "ORDER BY `t1`.`_row_number_main_` NULLS LAST, `t4`.`_row_number_subsearch_` NULLS"
229+
+ " LAST";
204230
verifyPPLToSparkSQL(root, expectedSparkSql);
205231
}
206232
}

0 commit comments

Comments
 (0)