Skip to content

Commit 9cd1f96

Browse files
authored
Implement Append command with Calcite (opensearch-project#4123)
* Implement Append Command Signed-off-by: Songkan Tang <songkant@amazon.com> * Fix spotless check Signed-off-by: Songkan Tang <songkant@amazon.com> * Rephrase append.rst Signed-off-by: Songkan Tang <songkant@amazon.com> * Support subsearch different index for append command Signed-off-by: Songkan Tang <songkant@amazon.com> * Fix some tests and add cross cluster IT Signed-off-by: Songkan Tang <songkant@amazon.com> * Not support empty subsearch input for now Signed-off-by: Songkan Tang <songkant@amazon.com> * Fix doctest Signed-off-by: Songkan Tang <songkant@amazon.com> * Support empty source edge case Signed-off-by: Songkan Tang <songkant@amazon.com> * Fix anonymizer tests Signed-off-by: Songkan Tang <songkant@amazon.com> * Add missing test cases for nested join or lookup command in appended subsearch Signed-off-by: Songkan Tang <songkant@amazon.com> * Fix compile issue Signed-off-by: Songkan Tang <songkant@amazon.com> --------- Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent ca4d6c1 commit 9cd1f96

20 files changed

Lines changed: 942 additions & 2 deletions

File tree

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.sql.ast.expression.WindowFunction;
5959
import org.opensearch.sql.ast.tree.AD;
6060
import org.opensearch.sql.ast.tree.Aggregation;
61+
import org.opensearch.sql.ast.tree.Append;
6162
import org.opensearch.sql.ast.tree.AppendCol;
6263
import org.opensearch.sql.ast.tree.Bin;
6364
import org.opensearch.sql.ast.tree.CloseCursor;
@@ -783,6 +784,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) {
783784
throw getOnlyForCalciteException("Appendcol");
784785
}
785786

787+
@Override
788+
public LogicalPlan visitAppend(Append node, AnalysisContext context) {
789+
throw getOnlyForCalciteException("Append");
790+
}
791+
786792
private LogicalSort buildSort(
787793
LogicalPlan child, AnalysisContext context, Integer count, List<Field> sortFields) {
788794
ExpressionReferenceOptimizer optimizer =

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.sql.ast.statement.Statement;
4747
import org.opensearch.sql.ast.tree.AD;
4848
import org.opensearch.sql.ast.tree.Aggregation;
49+
import org.opensearch.sql.ast.tree.Append;
4950
import org.opensearch.sql.ast.tree.AppendCol;
5051
import org.opensearch.sql.ast.tree.Bin;
5152
import org.opensearch.sql.ast.tree.CloseCursor;
@@ -416,4 +417,8 @@ public T visitExistsSubquery(ExistsSubquery node, C context) {
416417
public T visitAppendCol(AppendCol node, C context) {
417418
return visitChildren(node, context);
418419
}
420+
421+
public T visitAppend(Append node, C context) {
422+
return visitChildren(node, context);
423+
}
419424
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast;
7+
8+
import java.util.Collections;
9+
import java.util.List;
10+
import org.opensearch.sql.ast.tree.Append;
11+
import org.opensearch.sql.ast.tree.AppendCol;
12+
import org.opensearch.sql.ast.tree.Join;
13+
import org.opensearch.sql.ast.tree.Lookup;
14+
import org.opensearch.sql.ast.tree.Relation;
15+
import org.opensearch.sql.ast.tree.TableFunction;
16+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
17+
import org.opensearch.sql.ast.tree.Values;
18+
19+
/** AST nodes visitor simplifies subsearch ast tree with empty source input. */
20+
public class EmptySourcePropagateVisitor extends AbstractNodeVisitor<UnresolvedPlan, Void> {
21+
22+
public static final UnresolvedPlan EMPTY_SOURCE = new Values(Collections.emptyList());
23+
24+
@Override
25+
public UnresolvedPlan visitValues(Values node, Void context) {
26+
return node;
27+
}
28+
29+
@Override
30+
public UnresolvedPlan visitRelation(Relation node, Void context) {
31+
return node;
32+
}
33+
34+
// Assume future table functions like inputLookup, makeresult command will use this unresolved
35+
// plan
36+
@Override
37+
public UnresolvedPlan visitTableFunction(TableFunction node, Void context) {
38+
return node;
39+
}
40+
41+
@Override
42+
public UnresolvedPlan visitChildren(Node node, Void context) {
43+
assert node instanceof UnresolvedPlan;
44+
UnresolvedPlan unresolvedPlan = (UnresolvedPlan) node;
45+
46+
if (unresolvedPlan.getChild().size() == 1) {
47+
return isEmptySource(((List<UnresolvedPlan>) unresolvedPlan.getChild()).get(0))
48+
? EMPTY_SOURCE
49+
: unresolvedPlan;
50+
}
51+
return super.visitChildren(node, context);
52+
}
53+
54+
@Override
55+
public UnresolvedPlan visitAppend(Append node, Void context) {
56+
UnresolvedPlan subSearch = node.getSubSearch().accept(this, context);
57+
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
58+
return new Append(subSearch).attach(child);
59+
}
60+
61+
@Override
62+
public UnresolvedPlan visitAppendCol(AppendCol node, Void context) {
63+
UnresolvedPlan subSearch = node.getSubSearch().accept(this, context);
64+
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
65+
return new AppendCol(node.isOverride(), subSearch).attach(child);
66+
}
67+
68+
// TODO: Revisit lookup logic here but for now we don't see use case yet
69+
@Override
70+
public UnresolvedPlan visitLookup(Lookup node, Void context) {
71+
UnresolvedPlan lookupRelation = node.getLookupRelation().accept(this, context);
72+
UnresolvedPlan child = node.getChild().get(0).accept(this, context);
73+
// Lookup is a LEFT join.
74+
// If left child is expected to be 0 row, it outputs 0 row
75+
// If right child is expected to be 0 row, the output is the left child;
76+
if (isEmptySource(child)) {
77+
return EMPTY_SOURCE;
78+
}
79+
return isEmptySource(lookupRelation) ? child : node;
80+
}
81+
82+
// Not see use case yet
83+
@Override
84+
public UnresolvedPlan visitJoin(Join node, Void context) {
85+
UnresolvedPlan left = node.getLeft().accept(this, context);
86+
UnresolvedPlan right = node.getRight().accept(this, context);
87+
88+
boolean leftEmpty = isEmptySource(left);
89+
boolean rightEmpty = isEmptySource(right);
90+
91+
switch (node.getJoinType()) {
92+
case INNER:
93+
case CROSS:
94+
return leftEmpty || rightEmpty ? EMPTY_SOURCE : node;
95+
case LEFT:
96+
case SEMI:
97+
case ANTI:
98+
if (leftEmpty) {
99+
return EMPTY_SOURCE;
100+
}
101+
return rightEmpty ? left : node;
102+
case RIGHT:
103+
if (rightEmpty) {
104+
return EMPTY_SOURCE;
105+
}
106+
return leftEmpty ? right : node;
107+
case FULL:
108+
if (leftEmpty) {
109+
return right;
110+
}
111+
return rightEmpty ? left : node;
112+
default:
113+
return node;
114+
}
115+
}
116+
117+
private boolean isEmptySource(UnresolvedPlan plan) {
118+
return plan instanceof Values
119+
&& (((Values) plan).getValues() == null || ((Values) plan).getValues().isEmpty());
120+
}
121+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.Setter;
14+
import lombok.ToString;
15+
import org.opensearch.sql.ast.AbstractNodeVisitor;
16+
import org.opensearch.sql.ast.Node;
17+
18+
/** Logical plan node of Append, the interface for union all columns in queries. */
19+
@Getter
20+
@Setter
21+
@ToString
22+
@EqualsAndHashCode(callSuper = false)
23+
@RequiredArgsConstructor
24+
public class Append extends UnresolvedPlan {
25+
26+
private final UnresolvedPlan subSearch;
27+
28+
private UnresolvedPlan child;
29+
30+
@Override
31+
public UnresolvedPlan attach(UnresolvedPlan child) {
32+
this.child = child;
33+
return this;
34+
}
35+
36+
@Override
37+
public List<? extends Node> getChild() {
38+
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
39+
}
40+
41+
@Override
42+
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
43+
return visitor.visitAppend(this, context);
44+
}
45+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@
5252
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
5353
import org.apache.calcite.sql.type.SqlTypeFamily;
5454
import org.apache.calcite.sql.type.SqlTypeName;
55+
import org.apache.calcite.sql.validate.SqlValidatorUtil;
5556
import org.apache.calcite.tools.RelBuilder;
5657
import org.apache.calcite.tools.RelBuilder.AggCall;
5758
import org.apache.calcite.util.Holder;
5859
import org.apache.commons.lang3.ArrayUtils;
5960
import org.apache.commons.lang3.tuple.Pair;
6061
import org.checkerframework.checker.nullness.qual.Nullable;
6162
import org.opensearch.sql.ast.AbstractNodeVisitor;
63+
import org.opensearch.sql.ast.EmptySourcePropagateVisitor;
6264
import org.opensearch.sql.ast.Node;
6365
import org.opensearch.sql.ast.dsl.AstDSL;
6466
import org.opensearch.sql.ast.expression.AggregateFunction;
@@ -81,6 +83,7 @@
8183
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
8284
import org.opensearch.sql.ast.tree.AD;
8385
import org.opensearch.sql.ast.tree.Aggregation;
86+
import org.opensearch.sql.ast.tree.Append;
8487
import org.opensearch.sql.ast.tree.AppendCol;
8588
import org.opensearch.sql.ast.tree.Bin;
8689
import org.opensearch.sql.ast.tree.CloseCursor;
@@ -113,6 +116,7 @@
113116
import org.opensearch.sql.ast.tree.Trendline;
114117
import org.opensearch.sql.ast.tree.Trendline.TrendlineType;
115118
import org.opensearch.sql.ast.tree.UnresolvedPlan;
119+
import org.opensearch.sql.ast.tree.Values;
116120
import org.opensearch.sql.ast.tree.Window;
117121
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
118122
import org.opensearch.sql.calcite.utils.BinUtils;
@@ -1214,6 +1218,75 @@ public RelNode visitAppendCol(AppendCol node, CalcitePlanContext context) {
12141218
}
12151219
}
12161220

1221+
@Override
1222+
public RelNode visitAppend(Append node, CalcitePlanContext context) {
1223+
// 1. Resolve main plan
1224+
visitChildren(node, context);
1225+
1226+
// 2. Resolve subsearch plan
1227+
UnresolvedPlan prunedSubSearch =
1228+
node.getSubSearch().accept(new EmptySourcePropagateVisitor(), null);
1229+
prunedSubSearch.accept(this, context);
1230+
1231+
// 3. Merge two query schemas
1232+
RelNode subsearchNode = context.relBuilder.build();
1233+
RelNode mainNode = context.relBuilder.build();
1234+
List<RelDataTypeField> mainFields = mainNode.getRowType().getFieldList();
1235+
List<RelDataTypeField> subsearchFields = subsearchNode.getRowType().getFieldList();
1236+
Map<String, RelDataTypeField> subsearchFieldMap =
1237+
subsearchFields.stream()
1238+
.map(typeField -> Pair.of(typeField.getName(), typeField))
1239+
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
1240+
boolean[] isSelected = new boolean[subsearchFields.size()];
1241+
List<String> names = new ArrayList<>();
1242+
List<RexNode> mainUnionProjects = new ArrayList<>();
1243+
List<RexNode> subsearchUnionProjects = new ArrayList<>();
1244+
1245+
// 3.1 Start with main query's schema. If subsearch plan doesn't have matched column,
1246+
// add same type column in place with NULL literal
1247+
for (int i = 0; i < mainFields.size(); i++) {
1248+
mainUnionProjects.add(context.rexBuilder.makeInputRef(mainNode, i));
1249+
RelDataTypeField mainField = mainFields.get(i);
1250+
RelDataTypeField subsearchField = subsearchFieldMap.get(mainField.getName());
1251+
names.add(mainField.getName());
1252+
if (subsearchFieldMap.containsKey(mainField.getName())
1253+
&& subsearchField != null
1254+
&& subsearchField.getType().equals(mainField.getType())) {
1255+
subsearchUnionProjects.add(
1256+
context.rexBuilder.makeInputRef(subsearchNode, subsearchField.getIndex()));
1257+
isSelected[subsearchField.getIndex()] = true;
1258+
} else {
1259+
subsearchUnionProjects.add(context.rexBuilder.makeNullLiteral(mainField.getType()));
1260+
}
1261+
}
1262+
1263+
// 3.2 Add remaining subsearch columns to the merged schema
1264+
for (int j = 0; j < subsearchFields.size(); j++) {
1265+
RelDataTypeField subsearchField = subsearchFields.get(j);
1266+
if (!isSelected[j]) {
1267+
mainUnionProjects.add(context.rexBuilder.makeNullLiteral(subsearchField.getType()));
1268+
subsearchUnionProjects.add(context.rexBuilder.makeInputRef(subsearchNode, j));
1269+
names.add(subsearchField.getName());
1270+
}
1271+
}
1272+
1273+
// 3.3 Uniquify names in case the merged names have duplicates
1274+
List<String> uniqNames =
1275+
SqlValidatorUtil.uniquify(names, SqlValidatorUtil.EXPR_SUGGESTER, true);
1276+
1277+
// 4. Apply new schema over two query plans
1278+
RelNode projectedMainNode =
1279+
context.relBuilder.push(mainNode).project(mainUnionProjects, uniqNames).build();
1280+
RelNode projectedSubsearchNode =
1281+
context.relBuilder.push(subsearchNode).project(subsearchUnionProjects, uniqNames).build();
1282+
1283+
// 5. Union all two projected plans
1284+
context.relBuilder.push(projectedMainNode);
1285+
context.relBuilder.push(projectedSubsearchNode);
1286+
context.relBuilder.union(true);
1287+
return context.relBuilder.peek();
1288+
}
1289+
12171290
/*
12181291
* Unsupported Commands of PPL with Calcite for OpenSearch 3.0.0-beta
12191292
*/
@@ -1844,6 +1917,16 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
18441917
return context.relBuilder.peek();
18451918
}
18461919

1920+
@Override
1921+
public RelNode visitValues(Values values, CalcitePlanContext context) {
1922+
if (values.getValues() == null || values.getValues().isEmpty()) {
1923+
context.relBuilder.values(context.relBuilder.getTypeFactory().builder().build());
1924+
return context.relBuilder.peek();
1925+
} else {
1926+
throw new CalciteUnsupportedException("Explicit values node is unsupported in Calcite");
1927+
}
1928+
}
1929+
18471930
private void buildParseRelNode(Parse node, CalcitePlanContext context) {
18481931
RexNode sourceField = rexVisitor.analyze(node.getSourceField(), context);
18491932
ParseMethod parseMethod = node.getParseMethod();

docs/category.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
"user/dql/metadata.rst"
5757
],
5858
"ppl_cli_calcite": [
59-
"user/ppl/cmd/stats.rst",
59+
"user/ppl/cmd/append.rst",
6060
"user/ppl/cmd/fields.rst",
6161
"user/ppl/cmd/regex.rst",
6262
"user/ppl/cmd/stats.rst",

0 commit comments

Comments
 (0)