Skip to content

Commit cbb8507

Browse files
committed
Support serializing & deserializing UDTs
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent f244bd6 commit cbb8507

5 files changed

Lines changed: 325 additions & 6 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,4 +977,16 @@ public void testMedian() throws IOException {
977977
verifySchema(actual, schema("median(balance)", "bigint"));
978978
verifyDataRows(actual, rows(32838));
979979
}
980+
981+
@Test
982+
public void testStatsCountOnFunctionsWithUDTArg() throws IOException {
983+
JSONObject response =
984+
executeQuery(
985+
String.format(
986+
"source=%s | eval t = unix_timestamp(birthdate) | stats count() by t | sort t |"
987+
+ " head 3",
988+
TEST_INDEX_BANK));
989+
verifySchema(response, schema("count()", "bigint"), schema("t", "double"));
990+
verifyDataRows(response, rows(1, 1508716800), rows(1, 1511136000), rows(1, 1529712000));
991+
}
980992
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.storage.serde;
7+
8+
import static java.util.Objects.requireNonNull;
9+
10+
import java.util.List;
11+
import java.util.Map;
12+
import org.apache.calcite.rel.externalize.RelJson;
13+
import org.apache.calcite.rel.type.RelDataType;
14+
import org.apache.calcite.rel.type.RelDataTypeFactory;
15+
import org.apache.calcite.rel.type.RelDataTypeField;
16+
import org.apache.calcite.util.JsonBuilder;
17+
import org.checkerframework.checker.nullness.qual.Nullable;
18+
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
19+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
20+
21+
/** An extension to {@link RelJson} to allow serialization & deserialization of UDTs */
22+
public class ExtendedRelJson extends RelJson {
23+
private final JsonBuilder jsonBuilder;
24+
25+
private ExtendedRelJson(JsonBuilder jsonBuilder) {
26+
super(jsonBuilder);
27+
this.jsonBuilder = jsonBuilder;
28+
}
29+
30+
/** Creates a ExtendedRelJson. */
31+
public static ExtendedRelJson create(JsonBuilder jsonBuilder) {
32+
return new ExtendedRelJson(jsonBuilder);
33+
}
34+
35+
public @Nullable Object toJson(@Nullable Object value) {
36+
if (value instanceof RelDataTypeField) {
37+
return toJson((RelDataTypeField) value);
38+
} else if (value instanceof RelDataType) {
39+
return toJson((RelDataType) value);
40+
}
41+
return super.toJson(value);
42+
}
43+
44+
private Object toJson(RelDataTypeField node) {
45+
final Map<String, @Nullable Object> map;
46+
if (node.getType().isStruct()) {
47+
map = jsonBuilder().map();
48+
map.put("fields", toJson(node.getType()));
49+
map.put("nullable", node.getType().isNullable());
50+
} else {
51+
//noinspection unchecked
52+
map = (Map<String, @Nullable Object>) toJson(node.getType());
53+
}
54+
map.put("name", node.getName());
55+
return map;
56+
}
57+
58+
/** Modifies behavior for AbstractExprRelDataType instances, delegates to RelJson otherwise. */
59+
private Object toJson(RelDataType node) {
60+
final Map<String, @Nullable Object> map = jsonBuilder().map();
61+
if (node.isStruct()) {
62+
final List<@Nullable Object> list = jsonBuilder().list();
63+
for (RelDataTypeField field : node.getFieldList()) {
64+
list.add(toJson(field));
65+
}
66+
map.put("fields", list);
67+
map.put("nullable", node.isNullable());
68+
} else {
69+
// For UDT like EXPR_TIMESTAMP, we additionally save its UDT info as a tag.
70+
if (node instanceof AbstractExprRelDataType) {
71+
map.put("udt", ((AbstractExprRelDataType<?>) node).getUdt().name());
72+
}
73+
map.put("type", node.getSqlTypeName().name());
74+
map.put("nullable", node.isNullable());
75+
if (node.getComponentType() != null) {
76+
map.put("component", toJson(node.getComponentType()));
77+
}
78+
RelDataType keyType = node.getKeyType();
79+
if (keyType != null) {
80+
map.put("key", toJson(keyType));
81+
}
82+
RelDataType valueType = node.getValueType();
83+
if (valueType != null) {
84+
map.put("value", toJson(valueType));
85+
}
86+
if (node.getSqlTypeName().allowsPrec()) {
87+
map.put("precision", node.getPrecision());
88+
}
89+
if (node.getSqlTypeName().allowsScale()) {
90+
map.put("scale", node.getScale());
91+
}
92+
}
93+
return map;
94+
}
95+
96+
@Override
97+
public RelDataType toType(RelDataTypeFactory typeFactory, Object o) {
98+
if (o instanceof Map
99+
&& ((Map<?, ?>) o).containsKey("udt")
100+
&& typeFactory instanceof OpenSearchTypeFactory) {
101+
// Reconstruct UDT from its udt tag
102+
Object udtName = ((Map<?, ?>) o).get("udt");
103+
OpenSearchTypeFactory.ExprUDT udt = OpenSearchTypeFactory.ExprUDT.valueOf((String) udtName);
104+
return ((OpenSearchTypeFactory) typeFactory).createUDT(udt);
105+
}
106+
return super.toType(typeFactory, o);
107+
}
108+
109+
private JsonBuilder jsonBuilder() {
110+
return requireNonNull(jsonBuilder, "jsonBuilder");
111+
}
112+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,17 @@ public RelJsonSerializer(RelOptCluster cluster) {
7474
* <li>Encodes the resulting map into a final object string
7575
*
7676
* @param rexNode pushed down RexNode
77-
* @param relDataType row type of RexNode input
77+
* @param rowType row type of RexNode input
7878
* @param fieldTypes input field and ExprType mapping
7979
* @return serialized string of map structure for inputs
8080
*/
81-
public String serialize(
82-
RexNode rexNode, RelDataType relDataType, Map<String, ExprType> fieldTypes) {
81+
public String serialize(RexNode rexNode, RelDataType rowType, Map<String, ExprType> fieldTypes) {
8382
try {
8483
// Serialize RexNode and RelDataType by JSON
8584
JsonBuilder jsonBuilder = new JsonBuilder();
86-
RelJson relJson = RelJson.create().withJsonBuilder(jsonBuilder);
85+
RelJson relJson = ExtendedRelJson.create(jsonBuilder);
8786
String rexNodeJson = jsonBuilder.toJsonString(relJson.toJson(rexNode));
88-
String rowTypeJson = jsonBuilder.toJsonString(relJson.toJson(relDataType));
87+
String rowTypeJson = jsonBuilder.toJsonString(relJson.toJson(rowType));
8988
// Construct envelope of serializable objects
9089
Map<String, Object> envelope =
9190
Map.of(EXPR, rexNodeJson, FIELD_TYPES, fieldTypes, ROW_TYPE, rowTypeJson);
@@ -121,7 +120,7 @@ public Map<String, Object> deserialize(String struct) {
121120
// PPL Expr types are all serializable
122121
Map<String, ExprType> fieldTypes = (Map<String, ExprType>) objectMap.get(FIELD_TYPES);
123122
// Deserialize RelDataType and RexNode by JSON
124-
RelJson relJson = RelJson.create();
123+
RelJson relJson = ExtendedRelJson.create((JsonBuilder) null);
125124
Map<String, Object> rowTypeMap = mapper.readValue((String) objectMap.get(ROW_TYPE), TYPE_REF);
126125
RelDataType rowType = relJson.toType(cluster.getTypeFactory(), rowTypeMap);
127126
OpenSearchRelInputTranslator inputTranslator = new OpenSearchRelInputTranslator(rowType);
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.storage.serde;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
10+
import java.util.Map;
11+
import org.apache.calcite.rel.type.RelDataType;
12+
import org.apache.calcite.rel.type.RelDataTypeField;
13+
import org.apache.calcite.sql.type.SqlTypeName;
14+
import org.apache.calcite.util.JsonBuilder;
15+
import org.junit.jupiter.api.Test;
16+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
17+
18+
public class ExtendedRelJsonTest {
19+
private final ExtendedRelJson relJson = ExtendedRelJson.create(new JsonBuilder());
20+
21+
@Test
22+
void testSerializeSqlType() {
23+
RelDataType varcharType = OpenSearchTypeFactory.TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR);
24+
RelDataType integerType =
25+
OpenSearchTypeFactory.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER, true);
26+
RelDataType decimalType =
27+
OpenSearchTypeFactory.TYPE_FACTORY.createSqlType(SqlTypeName.DECIMAL, 4, 4);
28+
29+
assertEquals(
30+
Map.of("type", "VARCHAR", "nullable", false, "precision", -1), relJson.toJson(varcharType));
31+
assertEquals(Map.of("type", "INTEGER", "nullable", true), relJson.toJson(integerType));
32+
assertEquals(
33+
Map.of("type", "DECIMAL", "nullable", false, "precision", 4, "scale", 4),
34+
relJson.toJson(decimalType));
35+
}
36+
37+
@Test
38+
void testSerializeUDT() {
39+
RelDataType dateType =
40+
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_DATE);
41+
RelDataType timeType =
42+
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_TIME, true);
43+
RelDataType timestampType =
44+
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_TIMESTAMP);
45+
46+
assertEquals(
47+
Map.of("udt", "EXPR_DATE", "type", "VARCHAR", "nullable", false, "precision", -1),
48+
relJson.toJson(dateType));
49+
assertEquals(
50+
Map.of("udt", "EXPR_TIME", "type", "VARCHAR", "nullable", true, "precision", -1),
51+
relJson.toJson(timeType));
52+
assertEquals(
53+
Map.of("udt", "EXPR_TIMESTAMP", "type", "VARCHAR", "nullable", false, "precision", -1),
54+
relJson.toJson(timestampType));
55+
}
56+
57+
@Test
58+
void testDeserializeSqlType() {
59+
Map<String, Object> serializedDecimal =
60+
Map.of("type", "DECIMAL", "nullable", false, "precision", 4, "scale", 4);
61+
assertEquals(
62+
OpenSearchTypeFactory.TYPE_FACTORY.createSqlType(SqlTypeName.DECIMAL, 4, 4),
63+
relJson.toType(OpenSearchTypeFactory.TYPE_FACTORY, serializedDecimal));
64+
}
65+
66+
@Test
67+
void testDeserializeUDT() {
68+
Map<String, Object> serializedTimestamp =
69+
Map.of("udt", "EXPR_TIMESTAMP", "type", "VARCHAR", "nullable", true, "precision", -1);
70+
assertEquals(
71+
OpenSearchTypeFactory.TYPE_FACTORY
72+
.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_TIMESTAMP, true)
73+
.toString(),
74+
relJson.toType(OpenSearchTypeFactory.TYPE_FACTORY, serializedTimestamp).toString());
75+
}
76+
77+
@Test
78+
void testSerializeRelDataTypeField() {
79+
RelDataType structType =
80+
OpenSearchTypeFactory.TYPE_FACTORY
81+
.builder()
82+
.add("name", OpenSearchTypeFactory.TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR))
83+
.add(
84+
"timestamp",
85+
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(
86+
OpenSearchTypeFactory.ExprUDT.EXPR_TIMESTAMP))
87+
.build();
88+
89+
RelDataTypeField nameField = structType.getFieldList().get(0);
90+
RelDataTypeField timestampField = structType.getFieldList().get(1);
91+
92+
// Test serialization of regular field
93+
Object nameFieldJson = relJson.toJson((Object) nameField);
94+
assertEquals(
95+
Map.of("type", "VARCHAR", "nullable", false, "precision", -1, "name", "name"),
96+
nameFieldJson);
97+
98+
// Test serialization of UDT field
99+
Object timestampFieldJson = relJson.toJson(timestampField);
100+
assertEquals(
101+
Map.of(
102+
"udt",
103+
"EXPR_TIMESTAMP",
104+
"type",
105+
"VARCHAR",
106+
"nullable",
107+
false,
108+
"precision",
109+
-1,
110+
"name",
111+
"timestamp"),
112+
timestampFieldJson);
113+
}
114+
115+
@Test
116+
void testDeserializeRelDataTypeField() {
117+
RelDataType expectedType =
118+
OpenSearchTypeFactory.TYPE_FACTORY
119+
.builder()
120+
.add("name", OpenSearchTypeFactory.TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR))
121+
.add(
122+
"timestamp",
123+
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(
124+
OpenSearchTypeFactory.ExprUDT.EXPR_TIMESTAMP))
125+
.build();
126+
127+
Map<String, Object> nameFieldMap =
128+
Map.of("type", "VARCHAR", "nullable", false, "precision", -1, "name", "name");
129+
Map<String, Object> udtFieldMap =
130+
Map.of(
131+
"udt",
132+
"EXPR_TIMESTAMP",
133+
"type",
134+
"VARCHAR",
135+
"nullable",
136+
false,
137+
"precision",
138+
-1,
139+
"name",
140+
"timestamp");
141+
Map<String, Object> structMap =
142+
Map.of(
143+
"fields",
144+
java.util.Arrays.asList(nameFieldMap, udtFieldMap),
145+
"type",
146+
"struct",
147+
"nullable",
148+
false);
149+
RelDataType resultType = relJson.toType(OpenSearchTypeFactory.TYPE_FACTORY, structMap);
150+
151+
assertEquals(resultType.toString(), expectedType.toString());
152+
}
153+
}

opensearch/src/test/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializerTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import org.junit.jupiter.api.DisplayNameGenerator;
2222
import org.junit.jupiter.api.Test;
2323
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
24+
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
2425
import org.opensearch.sql.data.type.ExprCoreType;
2526
import org.opensearch.sql.data.type.ExprType;
2627
import org.opensearch.sql.expression.function.BuiltinFunctionName;
2728
import org.opensearch.sql.expression.function.PPLFuncImpTable;
29+
import org.opensearch.sql.opensearch.data.type.OpenSearchBinaryType;
30+
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
31+
import org.opensearch.sql.opensearch.data.type.OpenSearchDateType;
2832

2933
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
3034
public class RelJsonSerializerTest {
@@ -57,6 +61,45 @@ void testSerializeAndDeserialize() {
5761
assertEquals(fieldTypes, objects.get(RelJsonSerializer.FIELD_TYPES));
5862
}
5963

64+
@Test
65+
void testSerializeAndDeserializeUDT() {
66+
RelDataType rowTypeWithUDT =
67+
rexBuilder
68+
.getTypeFactory()
69+
.builder()
70+
.kind(StructKind.FULLY_QUALIFIED)
71+
.add("date", UserDefinedFunctionUtils.NULLABLE_DATE_UDT)
72+
.add("time", UserDefinedFunctionUtils.NULLABLE_TIME_UDT)
73+
.add("timestamp", UserDefinedFunctionUtils.NULLABLE_TIMESTAMP_UDT)
74+
.add("ip", UserDefinedFunctionUtils.NULLABLE_IP_UDT)
75+
.add(
76+
"binary",
77+
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(
78+
OpenSearchTypeFactory.ExprUDT.EXPR_BINARY))
79+
.build();
80+
Map<String, ExprType> fieldTypesWithUDT =
81+
Map.ofEntries(
82+
Map.entry("date", OpenSearchDateType.of(ExprCoreType.DATE)),
83+
Map.entry("time", OpenSearchDateType.of(ExprCoreType.TIME)),
84+
Map.entry("timestamp", OpenSearchDateType.of(ExprCoreType.TIMESTAMP)),
85+
Map.entry("ip", OpenSearchDataType.of(ExprCoreType.IP)),
86+
Map.entry("binary", OpenSearchBinaryType.of()));
87+
RexNode rexNode =
88+
PPLFuncImpTable.INSTANCE.resolve(
89+
rexBuilder,
90+
BuiltinFunctionName.JSON_ARRAY,
91+
rexBuilder.makeInputRef(rowTypeWithUDT.getFieldList().get(0).getType(), 0),
92+
rexBuilder.makeInputRef(rowTypeWithUDT.getFieldList().get(1).getType(), 1),
93+
rexBuilder.makeInputRef(rowTypeWithUDT.getFieldList().get(2).getType(), 2),
94+
rexBuilder.makeInputRef(rowTypeWithUDT.getFieldList().get(3).getType(), 3),
95+
rexBuilder.makeInputRef(rowTypeWithUDT.getFieldList().get(4).getType(), 4));
96+
String serialized = serializer.serialize(rexNode, rowTypeWithUDT, fieldTypesWithUDT);
97+
Map<String, Object> objects = serializer.deserialize(serialized);
98+
assertEquals(rexNode, objects.get(RelJsonSerializer.EXPR));
99+
assertEquals(rowTypeWithUDT.toString(), objects.get(RelJsonSerializer.ROW_TYPE).toString());
100+
assertEquals(fieldTypesWithUDT, objects.get(RelJsonSerializer.FIELD_TYPES));
101+
}
102+
60103
@Test
61104
void testSerializeUnsupportedRexNode() {
62105
RexNode illegalRex = rexBuilder.makeRangeReference(rowType, 0, true);

0 commit comments

Comments
 (0)