Skip to content

Commit 3537210

Browse files
committed
fix: recover datetime schema labels on analytics-engine path (opensearch-project#5420)
Problem: PPL datetime columns on the force-routed analytics-engine path return wire schema `"type": "string"` instead of the documented `"timestamp"`/`"date"`/`"time"`. The cause is `DatetimeOutputCastRule`, which wraps the root output with `CAST(<datetime> AS VARCHAR)` so every backend can serialize datetimes uniformly — but this also strips the pre-cast type from the RelNode's row type that `AnalyticsExecutionEngine` hands to the response builder. Solution: in `AnalyticsExecutionEngine.buildSchema`, detect the rule's output Project structurally — a Project where every slot is either an identity `RexInputRef` or `CAST(<datetime InputRef> AS VARCHAR)`, with at least one cast slot. Walk past a single `Sort` wrapper (the unified planner's system query-size limit). For each cast slot, recover the pre-cast operand type for the wire schema; values continue to render as VARCHAR so per-row output is unchanged. Detection is structural rather than hint-based to avoid registering a new hint name in Calcite's strict-mode HintStrategyTable; it mirrors the shape check `DatetimeOutputCastRewriter` (analytics-backend-datafusion) already uses to locate the same Project for value-format rewriting. Signed-off-by: Eric Wei <mengwei.eric@gmail.com>
1 parent b10d541 commit 3537210

2 files changed

Lines changed: 234 additions & 9 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,15 @@
1111
import java.util.Map;
1212
import org.apache.calcite.plan.RelOptUtil;
1313
import org.apache.calcite.rel.RelNode;
14+
import org.apache.calcite.rel.core.Project;
15+
import org.apache.calcite.rel.core.Sort;
1416
import org.apache.calcite.rel.type.RelDataType;
1517
import org.apache.calcite.rel.type.RelDataTypeField;
18+
import org.apache.calcite.rex.RexCall;
19+
import org.apache.calcite.rex.RexInputRef;
20+
import org.apache.calcite.rex.RexNode;
21+
import org.apache.calcite.sql.SqlKind;
22+
import org.apache.calcite.sql.type.SqlTypeName;
1623
import org.opensearch.analytics.exec.QueryPlanExecutor;
1724
import org.opensearch.core.action.ActionListener;
1825
import org.opensearch.sql.ast.statement.ExplainMode;
@@ -71,10 +78,7 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
7178
@Override
7279
public void execute(
7380
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
74-
// QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched
75-
// to a worker pool and results arrive on the listener. Record the execute metric in the
76-
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
77-
// taken by SimpleJsonResponseFormatter sees the correct value.
81+
// Record EXECUTE inside the callback so the formatter's metric snapshot sees the final value.
7882
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
7983
long execStart = System.nanoTime();
8084

@@ -87,7 +91,7 @@ public void onResponse(Iterable<Object[]> rows) {
8791
try {
8892
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
8993
List<ExprValue> results = convertRows(rows, fields);
90-
Schema schema = buildSchema(fields);
94+
Schema schema = buildSchema(plan);
9195
execMetric.set(System.nanoTime() - execStart);
9296
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
9397
} catch (Exception e) {
@@ -132,15 +136,91 @@ private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeFie
132136
return results;
133137
}
134138

135-
private Schema buildSchema(List<RelDataTypeField> fields) {
139+
/**
140+
* Recovers pre-cast datetime types stripped by {@code DatetimeOutputCastRule}. The wire format
141+
* keeps {@code timestamp}/{@code date}/{@code time} labels even though values render as VARCHAR.
142+
*
143+
* <p>Detection is structural: the rule's output Project has only identity {@link RexInputRef}
144+
* slots and {@code CAST(<datetime InputRef> AS VARCHAR)} slots — no user-authored expressions.
145+
* That shape uniquely identifies the rule's emit and avoids unwrapping user CASTs (which can
146+
* legitimately VARCHAR-wrap a datetime expression and should round-trip as VARCHAR).
147+
*/
148+
private Schema buildSchema(RelNode plan) {
149+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
150+
Project castProject = findOutputCastProject(plan);
151+
List<RexNode> projects = castProject == null ? null : castProject.getProjects();
136152
List<Schema.Column> columns = new ArrayList<>();
137-
for (RelDataTypeField field : fields) {
138-
ExprType exprType = convertType(field.getType());
139-
columns.add(new Schema.Column(field.getName(), null, exprType));
153+
for (int i = 0; i < fields.size(); i++) {
154+
RelDataTypeField field = fields.get(i);
155+
RelDataType labelType = field.getType();
156+
if (projects != null && i < projects.size()) {
157+
labelType = unwrapDatetimeCast(projects.get(i), labelType);
158+
}
159+
columns.add(new Schema.Column(field.getName(), null, convertType(labelType)));
140160
}
141161
return new Schema(columns);
142162
}
143163

164+
/**
165+
* Returns the Project emitted by {@code DatetimeOutputCastRule} — root, or sitting under a single
166+
* {@code Sort} (system query-size limit). Detection is by structural shape: every slot is either
167+
* an identity {@link RexInputRef} or {@code CAST(<datetime InputRef> AS VARCHAR)}, and at least
168+
* one slot is the cast form. Anything else (user-authored expression, function call) means this
169+
* is not the rule's emit.
170+
*/
171+
private static Project findOutputCastProject(RelNode plan) {
172+
RelNode current = plan;
173+
while (current instanceof Sort) {
174+
current = current.getInput(0);
175+
}
176+
if (!(current instanceof Project project)) {
177+
return null;
178+
}
179+
boolean sawDatetimeCast = false;
180+
for (RexNode slot : project.getProjects()) {
181+
if (slot instanceof RexInputRef) {
182+
continue;
183+
}
184+
if (isDatetimeToVarcharCast(slot)) {
185+
sawDatetimeCast = true;
186+
continue;
187+
}
188+
return null;
189+
}
190+
return sawDatetimeCast ? project : null;
191+
}
192+
193+
private static boolean isDatetimeToVarcharCast(RexNode expr) {
194+
if (!(expr instanceof RexCall call) || call.getKind() != SqlKind.CAST) {
195+
return false;
196+
}
197+
if (call.getOperands().size() != 1) {
198+
return false;
199+
}
200+
RexNode source = call.getOperands().get(0);
201+
if (!(source instanceof RexInputRef)) {
202+
return false;
203+
}
204+
if (call.getType().getSqlTypeName() != SqlTypeName.VARCHAR) {
205+
return false;
206+
}
207+
return isDatetime(source.getType().getSqlTypeName());
208+
}
209+
210+
private static boolean isDatetime(SqlTypeName type) {
211+
return type == SqlTypeName.DATE
212+
|| type == SqlTypeName.TIME
213+
|| type == SqlTypeName.TIMESTAMP
214+
|| type == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
215+
}
216+
217+
private static RelDataType unwrapDatetimeCast(RexNode projection, RelDataType fallback) {
218+
if (!isDatetimeToVarcharCast(projection)) {
219+
return fallback;
220+
}
221+
return ((RexCall) projection).getOperands().get(0).getType();
222+
}
223+
144224
private ExprType convertType(RelDataType type) {
145225
try {
146226
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,26 @@
1515
import static org.mockito.Mockito.when;
1616

1717
import java.lang.reflect.Field;
18+
import java.util.ArrayList;
1819
import java.util.Arrays;
1920
import java.util.Collections;
21+
import java.util.List;
2022
import java.util.concurrent.atomic.AtomicReference;
2123
import java.util.stream.Collectors;
24+
import org.apache.calcite.plan.RelOptCluster;
25+
import org.apache.calcite.plan.hep.HepPlanner;
26+
import org.apache.calcite.plan.hep.HepProgramBuilder;
27+
import org.apache.calcite.rel.RelCollations;
2228
import org.apache.calcite.rel.RelNode;
29+
import org.apache.calcite.rel.logical.LogicalProject;
30+
import org.apache.calcite.rel.logical.LogicalSort;
31+
import org.apache.calcite.rel.logical.LogicalValues;
2332
import org.apache.calcite.rel.type.RelDataType;
2433
import org.apache.calcite.rel.type.RelDataTypeFactory;
2534
import org.apache.calcite.rel.type.RelDataTypeSystem;
35+
import org.apache.calcite.rex.RexBuilder;
36+
import org.apache.calcite.rex.RexNode;
37+
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
2638
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
2739
import org.apache.calcite.sql.type.SqlTypeName;
2840
import org.junit.jupiter.api.BeforeEach;
@@ -276,8 +288,141 @@ void physicalPlanExplain_callsOnFailure() {
276288
+ errorRef.get().getMessage());
277289
}
278290

291+
// --- schema recovery: structural detection of CAST(<datetime> AS VARCHAR) projects ---
292+
293+
@Test
294+
void buildSchema_recoversDatetimeLabelsFromOutputCastProject() {
295+
RelNode plan =
296+
buildOutputCastPlan(
297+
new String[] {"ts", "d", "t"},
298+
new SqlTypeName[] {SqlTypeName.TIMESTAMP, SqlTypeName.DATE, SqlTypeName.TIME});
299+
Iterable<Object[]> rows =
300+
Collections.singletonList(new Object[] {"2024-01-15 10:30:00", "2024-01-15", "10:30:00"});
301+
stubExecutorWith(plan, rows);
302+
303+
QueryResponse response = executeAndCapture(plan);
304+
String dump = dumpResponse(response);
305+
306+
assertEquals(
307+
ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(0).getExprType(), dump);
308+
assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(1).getExprType(), dump);
309+
assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(2).getExprType(), dump);
310+
}
311+
312+
@Test
313+
void buildSchema_walksThroughLogicalSortWrapper() {
314+
RelNode castProject =
315+
buildOutputCastPlan(new String[] {"ts"}, new SqlTypeName[] {SqlTypeName.TIMESTAMP});
316+
// Mimic the LogicalSystemLimit wrapper that wraps the rule-emitted Project at the root.
317+
RexBuilder rexBuilder = castProject.getCluster().getRexBuilder();
318+
RexNode fetch =
319+
rexBuilder.makeLiteral(
320+
10000, castProject.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
321+
RelNode wrapped = LogicalSort.create(castProject, RelCollations.EMPTY, null, fetch);
322+
stubExecutorWith(wrapped, Collections.emptyList());
323+
324+
QueryResponse response = executeAndCapture(wrapped);
325+
String dump = dumpResponse(response);
326+
327+
assertEquals(
328+
ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(0).getExprType(), dump);
329+
}
330+
331+
@Test
332+
void buildSchema_projectWithUserExpressionDoesNotRecover() {
333+
// A Project that mixes a CAST(<datetime> AS VARCHAR) slot with a user-authored
334+
// expression slot (here: ts + INTERVAL — an unrelated function call) is NOT the
335+
// rule's emit shape. Recovery must NOT happen; the wire schema reflects the
336+
// Project's row type as-is (the cast slot stays VARCHAR/STRING).
337+
RelNode plan =
338+
buildMixedProject(
339+
new String[] {"ts_str", "calc"},
340+
new SqlTypeName[] {SqlTypeName.TIMESTAMP, SqlTypeName.INTEGER});
341+
stubExecutorWith(plan, Collections.emptyList());
342+
343+
QueryResponse response = executeAndCapture(plan);
344+
String dump = dumpResponse(response);
345+
346+
// ts_str slot is CAST(<TIMESTAMP> AS VARCHAR) but sits next to a user expression,
347+
// so the structural shape doesn't match — schema stays STRING (VARCHAR).
348+
assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump);
349+
}
350+
351+
@Test
352+
void buildSchema_nonProjectRootKeepsFieldType() {
353+
// When the rule didn't fire (no datetime fields), the root is whatever the
354+
// planner produced — the recovery path must fall back to the field type.
355+
RelNode plan = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
356+
stubExecutorWith(plan, Collections.emptyList());
357+
358+
QueryResponse response = executeAndCapture(plan);
359+
String dump = dumpResponse(response);
360+
361+
assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump);
362+
assertEquals(
363+
ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump);
364+
}
365+
279366
// --- helpers ---
280367

368+
/**
369+
* Builds a {@code LogicalProject(CAST(<typed> AS VARCHAR))} over a {@link LogicalValues} input —
370+
* mirrors what {@code DatetimeOutputCastRule} emits at the root.
371+
*/
372+
private RelNode buildOutputCastPlan(String[] names, SqlTypeName[] types) {
373+
SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
374+
RexBuilder rexBuilder = new RexBuilder(typeFactory);
375+
HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
376+
RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
377+
378+
RelDataTypeFactory.Builder rowBuilder = typeFactory.builder();
379+
for (int i = 0; i < names.length; i++) {
380+
rowBuilder.add(names[i], types[i]).nullable(true);
381+
}
382+
RelDataType rowType = rowBuilder.build();
383+
LogicalValues input = LogicalValues.createEmpty(cluster, rowType);
384+
385+
RelDataType varchar =
386+
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true);
387+
List<RexNode> projects = new ArrayList<>(names.length);
388+
List<String> projectNames = new ArrayList<>(names.length);
389+
for (int i = 0; i < names.length; i++) {
390+
RexNode ref = rexBuilder.makeInputRef(input, i);
391+
projects.add(rexBuilder.makeCast(varchar, ref));
392+
projectNames.add(names[i]);
393+
}
394+
return LogicalProject.create(input, List.of(), projects, projectNames);
395+
}
396+
397+
/**
398+
* Builds a {@code LogicalProject} where the first slot is {@code CAST(<datetime> AS VARCHAR)} and
399+
* the second slot is a user-authored expression ({@code col + 1}) — i.e. NOT the shape that the
400+
* rule emits.
401+
*/
402+
private RelNode buildMixedProject(String[] names, SqlTypeName[] types) {
403+
SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
404+
RexBuilder rexBuilder = new RexBuilder(typeFactory);
405+
HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
406+
RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);
407+
408+
RelDataTypeFactory.Builder rowBuilder = typeFactory.builder();
409+
for (int i = 0; i < names.length; i++) {
410+
rowBuilder.add(names[i], types[i]).nullable(true);
411+
}
412+
LogicalValues input = LogicalValues.createEmpty(cluster, rowBuilder.build());
413+
414+
RelDataType varchar =
415+
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true);
416+
RexNode castSlot = rexBuilder.makeCast(varchar, rexBuilder.makeInputRef(input, 0));
417+
RexNode plusSlot =
418+
rexBuilder.makeCall(
419+
SqlStdOperatorTable.PLUS,
420+
rexBuilder.makeInputRef(input, 1),
421+
rexBuilder.makeLiteral(1, typeFactory.createSqlType(SqlTypeName.INTEGER)));
422+
return LogicalProject.create(
423+
input, List.of(), List.of(castSlot, plusSlot), List.of(names[0], names[1]));
424+
}
425+
281426
private QueryResponse executeAndCapture(RelNode relNode) {
282427
AtomicReference<QueryResponse> ref = new AtomicReference<>();
283428
engine.execute(relNode, mockContext, captureListener(ref));

0 commit comments

Comments
 (0)