Skip to content

Commit 65c549f

Browse files
committed
feat(api): Add datetime UDT extension for unified query API
Bridge the type mismatch between PPL UDT types (String-based) and standard Calcite types (int/long-based) in the unified query API path by contributing two post-analysis rules to PPL's LanguageSpec: 1. DatetimeUdtNormalizeRule rewrites UDT calls — replaces UDT return types with standard Calcite types and wraps UDF implementors to convert values at input (int/long -> String) and output (String -> int/long). 2. DatetimeUdtOutputCastRule wraps the plan root with a projection that casts remaining datetime output columns to VARCHAR so the wire format matches PPL's String datetime contract. Both rules are registered via DatetimeUdtExtension, which encapsulates the ordering invariant (normalize before cast). The extension plugs into the LanguageExtension mechanism introduced in #5360 via a new postAnalysisRules hook, applied once at the top of UnifiedQueryPlanner.plan() after the language-specific strategy returns. Applied only on the PPL path; zero impact on the SQL or OpenSearch plugin paths. Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent a375f98 commit 65c549f

8 files changed

Lines changed: 482 additions & 20 deletions

File tree

api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,15 @@ public UnifiedQueryPlanner(UnifiedQueryContext context) {
6060
*/
6161
public RelNode plan(String query) {
6262
try {
63-
return context.measure(ANALYZE, () -> strategy.plan(query));
63+
return context.measure(
64+
ANALYZE,
65+
() -> {
66+
RelNode plan = strategy.plan(query);
67+
for (var rule : context.getLangSpec().postAnalysisRules()) {
68+
plan = rule.apply(plan);
69+
}
70+
return plan;
71+
});
6472
} catch (SyntaxCheckException | UnsupportedOperationException e) {
6573
throw e;
6674
} catch (Exception e) {

api/src/main/java/org/opensearch/sql/api/spec/LanguageSpec.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.ArrayList;
99
import java.util.List;
10+
import org.apache.calcite.rel.RelNode;
1011
import org.apache.calcite.sql.SqlNode;
1112
import org.apache.calcite.sql.SqlOperatorTable;
1213
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -17,8 +18,8 @@
1718

1819
/**
1920
* Language specification defining the dialect the engine accepts. Provides parser configuration,
20-
* validator configuration, and composable {@link LanguageExtension}s that contribute operators and
21-
* post-parse rewrite rules.
21+
* validator configuration, and composable {@link LanguageExtension}s that contribute operators,
22+
* post-parse rewrite rules, and post-analysis rewrite rules.
2223
*
2324
* <p>Implementations define a complete language surface — for example, {@link UnifiedSqlSpec}
2425
* provides ANSI and extended SQL modes. A future PPL spec would implement this same interface once
@@ -27,8 +28,18 @@
2728
public interface LanguageSpec {
2829

2930
/**
30-
* A composable language extension that contributes operators and post-parse rewrite rules. All
31-
* methods have defaults so extensions only override what they need.
31+
* A RelNode rewrite rule applied after analysis and before execution. Takes a logical plan and
32+
* returns a rewritten plan.
33+
*/
34+
@FunctionalInterface
35+
interface PostAnalysisRule {
36+
RelNode apply(RelNode plan);
37+
}
38+
39+
/**
40+
* A composable language extension that contributes operators, post-parse rewrite rules, and
41+
* post-analysis rewrite rules. All methods have defaults so extensions only override what they
42+
* need.
3243
*/
3344
interface LanguageExtension {
3445

@@ -47,6 +58,15 @@ default SqlOperatorTable operators() {
4758
default List<SqlVisitor<SqlNode>> postParseRules() {
4859
return List.of();
4960
}
61+
62+
/**
63+
* RelNode rewrite rules applied after analysis and before execution. Rules within a single
64+
* extension are applied in list order; extensions that depend on ordering should return their
65+
* rules together from one extension rather than relying on cross-extension ordering.
66+
*/
67+
default List<PostAnalysisRule> postAnalysisRules() {
68+
return List.of();
69+
}
5070
}
5171

5272
/**
@@ -62,9 +82,9 @@ default List<SqlVisitor<SqlNode>> postParseRules() {
6282
SqlValidator.Config validatorConfig();
6383

6484
/**
65-
* Language extensions registered with this spec. Each extension contributes operators and
66-
* post-parse rewrite rules that are composed by {@link #operatorTable()} and {@link
67-
* #postParseRules()}.
85+
* Language extensions registered with this spec. Each extension contributes operators, post-parse
86+
* rewrite rules, and post-analysis rewrite rules composed by {@link #operatorTable()}, {@link
87+
* #postParseRules()}, and {@link #postAnalysisRules()}.
6888
*/
6989
List<LanguageExtension> extensions();
7090

@@ -86,4 +106,12 @@ default SqlOperatorTable operatorTable() {
86106
default List<SqlVisitor<SqlNode>> postParseRules() {
87107
return extensions().stream().flatMap(ext -> ext.postParseRules().stream()).toList();
88108
}
109+
110+
/**
111+
* All post-analysis RelNode rewrite rules from registered extensions, flattened in registration
112+
* order. Applied to the logical plan after analysis and before execution.
113+
*/
114+
default List<PostAnalysisRule> postAnalysisRules() {
115+
return extensions().stream().flatMap(ext -> ext.postAnalysisRules().stream()).toList();
116+
}
89117
}

api/src/main/java/org/opensearch/sql/api/spec/UnifiedPplSpec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import lombok.NoArgsConstructor;
1111
import org.apache.calcite.sql.parser.SqlParser;
1212
import org.apache.calcite.sql.validate.SqlValidator;
13+
import org.opensearch.sql.api.spec.datetime.DatetimeUdtExtension;
1314

1415
/**
1516
* PPL language specification.
@@ -37,6 +38,6 @@ public SqlValidator.Config validatorConfig() {
3738

3839
@Override
3940
public List<LanguageExtension> extensions() {
40-
return List.of();
41+
return List.of(new DatetimeUdtExtension());
4142
}
4243
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.spec.datetime;
7+
8+
import java.util.List;
9+
import java.util.Optional;
10+
import lombok.Getter;
11+
import lombok.RequiredArgsConstructor;
12+
import org.apache.calcite.avatica.util.DateTimeUtils;
13+
import org.apache.calcite.linq4j.tree.Expression;
14+
import org.apache.calcite.linq4j.tree.Expressions;
15+
import org.apache.calcite.rel.type.RelDataType;
16+
import org.apache.calcite.rex.RexBuilder;
17+
import org.apache.calcite.sql.type.SqlTypeName;
18+
import org.opensearch.sql.api.spec.LanguageSpec.LanguageExtension;
19+
import org.opensearch.sql.api.spec.LanguageSpec.PostAnalysisRule;
20+
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
21+
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
22+
23+
/**
24+
* Normalizes datetime UDT operations in the logical plan and casts remaining datetime output
25+
* columns to VARCHAR so the wire output matches PPL's String datetime contract.
26+
*
27+
* <p>Contributes two ordered post-analysis rules: {@link DatetimeUdtNormalizeRule} rewrites UDT
28+
* calls; {@link DatetimeUdtOutputCastRule} wraps the root with a varchar projection. The cast
29+
* depends on the normalized row type, so both rules live in a single extension to keep their
30+
* ordering encapsulated.
31+
*/
32+
public class DatetimeUdtExtension implements LanguageExtension {
33+
34+
@Override
35+
public List<PostAnalysisRule> postAnalysisRules() {
36+
return List.of(new DatetimeUdtNormalizeRule(), new DatetimeUdtOutputCastRule());
37+
}
38+
39+
/** Maps a datetime UDT to its standard Calcite equivalent with value conversion methods. */
40+
@Getter
41+
@RequiredArgsConstructor
42+
enum UdtMapping {
43+
DATE(ExprUDT.EXPR_DATE, SqlTypeName.DATE, "dateStringToUnixDate", "unixDateToString"),
44+
TIME(ExprUDT.EXPR_TIME, SqlTypeName.TIME, "timeStringToUnixDate", "unixTimeToString"),
45+
TIMESTAMP(
46+
ExprUDT.EXPR_TIMESTAMP,
47+
SqlTypeName.TIMESTAMP,
48+
"timestampStringToUnixDate",
49+
"unixTimestampToString");
50+
51+
private final ExprUDT udtType;
52+
private final SqlTypeName stdType;
53+
private final String toStdMethod;
54+
private final String fromStdMethod;
55+
56+
/** Matches a UDT type to its mapping. */
57+
static Optional<UdtMapping> fromUdtType(RelDataType type) {
58+
if (!(type instanceof AbstractExprRelDataType<?> e)) return Optional.empty();
59+
ExprUDT udt = e.getUdt();
60+
for (UdtMapping u : values()) {
61+
if (u.udtType == udt) return Optional.of(u);
62+
}
63+
return Optional.empty();
64+
}
65+
66+
/** Matches a standard Calcite type to its mapping. */
67+
static Optional<UdtMapping> fromStdType(RelDataType type) {
68+
SqlTypeName name = type.getSqlTypeName();
69+
for (UdtMapping u : values()) {
70+
if (u.stdType == name) return Optional.of(u);
71+
}
72+
return Optional.empty();
73+
}
74+
75+
RelDataType toStdType(RexBuilder rexBuilder, boolean nullable) {
76+
return rexBuilder
77+
.getTypeFactory()
78+
.createTypeWithNullability(rexBuilder.getTypeFactory().createSqlType(stdType), nullable);
79+
}
80+
81+
/** UDT value (String) → standard value (int/long). */
82+
Expression toStdValue(Expression result) {
83+
return Expressions.call(
84+
DateTimeUtils.class, toStdMethod, Expressions.call(result, "toString"));
85+
}
86+
87+
/** Standard value (int/long) → UDT value (String). */
88+
Expression fromStdValue(Expression operand) {
89+
return Expressions.call(DateTimeUtils.class, fromStdMethod, operand);
90+
}
91+
}
92+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.spec.datetime;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Optional;
11+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
12+
import org.apache.calcite.linq4j.tree.Expression;
13+
import org.apache.calcite.rel.RelHomogeneousShuttle;
14+
import org.apache.calcite.rel.RelNode;
15+
import org.apache.calcite.rel.type.RelDataType;
16+
import org.apache.calcite.rex.RexBuilder;
17+
import org.apache.calcite.rex.RexCall;
18+
import org.apache.calcite.rex.RexNode;
19+
import org.apache.calcite.rex.RexShuttle;
20+
import org.apache.calcite.sql.type.ReturnTypes;
21+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
22+
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
23+
import org.opensearch.sql.api.spec.LanguageSpec.PostAnalysisRule;
24+
import org.opensearch.sql.api.spec.datetime.DatetimeUdtExtension.UdtMapping;
25+
import org.opensearch.sql.expression.function.ImplementorUDF;
26+
import org.opensearch.sql.expression.function.ImplementorUDF.ImplementableUDFunction;
27+
import org.opensearch.sql.expression.function.UDFOperandMetadata;
28+
29+
/**
30+
* Normalizes UDT types in the plan by replacing UDT return types with standard Calcite types
31+
* (signature) and wrapping UDF implementors to convert between UDT and standard values
32+
* (implementation).
33+
*/
34+
public class DatetimeUdtNormalizeRule implements PostAnalysisRule {
35+
36+
@Override
37+
public RelNode apply(RelNode plan) {
38+
RexBuilder rexBuilder = plan.getCluster().getRexBuilder();
39+
return plan.accept(
40+
new RelHomogeneousShuttle() {
41+
@Override
42+
public RelNode visit(RelNode other) {
43+
return super.visit(other).accept(new UdtRexShuttle(rexBuilder));
44+
}
45+
});
46+
}
47+
48+
private static class UdtRexShuttle extends RexShuttle {
49+
50+
private final RexBuilder rexBuilder;
51+
52+
UdtRexShuttle(RexBuilder rexBuilder) {
53+
this.rexBuilder = rexBuilder;
54+
}
55+
56+
@Override
57+
public RexNode visitCall(RexCall call) {
58+
RexCall visited = (RexCall) super.visitCall(call);
59+
60+
if (!(visited.getOperator() instanceof SqlUserDefinedFunction udf
61+
&& udf.getFunction() instanceof ImplementableUDFunction func)) {
62+
return visited;
63+
}
64+
65+
Optional<UdtMapping> returnType = UdtMapping.fromUdtType(visited.getType());
66+
if (returnType.isEmpty()
67+
&& visited.getOperands().stream()
68+
.noneMatch(op -> UdtMapping.fromStdType(op.getType()).isPresent())) {
69+
return visited;
70+
}
71+
72+
RelDataType normReturnType = normalizeReturnType(rexBuilder, visited, returnType);
73+
NotNullImplementor normFuncImpl = normalizeImplementation(visited, func, returnType);
74+
SqlUserDefinedFunction normUdf =
75+
new ImplementorUDF(normFuncImpl, func.getNullPolicy()) {
76+
@Override
77+
public SqlReturnTypeInference getReturnTypeInference() {
78+
return returnType
79+
.<SqlReturnTypeInference>map(u -> ReturnTypes.explicit(normReturnType))
80+
.orElse(udf.getReturnTypeInference());
81+
}
82+
83+
@Override
84+
public UDFOperandMetadata getOperandMetadata() {
85+
return (UDFOperandMetadata) udf.getOperandTypeChecker();
86+
}
87+
}.toUDF(udf.getName(), udf.isDeterministic());
88+
return rexBuilder.makeCall(normReturnType, normUdf, visited.getOperands());
89+
}
90+
}
91+
92+
/** Replace UDT return type with standard Calcite type. */
93+
private static RelDataType normalizeReturnType(
94+
RexBuilder rexBuilder, RexCall call, Optional<UdtMapping> returnUdt) {
95+
return returnUdt
96+
.map(u -> u.toStdType(rexBuilder, call.getType().isNullable()))
97+
.orElse(call.getType());
98+
}
99+
100+
/** Wrap implementor to convert inputs (standard → UDT) and output (UDT → standard). */
101+
private static NotNullImplementor normalizeImplementation(
102+
RexCall originalCall, ImplementableUDFunction func, Optional<UdtMapping> returnUdt) {
103+
return (translator, rexCall, operands) -> {
104+
List<Expression> converted = new ArrayList<>(operands.size());
105+
for (int i = 0; i < operands.size(); i++) {
106+
Expression operand = operands.get(i);
107+
RelDataType opType = originalCall.getOperands().get(i).getType();
108+
converted.add(
109+
UdtMapping.fromStdType(opType).map(u -> u.fromStdValue(operand)).orElse(operand));
110+
}
111+
Expression result = func.getNotNullImplementor().implement(translator, rexCall, converted);
112+
return returnUdt.map(u -> u.toStdValue(result)).orElse(result);
113+
};
114+
}
115+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.spec.datetime;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Set;
11+
import org.apache.calcite.rel.RelNode;
12+
import org.apache.calcite.rel.logical.LogicalProject;
13+
import org.apache.calcite.rel.type.RelDataType;
14+
import org.apache.calcite.rel.type.RelDataTypeField;
15+
import org.apache.calcite.rex.RexBuilder;
16+
import org.apache.calcite.rex.RexNode;
17+
import org.apache.calcite.sql.type.SqlTypeName;
18+
import org.opensearch.sql.api.spec.LanguageSpec.PostAnalysisRule;
19+
import org.opensearch.sql.api.spec.datetime.DatetimeUdtExtension.UdtMapping;
20+
21+
/**
22+
* Wraps the plan with a projection that casts standard datetime output columns to VARCHAR so the
23+
* wire output matches PPL's String datetime contract. Runs after {@link DatetimeUdtNormalizeRule}
24+
* has converted UDT columns to their standard datetime types.
25+
*/
26+
public class DatetimeUdtOutputCastRule implements PostAnalysisRule {
27+
28+
@Override
29+
public RelNode apply(RelNode plan) {
30+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
31+
boolean anyDatetime =
32+
fields.stream().anyMatch(f -> UdtMapping.fromStdType(f.getType()).isPresent());
33+
if (!anyDatetime) {
34+
return plan;
35+
}
36+
37+
RexBuilder rexBuilder = plan.getCluster().getRexBuilder();
38+
RelDataType varcharType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
39+
List<RexNode> projects = new ArrayList<>(fields.size());
40+
List<String> names = new ArrayList<>(fields.size());
41+
for (RelDataTypeField field : fields) {
42+
RexNode ref = rexBuilder.makeInputRef(plan, field.getIndex());
43+
if (UdtMapping.fromStdType(field.getType()).isPresent()) {
44+
RelDataType nullableVarchar =
45+
rexBuilder
46+
.getTypeFactory()
47+
.createTypeWithNullability(varcharType, field.getType().isNullable());
48+
projects.add(rexBuilder.makeCast(nullableVarchar, ref));
49+
} else {
50+
projects.add(ref);
51+
}
52+
names.add(field.getName());
53+
}
54+
return LogicalProject.create(plan, List.of(), projects, names, Set.of());
55+
}
56+
}

0 commit comments

Comments
 (0)