Skip to content

Commit ed507d7

Browse files
authored
Support merging object-type fields when fetching the schema from the index (opensearch-project#3653)
* merge object/array Signed-off-by: xinyual <xinyual@amazon.com> * simplified code Signed-off-by: xinyual <xinyual@amazon.com> * apply spotless Signed-off-by: xinyual <xinyual@amazon.com> * fix IT by adding fields Signed-off-by: xinyual <xinyual@amazon.com> * revert to hashmap Signed-off-by: xinyual <xinyual@amazon.com> * filter one indices case Signed-off-by: xinyual <xinyual@amazon.com> * add ut and merge rules Signed-off-by: xinyual <xinyual@amazon.com> * add benchmark test Signed-off-by: xinyual <xinyual@amazon.com> * revert change Signed-off-by: xinyual <xinyual@amazon.com> * refactor merge rules Signed-off-by: xinyual <xinyual@amazon.com> * fix IT Signed-off-by: xinyual <xinyual@amazon.com> --------- Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 05aa651 commit ed507d7

19 files changed

Lines changed: 448 additions & 26 deletions

File tree

benchmarks/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ repositories {
1414

1515
dependencies {
1616
implementation project(':core')
17+
implementation project(':opensearch')
1718

1819
// Dependencies required by JMH micro benchmark
1920
api group: 'org.openjdk.jmh', name: 'jmh-core', version: '1.36'
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.operator.predicate;
7+
8+
import java.util.ArrayList;
9+
import java.util.HashMap;
10+
import java.util.LinkedHashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import org.openjdk.jmh.annotations.Benchmark;
14+
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
15+
import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest;
16+
17+
public class MergeArrayAndObjectMapBenchmark {
18+
private static final List<Map<String, OpenSearchDataType>> candidateMaps = prepareListOfMaps(120);
19+
20+
@Benchmark
21+
public void testMerge() {
22+
Map<String, OpenSearchDataType> finalResult = new HashMap<>();
23+
for (Map<String, OpenSearchDataType> map : candidateMaps) {
24+
OpenSearchDescribeIndexRequest.mergeObjectAndArrayInsideMap(finalResult, map);
25+
}
26+
}
27+
28+
private static Map<String, OpenSearchDataType> prepareMap(int recursive, String prefix) {
29+
Map<String, OpenSearchDataType> map = new HashMap<>();
30+
Map<String, Object> innerMap = prepareRecursiveMap(recursive, prefix);
31+
map.put("name", OpenSearchDataType.of(OpenSearchDataType.MappingType.Object, innerMap));
32+
return map;
33+
}
34+
35+
public static Map<String, Object> prepareRecursiveMap(int recursive, String prefix) {
36+
Map<String, Object> innerMap = new LinkedHashMap<>();
37+
if (recursive == 0) {
38+
innerMap.put("type", "string");
39+
} else {
40+
innerMap.put("type", "object");
41+
innerMap.put(
42+
"properties",
43+
Map.of(
44+
prefix + "_" + String.valueOf(recursive),
45+
Map.of("type", "text"),
46+
"recursive",
47+
prepareRecursiveMap(recursive - 1, prefix)));
48+
}
49+
return innerMap;
50+
}
51+
52+
private static List<Map<String, OpenSearchDataType>> prepareListOfMaps(int listNumber) {
53+
List<Map<String, OpenSearchDataType>> list = new ArrayList<>();
54+
for (int i = 0; i < listNumber; i++) {
55+
list.add(prepareMap(15, "prefix" + i));
56+
}
57+
return list;
58+
}
59+
}

benchmarks/src/jmh/java/org/opensearch/sql/expression/operator/predicate/PatternsWindowFunctionBenchmark.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.sql.expression.ReferenceExpression;
3232
import org.opensearch.sql.expression.window.WindowDefinition;
3333
import org.opensearch.sql.expression.window.frame.BufferPatternRowsWindowFrame;
34-
import org.opensearch.sql.expression.window.frame.CurrentRowWindowFrame;
3534
import org.opensearch.sql.expression.window.frame.WindowFrame;
3635

3736
@Warmup(iterations = 1)
@@ -62,14 +61,6 @@ public class PatternsWindowFunctionBenchmark {
6261
new BrainLogParser(),
6362
new NamedArgumentExpression("message", new ReferenceExpression("message", STRING)));
6463

65-
@Benchmark
66-
public void testSimplePattern() {
67-
CurrentRowWindowFrame windowFrame =
68-
new CurrentRowWindowFrame(new WindowDefinition(ImmutableList.of(), ImmutableList.of()));
69-
70-
run(windowFrame, DSL.simple_pattern(DSL.ref("message", STRING)));
71-
}
72-
7364
@Benchmark
7465
public void testBrain() {
7566
BufferPatternRowsWindowFrame windowFrame =

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBasicIT.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55

66
package org.opensearch.sql.calcite.standalone;
77

8-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
9-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
10-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
8+
import static org.opensearch.sql.legacy.TestsConstants.*;
119
import static org.opensearch.sql.util.MatcherUtils.rows;
1210
import static org.opensearch.sql.util.MatcherUtils.schema;
1311
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -38,6 +36,8 @@ public void init() throws IOException {
3836

3937
loadIndex(Index.BANK);
4038
loadIndex(Index.DATA_TYPE_ALIAS);
39+
loadIndex(Index.MERGE_TEST_1);
40+
loadIndex(Index.MERGE_TEST_2);
4141
}
4242

4343
@Test
@@ -566,6 +566,30 @@ public void testMetaFieldAlias() {
566566
}
567567

568568
@Test
569+
public void testFieldsMergedObject() {
570+
JSONObject result =
571+
executeQuery(
572+
String.format(
573+
"source=%s | fields machine.os1, machine.os2, machine_array.os1, "
574+
+ " machine_array.os2, machine_deep.attr1, machine_deep.attr2,"
575+
+ " machine_deep.layer.os1, machine_deep.layer.os2",
576+
TEST_INDEX_MERGE_TEST_WILDCARD));
577+
verifySchema(
578+
result,
579+
schema("machine.os1", "string"),
580+
schema("machine.os2", "string"),
581+
schema("machine_array.os1", "string"),
582+
schema("machine_array.os2", "string"),
583+
schema("machine_deep.attr1", "long"),
584+
schema("machine_deep.attr2", "long"),
585+
schema("machine_deep.layer.os1", "string"),
586+
schema("machine_deep.layer.os2", "string"));
587+
verifyDataRows(
588+
result,
589+
rows("linux", null, "linux", null, 1, null, "os1", null),
590+
rows(null, "linux", null, "linux", null, 2, null, "os2"));
591+
}
592+
569593
public void testNumericLiteral() {
570594
JSONObject result =
571595
executeQuery(

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLCaseFunctionIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void testCaseWhenWithCast() {
6666
cast(response as int) >= 400 AND cast(response as int) < 500, "Client Error",
6767
cast(response as int) >= 500 AND cast(response as int) < 600, "Server Error"
6868
else concat("Incorrect HTTP status code for", url))
69-
| where status != "Success"
69+
| where status != "Success" | fields host, method, message, bytes, response, url, status
7070
""",
7171
TEST_INDEX_WEBLOGS));
7272
verifySchema(
@@ -114,7 +114,7 @@ public void testCaseWhenNoElse() {
114114
cast(response as int) >= 300 AND cast(response as int) < 400, "Redirection",
115115
cast(response as int) >= 400 AND cast(response as int) < 500, "Client Error",
116116
cast(response as int) >= 500 AND cast(response as int) < 600, "Server Error")
117-
| where isnull(status) OR status != "Success"
117+
| where isnull(status) OR status != "Success" | fields host, method, message, bytes, response, url, status
118118
""",
119119
TEST_INDEX_WEBLOGS));
120120
verifySchema(
@@ -156,7 +156,7 @@ response in ('300', '301'), "Redirection",
156156
response in ('400', '403'), "Client Error",
157157
response in ('500', '505'), "Server Error"
158158
else concat("Incorrect HTTP status code for", url))
159-
| where status != "Success"
159+
| where status != "Success" | fields host, method, message, bytes, response, url, status
160160
""",
161161
TEST_INDEX_WEBLOGS));
162162
verifySchema(
@@ -205,6 +205,7 @@ response in ('300', '301'), false,
205205
response in ('400', '403'), false,
206206
response in ('500', '505'), false
207207
else false)
208+
| fields host, method, message, bytes, response, url
208209
""",
209210
TEST_INDEX_WEBLOGS));
210211
verifySchema(
@@ -240,6 +241,7 @@ response in ('500', '505'), "500"
240241
else concat("Incorrect HTTP status code for", url))
241242
| fields new_response
242243
]
244+
| fields host, method, message, bytes, response, url
243245
""",
244246
TEST_INDEX_WEBLOGS, TEST_INDEX_WEBLOGS));
245247
verifySchema(

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,16 @@ public enum Index {
818818
"hobbies",
819819
getHobbiesIndexMapping(),
820820
"src/test/resources/hobbies.json"),
821+
MERGE_TEST_1(
822+
TestsConstants.TEST_INDEX_MERGE_TEST_1,
823+
"merge_test1",
824+
getMappingFile("merge_test_1_mapping.json"),
825+
"src/test/resources/merge_test_1.json"),
826+
MERGE_TEST_2(
827+
TestsConstants.TEST_INDEX_MERGE_TEST_2,
828+
"merge_test2",
829+
getMappingFile("merge_test_2_mapping.json"),
830+
"src/test/resources/merge_test_2.json"),
821831
// It's "people" table in Spark PPL ITs, to avoid conflicts, rename to "worker" here
822832
WORKER(
823833
TestsConstants.TEST_INDEX_WORKER,

integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class TestsConstants {
7474
public static final String TEST_INDEX_WORKER = TEST_INDEX + "_worker";
7575
public static final String TEST_INDEX_WORK_INFORMATION = TEST_INDEX + "_work_information";
7676
public static final String TEST_INDEX_DUPLICATION_NULLABLE = TEST_INDEX + "_duplication_nullable";
77+
public static final String TEST_INDEX_MERGE_TEST_1 = TEST_INDEX + "_merge_test_1";
78+
public static final String TEST_INDEX_MERGE_TEST_2 = TEST_INDEX + "_merge_test_2";
79+
public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*";
7780

7881
public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
7982
public static final String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

integ-test/src/test/java/org/opensearch/sql/ppl/FieldsCommandIT.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55

66
package org.opensearch.sql.ppl;
77

8-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
9-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
8+
import static org.opensearch.sql.legacy.TestsConstants.*;
109
import static org.opensearch.sql.util.MatcherUtils.columnName;
1110
import static org.opensearch.sql.util.MatcherUtils.columnPattern;
1211
import static org.opensearch.sql.util.MatcherUtils.rows;
@@ -28,6 +27,8 @@ public void init() throws Exception {
2827
super.init();
2928
loadIndex(Index.ACCOUNT);
3029
loadIndex(Index.BANK);
30+
loadIndex(Index.MERGE_TEST_1);
31+
loadIndex(Index.MERGE_TEST_2);
3132
}
3233

3334
@Test
@@ -105,4 +106,29 @@ public void testMetadataFieldsWithEvalMetaField() {
105106
"source=%s | eval _id = 1 | fields firstname, _id", TEST_INDEX_ACCOUNT)));
106107
verifyErrorMessageContains(e, "Cannot use metadata field [_id] as the eval field.");
107108
}
109+
110+
@Test
111+
public void testFieldsMergedObject() throws IOException {
112+
JSONObject result =
113+
executeQuery(
114+
String.format(
115+
"source=%s | fields machine.os1, machine.os2, machine_array.os1, "
116+
+ " machine_array.os2, machine_deep.attr1, machine_deep.attr2,"
117+
+ " machine_deep.layer.os1, machine_deep.layer.os2",
118+
TEST_INDEX_MERGE_TEST_WILDCARD));
119+
verifySchema(
120+
result,
121+
schema("machine.os1", "string"),
122+
schema("machine.os2", "string"),
123+
schema("machine_array.os1", "string"),
124+
schema("machine_array.os2", "string"),
125+
schema("machine_deep.attr1", "bigint"),
126+
schema("machine_deep.attr2", "bigint"),
127+
schema("machine_deep.layer.os1", "string"),
128+
schema("machine_deep.layer.os2", "string"));
129+
verifyDataRows(
130+
result,
131+
rows("linux", null, "linux", null, 1, null, "os1", null),
132+
rows(null, "linux", null, "linux", null, 2, null, "os2"));
133+
}
108134
}

integ-test/src/test/java/org/opensearch/sql/ppl/GeoIpFunctionsIT.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void testGeoIpEnrichment() {
8383
JSONObject resultGeoIp =
8484
executeQuery(
8585
String.format(
86-
"search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s)",
86+
"search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s) | fields name, ip,"
87+
+ " enrichmentResult",
8788
TEST_INDEX_GEOIP, "dummycityindex", "ip"));
8889

8990
verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult"));
@@ -100,7 +101,8 @@ public void testGeoIpEnrichmentWithSingleOption() {
100101
JSONObject resultGeoIp =
101102
executeQuery(
102103
String.format(
103-
"search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s,\\\"%s\\\")",
104+
"search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s,\\\"%s\\\") |"
105+
+ " fields name, ip, enrichmentResult",
104106
TEST_INDEX_GEOIP, "dummycityindex", "ip", "city"));
105107

106108
verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult"));
@@ -117,7 +119,8 @@ public void testGeoIpEnrichmentWithSpaceSeparatedMultipleOptions() {
117119
JSONObject resultGeoIp =
118120
executeQuery(
119121
String.format(
120-
"search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s,\\\"%s\\\")",
122+
"search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s,\\\"%s\\\") |"
123+
+ " fields name, ip, enrichmentResult",
121124
TEST_INDEX_GEOIP, "dummycityindex", "ip", "city , country"));
122125

123126
verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult"));
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{
2+
"mappings": {
3+
"properties": {
4+
"machine": {
5+
"properties": {
6+
"os1": {
7+
"type": "text"
8+
},
9+
"ram1": {
10+
"type": "long"
11+
}
12+
}
13+
},
14+
"machine_deep": {
15+
"properties": {
16+
"attr1": {
17+
"type": "long"
18+
},
19+
"layer": {
20+
"properties": {
21+
"os1": {
22+
"type": "text"
23+
}
24+
}
25+
}
26+
}
27+
},
28+
"machine_array": {
29+
"type": "nested",
30+
"properties": {
31+
"os1": {
32+
"type": "text"
33+
},
34+
"ram1": {
35+
"type": "long"
36+
}
37+
}
38+
}
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)