Skip to content

Commit 83040d8

Browse files
committed
Support read multi-values from OpenSearch
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 77633ef commit 83040d8

4 files changed

Lines changed: 139 additions & 183 deletions

File tree

core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ public static ExprValue fromObjectValue(Object o) {
116116
if (null == o) {
117117
return LITERAL_NULL;
118118
}
119-
if (o instanceof Map) {
119+
if (o instanceof ExprValue) {
120+
return (ExprValue) o;
121+
} else if (o instanceof Map) {
120122
return tupleValue((Map) o);
121123
} else if (o instanceof List) {
122124
return collectionValue(((List) o));
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
setup:
2+
- do:
3+
indices.create:
4+
index: array-test
5+
body:
6+
settings:
7+
number_of_shards: 1
8+
mappings:
9+
properties:
10+
array_keyword:
11+
type: keyword
12+
array_text:
13+
type: text
14+
array_num:
15+
type: long
16+
array_boolean:
17+
type: boolean
18+
array_date:
19+
type: date
20+
num:
21+
type: integer
22+
parent:
23+
type: object
24+
properties:
25+
child_array_num:
26+
type: float
27+
child_array_keyword:
28+
type: keyword
29+
child_num:
30+
type: integer
31+
- do:
32+
query.settings:
33+
body:
34+
transient:
35+
plugins.calcite.enabled : true
36+
37+
---
38+
teardown:
39+
- do:
40+
query.settings:
41+
body:
42+
transient:
43+
plugins.calcite.enabled : false
44+
45+
---
46+
"Handle multiple values (array) documents":
47+
- skip:
48+
features:
49+
- headers
50+
- allowed_warnings
51+
- do:
52+
bulk:
53+
index: array-test
54+
refresh: true
55+
body:
56+
- '{"index": {}}'
57+
- '{"array_text": ["Jane Smith","hello world"],"array_keyword": [ "c++", "java" ],"array_num": [1, 2],"array_boolean": [true, false],"array_date": ["2023-01-02T22:03:34.000Z","2023-01-02T22:03:35.000Z"],"num" : 10,"parent": {"child_array_num": [1.1, 2.2], "child_array_keyword":["a a a", "b b b"], "child_num": 100}}'
58+
- '{"index": {}}'
59+
- '{"array_text": "OpenSearch PPL","array_keyword": "python","array_num": 3,"array_boolean": true,"array_date": "2023-01-02T22:03:36.000Z","num" : 11,"parent": {"child_array_num": 3.3, "child_array_keyword":"c c c", "child_num": 101}}'
60+
- do:
61+
allowed_warnings:
62+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
63+
headers:
64+
Content-Type: 'application/json'
65+
ppl:
66+
body:
67+
query: 'source=array-test '
68+
- match: {"total": 2}
69+
- match: {"schema": [{"name": "parent", "type": "struct"},{"name":"array_boolean","type":"boolean"},{"name":"array_num","type":"bigint"},{"name":"array_date","type":"timestamp"},{"name":"num","type":"int"},{"name":"array_text","type":"string"},{"name":"array_keyword","type":"string"}]}
70+
- match: {"datarows": [[{"child_array_keyword":["a a a","b b b"],"child_num":100,"child_array_num":[1.1,2.2]},[true, false],[1,2],["2023-01-02 22:03:34","2023-01-02 22:03:35"],10,["Jane Smith","hello world"],["c++","java"]],[{"child_num": 101,"child_array_keyword":"c c c","child_array_num":3.3},true,3,"2023-01-02 22:03:36",11,"OpenSearch PPL","python"]]}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,22 @@
66
package org.opensearch.sql.opensearch.executor;
77

88
import com.google.common.base.Suppliers;
9+
import java.lang.reflect.Method;
910
import java.sql.PreparedStatement;
1011
import java.sql.ResultSet;
1112
import java.sql.ResultSetMetaData;
1213
import java.sql.SQLException;
1314
import java.util.ArrayList;
15+
import java.util.HashMap;
1416
import java.util.LinkedHashMap;
1517
import java.util.List;
1618
import java.util.Map;
1719
import java.util.Optional;
1820
import java.util.concurrent.ConcurrentHashMap;
1921
import java.util.concurrent.atomic.AtomicReference;
2022
import java.util.function.Supplier;
23+
import org.apache.calcite.avatica.AvaticaResultSet;
24+
import org.apache.calcite.avatica.util.Cursor;
2125
import org.apache.calcite.plan.RelOptUtil;
2226
import org.apache.calcite.rel.RelNode;
2327
import org.apache.calcite.rel.RelRoot;
@@ -34,6 +38,7 @@
3438
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
3539
import org.apache.logging.log4j.LogManager;
3640
import org.apache.logging.log4j.Logger;
41+
import org.locationtech.jts.geom.Point;
3742
import org.opensearch.sql.ast.statement.Explain.ExplainFormat;
3843
import org.opensearch.sql.calcite.CalcitePlanContext;
3944
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners;
@@ -42,6 +47,7 @@
4247
import org.opensearch.sql.common.response.ResponseListener;
4348
import org.opensearch.sql.data.model.ExprTupleValue;
4449
import org.opensearch.sql.data.model.ExprValue;
50+
import org.opensearch.sql.data.model.ExprValueUtils;
4551
import org.opensearch.sql.data.type.ExprCoreType;
4652
import org.opensearch.sql.data.type.ExprType;
4753
import org.opensearch.sql.executor.ExecutionContext;
@@ -52,10 +58,10 @@
5258
import org.opensearch.sql.expression.function.BuiltinFunctionName;
5359
import org.opensearch.sql.expression.function.PPLFuncImpTable;
5460
import org.opensearch.sql.opensearch.client.OpenSearchClient;
61+
import org.opensearch.sql.opensearch.data.value.OpenSearchExprGeoPointValue;
5562
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
5663
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
5764
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
58-
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
5965
import org.opensearch.sql.planner.physical.PhysicalPlan;
6066
import org.opensearch.sql.storage.TableScanOperator;
6167
import org.opensearch.transport.client.node.NodeClient;
@@ -212,6 +218,61 @@ public void execute(
212218
});
213219
}
214220

221+
/**
222+
* Retrieves column accessors from AvaticaResultSet using reflection. This method accesses the
223+
* private getAccessor method to obtain direct access to column data.
224+
*
225+
* @param rs the ResultSet to get accessors from
226+
* @param columnCount the number of columns in the ResultSet
227+
* @return list of Cursor.Accessor objects for each column
228+
* @throws SQLException if reflection fails or column access is invalid
229+
*/
230+
private List<Cursor.Accessor> getAccessors(ResultSet rs, int columnCount) throws SQLException {
231+
List<Cursor.Accessor> accessorList = new ArrayList<>();
232+
try {
233+
Method method = AvaticaResultSet.class.getDeclaredMethod("getAccessor", int.class);
234+
method.setAccessible(true);
235+
for (int i = 1; i <= columnCount; i++) {
236+
accessorList.add((Cursor.Accessor) method.invoke(rs, i));
237+
}
238+
} catch (Exception e) {
239+
throw new SQLException("Unable to get accessors", e);
240+
}
241+
return accessorList;
242+
}
243+
244+
/**
245+
* Process values recursively, handling geo points and nested maps. Geo points are converted to
246+
* OpenSearchExprGeoPointValue. Maps are recursively processed to handle nested structures.
247+
*/
248+
private static Object processValue(Object value) {
249+
if (value == null) {
250+
return null;
251+
}
252+
if (value instanceof Point) {
253+
Point point = (Point) value;
254+
return new OpenSearchExprGeoPointValue(point.getY(), point.getX());
255+
}
256+
if (value instanceof Map) {
257+
Map<String, Object> map = (Map<String, Object>) value;
258+
Map<String, Object> convertedMap = new HashMap<>();
259+
for (Map.Entry<String, Object> entry : map.entrySet()) {
260+
convertedMap.put(entry.getKey(), processValue(entry.getValue()));
261+
}
262+
return convertedMap;
263+
}
264+
if (value instanceof List) {
265+
List<Object> list = (List<Object>) value;
266+
List<Object> convertedList = new ArrayList<>();
267+
for (Object item : list) {
268+
convertedList.add(processValue(item));
269+
}
270+
return convertedList;
271+
}
272+
// For other types, return as-is
273+
return value;
274+
}
275+
215276
private void buildResultSet(
216277
ResultSet resultSet,
217278
RelDataType rowTypes,
@@ -221,6 +282,7 @@ private void buildResultSet(
221282
// Get the ResultSet metadata to know about columns
222283
ResultSetMetaData metaData = resultSet.getMetaData();
223284
int columnCount = metaData.getColumnCount();
285+
List<Cursor.Accessor> accessorList = getAccessors(resultSet, columnCount);
224286
List<RelDataType> fieldTypes =
225287
rowTypes.getFieldList().stream().map(RelDataTypeField::getType).toList();
226288
List<ExprValue> values = new ArrayList<>();
@@ -230,11 +292,9 @@ private void buildResultSet(
230292
// Loop through each column
231293
for (int i = 1; i <= columnCount; i++) {
232294
String columnName = metaData.getColumnName(i);
233-
int sqlType = metaData.getColumnType(i);
234-
RelDataType fieldType = fieldTypes.get(i - 1);
235-
ExprValue exprValue =
236-
JdbcOpenSearchDataTypeConvertor.getExprValueFromSqlType(
237-
resultSet, i, sqlType, fieldType, columnName);
295+
Object value = accessorList.get(i - 1).getObject();
296+
Object converted = processValue(value);
297+
ExprValue exprValue = ExprValueUtils.fromObjectValue(converted);
238298
row.put(columnName, exprValue);
239299
}
240300
values.add(ExprTupleValue.fromExprValueMap(row));

opensearch/src/main/java/org/opensearch/sql/opensearch/util/JdbcOpenSearchDataTypeConvertor.java

Lines changed: 0 additions & 176 deletions
This file was deleted.

0 commit comments

Comments
 (0)