Skip to content

Commit 74e0153

Browse files
committed
WIP: Implementing expand command
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent c537b92 commit 74e0153

6 files changed

Lines changed: 39 additions & 12 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -844,12 +844,10 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
844844

845845
// 2. Get the field to expand
846846
Field arrayField = expand.getField();
847-
848847
// 3. Unnest the array field
849848
// Analyze the array field to get its RexNode
850849
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);
851850
// Push the original table to the RelBuilder stack
852-
RelNode originalTable = relBuilder.peek();
853851
// No alias is provided in the expand command, so we remove the original array field,
854852
// then replace it with the unnest result.
855853
// relBuilder.projectExcept(arrayFieldRex);
@@ -858,11 +856,12 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
858856
Holder<RexCorrelVariable> correlVariable = Holder.empty();
859857
relBuilder.variable(correlVariable::set);
860858

861-
relBuilder.push(originalTable);
862-
859+
// Push a copy of the original table to the RelBuilder stack as right
860+
// side of the join.
861+
relBuilder.push(relBuilder.peek());
863862
RexNode correlArrayField =
864863
relBuilder.field(
865-
context.rexBuilder.makeCorrel(originalTable.getRowType(), correlVariable.get().id),
864+
context.rexBuilder.makeCorrel(relBuilder.peek().getRowType(), correlVariable.get().id),
866865
arrayFieldRex.getIndex());
867866

868867
relBuilder.project(
@@ -875,7 +874,7 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
875874
relBuilder.uncollect(List.of(), false);
876875

877876
// ImmutableBitSet requiredFields = originalTable.getRowType().getFieldList()
878-
relBuilder.correlate(JoinRelType.INNER, correlVariable.get().id, arrayFieldRex);
877+
relBuilder.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(arrayFieldRex));
879878

880879
return relBuilder.peek();
881880
}

core/src/main/java/org/opensearch/sql/exception/CalciteUnsupportedException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@ public class CalciteUnsupportedException extends QueryEngineException {
1010
public CalciteUnsupportedException(String message) {
1111
super(message);
1212
}
13+
14+
public CalciteUnsupportedException(String message, Throwable cause) {
15+
super(message, cause);
16+
}
1317
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void executeWithCalcite(
113113
} else {
114114
if (t instanceof Error) {
115115
// Calcite may throw AssertError during query execution.
116-
listener.onFailure(new CalciteUnsupportedException(t.getMessage()));
116+
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
117117
} else {
118118
listener.onFailure((Exception) t);
119119
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import org.opensearch.sql.ppl.ExpandCommandIT;
1111

12-
public class CalciteExpandIT extends ExpandCommandIT {
12+
public class CalciteExpandCommandIT extends ExpandCommandIT {
1313
@Override
1414
public void init() throws Exception {
1515
super.init();

integ-test/src/test/java/org/opensearch/sql/ppl/ExpandCommandIT.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.opensearch.sql.ppl;
99

10+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ARRAY;
1011
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
1112
import static org.opensearch.sql.util.MatcherUtils.schema;
1213
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
@@ -21,21 +22,31 @@ public class ExpandCommandIT extends PPLIntegTestCase {
2122
public void init() throws Exception {
2223
super.init();
2324
loadIndex(Index.NESTED_SIMPLE);
25+
loadIndex(Index.ARRAY);
2426
}
2527

2628
@Test
27-
public void testExpand() throws Exception {
29+
public void testExpandOnNested() throws Exception {
2830
JSONObject response =
2931
executeQuery(String.format("source=%s | expand address", TEST_INDEX_NESTED_SIMPLE));
3032
verifySchema(
3133
response,
3234
schema("name", "string"),
3335
schema("age", "integer"),
3436
schema("id", "integer"),
35-
schema("address", "object"));
37+
schema("address", "struct"));
3638
verifyNumOfRows(response, 11);
3739
}
3840

41+
@Ignore
42+
@Test
43+
public void testExpandOnArray() throws Exception {
44+
JSONObject response =
45+
executeQuery(String.format("source=%s | expand strings", TEST_INDEX_ARRAY));
46+
verifySchema(response, schema("numbers", "array"), schema("strings", "string"));
47+
verifyNumOfRows(response, 5);
48+
}
49+
3950
// TODO: double check if expand with alias is supported
4051
@Ignore
4152
@Test
@@ -48,7 +59,7 @@ public void testExpandWithAlias() throws Exception {
4859
schema("age", "integer"),
4960
schema("id", "integer"),
5061
schema("address", "array"),
51-
schema("addr", "object"));
62+
schema("addr", "struct"));
5263
verifyNumOfRows(response, 11);
5364
}
5465
}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import org.apache.calcite.rel.type.RelDataTypeField;
2626
import org.apache.calcite.runtime.Hook;
2727
import org.apache.calcite.sql.SqlExplainLevel;
28+
import org.apache.calcite.sql.type.SqlTypeName;
2829
import org.opensearch.sql.ast.statement.Explain.ExplainFormat;
2930
import org.opensearch.sql.calcite.CalcitePlanContext;
3031
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners;
3132
import org.opensearch.sql.common.response.ResponseListener;
3233
import org.opensearch.sql.data.model.ExprTupleValue;
3334
import org.opensearch.sql.data.model.ExprValue;
35+
import org.opensearch.sql.data.type.ExprCoreType;
3436
import org.opensearch.sql.data.type.ExprType;
3537
import org.opensearch.sql.executor.ExecutionContext;
3638
import org.opensearch.sql.executor.ExecutionEngine;
@@ -232,7 +234,18 @@ private void buildResultSet(
232234
for (int i = 1; i <= columnCount; ++i) {
233235
String columnName = metaData.getColumnName(i);
234236
RelDataType fieldType = fieldTypes.get(i - 1);
235-
ExprType exprType = convertRelDataTypeToExprType(fieldType);
237+
// The element type of struct and array is currently set to ANY.
238+
// We set them using the runtime type as a workaround.
239+
ExprType exprType;
240+
if (fieldType.getSqlTypeName() == SqlTypeName.ANY) {
241+
if (!values.isEmpty()) {
242+
exprType = values.getFirst().tupleValue().get(columnName).type();
243+
} else {
244+
exprType = ExprCoreType.UNDEFINED;
245+
}
246+
} else {
247+
exprType = convertRelDataTypeToExprType(fieldType);
248+
}
236249
columns.add(new Column(columnName, null, exprType));
237250
}
238251
Schema schema = new Schema(columns);

0 commit comments

Comments
 (0)