Skip to content

Commit 48bbc4b

Browse files
committed
Add regression tests for chained appendpipe (double and triple)
Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent ce997d9 commit 48bbc4b

2 files changed

Lines changed: 209 additions & 0 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.opensearch.sql.util.MatcherUtils.rows;
1111
import static org.opensearch.sql.util.MatcherUtils.schema;
1212
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
13+
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
1314
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;
1415

1516
import java.io.IOException;
@@ -87,4 +88,64 @@ public void testAppendpipeWithConflictTypeColumn() throws IOException {
8788
TEST_INDEX_ACCOUNT)));
8889
assertTrue(exception.getMessage().contains("due to incompatible types"));
8990
}
91+
92+
/** Regression test: double appendpipe with different aggregations (issue #5173). */
93+
@Test
94+
public void testDoubleAppendPipe() throws IOException {
95+
// stats by gender gives 2 rows (F, M), then two appendpipe aggregations add 1 row each
96+
JSONObject actual =
97+
executeQuery(
98+
String.format(
99+
Locale.ROOT,
100+
"source=%s | stats sum(age) as sum_age by gender"
101+
+ " | appendpipe [ stats avg(sum_age) as avg_sum_age ]"
102+
+ " | appendpipe [ stats max(sum_age) as max_sum_age ]",
103+
TEST_INDEX_ACCOUNT));
104+
// 2 original + 1 avg + 1 max = 4 rows
105+
verifyNumOfRows(actual, 4);
106+
verifySchemaInOrder(
107+
actual,
108+
schema("sum_age", "bigint"),
109+
schema("gender", "string"),
110+
schema("avg_sum_age", "double"),
111+
schema("max_sum_age", "bigint"));
112+
}
113+
114+
/** Regression test: triple appendpipe with different aggregations (issue #5173). */
115+
@Test
116+
public void testTripleAppendPipe() throws IOException {
117+
JSONObject actual =
118+
executeQuery(
119+
String.format(
120+
Locale.ROOT,
121+
"source=%s | stats sum(age) as sum_age by gender"
122+
+ " | appendpipe [ stats avg(sum_age) as avg_sum_age ]"
123+
+ " | appendpipe [ stats max(sum_age) as max_sum_age ]"
124+
+ " | appendpipe [ stats min(sum_age) as min_sum_age ]",
125+
TEST_INDEX_ACCOUNT));
126+
// 2 original + 1 avg + 1 max + 1 min = 5 rows
127+
verifyNumOfRows(actual, 5);
128+
verifySchemaInOrder(
129+
actual,
130+
schema("sum_age", "bigint"),
131+
schema("gender", "string"),
132+
schema("avg_sum_age", "double"),
133+
schema("max_sum_age", "bigint"),
134+
schema("min_sum_age", "bigint"));
135+
}
136+
137+
/** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */
138+
@Test
139+
public void testDoubleAppendPipeWithFilter() throws IOException {
140+
JSONObject actual =
141+
executeQuery(
142+
String.format(
143+
Locale.ROOT,
144+
"source=%s | stats sum(age) as sum_age by gender"
145+
+ " | appendpipe [ where gender = 'F' ]"
146+
+ " | appendpipe [ where gender = 'M' ]",
147+
TEST_INDEX_ACCOUNT));
148+
// 2 original + 1 (F filter) + 1 (M filter from cumulative 3 rows) = 4 rows
149+
verifyNumOfRows(actual, 4);
150+
}
90151
}

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,152 @@ public void testAppendPipeWithMergedColumns() {
5959
+ "FROM `scott`.`EMP`";
6060
verifyPPLToSparkSQL(root, expectedSparkSql);
6161
}
62+
63+
/**
64+
* Regression test: double appendpipe with different aggregations. Result count (16 = 14 + 1 avg +
65+
* 1 max) is verified in integration tests only because RelRunners.run() creates a new planner
66+
* that conflicts with shared RelNode subtrees — a test framework limitation that does not affect
67+
* the production path.
68+
*/
69+
@Test
70+
public void testDoubleAppendPipe() {
71+
String ppl =
72+
"source=EMP | appendpipe [stats avg(SAL) as avg_sal] | appendpipe [stats max(SAL) as"
73+
+ " max_sal]";
74+
RelNode root = getRelNode(ppl);
75+
String expectedLogical =
76+
"LogicalUnion(all=[true])\n"
77+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
78+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8], max_sal=[null:DECIMAL(7, 2)])\n"
79+
+ " LogicalUnion(all=[true])\n"
80+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
81+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[null:DECIMAL(11, 6)])\n"
82+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
83+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
84+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
85+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
86+
+ " avg_sal=[$0])\n"
87+
+ " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n"
88+
+ " LogicalProject(SAL=[$5])\n"
89+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
90+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
91+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
92+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
93+
+ " avg_sal=[null:DECIMAL(11, 6)], max_sal=[$0])\n"
94+
+ " LogicalAggregate(group=[{}], max_sal=[MAX($0)])\n"
95+
+ " LogicalProject(SAL=[$5])\n"
96+
+ " LogicalUnion(all=[true])\n"
97+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3],"
98+
+ " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7],"
99+
+ " avg_sal=[null:DECIMAL(11, 6)])\n"
100+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
101+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
102+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
103+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
104+
+ " avg_sal=[$0])\n"
105+
+ " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n"
106+
+ " LogicalProject(SAL=[$5])\n"
107+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
108+
verifyLogical(root, expectedLogical);
109+
}
110+
111+
/**
112+
* Regression test: triple appendpipe with different aggregations. Result count (17 = 14 + 1 avg +
113+
* 1 max + 1 min) is verified in integration tests only — see testDoubleAppendPipe for rationale.
114+
*/
115+
@Test
116+
public void testTripleAppendPipe() {
117+
String ppl =
118+
"source=EMP | appendpipe [stats avg(SAL) as avg_sal] | appendpipe [stats max(SAL) as"
119+
+ " max_sal] | appendpipe [stats min(SAL) as min_sal]";
120+
RelNode root = getRelNode(ppl);
121+
String expectedLogical =
122+
"LogicalUnion(all=[true])\n"
123+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
124+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8], max_sal=[$9],"
125+
+ " min_sal=[null:DECIMAL(7, 2)])\n"
126+
+ " LogicalUnion(all=[true])\n"
127+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
128+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8],"
129+
+ " max_sal=[null:DECIMAL(7, 2)])\n"
130+
+ " LogicalUnion(all=[true])\n"
131+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3],"
132+
+ " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7],"
133+
+ " avg_sal=[null:DECIMAL(11, 6)])\n"
134+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
135+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
136+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
137+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
138+
+ " avg_sal=[$0])\n"
139+
+ " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n"
140+
+ " LogicalProject(SAL=[$5])\n"
141+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
142+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
143+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
144+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
145+
+ " avg_sal=[null:DECIMAL(11, 6)], max_sal=[$0])\n"
146+
+ " LogicalAggregate(group=[{}], max_sal=[MAX($0)])\n"
147+
+ " LogicalProject(SAL=[$5])\n"
148+
+ " LogicalUnion(all=[true])\n"
149+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3],"
150+
+ " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7],"
151+
+ " avg_sal=[null:DECIMAL(11, 6)])\n"
152+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
153+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
154+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
155+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
156+
+ " avg_sal=[$0])\n"
157+
+ " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n"
158+
+ " LogicalProject(SAL=[$5])\n"
159+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
160+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
161+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
162+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
163+
+ " avg_sal=[null:DECIMAL(11, 6)], max_sal=[null:DECIMAL(7, 2)], min_sal=[$0])\n"
164+
+ " LogicalAggregate(group=[{}], min_sal=[MIN($0)])\n"
165+
+ " LogicalProject(SAL=[$5])\n"
166+
+ " LogicalUnion(all=[true])\n"
167+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3],"
168+
+ " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8],"
169+
+ " max_sal=[null:DECIMAL(7, 2)])\n"
170+
+ " LogicalUnion(all=[true])\n"
171+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3],"
172+
+ " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7],"
173+
+ " avg_sal=[null:DECIMAL(11, 6)])\n"
174+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
175+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
176+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
177+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
178+
+ " avg_sal=[$0])\n"
179+
+ " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n"
180+
+ " LogicalProject(SAL=[$5])\n"
181+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
182+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
183+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
184+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
185+
+ " avg_sal=[null:DECIMAL(11, 6)], max_sal=[$0])\n"
186+
+ " LogicalAggregate(group=[{}], max_sal=[MAX($0)])\n"
187+
+ " LogicalProject(SAL=[$5])\n"
188+
+ " LogicalUnion(all=[true])\n"
189+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3],"
190+
+ " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7],"
191+
+ " avg_sal=[null:DECIMAL(11, 6)])\n"
192+
+ " LogicalTableScan(table=[[scott, EMP]])\n"
193+
+ " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)],"
194+
+ " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE],"
195+
+ " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT],"
196+
+ " avg_sal=[$0])\n"
197+
+ " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n"
198+
+ " LogicalProject(SAL=[$5])\n"
199+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
200+
verifyLogical(root, expectedLogical);
201+
}
202+
203+
/** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */
204+
@Test
205+
public void testDoubleAppendPipeWithFilter() {
206+
String ppl = "source=EMP | appendpipe [where DEPTNO = 20] | appendpipe [where DEPTNO = 30]";
207+
RelNode root = getRelNode(ppl);
208+
verifyResultCount(root, 25); // 14 original + 5 (dept 20) + 6 (dept 30)
209+
}
62210
}

0 commit comments

Comments
 (0)