Skip to content

Commit e8fb7de

Browse files
committed
Preserve datetime schema-column types in AnalyticsExecutionEngine
`DatetimeOutputCastRule` (api/spec/datetime) wraps every datetime-typed root column in `CAST(... AS VARCHAR)` so the analytics-engine backend can emit PPL's documented space-separator format via `to_char` (`DatetimeOutputCastRewriter` on the OpenSearch side). The downside is that `plan.getRowType()` then advertises those columns as VARCHAR — the response schema reports `"string"` for what callers expect as `"timestamp"`. The legacy / Calcite-reference path doesn't have this divergence: the wire value is the formatted string while the schema column type stays as the original datetime type. Walk the outermost `Project` (descending through a single `Sort`, the `LogicalSystemLimit` system query-size wrapper) and, for any slot of shape `CAST(<datetime> AS VARCHAR)`, swap in the inner `RelDataType` for the schema column. Values still come back as the strings DataFusion emitted — only the schema column type is restored. Falls back to the slot's reported type for non-cast slots, non-datetime sources, or when the root isn't a Project (raw scan / aggregate fragment). Surfaced by `verifySchema(... schema("@timestamp", "timestamp") ...)` assertions in `CalciteSearchCommandIT` time-modifier tests; this commit removes the schema-type divergence for those queries (the remaining data-row mismatches are downstream Lucene-secondary issues filed separately). Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent ee5798d commit e8fb7de

1 file changed

Lines changed: 76 additions & 5 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@
66
package org.opensearch.sql.executor.analytics;
77

88
import java.util.ArrayList;
9+
import java.util.EnumSet;
910
import java.util.LinkedHashMap;
1011
import java.util.List;
1112
import java.util.Map;
1213
import org.apache.calcite.plan.RelOptUtil;
1314
import org.apache.calcite.rel.RelNode;
15+
import org.apache.calcite.rel.core.Project;
16+
import org.apache.calcite.rel.core.Sort;
1417
import org.apache.calcite.rel.type.RelDataType;
1518
import org.apache.calcite.rel.type.RelDataTypeField;
19+
import org.apache.calcite.rex.RexCall;
20+
import org.apache.calcite.rex.RexNode;
21+
import org.apache.calcite.sql.SqlKind;
22+
import org.apache.calcite.sql.type.SqlTypeName;
1623
import org.opensearch.analytics.exec.QueryPlanExecutor;
1724
import org.opensearch.core.action.ActionListener;
1825
import org.opensearch.sql.ast.statement.ExplainMode;
@@ -87,7 +94,7 @@ public void onResponse(Iterable<Object[]> rows) {
8794
try {
8895
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
8996
List<ExprValue> results = convertRows(rows, fields);
90-
Schema schema = buildSchema(fields);
97+
Schema schema = buildSchema(fields, recoverOriginalDatetimeTypes(plan));
9198
execMetric.set(System.nanoTime() - execStart);
9299
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
93100
} catch (Exception e) {
@@ -132,11 +139,11 @@ private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeFie
132139
return results;
133140
}
134141

135-
private Schema buildSchema(List<RelDataTypeField> fields) {
142+
private Schema buildSchema(List<RelDataTypeField> fields, List<RelDataType> reportedTypes) {
136143
List<Schema.Column> columns = new ArrayList<>();
137-
for (RelDataTypeField field : fields) {
138-
ExprType exprType = convertType(field.getType());
139-
columns.add(new Schema.Column(field.getName(), null, exprType));
144+
for (int i = 0; i < fields.size(); i++) {
145+
ExprType exprType = convertType(reportedTypes.get(i));
146+
columns.add(new Schema.Column(fields.get(i).getName(), null, exprType));
140147
}
141148
return new Schema(columns);
142149
}
@@ -148,4 +155,68 @@ private ExprType convertType(RelDataType type) {
148155
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
149156
}
150157
}
158+
159+
/**
160+
* Datetime SqlTypeNames produced by the SQL-plugin engine pipeline. Used by {@link
161+
* #recoverOriginalDatetimeTypes} to detect {@code DatetimeOutputCastRule}'s {@code
162+
* CAST(<datetime> AS VARCHAR)} output slots.
163+
*/
164+
private static final EnumSet<SqlTypeName> DATETIME_TYPES =
165+
EnumSet.of(
166+
SqlTypeName.TIMESTAMP,
167+
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
168+
SqlTypeName.DATE,
169+
SqlTypeName.TIME);
170+
171+
/**
172+
* Recovers the pre-cast datetime types for the schema. {@code DatetimeOutputCastRule} (api
173+
* post-analysis pass) wraps every datetime-typed root column in {@code CAST(... AS VARCHAR)}
174+
* so the analytics-engine backend can emit PPL's documented space-separator format via {@code
175+
* to_char} ({@code DatetimeOutputCastRewriter} on the OpenSearch sandbox side). The downside
176+
* is that {@code plan.getRowType()} then advertises those columns as VARCHAR — the response
177+
* schema would report {@code "string"} for what tests (and callers) still expect as {@code
178+
* "timestamp"}. The legacy / Calcite-reference path doesn't have this divergence: the wire
179+
* value is a formatted string while the schema column type is the original datetime type.
180+
*
181+
* <p>This walk inspects the OUTERMOST {@link Project} (descending through a single {@link
182+
* Sort}, the {@code LogicalSystemLimit} wrapper) and, for any slot of the exact shape {@code
183+
* CAST(<datetime> AS VARCHAR)}, swaps in the inner {@link RelDataType} for the schema. Values
184+
* still come back as the strings DataFusion emitted; only the schema column type is restored.
185+
*
186+
* <p>Falls back to the slot's reported type for non-cast slots, non-datetime sources, or when
187+
* the root isn't a Project (raw scan / aggregate fragment).
188+
*/
189+
private static List<RelDataType> recoverOriginalDatetimeTypes(RelNode plan) {
190+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
191+
List<RelDataType> reportedTypes = new ArrayList<>(fields.size());
192+
for (RelDataTypeField field : fields) {
193+
reportedTypes.add(field.getType());
194+
}
195+
RelNode root = plan;
196+
if (root instanceof Sort sort && sort.getInput() instanceof Project) {
197+
root = sort.getInput();
198+
}
199+
if (!(root instanceof Project project)) {
200+
return reportedTypes;
201+
}
202+
List<RexNode> slots = project.getProjects();
203+
int n = Math.min(slots.size(), reportedTypes.size());
204+
for (int i = 0; i < n; i++) {
205+
RexNode slot = slots.get(i);
206+
if (!(slot instanceof RexCall call)) {
207+
continue;
208+
}
209+
if (call.getKind() != SqlKind.CAST) {
210+
continue;
211+
}
212+
if (call.getType().getSqlTypeName() != SqlTypeName.VARCHAR) {
213+
continue;
214+
}
215+
RexNode source = call.getOperands().get(0);
216+
if (DATETIME_TYPES.contains(source.getType().getSqlTypeName())) {
217+
reportedTypes.set(i, source.getType());
218+
}
219+
}
220+
return reportedTypes;
221+
}
151222
}

0 commit comments

Comments
 (0)