Skip to content

Commit a0ab339

Browse files
authored
Support flatten command with Calcite (opensearch-project#3747)
* WIP: Implementing flatten Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update flatten IT Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Support flatten with aliases Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Document flatten command Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Improve docs for flatten Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unix test for flatten command Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove redundant *from 3.1.0* tag in appendcol's doc Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unify command doc's language Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Revert "Unify command doc's language" This reverts commit 8750e2f. Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove since from flatten's doc Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update doc of flatten command Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Test flatten null fields Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add an ignored test case and limitation explainations for flatten after fields Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Rephrase flatten limitations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent e6ab4fb commit a0ab339

11 files changed

Lines changed: 705 additions & 1 deletion

File tree

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.opensearch.sql.ast.tree.FetchCursor;
6666
import org.opensearch.sql.ast.tree.FillNull;
6767
import org.opensearch.sql.ast.tree.Filter;
68+
import org.opensearch.sql.ast.tree.Flatten;
6869
import org.opensearch.sql.ast.tree.Head;
6970
import org.opensearch.sql.ast.tree.Join;
7071
import org.opensearch.sql.ast.tree.Kmeans;
@@ -675,6 +676,12 @@ public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
675676
computationsAndTypes.build());
676677
}
677678

679+
@Override
680+
public LogicalPlan visitFlatten(Flatten node, AnalysisContext context) {
681+
throw new UnsupportedOperationException(
682+
"FLATTEN is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
683+
}
684+
678685
@Override
679686
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
680687
LogicalPlan child = paginate.getChild().get(0).accept(this, context);

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.opensearch.sql.ast.tree.FetchCursor;
5555
import org.opensearch.sql.ast.tree.FillNull;
5656
import org.opensearch.sql.ast.tree.Filter;
57+
import org.opensearch.sql.ast.tree.Flatten;
5758
import org.opensearch.sql.ast.tree.Head;
5859
import org.opensearch.sql.ast.tree.Join;
5960
import org.opensearch.sql.ast.tree.Kmeans;
@@ -127,6 +128,10 @@ public T visitFilter(Filter node, C context) {
127128
return visitChildren(node, context);
128129
}
129130

131+
public T visitFlatten(Flatten node, C context) {
132+
return visitChildren(node, context);
133+
}
134+
130135
public T visitTrendline(Trendline node, C context) {
131136
return visitChildren(node, context);
132137
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import javax.annotation.Nullable;
11+
import lombok.EqualsAndHashCode;
12+
import lombok.Getter;
13+
import lombok.RequiredArgsConstructor;
14+
import lombok.ToString;
15+
import org.opensearch.sql.ast.AbstractNodeVisitor;
16+
import org.opensearch.sql.ast.expression.Field;
17+
18+
/** AST node representing a {@code flatten <field>} operation. */
19+
@ToString
20+
@RequiredArgsConstructor
21+
@EqualsAndHashCode(callSuper = false)
22+
public class Flatten extends UnresolvedPlan {
23+
24+
private UnresolvedPlan child;
25+
@Getter private final Field field;
26+
@Getter @Nullable private final List<String> aliases;
27+
28+
@Override
29+
public Flatten attach(UnresolvedPlan child) {
30+
this.child = child;
31+
return this;
32+
}
33+
34+
@Override
35+
public List<UnresolvedPlan> getChild() {
36+
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
37+
}
38+
39+
@Override
40+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
41+
return nodeVisitor.visitFlatten(this, context);
42+
}
43+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.base.Strings;
2121
import com.google.common.collect.ImmutableList;
2222
import com.google.common.collect.Iterables;
23+
import com.google.common.collect.Streams;
2324
import java.util.ArrayList;
2425
import java.util.Comparator;
2526
import java.util.HashSet;
@@ -82,6 +83,7 @@
8283
import org.opensearch.sql.ast.tree.FetchCursor;
8384
import org.opensearch.sql.ast.tree.FillNull;
8485
import org.opensearch.sql.ast.tree.Filter;
86+
import org.opensearch.sql.ast.tree.Flatten;
8587
import org.opensearch.sql.ast.tree.Head;
8688
import org.opensearch.sql.ast.tree.Join;
8789
import org.opensearch.sql.ast.tree.Kmeans;
@@ -223,10 +225,41 @@ private static void tryToRemoveNestedFields(CalcitePlanContext context) {
223225
.map(field -> (RexNode) context.relBuilder.field(field))
224226
.toList();
225227
if (!duplicatedNestedFields.isEmpty()) {
226-
context.relBuilder.projectExcept(duplicatedNestedFields);
228+
// This is a workaround to avoid the bug in Calcite:
229+
// In {@link RelBuilder#project_(Iterable, Iterable, Iterable, boolean, Iterable)},
230+
// the check `RexUtil.isIdentity(nodeList, inputRowType)` will pass when the input
231+
// and the output nodeList refer to the same fields, even if the field name list
232+
// is different. As a result, renaming operation will not be applied. This makes
233+
// the logical plan for the flatten command incorrect, where the operation is
234+
// equivalent to renaming the flattened sub-fields. E.g. emp.name -> name.
235+
forceProjectExcept(context.relBuilder, duplicatedNestedFields);
227236
}
228237
}
229238

239+
/**
240+
* Project except with force.
241+
*
242+
* <p>This method is copied from {@link RelBuilder#projectExcept(Iterable)} and modified with the
243+
* force flag in project set to true. It is subject to future changes in Calcite.
244+
*
245+
* @param relBuilder RelBuilder
246+
* @param expressions Expressions to exclude from the project
247+
*/
248+
private static void forceProjectExcept(RelBuilder relBuilder, Iterable<RexNode> expressions) {
249+
List<RexNode> allExpressions = new ArrayList<>(relBuilder.fields());
250+
Set<RexNode> excludeExpressions = new HashSet<>();
251+
for (RexNode excludeExp : expressions) {
252+
if (!excludeExpressions.add(excludeExp)) {
253+
throw new IllegalArgumentException(
254+
"Input list contains duplicates. Expression " + excludeExp + " exists multiple times.");
255+
}
256+
if (!allExpressions.remove(excludeExp)) {
257+
throw new IllegalArgumentException("Expression " + excludeExp.toString() + " not found.");
258+
}
259+
}
260+
relBuilder.project(allExpressions, ImmutableList.of(), true);
261+
}
262+
230263
/**
231264
* Try to remove metadata fields in two cases:
232265
*
@@ -1060,6 +1093,63 @@ public RelNode visitTableFunction(TableFunction node, CalcitePlanContext context
10601093
throw new CalciteUnsupportedException("Table function is unsupported in Calcite");
10611094
}
10621095

1096+
/**
1097+
* Visit flatten command.
1098+
*
1099+
* <p>The flatten command is used to flatten a struct field into multiple fields. This
1100+
* implementation simply projects the flattened fields and renames them according to the provided
1101+
* aliases or the field names in the struct. This is possible because the struct / object field
1102+
* are always read in a flattened manner in OpenSearch.
1103+
*
1104+
* @param node Flatten command node
1105+
* @param context CalcitePlanContext
1106+
* @return RelNode representing the visited logical plan
1107+
*/
1108+
@Override
1109+
public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
1110+
visitChildren(node, context);
1111+
RelBuilder relBuilder = context.relBuilder;
1112+
String fieldName = node.getField().getField().toString();
1113+
// Match the sub-field names with "field.*"
1114+
List<RelDataTypeField> fieldsToExpand =
1115+
relBuilder.peek().getRowType().getFieldList().stream()
1116+
.filter(f -> f.getName().startsWith(fieldName + "."))
1117+
.toList();
1118+
1119+
List<String> expandedFieldNames;
1120+
if (node.getAliases() != null) {
1121+
if (node.getAliases().size() != fieldsToExpand.size()) {
1122+
throw new IllegalArgumentException(
1123+
String.format(
1124+
"The number of aliases has to match the number of flattened fields. Expected %d"
1125+
+ " (%s), got %d (%s)",
1126+
fieldsToExpand.size(),
1127+
fieldsToExpand.stream()
1128+
.map(RelDataTypeField::getName)
1129+
.collect(Collectors.joining(", ")),
1130+
node.getAliases().size(),
1131+
String.join(", ", node.getAliases())));
1132+
}
1133+
expandedFieldNames = node.getAliases();
1134+
} else {
1135+
// If no aliases provided, name the flattened fields to the key name in the struct.
1136+
// E.g. message.author --renamed-to--> author
1137+
expandedFieldNames =
1138+
fieldsToExpand.stream()
1139+
.map(RelDataTypeField::getName)
1140+
.map(name -> name.substring(fieldName.length() + 1))
1141+
.collect(Collectors.toList());
1142+
}
1143+
List<RexNode> expandedFields =
1144+
Streams.zip(
1145+
fieldsToExpand.stream(),
1146+
expandedFieldNames.stream(),
1147+
(f, n) -> relBuilder.alias(relBuilder.field(f.getName()), n))
1148+
.collect(Collectors.toList());
1149+
relBuilder.projectPlus(expandedFields);
1150+
return relBuilder.peek();
1151+
}
1152+
10631153
@Override
10641154
public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
10651155
visitChildren(node, context);

docs/user/ppl/cmd/flatten.rst

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
=============
2+
flatten
3+
=============
4+
5+
.. rubric:: Table of contents
6+
7+
.. contents::
8+
:local:
9+
:depth: 2
10+
11+
Description
12+
===========
13+
14+
Use ``flatten`` command to flatten a struct or an object field into separate
15+
fields in a document.
16+
17+
The flattened fields will be ordered **lexicographically** by their original
18+
key names in the struct. I.e. if the struct has keys ``b``, ``c`` and ``Z``,
19+
the flattened fields will be ordered as ``Z``, ``b``, ``c``.
20+
21+
Note that ``flatten`` should not be applied to arrays. Please use ``expand``
22+
command to expand an array field into multiple rows instead. However, since
23+
an array can be stored in a non-array field in OpenSearch, when expanding a
24+
field storing a nested array, only the first element of the array will be
25+
flattened.
26+
27+
Version
28+
=======
29+
3.1.0
30+
31+
Syntax
32+
======
33+
34+
flatten <field> [as (<alias-list>)]
35+
36+
* field: The field to be flattened. Only object and nested fields are
37+
supported.
38+
* alias-list: (Optional) The names to use instead of the original key names.
39+
Names are separated by commas. It is advised to put the alias-list in
40+
parentheses if there is more than one alias. E.g. both
41+
``country, state, city`` and ``(country, state, city)`` are supported,
42+
but the latter is advised. Its length must match the number of keys in the
43+
struct field. Please note that the provided alias names **must** follow
44+
the lexicographical order of the corresponding original keys in the struct.
45+
46+
Example: flatten an object field with aliases
47+
=============================================
48+
49+
Given the following index ``my-index``
50+
51+
.. code-block::
52+
53+
{"message":{"info":"a","author":"e","dayOfWeek":1},"myNum":1}
54+
{"message":{"info":"b","author":"f","dayOfWeek":2},"myNum":2}
55+
56+
with the following mapping:
57+
58+
.. code-block:: json
59+
60+
{
61+
"mappings": {
62+
"properties": {
63+
"message": {
64+
"type": "object",
65+
"properties": {
66+
"info": {
67+
"type": "keyword",
68+
"index": "true"
69+
},
70+
"author": {
71+
"type": "keyword",
72+
"fields": {
73+
"keyword": {
74+
"type": "keyword",
75+
"ignore_above": 256
76+
}
77+
},
78+
"index": "true"
79+
},
80+
"dayOfWeek": {
81+
"type": "long"
82+
}
83+
}
84+
},
85+
"myNum": {
86+
"type": "long"
87+
}
88+
}
89+
}
90+
}
91+
92+
93+
The following query flattens the ``message`` field and renames the keys to
94+
``creator, dow, info``:
95+
96+
PPL query::
97+
98+
PPL> source=my-index | flatten message as (creator, dow, info);
99+
fetched rows / total rows = 2/2
100+
+-----------------------------------------+--------+---------+-----+------+
101+
| message | myNum | creator | dow | info |
102+
|-----------------------------------------|--------|---------|-----|------|
103+
| {"info":"a","author":"e","dayOfWeek":1} | 1 | e | 1 | a |
104+
| {"info":"b","author":"f","dayOfWeek":2} | 2 | f | 2 | b |
105+
+-----------------------------------------+--------+---------+-----+------+
106+
107+
Limitations
108+
===========
109+
* ``flatten`` command may not work as expected when its flattened fields are
110+
invisible.
111+
112+
For example in query
113+
``source=my-index | fields message | flatten message``, the
114+
``flatten message`` command doesn't work since some flattened fields such as
115+
``message.info`` and ``message.author`` after command ``fields message`` are
116+
invisible.
117+
118+
As an alternative, you can change to ``source=my-index | flatten message``.
119+
120+
* The command works only with Calcite enabled. This can be set with the
121+
following command:
122+
123+
.. code-block::
124+
125+
PUT /_cluster/settings
126+
{
127+
"persistent":{
128+
"plugins.calcite.enabled": true
129+
}
130+
}

0 commit comments

Comments
 (0)