Skip to content

Commit f88730f

Browse files
committed
add tests
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent a3e78a8 commit f88730f

5 files changed

Lines changed: 203 additions & 369 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) {
240240
visitChildren(node, context);
241241
UnresolvedPlan subqueryPlan = node.getSubQuery();
242242
UnresolvedPlan childNode = subqueryPlan;
243-
while (childNode.getChild() != null && !childNode.getChild().isEmpty()) {
243+
while (childNode.getChild() != null
244+
&& !childNode.getChild().isEmpty()
245+
&& !(childNode.getChild().getFirst() instanceof Values)) {
244246
childNode = (UnresolvedPlan) childNode.getChild().getFirst();
245247
}
246248
childNode.attach(node.getChild().getFirst());
@@ -250,7 +252,6 @@ public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) {
250252
RelNode subPipelineNode = context.relBuilder.build();
251253
RelNode mainNode = context.relBuilder.build();
252254
return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context);
253-
254255
}
255256

256257
@Override
@@ -1717,11 +1718,12 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
17171718
return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context);
17181719
}
17191720

1720-
private RelNode mergeTableAndResolveColumnConflict(RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) {
1721+
private RelNode mergeTableAndResolveColumnConflict(
1722+
RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) {
17211723
// Use shared schema merging logic that handles type conflicts via field renaming
17221724
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subqueryNode);
17231725
List<RelNode> projectedNodes =
1724-
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);
1726+
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);
17251727

17261728
// 4. Union the projected plans
17271729
for (RelNode projectedNode : projectedNodes) {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java

Lines changed: 78 additions & 224 deletions
Original file line numberDiff line numberDiff line change
@@ -5,236 +5,90 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8-
import org.json.JSONObject;
9-
import org.junit.Test;
10-
import org.opensearch.sql.common.setting.Settings;
11-
import org.opensearch.sql.ppl.PPLIntegTestCase;
12-
13-
import java.io.IOException;
14-
import java.util.Arrays;
15-
import java.util.List;
16-
import java.util.Locale;
17-
188
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
199
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
2010
import static org.opensearch.sql.util.MatcherUtils.rows;
2111
import static org.opensearch.sql.util.MatcherUtils.schema;
2212
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
2313
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;
2414

25-
public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase {
26-
@Override
27-
public void init() throws Exception {
28-
super.init();
29-
enableCalcite();
30-
loadIndex(Index.ACCOUNT);
31-
loadIndex(Index.BANK);
32-
}
33-
34-
@Test
35-
public void testAppendPipe() throws IOException {
36-
JSONObject actual =
37-
executeQuery(
38-
String.format(
39-
Locale.ROOT,
40-
"source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ "
41-
+ " stats sum(age) as sum_age_by_state by state | sort sum_age_by_state ] |"
42-
+ " head 5",
43-
TEST_INDEX_ACCOUNT));
44-
verifySchemaInOrder(
45-
actual,
46-
schema("sum_age_by_gender", "bigint"),
47-
schema("gender", "string"),
48-
schema("sum_age_by_state", "bigint"),
49-
schema("state", "string"));
50-
verifyDataRows(
51-
actual,
52-
rows(14947, "F", null, null),
53-
rows(15224, "M", null, null),
54-
rows(null, null, 369, "NV"),
55-
rows(null, null, 412, "NM"),
56-
rows(null, null, 414, "AZ"));
57-
}
58-
59-
@Test
60-
public void testAppendEmptySearchCommand() throws IOException {
61-
List<String> emptySourcePPLs =
62-
Arrays.asList(
63-
String.format(
64-
Locale.ROOT,
65-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ |"
66-
+ " stats sum(age) as sum_age_by_state by state ]",
67-
TEST_INDEX_ACCOUNT),
68-
String.format(
69-
Locale.ROOT,
70-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ ]",
71-
TEST_INDEX_ACCOUNT),
72-
String.format(
73-
Locale.ROOT,
74-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where age >"
75-
+ " 10 | append [ ] ]",
76-
TEST_INDEX_ACCOUNT),
77-
String.format(
78-
Locale.ROOT,
79-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where age >"
80-
+ " 10 | lookup %s gender as igender ]",
81-
TEST_INDEX_ACCOUNT,
82-
TEST_INDEX_ACCOUNT));
83-
84-
for (String ppl : emptySourcePPLs) {
85-
JSONObject actual = executeQuery(ppl);
86-
verifySchemaInOrder(
87-
actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string"));
88-
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"));
89-
}
90-
}
91-
92-
@Test
93-
public void testAppendEmptySearchWithJoin() throws IOException {
94-
withSettings(
95-
Settings.Key.CALCITE_SUPPORT_ALL_JOIN_TYPES,
96-
"true",
97-
() -> {
98-
List<String> emptySourceWithJoinPPLs =
99-
Arrays.asList(
100-
String.format(
101-
Locale.ROOT,
102-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | "
103-
+ " join left=L right=R on L.gender = R.gender %s ]",
104-
TEST_INDEX_ACCOUNT,
105-
TEST_INDEX_ACCOUNT),
106-
String.format(
107-
Locale.ROOT,
108-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | "
109-
+ " cross join left=L right=R on L.gender = R.gender %s ]",
110-
TEST_INDEX_ACCOUNT,
111-
TEST_INDEX_ACCOUNT),
112-
String.format(
113-
Locale.ROOT,
114-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | "
115-
+ " left join left=L right=R on L.gender = R.gender %s ]",
116-
TEST_INDEX_ACCOUNT,
117-
TEST_INDEX_ACCOUNT),
118-
String.format(
119-
Locale.ROOT,
120-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | "
121-
+ " semi join left=L right=R on L.gender = R.gender %s ]",
122-
TEST_INDEX_ACCOUNT,
123-
TEST_INDEX_ACCOUNT));
124-
125-
for (String ppl : emptySourceWithJoinPPLs) {
126-
JSONObject actual = null;
127-
try {
128-
actual = executeQuery(ppl);
129-
} catch (IOException e) {
130-
throw new RuntimeException(e);
131-
}
132-
verifySchemaInOrder(
133-
actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string"));
134-
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"));
135-
}
136-
137-
List<String> emptySourceWithRightOrFullJoinPPLs =
138-
Arrays.asList(
139-
String.format(
140-
Locale.ROOT,
141-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where"
142-
+ " gender = 'F' | right join on gender = gender [source=%s | stats"
143-
+ " count() as cnt by gender ] ]",
144-
TEST_INDEX_ACCOUNT,
145-
TEST_INDEX_ACCOUNT),
146-
String.format(
147-
Locale.ROOT,
148-
"source=%s | stats sum(age) as sum_age_by_gender by gender | append [ | where"
149-
+ " gender = 'F' | full join on gender = gender [source=%s | stats"
150-
+ " count() as cnt by gender ] ]",
151-
TEST_INDEX_ACCOUNT,
152-
TEST_INDEX_ACCOUNT));
153-
154-
for (String ppl : emptySourceWithRightOrFullJoinPPLs) {
155-
JSONObject actual = null;
156-
try {
157-
actual = executeQuery(ppl);
158-
} catch (IOException e) {
159-
throw new RuntimeException(e);
160-
}
161-
verifySchemaInOrder(
162-
actual,
163-
schema("sum_age_by_gender", "bigint"),
164-
schema("gender", "string"),
165-
schema("cnt", "bigint"));
166-
verifyDataRows(
167-
actual,
168-
rows(14947, "F", null),
169-
rows(15224, "M", null),
170-
rows(null, "F", 493),
171-
rows(null, "M", 507));
172-
}
173-
});
174-
}
175-
176-
@Test
177-
public void testAppendDifferentIndex() throws IOException {
178-
JSONObject actual =
179-
executeQuery(
180-
String.format(
181-
Locale.ROOT,
182-
"source=%s | stats sum(age) as sum by gender | append [ source=%s | stats"
183-
+ " sum(age) as bank_sum_age ]",
184-
TEST_INDEX_ACCOUNT,
185-
TEST_INDEX_BANK));
186-
verifySchemaInOrder(
187-
actual,
188-
schema("sum", "bigint"),
189-
schema("gender", "string"),
190-
schema("bank_sum_age", "bigint"));
191-
verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238));
192-
}
193-
194-
@Test
195-
public void testAppendWithMergedColumn() throws IOException {
196-
JSONObject actual =
197-
executeQuery(
198-
String.format(
199-
Locale.ROOT,
200-
"source=%s | stats sum(age) as sum by gender |"
201-
+ " append [ source=%s | stats sum(age) as sum by state | sort sum ] | head 5",
202-
TEST_INDEX_ACCOUNT,
203-
TEST_INDEX_ACCOUNT));
204-
verifySchemaInOrder(
205-
actual, schema("sum", "bigint"), schema("gender", "string"), schema("state", "string"));
206-
verifyDataRows(
207-
actual,
208-
rows(14947, "F", null),
209-
rows(15224, "M", null),
210-
rows(369, null, "NV"),
211-
rows(412, null, "NM"),
212-
rows(414, null, "AZ"));
213-
}
15+
import java.io.IOException;
16+
import java.util.Locale;
17+
import org.json.JSONObject;
18+
import org.junit.Test;
19+
import org.opensearch.sql.ppl.PPLIntegTestCase;
21420

215-
@Test
216-
public void testAppendWithConflictTypeColumn() throws IOException {
217-
JSONObject actual =
218-
executeQuery(
219-
String.format(
220-
Locale.ROOT,
221-
"source=%s | stats sum(age) as sum by gender | append [ source=%s | stats sum(age)"
222-
+ " as sum by state | sort sum | eval sum = cast(sum as double) ] | head 5",
223-
TEST_INDEX_ACCOUNT,
224-
TEST_INDEX_ACCOUNT));
225-
verifySchemaInOrder(
226-
actual,
227-
schema("sum", "bigint"),
228-
schema("gender", "string"),
229-
schema("state", "string"),
230-
schema("sum0", "double"));
231-
verifyDataRows(
232-
actual,
233-
rows(14947, "F", null, null),
234-
rows(15224, "M", null, null),
235-
rows(null, null, "NV", 369d),
236-
rows(null, null, "NM", 412d),
237-
rows(null, null, "AZ", 414d));
238-
}
21+
public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase {
22+
@Override
23+
public void init() throws Exception {
24+
super.init();
25+
enableCalcite();
26+
loadIndex(Index.ACCOUNT);
27+
loadIndex(Index.BANK);
28+
}
29+
30+
@Test
31+
public void testAppendPipe() throws IOException {
32+
JSONObject actual =
33+
executeQuery(
34+
String.format(
35+
Locale.ROOT,
36+
"source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ "
37+
+ " sort -sum_age_by_gender ] |"
38+
+ " head 5",
39+
TEST_INDEX_ACCOUNT));
40+
verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string"));
41+
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F"));
42+
}
43+
44+
@Test
45+
public void testAppendDifferentIndex() throws IOException {
46+
JSONObject actual =
47+
executeQuery(
48+
String.format(
49+
Locale.ROOT,
50+
"source=%s | stats sum(age) as sum by gender | append [ source=%s | stats"
51+
+ " sum(age) as bank_sum_age ]",
52+
TEST_INDEX_ACCOUNT,
53+
TEST_INDEX_BANK));
54+
verifySchemaInOrder(
55+
actual,
56+
schema("sum", "bigint"),
57+
schema("gender", "string"),
58+
schema("bank_sum_age", "bigint"));
59+
verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238));
60+
}
61+
62+
@Test
63+
public void testAppendpipeWithMergedColumn() throws IOException {
64+
JSONObject actual =
65+
executeQuery(
66+
String.format(
67+
Locale.ROOT,
68+
"source=%s | stats sum(age) as sum by gender |"
69+
+ " appendpipe [ stats sum(sum) as sum ] | head 5",
70+
TEST_INDEX_ACCOUNT,
71+
TEST_INDEX_ACCOUNT));
72+
verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string"));
73+
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null));
74+
}
75+
76+
@Test
77+
public void testAppendpipeWithConflictTypeColumn() throws IOException {
78+
JSONObject actual =
79+
executeQuery(
80+
String.format(
81+
Locale.ROOT,
82+
"source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum = cast(sum as"
83+
+ " double) ] | head 5",
84+
TEST_INDEX_ACCOUNT));
85+
verifySchemaInOrder(
86+
actual, schema("sum", "bigint"), schema("gender", "string"), schema("sum0", "double"));
87+
verifyDataRows(
88+
actual,
89+
rows(14947, "F", null),
90+
rows(15224, "M", null),
91+
rows(null, "F", 14947d),
92+
rows(null, "M", 15224d));
93+
}
23994
}
240-

0 commit comments

Comments
 (0)