Skip to content

Commit 9896166

Browse files
committed
Support merging object-type fields when fetching the schema from the index (opensearch-project#3653)
* merge object/array Signed-off-by: xinyual <xinyual@amazon.com> * simplified code Signed-off-by: xinyual <xinyual@amazon.com> * apply spotless Signed-off-by: xinyual <xinyual@amazon.com> * fix IT by adding fields Signed-off-by: xinyual <xinyual@amazon.com> * revert to hashmap Signed-off-by: xinyual <xinyual@amazon.com> * filter one indices case Signed-off-by: xinyual <xinyual@amazon.com> * add ut and merge rules Signed-off-by: xinyual <xinyual@amazon.com> * add benchmark test Signed-off-by: xinyual <xinyual@amazon.com> * revert change Signed-off-by: xinyual <xinyual@amazon.com> * refactor merge rules Signed-off-by: xinyual <xinyual@amazon.com> * fix IT Signed-off-by: xinyual <xinyual@amazon.com> --------- Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 5705bd3 commit 9896166

19 files changed

Lines changed: 443 additions & 24 deletions

File tree

benchmarks/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ repositories {
1414

1515
dependencies {
1616
implementation project(':core')
17+
implementation project(':opensearch')
1718

1819
// Dependencies required by JMH micro benchmark
1920
api group: 'org.openjdk.jmh', name: 'jmh-core', version: '1.36'
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.operator.predicate;
7+
8+
import java.util.ArrayList;
9+
import java.util.HashMap;
10+
import java.util.LinkedHashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import org.openjdk.jmh.annotations.Benchmark;
14+
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
15+
import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest;
16+
17+
public class MergeArrayAndObjectMapBenchmark {
18+
private static final List<Map<String, OpenSearchDataType>> candidateMaps = prepareListOfMaps(120);
19+
20+
@Benchmark
21+
public void testMerge() {
22+
Map<String, OpenSearchDataType> finalResult = new HashMap<>();
23+
for (Map<String, OpenSearchDataType> map : candidateMaps) {
24+
OpenSearchDescribeIndexRequest.mergeObjectAndArrayInsideMap(finalResult, map);
25+
}
26+
}
27+
28+
private static Map<String, OpenSearchDataType> prepareMap(int recursive, String prefix) {
29+
Map<String, OpenSearchDataType> map = new HashMap<>();
30+
Map<String, Object> innerMap = prepareRecursiveMap(recursive, prefix);
31+
map.put("name", OpenSearchDataType.of(OpenSearchDataType.MappingType.Object, innerMap));
32+
return map;
33+
}
34+
35+
public static Map<String, Object> prepareRecursiveMap(int recursive, String prefix) {
36+
Map<String, Object> innerMap = new LinkedHashMap<>();
37+
if (recursive == 0) {
38+
innerMap.put("type", "string");
39+
} else {
40+
innerMap.put("type", "object");
41+
innerMap.put(
42+
"properties",
43+
Map.of(
44+
prefix + "_" + String.valueOf(recursive),
45+
Map.of("type", "text"),
46+
"recursive",
47+
prepareRecursiveMap(recursive - 1, prefix)));
48+
}
49+
return innerMap;
50+
}
51+
52+
private static List<Map<String, OpenSearchDataType>> prepareListOfMaps(int listNumber) {
53+
List<Map<String, OpenSearchDataType>> list = new ArrayList<>();
54+
for (int i = 0; i < listNumber; i++) {
55+
list.add(prepareMap(15, "prefix" + i));
56+
}
57+
return list;
58+
}
59+
}

benchmarks/src/jmh/java/org/opensearch/sql/expression/operator/predicate/PatternsWindowFunctionBenchmark.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.sql.expression.ReferenceExpression;
3232
import org.opensearch.sql.expression.window.WindowDefinition;
3333
import org.opensearch.sql.expression.window.frame.BufferPatternRowsWindowFrame;
34-
import org.opensearch.sql.expression.window.frame.CurrentRowWindowFrame;
3534
import org.opensearch.sql.expression.window.frame.WindowFrame;
3635

3736
@Warmup(iterations = 1)
@@ -62,14 +61,6 @@ public class PatternsWindowFunctionBenchmark {
6261
new BrainLogParser(),
6362
new NamedArgumentExpression("message", new ReferenceExpression("message", STRING)));
6463

65-
@Benchmark
66-
public void testSimplePattern() {
67-
CurrentRowWindowFrame windowFrame =
68-
new CurrentRowWindowFrame(new WindowDefinition(ImmutableList.of(), ImmutableList.of()));
69-
70-
run(windowFrame, DSL.simple_pattern(DSL.ref("message", STRING)));
71-
}
72-
7364
@Benchmark
7465
public void testBrain() {
7566
BufferPatternRowsWindowFrame windowFrame =

core/src/main/java/org/opensearch/sql/expression/function/jsonUDF/JsonExtractFunctionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static Object eval(Object... args) {
7979
results.add(queryResult != null ? queryResult : valueResult);
8080
}
8181
if (jsonPaths.size() == 1) {
82-
return doJsonize(results.getFirst());
82+
return doJsonize(results.get(0));
8383
}
8484
return doJsonize(results);
8585
}

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBasicIT.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55

66
package org.opensearch.sql.calcite.standalone;
77

8-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
9-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
10-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
8+
import static org.opensearch.sql.legacy.TestsConstants.*;
119
import static org.opensearch.sql.util.MatcherUtils.rows;
1210
import static org.opensearch.sql.util.MatcherUtils.schema;
1311
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -38,6 +36,8 @@ public void init() throws IOException {
3836

3937
loadIndex(Index.BANK);
4038
loadIndex(Index.DATA_TYPE_ALIAS);
39+
loadIndex(Index.MERGE_TEST_1);
40+
loadIndex(Index.MERGE_TEST_2);
4141
}
4242

4343
@Test
@@ -562,6 +562,30 @@ public void testMetaFieldAlias() {
562562
}
563563

564564
@Test
565+
public void testFieldsMergedObject() {
566+
JSONObject result =
567+
executeQuery(
568+
String.format(
569+
"source=%s | fields machine.os1, machine.os2, machine_array.os1, "
570+
+ " machine_array.os2, machine_deep.attr1, machine_deep.attr2,"
571+
+ " machine_deep.layer.os1, machine_deep.layer.os2",
572+
TEST_INDEX_MERGE_TEST_WILDCARD));
573+
verifySchema(
574+
result,
575+
schema("machine.os1", "string"),
576+
schema("machine.os2", "string"),
577+
schema("machine_array.os1", "string"),
578+
schema("machine_array.os2", "string"),
579+
schema("machine_deep.attr1", "long"),
580+
schema("machine_deep.attr2", "long"),
581+
schema("machine_deep.layer.os1", "string"),
582+
schema("machine_deep.layer.os2", "string"));
583+
verifyDataRows(
584+
result,
585+
rows("linux", null, "linux", null, 1, null, "os1", null),
586+
rows(null, "linux", null, "linux", null, 2, null, "os2"));
587+
}
588+
565589
public void testNumericLiteral() {
566590
JSONObject result =
567591
executeQuery(

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLCaseFunctionIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void testCaseWhenWithCast() {
6565
" cast(response as int) >= 400 AND cast(response as int) < 500, \"Client Error\",\n" +
6666
" cast(response as int) >= 500 AND cast(response as int) < 600, \"Server Error\"\n" +
6767
" else concat(\"Incorrect HTTP status code for\", url))\n" +
68-
"| where status != \"Success\"\n | fields host, method, message, bytes, response, url, status",
68+
"| where status != \"Success\" | fields host, method, message, bytes, response, url, status\n",
6969
TEST_INDEX_WEBLOGS));
7070
verifySchema(
7171
actual,
@@ -197,7 +197,8 @@ public void testCaseWhenInFilter() {
197197
" response in ('300', '301'), false,\n" +
198198
" response in ('400', '403'), false,\n" +
199199
" response in ('500', '505'), false\n" +
200-
" else false) | fields host, method, message, bytes, response, url\n",
200+
" else false)\n" +
201+
"| fields host, method, message, bytes, response, url\n",
201202
TEST_INDEX_WEBLOGS));
202203
verifySchema(
203204
actual,
@@ -230,7 +231,8 @@ public void testCaseWhenInSubquery() {
230231
" response in ('500', '505'), \"500\"\n" +
231232
" else concat(\"Incorrect HTTP status code for\", url))\n" +
232233
" | fields new_response\n" +
233-
" ] | fields host, method, message, bytes, response, url\n",
234+
" ]\n" +
235+
"| fields host, method, message, bytes, response, url\n",
234236
TEST_INDEX_WEBLOGS, TEST_INDEX_WEBLOGS));
235237
verifySchema(
236238
actual,

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,16 @@ public enum Index {
795795
"hobbies",
796796
getHobbiesIndexMapping(),
797797
"src/test/resources/hobbies.json"),
798+
MERGE_TEST_1(
799+
TestsConstants.TEST_INDEX_MERGE_TEST_1,
800+
"merge_test1",
801+
getMappingFile("merge_test_1_mapping.json"),
802+
"src/test/resources/merge_test_1.json"),
803+
MERGE_TEST_2(
804+
TestsConstants.TEST_INDEX_MERGE_TEST_2,
805+
"merge_test2",
806+
getMappingFile("merge_test_2_mapping.json"),
807+
"src/test/resources/merge_test_2.json"),
798808
// It's "people" table in Spark PPL ITs, to avoid conflicts, rename to "worker" here
799809
WORKER(
800810
TestsConstants.TEST_INDEX_WORKER,

integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class TestsConstants {
7474
public static final String TEST_INDEX_WORKER = TEST_INDEX + "_worker";
7575
public static final String TEST_INDEX_WORK_INFORMATION = TEST_INDEX + "_work_information";
7676
public static final String TEST_INDEX_DUPLICATION_NULLABLE = TEST_INDEX + "_duplication_nullable";
77+
public static final String TEST_INDEX_MERGE_TEST_1 = TEST_INDEX + "_merge_test_1";
78+
public static final String TEST_INDEX_MERGE_TEST_2 = TEST_INDEX + "_merge_test_2";
79+
public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*";
7780

7881
public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
7982
public static final String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

integ-test/src/test/java/org/opensearch/sql/ppl/FieldsCommandIT.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55

66
package org.opensearch.sql.ppl;
77

8-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
9-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
8+
import static org.opensearch.sql.legacy.TestsConstants.*;
109
import static org.opensearch.sql.util.MatcherUtils.columnName;
1110
import static org.opensearch.sql.util.MatcherUtils.columnPattern;
1211
import static org.opensearch.sql.util.MatcherUtils.rows;
@@ -28,6 +27,8 @@ public void init() throws Exception {
2827
super.init();
2928
loadIndex(Index.ACCOUNT);
3029
loadIndex(Index.BANK);
30+
loadIndex(Index.MERGE_TEST_1);
31+
loadIndex(Index.MERGE_TEST_2);
3132
}
3233

3334
@Test
@@ -105,4 +106,29 @@ public void testMetadataFieldsWithEvalMetaField() {
105106
"source=%s | eval _id = 1 | fields firstname, _id", TEST_INDEX_ACCOUNT)));
106107
verifyErrorMessageContains(e, "Cannot use metadata field [_id] as the eval field.");
107108
}
109+
110+
@Test
111+
public void testFieldsMergedObject() throws IOException {
112+
JSONObject result =
113+
executeQuery(
114+
String.format(
115+
"source=%s | fields machine.os1, machine.os2, machine_array.os1, "
116+
+ " machine_array.os2, machine_deep.attr1, machine_deep.attr2,"
117+
+ " machine_deep.layer.os1, machine_deep.layer.os2",
118+
TEST_INDEX_MERGE_TEST_WILDCARD));
119+
verifySchema(
120+
result,
121+
schema("machine.os1", "string"),
122+
schema("machine.os2", "string"),
123+
schema("machine_array.os1", "string"),
124+
schema("machine_array.os2", "string"),
125+
schema("machine_deep.attr1", "bigint"),
126+
schema("machine_deep.attr2", "bigint"),
127+
schema("machine_deep.layer.os1", "string"),
128+
schema("machine_deep.layer.os2", "string"));
129+
verifyDataRows(
130+
result,
131+
rows("linux", null, "linux", null, 1, null, "os1", null),
132+
rows(null, "linux", null, "linux", null, 2, null, "os2"));
133+
}
108134
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{
2+
"mappings": {
3+
"properties": {
4+
"machine": {
5+
"properties": {
6+
"os1": {
7+
"type": "text"
8+
},
9+
"ram1": {
10+
"type": "long"
11+
}
12+
}
13+
},
14+
"machine_deep": {
15+
"properties": {
16+
"attr1": {
17+
"type": "long"
18+
},
19+
"layer": {
20+
"properties": {
21+
"os1": {
22+
"type": "text"
23+
}
24+
}
25+
}
26+
}
27+
},
28+
"machine_array": {
29+
"type": "nested",
30+
"properties": {
31+
"os1": {
32+
"type": "text"
33+
},
34+
"ram1": {
35+
"type": "long"
36+
}
37+
}
38+
}
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)