Skip to content

Commit 7b9fbaf

Browse files
committed
WIP: Implementing flatten
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 5eb2e5b commit 7b9fbaf

9 files changed

Lines changed: 139 additions & 0 deletions

File tree

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.sql.ast.tree.FetchCursor;
5454
import org.opensearch.sql.ast.tree.FillNull;
5555
import org.opensearch.sql.ast.tree.Filter;
56+
import org.opensearch.sql.ast.tree.Flatten;
5657
import org.opensearch.sql.ast.tree.Head;
5758
import org.opensearch.sql.ast.tree.Join;
5859
import org.opensearch.sql.ast.tree.Kmeans;
@@ -672,6 +673,12 @@ public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
672673
computationsAndTypes.build());
673674
}
674675

676+
@Override
677+
public LogicalPlan visitFlatten(Flatten node, AnalysisContext context) {
678+
throw new UnsupportedOperationException(
679+
"FLATTEN is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
680+
}
681+
675682
@Override
676683
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
677684
LogicalPlan child = paginate.getChild().get(0).accept(this, context);

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.sql.ast.tree.FetchCursor;
5252
import org.opensearch.sql.ast.tree.FillNull;
5353
import org.opensearch.sql.ast.tree.Filter;
54+
import org.opensearch.sql.ast.tree.Flatten;
5455
import org.opensearch.sql.ast.tree.Head;
5556
import org.opensearch.sql.ast.tree.Join;
5657
import org.opensearch.sql.ast.tree.Kmeans;
@@ -120,6 +121,10 @@ public T visitFilter(Filter node, C context) {
120121
return visitChildren(node, context);
121122
}
122123

124+
public T visitFlatten(Flatten node, C context) {
125+
return visitChildren(node, context);
126+
}
127+
123128
public T visitTrendline(Trendline node, C context) {
124129
return visitChildren(node, context);
125130
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.Getter;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.ToString;
13+
import org.opensearch.sql.ast.AbstractNodeVisitor;
14+
import org.opensearch.sql.ast.expression.Field;
15+
16+
/** AST node representing a {@code flatten <field>} operation. */
17+
@ToString
18+
@RequiredArgsConstructor
19+
public class Flatten extends UnresolvedPlan {
20+
21+
private UnresolvedPlan child;
22+
@Getter private final Field field;
23+
24+
@Override
25+
public Flatten attach(UnresolvedPlan child) {
26+
this.child = child;
27+
return this;
28+
}
29+
30+
@Override
31+
public List<UnresolvedPlan> getChild() {
32+
return ImmutableList.of(child);
33+
}
34+
35+
@Override
36+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
37+
return nodeVisitor.visitFlatten(this, context);
38+
}
39+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.opensearch.sql.ast.tree.FetchCursor;
6565
import org.opensearch.sql.ast.tree.FillNull;
6666
import org.opensearch.sql.ast.tree.Filter;
67+
import org.opensearch.sql.ast.tree.Flatten;
6768
import org.opensearch.sql.ast.tree.Head;
6869
import org.opensearch.sql.ast.tree.Join;
6970
import org.opensearch.sql.ast.tree.Kmeans;
@@ -843,6 +844,26 @@ public RelNode visitTableFunction(TableFunction node, CalcitePlanContext context
843844
throw new CalciteUnsupportedException("Table function is unsupported in Calcite");
844845
}
845846

847+
@Override
848+
public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
849+
visitChildren(node, context);
850+
RexInputRef fieldRex = (RexInputRef) rexVisitor.analyze(node.getField(), context);
851+
RelBuilder relBuilder = context.relBuilder;
852+
String fieldName = node.getField().getField().toString();
853+
// Match the field names
854+
List<RexNode> aliasFields =
855+
relBuilder.peek().getRowType().getFieldList().stream()
856+
.filter(f -> f.getName().startsWith(fieldName + "."))
857+
.map(
858+
f ->
859+
relBuilder.alias(
860+
relBuilder.field(f.getName()),
861+
f.getName().substring(fieldName.length() + 1)))
862+
.toList();
863+
relBuilder.projectPlus(aliasFields);
864+
return relBuilder.peek();
865+
}
866+
846867
@Override
847868
public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
848869
throw new CalciteUnsupportedException("Trendline command is unsupported in Calcite");
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
*
3+
* * Copyright OpenSearch Contributors
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package org.opensearch.sql.calcite.remote;
9+
10+
import static org.opensearch.sql.legacy.SQLIntegTestCase.Index.NESTED_WITHOUT_ARRAYS;
11+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_TYPE_WITHOUT_ARRAYS;
12+
import static org.opensearch.sql.util.MatcherUtils.schema;
13+
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
14+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
15+
16+
import org.json.JSONObject;
17+
import org.junit.jupiter.api.Test;
18+
import org.opensearch.sql.ppl.PPLIntegTestCase;
19+
20+
public class CalciteFlattenCommandIT extends PPLIntegTestCase {
21+
@Override
22+
public void init() throws Exception {
23+
super.init();
24+
loadIndex(NESTED_WITHOUT_ARRAYS);
25+
enableCalcite();
26+
disallowCalciteFallback();
27+
}
28+
29+
@Test
30+
public void testFlattenStruct() throws Exception {
31+
JSONObject result =
32+
executeQuery(
33+
String.format("source=%s | flatten message", TEST_INDEX_NESTED_TYPE_WITHOUT_ARRAYS));
34+
verifySchema(
35+
result,
36+
schema("message", "struct"),
37+
schema("info", "string"),
38+
schema("author", "string"),
39+
schema("dayOfWeek", "integer"),
40+
schema("comment", "array"),
41+
schema("myNum", "integer"),
42+
schema("someField", "string"));
43+
verifyNumOfRows(result, 5);
44+
}
45+
}

ppl/src/main/antlr/OpenSearchPPLLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ KMEANS: 'KMEANS';
3737
AD: 'AD';
3838
ML: 'ML';
3939
FILLNULL: 'FILLNULL';
40+
FLATTEN: 'FLATTEN';
4041
TRENDLINE: 'TRENDLINE';
4142
SIMPLE_PATTERN: 'SIMPLE_PATTERN';
4243
BRAIN: 'BRAIN';

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ commands
7070
| mlCommand
7171
| fillnullCommand
7272
| trendlineCommand
73+
| flattenCommand
7374
;
7475

7576
commandName
@@ -96,6 +97,7 @@ commandName
9697
| AD
9798
| ML
9899
| FILLNULL
100+
| FLATTEN
99101
| TRENDLINE
100102
| EXPLAIN
101103
;
@@ -237,6 +239,10 @@ trendlineType
237239
: SMA
238240
;
239241

242+
flattenCommand
243+
: FLATTEN fieldExpression
244+
;
245+
240246
kmeansCommand
241247
: KMEANS (kmeansParameter)*
242248
;

ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.opensearch.sql.ast.tree.Eval;
6363
import org.opensearch.sql.ast.tree.FillNull;
6464
import org.opensearch.sql.ast.tree.Filter;
65+
import org.opensearch.sql.ast.tree.Flatten;
6566
import org.opensearch.sql.ast.tree.Head;
6667
import org.opensearch.sql.ast.tree.Join;
6768
import org.opensearch.sql.ast.tree.Kmeans;
@@ -651,6 +652,12 @@ public UnresolvedPlan visitFillNullUsing(OpenSearchPPLParser.FillNullUsingContex
651652
return FillNull.ofVariousValue(replacementsBuilder.build());
652653
}
653654

655+
@Override
656+
public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) {
657+
Field field = (Field) internalVisitExpression(ctx.fieldExpression());
658+
return new Flatten(field);
659+
}
660+
654661
/** trendline command. */
655662
@Override
656663
public UnresolvedPlan visitTrendlineCommand(OpenSearchPPLParser.TrendlineCommandContext ctx) {

ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.sql.ast.tree.Eval;
5656
import org.opensearch.sql.ast.tree.FillNull;
5757
import org.opensearch.sql.ast.tree.Filter;
58+
import org.opensearch.sql.ast.tree.Flatten;
5859
import org.opensearch.sql.ast.tree.Head;
5960
import org.opensearch.sql.ast.tree.Join;
6061
import org.opensearch.sql.ast.tree.Lookup;
@@ -358,6 +359,13 @@ public String visitParse(Parse node, String context) {
358359
: StringUtils.format("%s | %s %s '%s'", child, commandName, source, regex);
359360
}
360361

362+
@Override
363+
public String visitFlatten(Flatten node, String context) {
364+
String child = node.getChild().getFirst().accept(this, context);
365+
String field = visitExpression(node.getField());
366+
return StringUtils.format("%s | flatten %s", child, field);
367+
}
368+
361369
@Override
362370
public String visitTrendline(Trendline node, String context) {
363371
String child = node.getChild().get(0).accept(this, context);

0 commit comments

Comments
 (0)