Skip to content

Commit 48fa93f

Browse files
authored
Support read multi-values from OpenSearch if no codegen triggered (#5015) (#5023)
* Support read multi-values from OpenSearch * Fix tests * remove reflection way --------- (cherry picked from commit 98516d6) Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent bc8d2e4 commit 48fa93f

6 files changed

Lines changed: 115 additions & 190 deletions

File tree

core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ public static ExprValue fromObjectValue(Object o) {
120120
if (null == o) {
121121
return LITERAL_NULL;
122122
}
123-
if (o instanceof Map) {
123+
if (o instanceof ExprValue) {
124+
return (ExprValue) o;
125+
} else if (o instanceof Map) {
124126
return tupleValue((Map) o);
125127
} else if (o instanceof List) {
126128
return collectionValue(((List) o));

core/src/test/java/org/opensearch/sql/data/model/ExprValueUtilsTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,8 @@ public void unSupportedObject() {
221221
Exception exception =
222222
assertThrows(
223223
ExpressionEvaluationException.class,
224-
() -> ExprValueUtils.fromObjectValue(integerValue(1)));
225-
assertEquals(
226-
"unsupported object " + "class org.opensearch.sql.data.model.ExprIntegerValue",
227-
exception.getMessage());
224+
() -> ExprValueUtils.fromObjectValue(new Object()));
225+
assertEquals("unsupported object class java.lang.Object", exception.getMessage());
228226
}
229227

230228
@Test

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLBasicIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,8 +599,8 @@ public void testNumericLiteral() throws IOException {
599599
schema("floatLiteral", "float"));
600600
verifyDataRows(
601601
result,
602-
rows("hello", 20, 0.05, 0.049999999999999996, 0.05),
603-
rows("world", 30, 0.05, 0.049999999999999996, 0.05));
602+
rows("hello", 20, 0.05, 0.049999999999999996, 0.049999999999999996),
603+
rows("world", 30, 0.05, 0.049999999999999996, 0.049999999999999996));
604604
}
605605

606606
@Test
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
setup:
2+
- do:
3+
indices.create:
4+
index: array-test
5+
body:
6+
settings:
7+
number_of_shards: 1
8+
mappings:
9+
properties:
10+
array_keyword:
11+
type: keyword
12+
array_text:
13+
type: text
14+
array_num:
15+
type: long
16+
array_boolean:
17+
type: boolean
18+
array_date:
19+
type: date
20+
num:
21+
type: integer
22+
parent:
23+
type: object
24+
properties:
25+
child_array_num:
26+
type: float
27+
child_array_keyword:
28+
type: keyword
29+
child_num:
30+
type: integer
31+
- do:
32+
bulk:
33+
index: array-test
34+
refresh: true
35+
body:
36+
- '{"index": {}}'
37+
- '{"array_text": ["Jane Smith","hello world"],"array_keyword": [ "c++", "java" ],"array_num": [1, 2],"array_boolean": [true, false],"array_date": ["2023-01-02T22:03:34.000Z","2023-01-02T22:03:35.000Z"],"num" : 10,"parent": {"child_array_num": [1.1, 2.2], "child_array_keyword":["a a a", "b b b"], "child_num": 100}}'
38+
- '{"index": {}}'
39+
- '{"array_text": "OpenSearch PPL","array_keyword": "python","array_num": 3,"array_boolean": true,"array_date": "2023-01-02T22:03:36.000Z","num" : 11,"parent": {"child_array_num": 3.3, "child_array_keyword":"c c c", "child_num": 101}}'
40+
- do:
41+
query.settings:
42+
body:
43+
transient:
44+
plugins.calcite.enabled : true
45+
---
46+
teardown:
47+
- do:
48+
query.settings:
49+
body:
50+
transient:
51+
plugins.calcite.enabled : false
52+
53+
---
54+
"Handle multiple values (array) documents":
55+
- skip:
56+
features:
57+
- headers
58+
- allowed_warnings
59+
- do:
60+
allowed_warnings:
61+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
62+
headers:
63+
Content-Type: 'application/json'
64+
ppl:
65+
body:
66+
query: 'source=array-test '
67+
- match: {"total": 2}
68+
- match: {"schema": [{"name": "parent", "type": "struct"},{"name":"array_boolean","type":"boolean"},{"name":"array_num","type":"bigint"},{"name":"array_date","type":"timestamp"},{"name":"num","type":"int"},{"name":"array_text","type":"string"},{"name":"array_keyword","type":"string"}]}
69+
- match: {"datarows": [[{"child_array_keyword":["a a a","b b b"],"child_num":100,"child_array_num":[1.1,2.2]},[true, false],[1,2],["2023-01-02 22:03:34","2023-01-02 22:03:35"],10,["Jane Smith","hello world"],["c++","java"]],[{"child_num": 101,"child_array_keyword":"c c c","child_array_num":3.3},true,3,"2023-01-02 22:03:36",11,"OpenSearch PPL","python"]]}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.sql.ResultSetMetaData;
1414
import java.sql.SQLException;
1515
import java.util.ArrayList;
16+
import java.util.HashMap;
1617
import java.util.LinkedHashMap;
1718
import java.util.List;
1819
import java.util.Map;
@@ -38,6 +39,7 @@
3839
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
3940
import org.apache.logging.log4j.LogManager;
4041
import org.apache.logging.log4j.Logger;
42+
import org.locationtech.jts.geom.Point;
4143
import org.opensearch.sql.ast.statement.Explain.ExplainFormat;
4244
import org.opensearch.sql.calcite.CalcitePlanContext;
4345
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners;
@@ -46,6 +48,7 @@
4648
import org.opensearch.sql.common.response.ResponseListener;
4749
import org.opensearch.sql.data.model.ExprTupleValue;
4850
import org.opensearch.sql.data.model.ExprValue;
51+
import org.opensearch.sql.data.model.ExprValueUtils;
4952
import org.opensearch.sql.data.type.ExprCoreType;
5053
import org.opensearch.sql.data.type.ExprType;
5154
import org.opensearch.sql.executor.ExecutionContext;
@@ -55,14 +58,14 @@
5558
import org.opensearch.sql.executor.pagination.PlanSerializer;
5659
import org.opensearch.sql.expression.function.PPLFuncImpTable;
5760
import org.opensearch.sql.opensearch.client.OpenSearchClient;
61+
import org.opensearch.sql.opensearch.data.value.OpenSearchExprGeoPointValue;
5862
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
5963
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
6064
import org.opensearch.sql.expression.function.BuiltinFunctionName;
6165
import org.opensearch.sql.expression.function.PPLFuncImpTable;
6266
import org.opensearch.sql.opensearch.client.OpenSearchClient;
6367
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
6468
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
65-
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
6669
import org.opensearch.sql.planner.physical.PhysicalPlan;
6770
import org.opensearch.sql.storage.TableScanOperator;
6871
import org.opensearch.client.node.NodeClient;
@@ -231,6 +234,38 @@ public void execute(
231234
}));
232235
}
233236

237+
/**
238+
* Process values recursively, handling geo points and nested maps. Geo points are converted to
239+
* OpenSearchExprGeoPointValue. Maps are recursively processed to handle nested structures.
240+
*/
241+
private static Object processValue(Object value) {
242+
if (value == null) {
243+
return null;
244+
}
245+
if (value instanceof Point) {
246+
Point point = (Point) value;
247+
return new OpenSearchExprGeoPointValue(point.getY(), point.getX());
248+
}
249+
if (value instanceof Map) {
250+
Map<String, Object> map = (Map<String, Object>) value;
251+
Map<String, Object> convertedMap = new HashMap<>();
252+
for (Map.Entry<String, Object> entry : map.entrySet()) {
253+
convertedMap.put(entry.getKey(), processValue(entry.getValue()));
254+
}
255+
return convertedMap;
256+
}
257+
if (value instanceof List) {
258+
List<Object> list = (List<Object>) value;
259+
List<Object> convertedList = new ArrayList<>();
260+
for (Object item : list) {
261+
convertedList.add(processValue(item));
262+
}
263+
return convertedList;
264+
}
265+
// For other types, return as-is
266+
return value;
267+
}
268+
234269
private void buildResultSet(
235270
ResultSet resultSet,
236271
RelDataType rowTypes,
@@ -249,11 +284,9 @@ private void buildResultSet(
249284
// Loop through each column
250285
for (int i = 1; i <= columnCount; i++) {
251286
String columnName = metaData.getColumnName(i);
252-
int sqlType = metaData.getColumnType(i);
253-
RelDataType fieldType = fieldTypes.get(i - 1);
254-
ExprValue exprValue =
255-
JdbcOpenSearchDataTypeConvertor.getExprValueFromSqlType(
256-
resultSet, i, sqlType, fieldType, columnName);
287+
Object value = resultSet.getObject(columnName);
288+
Object converted = processValue(value);
289+
ExprValue exprValue = ExprValueUtils.fromObjectValue(converted);
257290
row.put(columnName, exprValue);
258291
}
259292
values.add(ExprTupleValue.fromExprValueMap(row));
Lines changed: 0 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -1,177 +0,0 @@
1-
/*
2-
* Copyright OpenSearch Contributors
3-
* SPDX-License-Identifier: Apache-2.0
4-
*/
5-
6-
package org.opensearch.sql.opensearch.util;
7-
8-
import java.sql.Array;
9-
import java.sql.ResultSet;
10-
import java.sql.SQLException;
11-
import java.sql.Types;
12-
import java.util.Arrays;
13-
import java.util.HashMap;
14-
import java.util.Map;
15-
import lombok.experimental.UtilityClass;
16-
import org.locationtech.jts.geom.Point;
17-
import org.apache.calcite.avatica.util.ArrayImpl;
18-
import org.apache.calcite.rel.type.RelDataType;
19-
import org.apache.calcite.sql.type.SqlTypeName;
20-
import org.apache.logging.log4j.LogManager;
21-
import org.apache.logging.log4j.Logger;
22-
import org.locationtech.jts.geom.Point;
23-
import org.opensearch.sql.calcite.type.ExprJavaType;
24-
import org.opensearch.sql.data.model.ExprDateValue;
25-
import org.opensearch.sql.data.model.ExprNullValue;
26-
import org.opensearch.sql.data.model.ExprTimeValue;
27-
import org.opensearch.sql.data.model.ExprTimestampValue;
28-
import org.opensearch.sql.data.model.ExprValue;
29-
import org.opensearch.sql.data.model.ExprValueUtils;
30-
import org.opensearch.sql.data.type.ExprCoreType;
31-
import org.opensearch.sql.data.type.ExprType;
32-
import org.opensearch.sql.opensearch.data.value.OpenSearchExprGeoPointValue;
33-
34-
/** This class is used to convert the data type from JDBC to OpenSearch data type. */
35-
@UtilityClass
36-
public class JdbcOpenSearchDataTypeConvertor {
37-
private static final Logger LOG = LogManager.getLogger();
38-
39-
public static ExprType getExprTypeFromSqlType(int sqlType) {
40-
switch (sqlType) {
41-
case Types.INTEGER:
42-
return ExprCoreType.INTEGER;
43-
case Types.BIGINT:
44-
return ExprCoreType.LONG;
45-
case Types.DOUBLE:
46-
case Types.DECIMAL:
47-
case Types.NUMERIC:
48-
return ExprCoreType.DOUBLE;
49-
case Types.FLOAT:
50-
return ExprCoreType.FLOAT;
51-
case Types.DATE:
52-
return ExprCoreType.DATE;
53-
case Types.TIMESTAMP:
54-
return ExprCoreType.TIMESTAMP;
55-
case Types.BOOLEAN:
56-
return ExprCoreType.BOOLEAN;
57-
case Types.VARCHAR:
58-
case Types.CHAR:
59-
case Types.LONGVARCHAR:
60-
return ExprCoreType.STRING;
61-
default:
62-
// TODO unchecked OpenSearchDataType
63-
return ExprCoreType.UNKNOWN;
64-
}
65-
}
66-
67-
public static ExprValue getExprValueFromSqlType(
68-
ResultSet rs, int i, int sqlType, RelDataType fieldType, String fieldName)
69-
throws SQLException {
70-
Object value = rs.getObject(i);
71-
if (value == null) {
72-
return ExprNullValue.of();
73-
}
74-
75-
if (fieldType instanceof ExprJavaType && value instanceof ExprValue) {
76-
return (ExprValue) value;
77-
} else if (fieldType.getSqlTypeName() == SqlTypeName.GEOMETRY) {
78-
// Use getObject by name instead of index to avoid Avatica's transformation on the accessor.
79-
// Otherwise, Avatica will transform Geometry to String.
80-
Point geoPoint = (Point) rs.getObject(fieldName);
81-
return new OpenSearchExprGeoPointValue(geoPoint.getY(), geoPoint.getX());
82-
}
83-
84-
try {
85-
switch (sqlType) {
86-
case Types.VARCHAR:
87-
case Types.CHAR:
88-
case Types.LONGVARCHAR:
89-
return ExprValueUtils.fromObjectValue(rs.getString(i));
90-
91-
case Types.INTEGER:
92-
return ExprValueUtils.fromObjectValue(rs.getInt(i));
93-
94-
case Types.BIGINT:
95-
return ExprValueUtils.fromObjectValue(rs.getLong(i));
96-
97-
case Types.FLOAT:
98-
case Types.REAL:
99-
return ExprValueUtils.fromObjectValue(rs.getFloat(i));
100-
101-
case Types.DECIMAL:
102-
case Types.NUMERIC:
103-
case Types.DOUBLE:
104-
return ExprValueUtils.fromObjectValue(rs.getDouble(i));
105-
106-
case Types.DATE:
107-
String dateStr = rs.getString(i);
108-
return new ExprDateValue(dateStr);
109-
110-
case Types.TIME:
111-
String timeStr = rs.getString(i);
112-
return new ExprTimeValue(timeStr);
113-
114-
case Types.TIMESTAMP:
115-
String timestampStr = rs.getString(i);
116-
return new ExprTimestampValue(timestampStr);
117-
118-
case Types.BOOLEAN:
119-
return ExprValueUtils.fromObjectValue(rs.getBoolean(i));
120-
121-
case Types.ARRAY:
122-
Array array = rs.getArray(i);
123-
if (array instanceof ArrayImpl) {
124-
return ExprValueUtils.fromObjectValue(
125-
Arrays.asList((Object[]) ((ArrayImpl) value).getArray()));
126-
}
127-
return ExprValueUtils.fromObjectValue(array);
128-
129-
default:
130-
LOG.debug(
131-
"Unchecked sql type: {}, return Object type {}",
132-
sqlType,
133-
value.getClass().getTypeName());
134-
return convertComplexValue(value);
135-
}
136-
} catch (SQLException e) {
137-
LOG.error("Error converting SQL type {}: {}", sqlType, e.getMessage());
138-
throw e;
139-
}
140-
}
141-
142-
/**
143-
* Convert complex values like Maps that may contain geo points. This method recursively processes
144-
* Maps to handle nested geo points and converts them to appropriate ExprValue representations.
145-
*/
146-
private static ExprValue convertComplexValue(Object value) {
147-
Object converted = processValue(value);
148-
return ExprValueUtils.fromObjectValue(converted);
149-
}
150-
151-
/**
152-
* Process values recursively, handling geo points and nested maps. Geo points are converted to
153-
* OpenSearchExprGeoPointValue. Maps are recursively processed to handle nested structures.
154-
*/
155-
private static Object processValue(Object value) {
156-
if (value == null) {
157-
return null;
158-
}
159-
160-
if (value instanceof Point) {
161-
Point point = (Point) value;
162-
return new OpenSearchExprGeoPointValue(point.getY(), point.getX());
163-
}
164-
165-
if (value instanceof Map) {
166-
Map<String, Object> map = (Map<String, Object>) value;
167-
Map<String, Object> convertedMap = new HashMap<>();
168-
for (Map.Entry<String, Object> entry : map.entrySet()) {
169-
convertedMap.put(entry.getKey(), processValue(entry.getValue()));
170-
}
171-
return convertedMap;
172-
}
173-
174-
// For other types, return as-is
175-
return value;
176-
}
177-
}

0 commit comments

Comments
 (0)