Skip to content

Commit 732d9c8

Browse files
committed
Support flatten with aliases
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 5e2f1f1 commit 732d9c8

7 files changed

Lines changed: 316 additions & 58 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/tree/Flatten.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import com.google.common.collect.ImmutableList;
99
import java.util.List;
10+
import javax.annotation.Nullable;
11+
import lombok.EqualsAndHashCode;
1012
import lombok.Getter;
1113
import lombok.RequiredArgsConstructor;
1214
import lombok.ToString;
@@ -16,10 +18,12 @@
1618
/** AST node representing a {@code flatten <field>} operation. */
1719
@ToString
1820
@RequiredArgsConstructor
21+
@EqualsAndHashCode(callSuper = false)
1922
public class Flatten extends UnresolvedPlan {
2023

2124
private UnresolvedPlan child;
2225
@Getter private final Field field;
26+
@Getter @Nullable private final List<String> aliases;
2327

2428
@Override
2529
public Flatten attach(UnresolvedPlan child) {
@@ -29,7 +33,7 @@ public Flatten attach(UnresolvedPlan child) {
2933

3034
@Override
3135
public List<UnresolvedPlan> getChild() {
32-
return ImmutableList.of(child);
36+
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
3337
}
3438

3539
@Override

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.base.Strings;
2121
import com.google.common.collect.ImmutableList;
2222
import com.google.common.collect.Iterables;
23+
import com.google.common.collect.Streams;
2324
import java.util.ArrayList;
2425
import java.util.HashSet;
2526
import java.util.List;
@@ -974,23 +975,60 @@ public RelNode visitTableFunction(TableFunction node, CalcitePlanContext context
974975
throw new CalciteUnsupportedException("Table function is unsupported in Calcite");
975976
}
976977

978+
/**
979+
* Visit flatten command.
980+
*
981+
* <p>The flatten command is used to flatten a struct field into multiple fields. This
982+
* implementation simply projects the flattened fields and renames them according to the provided
983+
* aliases or the field names in the struct. This is possible because the struct / object field
984+
* are always read in a flattened manner in OpenSearch.
985+
*
986+
* @param node Flatten command node
987+
* @param context CalcitePlanContext
988+
* @return RelNode representing the visited logical plan
989+
*/
977990
@Override
978991
public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
979992
visitChildren(node, context);
980-
RexInputRef fieldRex = (RexInputRef) rexVisitor.analyze(node.getField(), context);
981993
RelBuilder relBuilder = context.relBuilder;
982994
String fieldName = node.getField().getField().toString();
983-
// Match the field names
984-
List<RexNode> aliasFields =
995+
// Match the sub-field names with "field.*"
996+
List<RelDataTypeField> fieldsToExpand =
985997
relBuilder.peek().getRowType().getFieldList().stream()
986998
.filter(f -> f.getName().startsWith(fieldName + "."))
987-
.map(
988-
f ->
989-
relBuilder.alias(
990-
relBuilder.field(f.getName()),
991-
f.getName().substring(fieldName.length() + 1)))
992999
.toList();
993-
relBuilder.projectPlus(aliasFields);
1000+
1001+
List<String> expandedFieldNames;
1002+
if (node.getAliases() != null) {
1003+
if (node.getAliases().size() != fieldsToExpand.size()) {
1004+
throw new IllegalArgumentException(
1005+
String.format(
1006+
"The number of aliases has to match the number of flattened fields. Expected %d"
1007+
+ " (%s), got %d (%s)",
1008+
fieldsToExpand.size(),
1009+
fieldsToExpand.stream()
1010+
.map(RelDataTypeField::getName)
1011+
.collect(Collectors.joining(", ")),
1012+
node.getAliases().size(),
1013+
String.join(", ", node.getAliases())));
1014+
}
1015+
expandedFieldNames = node.getAliases();
1016+
} else {
1017+
// If no aliases provided, name the flattened fields to the key name in the struct.
1018+
// E.g. message.author --renamed-to--> author
1019+
expandedFieldNames =
1020+
fieldsToExpand.stream()
1021+
.map(RelDataTypeField::getName)
1022+
.map(name -> name.substring(fieldName.length() + 1))
1023+
.collect(Collectors.toList());
1024+
}
1025+
List<RexNode> expandedFields =
1026+
Streams.zip(
1027+
fieldsToExpand.stream(),
1028+
expandedFieldNames.stream(),
1029+
(f, n) -> relBuilder.alias(relBuilder.field(f.getName()), n))
1030+
.collect(Collectors.toList());
1031+
relBuilder.projectPlus(expandedFields);
9941032
return relBuilder.peek();
9951033
}
9961034

docs/user/ppl/cmd/flatten.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
=============
2+
flatten
3+
=============
4+
5+
.. rubric:: Table of contents
6+
7+
.. contents::
8+
:local:
9+
:depth: 2
10+
11+
Description
12+
===========
13+
From 3.1.0
14+
15+
Use ``flatten`` command to flatten a nested struct field into separate fields in a document.
16+
17+
18+
Syntax
19+
======
20+
21+
flatten <field-list> [as <alias-list>]
22+

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFlattenCommandIT.java

Lines changed: 90 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import static org.opensearch.sql.util.MatcherUtils.rows;
1313
import static org.opensearch.sql.util.MatcherUtils.schema;
1414
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
15+
import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains;
1516
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
1617

18+
import org.hamcrest.Matcher;
1719
import org.json.JSONArray;
1820
import org.json.JSONObject;
1921
import org.junit.jupiter.api.Test;
@@ -36,6 +38,8 @@ public void testFlattenNestedStruct() throws Exception {
3638
verifySchema(
3739
result,
3840
// Nested fields are retrieved as array of nested structs
41+
// This is because such fields can store either a struct like {"dayOfWeek":1}
42+
// or an array of structs like [{"dayOfWeek":1}, {"dayOfWeek":2}]
3943
schema("comment", "array"),
4044
schema("myNum", "bigint"),
4145
schema("someField", "string"),
@@ -44,52 +48,92 @@ public void testFlattenNestedStruct() throws Exception {
4448
schema("author", "string"),
4549
schema("dayOfWeek", "bigint"),
4650
schema("info", "string"));
47-
verifyDataRows(
51+
verifyDataRows(result, getExpectedRows());
52+
}
53+
54+
@Test
55+
public void testFlattenWithAliases() throws Exception {
56+
JSONObject result =
57+
executeQuery(
58+
String.format(
59+
"source=%s | flatten message as (creator, dow, information)",
60+
TEST_INDEX_NESTED_TYPE_WITHOUT_ARRAYS));
61+
verifySchema(
4862
result,
49-
rows(
50-
new JSONArray().put(new JSONObject().put("data", "ab").put("likes", 3)),
51-
1,
52-
"b",
53-
new JSONArray()
54-
.put(new JSONObject().put("info", "a").put("author", "e").put("dayOfWeek", 1)),
55-
"e",
56-
1,
57-
"a"),
58-
rows(
59-
new JSONArray().put(new JSONObject().put("data", "aa").put("likes", 2)),
60-
2,
61-
"a",
62-
new JSONArray()
63-
.put(new JSONObject().put("info", "b").put("author", "f").put("dayOfWeek", 2)),
64-
"f",
65-
2,
66-
"b"),
67-
rows(
68-
new JSONArray().put(new JSONObject().put("data", "aa").put("likes", 3)),
69-
3,
70-
"a",
71-
new JSONArray()
72-
.put(new JSONObject().put("info", "c").put("author", "g").put("dayOfWeek", 1)),
73-
"g",
74-
1,
75-
"c"),
76-
rows(
77-
new JSONArray().put(new JSONObject().put("data", "ab").put("likes", 1)),
78-
4,
79-
"b",
80-
new JSONArray()
81-
.put(new JSONObject().put("info", "c").put("author", "h").put("dayOfWeek", 4)),
82-
"h",
83-
4,
84-
"c"),
85-
rows(
86-
new JSONArray().put(new JSONObject().put("data", "bb").put("likes", 10)),
87-
3,
88-
"a",
89-
new JSONArray()
90-
.put(new JSONObject().put("info", "zz").put("author", "zz").put("dayOfWeek", 6)),
91-
"zz",
92-
6,
93-
"zz"));
63+
schema("comment", "array"),
64+
schema("myNum", "bigint"),
65+
schema("someField", "string"),
66+
schema("message", "array"),
67+
schema("creator", "string"),
68+
schema("dow", "bigint"),
69+
schema("information", "string"));
70+
verifyDataRows(result, getExpectedRows());
71+
}
72+
73+
@Test
74+
public void testFlattenWithMismatchedNumberOfAliasesShouldThrow() throws Exception {
75+
Throwable t =
76+
expectThrows(
77+
Exception.class,
78+
() ->
79+
executeQuery(
80+
String.format(
81+
"source=%s | flatten message as a, b, c, d",
82+
TEST_INDEX_NESTED_TYPE_WITHOUT_ARRAYS)));
83+
verifyErrorMessageContains(
84+
t,
85+
"The number of aliases has to match the number of flattened fields. Expected 3"
86+
+ " (message.author, message.dayOfWeek, message.info), got 4 (a, b, c, d)");
87+
}
88+
89+
@SuppressWarnings("unchecked")
90+
private static Matcher<JSONArray>[] getExpectedRows() {
91+
return new org.hamcrest.TypeSafeMatcher[] {
92+
rows(
93+
new JSONArray().put(new JSONObject().put("data", "ab").put("likes", 3)),
94+
1,
95+
"b",
96+
new JSONArray()
97+
.put(new JSONObject().put("info", "a").put("author", "e").put("dayOfWeek", 1)),
98+
"e",
99+
1,
100+
"a"),
101+
rows(
102+
new JSONArray().put(new JSONObject().put("data", "aa").put("likes", 2)),
103+
2,
104+
"a",
105+
new JSONArray()
106+
.put(new JSONObject().put("info", "b").put("author", "f").put("dayOfWeek", 2)),
107+
"f",
108+
2,
109+
"b"),
110+
rows(
111+
new JSONArray().put(new JSONObject().put("data", "aa").put("likes", 3)),
112+
3,
113+
"a",
114+
new JSONArray()
115+
.put(new JSONObject().put("info", "c").put("author", "g").put("dayOfWeek", 1)),
116+
"g",
117+
1,
118+
"c"),
119+
rows(
120+
new JSONArray().put(new JSONObject().put("data", "ab").put("likes", 1)),
121+
4,
122+
"b",
123+
new JSONArray()
124+
.put(new JSONObject().put("info", "c").put("author", "h").put("dayOfWeek", 4)),
125+
"h",
126+
4,
127+
"c"),
128+
rows(
129+
new JSONArray().put(new JSONObject().put("data", "bb").put("likes", 10)),
130+
3,
131+
"a",
132+
new JSONArray()
133+
.put(new JSONObject().put("info", "zz").put("author", "zz").put("dayOfWeek", 6)),
134+
"zz",
135+
6,
136+
"zz")
137+
};
94138
}
95139
}

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ appendcolCommand
245245
;
246246

247247
flattenCommand
248-
: FLATTEN fieldExpression
248+
: FLATTEN fieldExpression (AS aliases = identifierSeq)?
249249
;
250250

251251
kmeansCommand
@@ -1038,6 +1038,11 @@ wcQualifiedName
10381038
: wildcard (DOT wildcard)* # identsAsWildcardQualifiedName
10391039
;
10401040

1041+
identifierSeq
1042+
: qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq
1043+
| LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq
1044+
;
1045+
10411046
ident
10421047
: (DOT)? ID
10431048
| BACKTICK ident BACKTICK

ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.AdCommandContext;
9191
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.ByClauseContext;
9292
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldListContext;
93+
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext;
9394
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.KmeansCommandContext;
9495
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupPairContext;
9596
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParserBaseVisitor;
@@ -657,7 +658,16 @@ public UnresolvedPlan visitFillNullUsing(OpenSearchPPLParser.FillNullUsingContex
657658
@Override
658659
public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) {
659660
Field field = (Field) internalVisitExpression(ctx.fieldExpression());
660-
return new Flatten(field);
661+
List<String> aliases =
662+
ctx.aliases == null ? null : getAliasList((IdentsAsQualifiedNameSeqContext) ctx.aliases);
663+
return new Flatten(field, aliases);
664+
}
665+
666+
private List<String> getAliasList(IdentsAsQualifiedNameSeqContext ctx) {
667+
return ctx.qualifiedName().stream()
668+
.map(this::internalVisitExpression)
669+
.map(Object::toString)
670+
.collect(Collectors.toList());
661671
}
662672

663673
/** trendline command. */

0 commit comments

Comments
 (0)