|
7 | 7 |
|
8 | 8 | package org.opensearch.sql.ppl.calcite; |
9 | 9 |
|
| 10 | +import com.google.common.collect.ImmutableList; |
| 11 | +import java.util.List; |
| 12 | +import org.apache.calcite.config.CalciteConnectionConfig; |
| 13 | +import org.apache.calcite.plan.RelTraitDef; |
| 14 | +import org.apache.calcite.rel.RelCollations; |
| 15 | +import org.apache.calcite.rel.RelNode; |
| 16 | +import org.apache.calcite.rel.type.RelDataType; |
| 17 | +import org.apache.calcite.rel.type.RelDataTypeFactory; |
| 18 | +import org.apache.calcite.rel.type.RelProtoDataType; |
| 19 | +import org.apache.calcite.schema.Schema; |
| 20 | +import org.apache.calcite.schema.SchemaPlus; |
| 21 | +import org.apache.calcite.schema.Statistic; |
| 22 | +import org.apache.calcite.schema.Statistics; |
| 23 | +import org.apache.calcite.schema.Table; |
| 24 | +import org.apache.calcite.sql.SqlCall; |
| 25 | +import org.apache.calcite.sql.SqlNode; |
| 26 | +import org.apache.calcite.sql.parser.SqlParser; |
| 27 | +import org.apache.calcite.sql.type.SqlTypeName; |
10 | 28 | import org.apache.calcite.test.CalciteAssert; |
| 29 | +import org.apache.calcite.tools.Frameworks; |
| 30 | +import org.apache.calcite.tools.Programs; |
| 31 | +import org.checkerframework.checker.nullness.qual.Nullable; |
11 | 32 | import org.junit.Test; |
12 | 33 |
|
13 | 34 | public class CalcitePPLExpandTest extends CalcitePPLAbstractTest { |
| 35 | + |
14 | 36 | public CalcitePPLExpandTest() { |
15 | 37 | super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); |
16 | 38 | } |
17 | 39 |
|
| 40 | + // There is no existing table with arrays. We create one for test purpose. |
| 41 | + public static class TableWithArray implements Table { |
| 42 | + protected final RelProtoDataType protoRowType = |
| 43 | + factory -> |
| 44 | + factory |
| 45 | + .builder() |
| 46 | + .add("DEPTNO", SqlTypeName.INTEGER) |
| 47 | + .add( |
| 48 | + "EMPNOS", |
| 49 | + factory.createArrayType(factory.createSqlType(SqlTypeName.INTEGER), -1)) |
| 50 | + .build(); |
| 51 | + |
| 52 | + @Override |
| 53 | + public RelDataType getRowType(RelDataTypeFactory typeFactory) { |
| 54 | + return protoRowType.apply(typeFactory); |
| 55 | + } |
| 56 | + |
| 57 | + @Override |
| 58 | + public Statistic getStatistic() { |
| 59 | + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); |
| 60 | + } |
| 61 | + |
| 62 | + @Override |
| 63 | + public Schema.TableType getJdbcTableType() { |
| 64 | + return Schema.TableType.TABLE; |
| 65 | + } |
| 66 | + |
| 67 | + @Override |
| 68 | + public boolean isRolledUp(String column) { |
| 69 | + return false; |
| 70 | + } |
| 71 | + |
| 72 | + @Override |
| 73 | + public boolean rolledUpColumnValidInsideAgg( |
| 74 | + String column, |
| 75 | + SqlCall call, |
| 76 | + @Nullable SqlNode parent, |
| 77 | + @Nullable CalciteConnectionConfig config) { |
| 78 | + return false; |
| 79 | + } |
| 80 | + } |
| 81 | + |
| 82 | + @Override |
| 83 | + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { |
| 84 | + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); |
| 85 | + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); |
| 86 | + // Add an empty table with name DEPT for test purpose |
| 87 | + schema.add("DEPT", new TableWithArray()); |
| 88 | + return Frameworks.newConfigBuilder() |
| 89 | + .parserConfig(SqlParser.Config.DEFAULT) |
| 90 | + .defaultSchema(schema) |
| 91 | + .traitDefs((List<RelTraitDef>) null) |
| 92 | + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); |
| 93 | + } |
| 94 | + |
18 | 95 | @Test |
19 | 96 | public void testExpand() { |
20 | | - String ppl = "source=EMP | expand JOB"; |
| 97 | + String ppl = "source=DEPT | expand EMPNOS"; |
| 98 | + RelNode root = getRelNode(ppl); |
| 99 | + String expectedLogical = |
| 100 | + "LogicalProject(DEPTNO=[$0], EMPNOS=[$2])\n" |
| 101 | + + " LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}])\n" |
| 102 | + + " LogicalTableScan(table=[[scott, DEPT]])\n" |
| 103 | + + " Uncollect\n" |
| 104 | + + " LogicalProject(EMPNOS=[$cor0.EMPNOS])\n" |
| 105 | + + " LogicalFilter(condition=[=($cor0.EMPNOS, $1)])\n" |
| 106 | + + " LogicalTableScan(table=[[scott, DEPT]])\n"; |
| 107 | + verifyLogical(root, expectedLogical); |
| 108 | + String expectedSparkSql = |
| 109 | + "SELECT `$cor0`.`DEPTNO`, `t00`.`EMPNOS`\n" |
| 110 | + + "FROM `scott`.`DEPT` `$cor0`,\n" |
| 111 | + + "LATERAL UNNEST (SELECT `$cor0`.`EMPNOS`\n" |
| 112 | + + "FROM `scott`.`DEPT`\n" |
| 113 | + + "WHERE `$cor0`.`EMPNOS` = `EMPNOS`) `t0` (`EMPNOS`) `t00`"; |
| 114 | + verifyPPLToSparkSQL(root, expectedSparkSql); |
| 115 | + } |
| 116 | + |
| 117 | + @Test |
| 118 | + public void testExpandWithEval() { |
| 119 | + String ppl = "source=DEPT | eval employee_no = EMPNOS | expand employee_no"; |
| 120 | + RelNode root = getRelNode(ppl); |
| 121 | + String expectedLogical = |
| 122 | + "LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$3])\n" |
| 123 | + + " LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{2}])\n" |
| 124 | + + " LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$1])\n" |
| 125 | + + " LogicalTableScan(table=[[scott, DEPT]])\n" |
| 126 | + + " Uncollect\n" |
| 127 | + + " LogicalProject(employee_no=[$cor0.employee_no])\n" |
| 128 | + + " LogicalFilter(condition=[=($cor0.employee_no, $2)])\n" |
| 129 | + + " LogicalProject(DEPTNO=[$0], EMPNOS=[$1], employee_no=[$1])\n" |
| 130 | + + " LogicalTableScan(table=[[scott, DEPT]])\n"; |
| 131 | + verifyLogical(root, expectedLogical); |
| 132 | + String expectedSparkSql = |
| 133 | + "SELECT `$cor0`.`DEPTNO`, `$cor0`.`EMPNOS`, `t20`.`employee_no`\n" |
| 134 | + + "FROM (SELECT `DEPTNO`, `EMPNOS`, `EMPNOS` `employee_no`\n" |
| 135 | + + "FROM `scott`.`DEPT`) `$cor0`,\n" |
| 136 | + + "LATERAL UNNEST (SELECT `$cor0`.`employee_no`\n" |
| 137 | + + "FROM (SELECT `DEPTNO`, `EMPNOS`, `EMPNOS` `employee_no`\n" |
| 138 | + + "FROM `scott`.`DEPT`) `t0`\n" |
| 139 | + + "WHERE `$cor0`.`employee_no` = `employee_no`) `t2` (`employee_no`) `t20`"; |
| 140 | + verifyPPLToSparkSQL(root, expectedSparkSql); |
21 | 141 | } |
22 | 142 | } |
0 commit comments