Skip to content

Commit 4bc4176

Browse files
committed
feat(api): Add UDT normalizer for unified query API (#5250)
Fix type mismatch between PPL UDT types (String-based) and standard Calcite types (int/long-based) in the unified query API path. UdtNormalizer post-processes the logical plan to: 1. Replace UDT return types with standard Calcite types in UDF signatures 2. Wrap UDF implementors to convert between value representations at input (int/long → String) and output (String → int/long) The normalizer is generic — all type-specific knowledge lives in the UdtMapping enum. Applied only in the PPL V3 planning path; zero impact on the OpenSearch plugin path. Changes: - UdtNormalizer + UdtMapping in api/udt/ - ImplementorUDFFunction in core/ exposing NotNullImplementor for wrapping - 6 end-to-end tests Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent 3f740fb commit 4bc4176

5 files changed

Lines changed: 351 additions & 12 deletions

File tree

api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.calcite.tools.Planner;
2121
import org.opensearch.sql.api.parser.NamedArgRewriter;
2222
import org.opensearch.sql.api.parser.UnifiedQueryParser;
23+
import org.opensearch.sql.api.udt.UdtNormalizer;
2324
import org.opensearch.sql.ast.tree.UnresolvedPlan;
2425
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
2526
import org.opensearch.sql.common.antlr.SyntaxCheckException;
@@ -101,18 +102,21 @@ private static class CustomVisitorStrategy implements PlanningStrategy {
101102
private final UnifiedQueryParser<UnresolvedPlan> parser;
102103
private final CalciteRelNodeVisitor relNodeVisitor =
103104
new CalciteRelNodeVisitor(new EmptyDataSourceService());
105+
private final UdtNormalizer udtNormalizer;
104106

105107
@SuppressWarnings("unchecked")
106108
CustomVisitorStrategy(UnifiedQueryContext context) {
107109
this.context = context;
108110
this.parser = (UnifiedQueryParser<UnresolvedPlan>) context.getParser();
111+
this.udtNormalizer = new UdtNormalizer(context.getPlanContext().rexBuilder);
109112
}
110113

111114
@Override
112115
public RelNode plan(String query) {
113116
UnresolvedPlan ast = parser.parse(query);
114117
RelNode logical = relNodeVisitor.analyze(ast, context.getPlanContext());
115-
return preserveCollation(logical);
118+
RelNode plan = preserveCollation(logical);
119+
return udtNormalizer.normalize(plan);
116120
}
117121

118122
private RelNode preserveCollation(RelNode logical) {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.udt;
7+
8+
import java.util.Optional;
9+
import lombok.Getter;
10+
import lombok.RequiredArgsConstructor;
11+
import org.apache.calcite.avatica.util.DateTimeUtils;
12+
import org.apache.calcite.linq4j.tree.Expression;
13+
import org.apache.calcite.linq4j.tree.Expressions;
14+
import org.apache.calcite.rel.type.RelDataType;
15+
import org.apache.calcite.rex.RexBuilder;
16+
import org.apache.calcite.sql.type.SqlTypeName;
17+
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
18+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
19+
20+
/** Maps a UDT to its standard Calcite equivalent with value conversion methods. */
21+
@Getter
22+
@RequiredArgsConstructor
23+
enum UdtMapping {
24+
DATE(ExprUDT.EXPR_DATE, SqlTypeName.DATE, "dateStringToUnixDate", "unixDateToString"),
25+
TIME(ExprUDT.EXPR_TIME, SqlTypeName.TIME, "timeStringToUnixDate", "unixTimeToString"),
26+
TIMESTAMP(
27+
ExprUDT.EXPR_TIMESTAMP,
28+
SqlTypeName.TIMESTAMP,
29+
"timestampStringToUnixDate",
30+
"unixTimestampToString");
31+
32+
private final ExprUDT udtType;
33+
private final SqlTypeName stdType;
34+
private final String toStdMethod;
35+
private final String fromStdMethod;
36+
37+
/** Matches a UDT type to its mapping. */
38+
static Optional<UdtMapping> fromUdtType(RelDataType type) {
39+
if (!(type instanceof AbstractExprRelDataType<?> e)) return Optional.empty();
40+
ExprUDT udt = e.getUdt();
41+
for (UdtMapping u : values()) {
42+
if (u.udtType == udt) return Optional.of(u);
43+
}
44+
return Optional.empty();
45+
}
46+
47+
/** Matches a standard Calcite type to its mapping. */
48+
static Optional<UdtMapping> fromStdType(RelDataType type) {
49+
SqlTypeName name = type.getSqlTypeName();
50+
for (UdtMapping u : values()) {
51+
if (u.stdType == name) return Optional.of(u);
52+
}
53+
return Optional.empty();
54+
}
55+
56+
RelDataType toStdType(RexBuilder rexBuilder, boolean nullable) {
57+
return rexBuilder
58+
.getTypeFactory()
59+
.createTypeWithNullability(rexBuilder.getTypeFactory().createSqlType(stdType), nullable);
60+
}
61+
62+
/** UDT value (String) → standard value (int/long). */
63+
Expression toStdValue(Expression result) {
64+
return Expressions.call(DateTimeUtils.class, toStdMethod, Expressions.call(result, "toString"));
65+
}
66+
67+
/** Standard value (int/long) → UDT value (String). */
68+
Expression fromStdValue(Expression operand) {
69+
return Expressions.call(DateTimeUtils.class, fromStdMethod, operand);
70+
}
71+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.udt;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Optional;
11+
import lombok.RequiredArgsConstructor;
12+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
13+
import org.apache.calcite.linq4j.tree.Expression;
14+
import org.apache.calcite.rel.RelHomogeneousShuttle;
15+
import org.apache.calcite.rel.RelNode;
16+
import org.apache.calcite.rel.type.RelDataType;
17+
import org.apache.calcite.rex.RexBuilder;
18+
import org.apache.calcite.rex.RexCall;
19+
import org.apache.calcite.rex.RexNode;
20+
import org.apache.calcite.rex.RexShuttle;
21+
import org.apache.calcite.sql.type.ReturnTypes;
22+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
23+
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
24+
import org.opensearch.sql.expression.function.ImplementorUDF;
25+
import org.opensearch.sql.expression.function.ImplementorUDF.ImplementableUDFunction;
26+
import org.opensearch.sql.expression.function.UDFOperandMetadata;
27+
28+
/**
29+
* Normalizes UDT types in the plan by replacing UDT return types with standard Calcite types
30+
* (signature) and wrapping UDF implementors to convert between UDT and standard values
31+
* (implementation).
32+
*/
33+
@RequiredArgsConstructor
34+
public class UdtNormalizer {
35+
36+
private final RexBuilder rexBuilder;
37+
38+
public RelNode normalize(RelNode plan) {
39+
return plan.accept(
40+
new RelHomogeneousShuttle() {
41+
@Override
42+
public RelNode visit(RelNode other) {
43+
return super.visit(other).accept(new UdtRexShuttle());
44+
}
45+
});
46+
}
47+
48+
private class UdtRexShuttle extends RexShuttle {
49+
50+
@Override
51+
public RexNode visitCall(RexCall call) {
52+
RexCall visited = (RexCall) super.visitCall(call);
53+
54+
if (!(visited.getOperator() instanceof SqlUserDefinedFunction udf
55+
&& udf.getFunction() instanceof ImplementableUDFunction func)) {
56+
return visited;
57+
}
58+
59+
Optional<UdtMapping> returnType = UdtMapping.fromUdtType(visited.getType());
60+
if (returnType.isEmpty()
61+
&& visited.getOperands().stream()
62+
.noneMatch(op -> UdtMapping.fromStdType(op.getType()).isPresent())) {
63+
return visited;
64+
}
65+
66+
RelDataType normReturnType = normalizeReturnType(visited, returnType);
67+
NotNullImplementor normFuncImpl = normalizeImplementation(visited, func, returnType);
68+
SqlUserDefinedFunction normUdf =
69+
new ImplementorUDF(normFuncImpl, func.getNullPolicy()) {
70+
@Override
71+
public SqlReturnTypeInference getReturnTypeInference() {
72+
return returnType
73+
.<SqlReturnTypeInference>map(u -> ReturnTypes.explicit(normReturnType))
74+
.orElse(udf.getReturnTypeInference());
75+
}
76+
77+
@Override
78+
public UDFOperandMetadata getOperandMetadata() {
79+
return (UDFOperandMetadata) udf.getOperandTypeChecker();
80+
}
81+
}.toUDF(udf.getName(), udf.isDeterministic());
82+
return rexBuilder.makeCall(normReturnType, normUdf, visited.getOperands());
83+
}
84+
}
85+
86+
/** Replace UDT return type with standard Calcite type. */
87+
private RelDataType normalizeReturnType(RexCall call, Optional<UdtMapping> returnUdt) {
88+
return returnUdt
89+
.map(u -> u.toStdType(rexBuilder, call.getType().isNullable()))
90+
.orElse(call.getType());
91+
}
92+
93+
/** Wrap implementor to convert inputs (standard → UDT) and output (UDT → standard). */
94+
private NotNullImplementor normalizeImplementation(
95+
RexCall originalCall, ImplementableUDFunction func, Optional<UdtMapping> returnUdt) {
96+
return (translator, rexCall, operands) -> {
97+
List<Expression> converted = new ArrayList<>(operands.size());
98+
for (int i = 0; i < operands.size(); i++) {
99+
Expression operand = operands.get(i);
100+
RelDataType opType = originalCall.getOperands().get(i).getType();
101+
converted.add(
102+
UdtMapping.fromStdType(opType).map(u -> u.fromStdValue(operand)).orElse(operand));
103+
}
104+
Expression result = func.getNotNullImplementor().implement(translator, rexCall, converted);
105+
return returnUdt.map(u -> u.toStdValue(result)).orElse(result);
106+
};
107+
}
108+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.udt;
7+
8+
import static java.sql.Types.BIGINT;
9+
import static java.sql.Types.DATE;
10+
import static java.sql.Types.INTEGER;
11+
import static java.sql.Types.TIME;
12+
import static java.sql.Types.VARCHAR;
13+
import static org.junit.Assert.assertFalse;
14+
15+
import java.sql.PreparedStatement;
16+
import java.sql.ResultSet;
17+
import java.time.LocalDate;
18+
import java.time.LocalTime;
19+
import org.apache.calcite.rel.RelHomogeneousShuttle;
20+
import org.apache.calcite.rel.RelNode;
21+
import org.apache.calcite.rex.RexCall;
22+
import org.apache.calcite.rex.RexNode;
23+
import org.apache.calcite.rex.RexShuttle;
24+
import org.apache.calcite.schema.Table;
25+
import org.apache.calcite.sql.type.SqlTypeName;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.opensearch.sql.api.ResultSetAssertion;
29+
import org.opensearch.sql.api.UnifiedQueryTestBase;
30+
import org.opensearch.sql.api.compiler.UnifiedQueryCompiler;
31+
32+
public class UdtNormalizerTest extends UnifiedQueryTestBase implements ResultSetAssertion {
33+
34+
private UnifiedQueryCompiler compiler;
35+
36+
@Before
37+
public void setUp() {
38+
super.setUp();
39+
compiler = new UnifiedQueryCompiler(context);
40+
}
41+
42+
@Override
43+
protected Table createEmployeesTable() {
44+
return SimpleTable.builder()
45+
.col("name", SqlTypeName.VARCHAR)
46+
.col("hire_date", SqlTypeName.DATE)
47+
.col("login_time", SqlTypeName.TIME)
48+
.col("updated_at", SqlTypeName.TIMESTAMP)
49+
.row(
50+
new Object[] {
51+
"Alice",
52+
(int) LocalDate.of(2020, 3, 15).toEpochDay(),
53+
(int) (LocalTime.of(9, 30).toNanoOfDay() / 1_000_000),
54+
1705312200000L
55+
}) // 2024-01-15 10:30:00 UTC
56+
.build();
57+
}
58+
59+
private void assertNoUdtInRexCalls(RelNode plan) {
60+
plan.accept(
61+
new RelHomogeneousShuttle() {
62+
@Override
63+
public RelNode visit(RelNode other) {
64+
other.accept(
65+
new RexShuttle() {
66+
@Override
67+
public RexNode visitCall(RexCall call) {
68+
assertFalse(
69+
"RexCall " + call + " has UDT return type: " + call.getType(),
70+
UdtMapping.fromUdtType(call.getType()).isPresent());
71+
return super.visitCall(call);
72+
}
73+
});
74+
return super.visit(other);
75+
}
76+
});
77+
}
78+
79+
private ResultSet planAndExecute(String query) throws Exception {
80+
RelNode plan = planner.plan(query);
81+
assertNoUdtInRexCalls(plan);
82+
PreparedStatement stmt = compiler.compile(plan);
83+
return stmt.executeQuery();
84+
}
85+
86+
@Test
87+
public void testDateUdtNormalization() throws Exception {
88+
ResultSet rs =
89+
planAndExecute(
90+
"source = catalog.employees"
91+
+ " | eval last = LAST_DAY(hire_date), y = YEAR(hire_date)"
92+
+ " | fields last, y");
93+
verify(rs)
94+
.expectSchema(col("last", DATE), col("y", INTEGER))
95+
.expectData(row(java.sql.Date.valueOf("2020-03-31"), 2020));
96+
}
97+
98+
@Test
99+
public void testTimeUdtNormalization() throws Exception {
100+
ResultSet rs =
101+
planAndExecute(
102+
"source = catalog.employees | where login_time > MAKETIME(8, 0, 0) | fields"
103+
+ " login_time");
104+
verify(rs)
105+
.expectSchema(col("login_time", TIME))
106+
.expectData(row(java.sql.Time.valueOf("09:30:00")));
107+
}
108+
109+
@Test
110+
public void testTimestampUdtNormalization() throws Exception {
111+
ResultSet rs =
112+
planAndExecute(
113+
"source = catalog.employees"
114+
+ " | where updated_at > TIMESTAMP('2024-01-01 00:00:00')"
115+
+ " | eval fmt = DATE_FORMAT(updated_at, '%Y-%m')"
116+
+ " | fields fmt");
117+
verify(rs).expectSchema(col("fmt", VARCHAR)).expectData(row("2024-01"));
118+
}
119+
120+
@Test
121+
public void testNestedUdfCalls() throws Exception {
122+
ResultSet rs =
123+
planAndExecute(
124+
"source = catalog.employees"
125+
+ " | eval days = DATEDIFF(DATE('2025-01-01'), DATE(hire_date))"
126+
+ " | fields days");
127+
verify(rs).expectSchema(col("days", BIGINT)).expectData(row(1753L));
128+
}
129+
130+
@Test
131+
public void testUnrelatedUdfUnaffected() throws Exception {
132+
ResultSet rs =
133+
planAndExecute("source = catalog.employees | eval s = CONCAT(name, ' test') | fields s");
134+
verify(rs).expectSchema(col("s", VARCHAR)).expectData(row("Alice test"));
135+
}
136+
137+
@Test
138+
public void testStandardCalciteFunctionUnaffected() throws Exception {
139+
ResultSet rs = planAndExecute("source = catalog.employees | eval u = UPPER(name) | fields u");
140+
verify(rs).expectSchema(col("u", VARCHAR)).expectData(row("ALICE"));
141+
}
142+
}

0 commit comments

Comments
 (0)