99
1010import com .google .common .collect .ImmutableList ;
1111import java .util .List ;
12+ import lombok .RequiredArgsConstructor ;
13+ import org .apache .calcite .DataContext ;
1214import org .apache .calcite .config .CalciteConnectionConfig ;
15+ import org .apache .calcite .linq4j .Enumerable ;
16+ import org .apache .calcite .linq4j .Linq4j ;
1317import org .apache .calcite .plan .RelTraitDef ;
1418import org .apache .calcite .rel .RelCollations ;
1519import org .apache .calcite .rel .RelNode ;
1620import org .apache .calcite .rel .type .RelDataType ;
1721import org .apache .calcite .rel .type .RelDataTypeFactory ;
1822import org .apache .calcite .rel .type .RelProtoDataType ;
23+ import org .apache .calcite .schema .ScannableTable ;
1924import org .apache .calcite .schema .Schema ;
2025import org .apache .calcite .schema .SchemaPlus ;
2126import org .apache .calcite .schema .Statistic ;
2227import org .apache .calcite .schema .Statistics ;
23- import org .apache .calcite .schema .Table ;
2428import org .apache .calcite .sql .SqlCall ;
2529import org .apache .calcite .sql .SqlNode ;
2630import org .apache .calcite .sql .parser .SqlParser ;
3236import org .junit .Assert ;
3337import org .junit .Test ;
3438
35- /**
36- * Unit tests for {@code flatten} command in PPL.
37- */
39+ /** Unit tests for {@code flatten} command in PPL. */
3840public class CalcitePPLFlattenTest extends CalcitePPLAbstractTest {
3941 public CalcitePPLFlattenTest () {
4042 super (CalciteAssert .SchemaSpec .SCOTT_WITH_TEMPORAL );
@@ -45,7 +47,12 @@ protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpec
4547 final SchemaPlus rootSchema = Frameworks .createRootSchema (true );
4648 final SchemaPlus schema = CalciteAssert .addSchema (rootSchema , schemaSpecs );
4749 // Add an empty table with name DEPT for test purpose
48- schema .add ("DEPT" , new TableWithStruct ());
50+ ImmutableList <Object []> rows =
51+ ImmutableList .of (
52+ new Object [] {10 , ImmutableList .of (7369 , "ALLEN" ), "SMITH" , 7369 },
53+ new Object [] {20 , ImmutableList .of (7499 , "ALLEN" ), "ALLEN" , 7499 },
54+ new Object [] {30 , ImmutableList .of (7521 , "WARD" ), "WARD" , 7521 });
55+ schema .add ("DEPT" , new TableWithStruct (rows ));
4956 return Frameworks .newConfigBuilder ()
5057 .parserConfig (SqlParser .Config .DEFAULT )
5158 .defaultSchema (schema )
@@ -58,20 +65,54 @@ public void testFlatten() {
5865 String ppl = "source=DEPT | flatten EMP" ;
5966 RelNode root = getRelNode (ppl );
6067 // Regarded as an identity scan. See RelBuilder#L2801
61- String expectedLogical = "LogicalTableScan(table=[[scott, DEPT]])\n " ;
68+ String expectedLogical =
69+ "LogicalProject(DEPTNO=[$0], EMP=[$1], EMPNAME=[$2], EMPNO=[$3])\n "
70+ + " LogicalTableScan(table=[[scott, DEPT]])\n " ;
6271 verifyLogical (root , expectedLogical );
63- String expectedSparkSql = "SELECT *\n FROM `scott`.`DEPT`" ;
72+ String expectedSparkSql =
73+ "SELECT `DEPTNO`, `EMP`, `EMP.EMPNAME` `EMPNAME`, `EMP.EMPNO` `EMPNO`\n FROM `scott`.`DEPT`" ;
6474 verifyPPLToSparkSQL (root , expectedSparkSql );
75+ String expectedResult =
76+ "DEPTNO=10; EMP={7369, ALLEN}; EMPNAME=SMITH; EMPNO=7369\n "
77+ + "DEPTNO=20; EMP={7499, ALLEN}; EMPNAME=ALLEN; EMPNO=7499\n "
78+ + "DEPTNO=30; EMP={7521, WARD}; EMPNAME=WARD; EMPNO=7521\n " ;
79+ verifyResult (root , expectedResult );
6580 }
6681
6782 @ Test
6883 public void testFlattenWithAliases () {
6984 String ppl = "source=DEPT | flatten EMP as name, number" ;
7085 RelNode root = getRelNode (ppl );
71- String expectedLogical = "LogicalTableScan(table=[[scott, DEPT]])\n " ;
86+ String expectedLogical =
87+ "LogicalProject(DEPTNO=[$0], EMP=[$1], name=[$2], number=[$3])\n "
88+ + " LogicalTableScan(table=[[scott, DEPT]])\n " ;
7289 verifyLogical (root , expectedLogical );
73- String expectedSparkSql = "SELECT *\n FROM `scott`.`DEPT`" ;
90+ String expectedSparkSql =
91+ "SELECT `DEPTNO`, `EMP`, `EMP.EMPNAME` `name`, `EMP.EMPNO` `number`\n FROM `scott`.`DEPT`" ;
7492 verifyPPLToSparkSQL (root , expectedSparkSql );
93+ String expectedResult =
94+ "DEPTNO=10; EMP={7369, ALLEN}; name=SMITH; number=7369\n "
95+ + "DEPTNO=20; EMP={7499, ALLEN}; name=ALLEN; number=7499\n "
96+ + "DEPTNO=30; EMP={7521, WARD}; name=WARD; number=7521\n " ;
97+ verifyResult (root , expectedResult );
98+ }
99+
100+ /**
101+ * This validates that the created table is scannable and the nested fields are removed from the
102+ * result.
103+ */
104+ @ Test
105+ public void testProject () {
106+ String ppl = "source=DEPT" ;
107+ RelNode root = getRelNode (ppl );
108+ String expectedLogical =
109+ "LogicalProject(DEPTNO=[$0], EMP=[$1])\n LogicalTableScan(table=[[scott, DEPT]])\n " ;
110+ verifyLogical (root , expectedLogical );
111+ String expectedResult =
112+ "DEPTNO=10; EMP={7369, ALLEN}\n "
113+ + "DEPTNO=20; EMP={7499, ALLEN}\n "
114+ + "DEPTNO=30; EMP={7521, WARD}\n " ;
115+ verifyResult (root , expectedResult );
75116 }
76117
77118 @ Test
@@ -80,12 +121,15 @@ public void testFlattenWithMismatchedNumberOfAliasesShouldThrow() {
80121 Throwable t = Assert .assertThrows (IllegalArgumentException .class , () -> getRelNode (ppl ));
81122 verifyErrorMessageContains (
82123 t ,
83- "The number of aliases has to match the number of flattened fields. Expected 2 (EMP.EMPNO, "
84- + " EMP.EMPNAME), got 1 (name)" );
124+ "The number of aliases has to match the number of flattened fields. Expected 2"
125+ + " ( EMP.EMPNAME, EMP.EMPNO ), got 1 (name)" );
85126 }
86127
87128 // There is no existing table with arrays. We create one for test purpose.
88- public static class TableWithStruct implements Table {
129+ @ RequiredArgsConstructor
130+ public static class TableWithStruct implements ScannableTable {
131+ private final ImmutableList <Object []> rows ;
132+
89133 protected final RelProtoDataType protoRowType =
90134 factory ->
91135 factory
@@ -102,10 +146,15 @@ public static class TableWithStruct implements Table {
102146 // E.g. struct emp will always hava emp.empno and emp.empname in its
103147 // logical projection. We add these two fields to simulate this behavior
104148 // in opensearch.
105- .add ("EMP.EMPNO" , SqlTypeName .INTEGER )
106149 .add ("EMP.EMPNAME" , SqlTypeName .VARCHAR )
150+ .add ("EMP.EMPNO" , SqlTypeName .INTEGER )
107151 .build ();
108152
153+ @ Override
154+ public Enumerable <@ Nullable Object []> scan (DataContext root ) {
155+ return Linq4j .asEnumerable (rows );
156+ }
157+
109158 @ Override
110159 public RelDataType getRowType (RelDataTypeFactory typeFactory ) {
111160 return protoRowType .apply (typeFactory );
0 commit comments