diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index c4bb8bcd91..f3c910c2b4 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -65,10 +65,14 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexFieldCollation; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.rex.RexWindow; import org.apache.calcite.rex.RexWindowBounds; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlLibraryOperators; @@ -2275,6 +2279,7 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) .relBuilder .aggregateCall(SqlStdOperatorTable.ROW_NUMBER) .over() + .orderBy(deriveCollationOrderKeys(context)) .rowsTo(RexWindowBounds.CURRENT_ROW) .as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS); context.relBuilder.projectPlus(streamSeq); @@ -2291,12 +2296,21 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS}); } - // Default: first get rawExpr - List overExpressions = - node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList(); - - if (hasGroup) { - // only build sequence when there is by condition + // Short-term correctness workaround: streamstats/trendline are evaluated in arrival order, and + // some engines can preserve that order through window partitions without an explicit ORDER BY. + // DataFusion and Calcite EnumerableWindow do not currently provide that guarantee for + // partitioned windows, so we make grouped streamstats frames walk an explicit input sequence. + // When the input already has an explicit Sort, materialize the sequence after that Sort instead + // of stripping it; this preserves "sort, then streamstats" semantics including tie arrival + // order. This may add a small per-partition sort cost on engines that did not need it; the + // long-term fix is a real streaming window operator. + RelCollation explicitInputCollation = PlanUtils.findInputCollation(context.relBuilder.peek()); + List windowOrderKeys = deriveCollationOrderKeys(context); + boolean useStreamSeq = + hasGroup && (windowOrderKeys.isEmpty() || explicitInputCollation != null); + if (useStreamSeq) { + // streamstats is order-sensitive. Materialize input order before any grouped window can + // repartition rows, then make each window frame walk that sequence explicitly. RexNode streamSeq = context .relBuilder @@ -2305,34 +2319,79 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) .rowsTo(RexWindowBounds.CURRENT_ROW) .as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS); context.relBuilder.projectPlus(streamSeq); + int seqColIndex = context.relBuilder.peek().getRowType().getFieldCount() - 1; + windowOrderKeys = List.of(context.relBuilder.field(seqColIndex)); + } - if (!node.isBucketNullable()) { - // construct groupNotNull predicate - List groupByList = - groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList(); - List notNullList = - PlanUtils.getSelectColumns(groupByList).stream() - .map(context.relBuilder::field) - .map(context.relBuilder::isNotNull) - .toList(); - RexNode groupNotNull = context.relBuilder.and(notNullList); - - // wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END - List wrappedOverExprs = - wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context); - context.relBuilder.projectPlus(wrappedOverExprs); - } else { - context.relBuilder.projectPlus(overExpressions); - } + List finalWindowOrderKeys = windowOrderKeys; + List overExpressions = + node.getWindowFunctionList().stream() + .map(w -> rexVisitor.analyze(w, context)) + .map(rex -> addWindowOrder(rex, finalWindowOrderKeys, context)) + .toList(); + projectStreamWindowExpressions(overExpressions, hasGroup, groupList, node, context); - // resort when there is by condition - context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS)); + if (!finalWindowOrderKeys.isEmpty()) { + context.relBuilder.sort(finalWindowOrderKeys); + } + if (useStreamSeq) { context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS)); + } + + return context.relBuilder.peek(); + } + + private void projectStreamWindowExpressions( + List overExpressions, + boolean hasGroup, + List groupList, + StreamWindow node, + CalcitePlanContext context) { + if (hasGroup && !node.isBucketNullable()) { + List groupByList = + groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList(); + List notNullList = + PlanUtils.getSelectColumns(groupByList).stream() + .map(context.relBuilder::field) + .map(context.relBuilder::isNotNull) + .toList(); + RexNode groupNotNull = context.relBuilder.and(notNullList); + context.relBuilder.projectPlus( + wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context)); } else { context.relBuilder.projectPlus(overExpressions); } + } - return context.relBuilder.peek(); + private RexNode addWindowOrder(RexNode rex, List orderKeys, CalcitePlanContext context) { + if (orderKeys.isEmpty()) { + return rex; + } + ImmutableList orderCollations = PlanUtils.toRexFieldCollations(orderKeys); + return rex.accept( + new RexShuttle() { + @Override + public RexNode visitOver(RexOver over) { + RexOver recursed = (RexOver) super.visitOver(over); + RexWindow window = recursed.getWindow(); + if (!window.orderKeys.isEmpty()) { + return recursed; + } + return context.rexBuilder.makeOver( + recursed.getType(), + recursed.getAggOperator(), + recursed.getOperands(), + window.partitionKeys, + orderCollations, + window.getLowerBound(), + window.getUpperBound(), + window.isRows(), + true, + false, + recursed.isDistinct(), + recursed.ignoreNulls()); + } + }); } private List wrapWindowFunctionsWithGroupNotNull( @@ -2648,6 +2707,7 @@ private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow .relBuilder .aggregateCall(SqlStdOperatorTable.ROW_NUMBER) .over() + .orderBy(deriveCollationOrderKeys(context)) .rowsTo(RexWindowBounds.CURRENT_ROW) .as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS); context.relBuilder.projectPlus(rowNum); @@ -2683,6 +2743,7 @@ private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow .aggregateCall( SqlStdOperatorTable.SUM, context.relBuilder.field("__reset_before_flag__")) .over() + .orderBy(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS)) .rowsTo(RexWindowBounds.CURRENT_ROW) .toRex(); RexNode sumAfterPrev = @@ -2691,6 +2752,7 @@ private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow .aggregateCall( SqlStdOperatorTable.SUM, context.relBuilder.field("__reset_after_flag__")) .over() + .orderBy(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS)) .rowsBetween( RexWindowBounds.UNBOUNDED_PRECEDING, RexWindowBounds.preceding(context.relBuilder.literal(1))) @@ -3928,6 +3990,14 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) { } }); + // Short-term correctness workaround: streamstats/trendline are evaluated in arrival order, and + // some engines can preserve that order through window partitions without an explicit ORDER BY. + // DataFusion and Calcite EnumerableWindow do not currently provide that guarantee for every + // window frame, so we declare the inherited input order on the window frame. This may add a + // small per-partition sort cost on engines that did not need it; the long-term fix is a real + // streaming window operator. + List trendlineOrderKeys = deriveCollationOrderKeys(context); + List trendlineNodes = new ArrayList<>(); List aliases = new ArrayList<>(); node.getComputations() @@ -3949,7 +4019,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) { null, List.of(), List.of(), - List.of(), + trendlineOrderKeys, windowFrame); // CASE WHEN count() over (ROWS (windowSize-1) PRECEDING) > windowSize - 1 RexNode whenConditionExpr = @@ -3970,7 +4040,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) { field, List.of(), List.of(), - List.of(), + trendlineOrderKeys, windowFrame); break; case TrendlineType.WMA: @@ -3980,6 +4050,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) { field, trendlineComputation.getNumberOfDataPoints(), windowFrame, + trendlineOrderKeys, context); break; default: @@ -4007,6 +4078,7 @@ private RexNode buildWmaRexNode( RexNode field, Integer numberOfDataPoints, WindowFrame windowFrame, + List orderKeys, CalcitePlanContext context) { // Divisor: 1 + 2 + 3 + ... + windowSize, aka (windowSize * (windowSize + 1) / 2) @@ -4023,7 +4095,7 @@ private RexNode buildWmaRexNode( field, List.of(context.relBuilder.literal(i)), List.of(), - List.of(), + orderKeys, windowFrame); divider = context.relBuilder.call( diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLDedupConvertRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLDedupConvertRule.java index 39bd243ea5..30dc901b0b 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLDedupConvertRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLDedupConvertRule.java @@ -6,6 +6,8 @@ package org.opensearch.sql.calcite.plan.rule; import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP; +import static org.opensearch.sql.calcite.utils.PlanUtils.collationToOrderKeys; +import static org.opensearch.sql.calcite.utils.PlanUtils.restoreInputOrder; import java.util.ArrayList; import java.util.List; @@ -187,42 +189,6 @@ public static void buildDedupNotNull( restoreInputOrder(relBuilder, inputCollation); } - /** - * Convert a RelCollation to a list of RexNode order keys using the RelBuilder's field references. - */ - private static List collationToOrderKeys(RelBuilder relBuilder, RelCollation collation) { - if (collation == null || collation.getFieldCollations().isEmpty()) { - return List.of(); - } - List orderKeys = new ArrayList<>(); - for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { - RexNode fieldRef = relBuilder.field(fieldCollation.getFieldIndex()); - if (fieldCollation.direction.isDescending()) { - fieldRef = relBuilder.desc(fieldRef); - } - if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) { - fieldRef = relBuilder.nullsLast(fieldRef); - } else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) { - fieldRef = relBuilder.nullsFirst(fieldRef); - } - orderKeys.add(fieldRef); - } - return orderKeys; - } - - /** - * Re-apply a sort after dedup to restore the input order that may have been disrupted by the - * window operator. EnumerableWindow can re-partition data by the PARTITION BY key, destroying any - * upstream sort order. This explicit re-sort ensures the final output preserves the original - * order. - */ - private static void restoreInputOrder(RelBuilder relBuilder, RelCollation inputCollation) { - if (inputCollation != null && !inputCollation.getFieldCollations().isEmpty()) { - List sortKeys = collationToOrderKeys(relBuilder, inputCollation); - relBuilder.sort(sortKeys); - } - } - /** Rule configuration. */ @Value.Immutable public interface Config extends OpenSearchRuleConfig { diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index f899f74742..4104d05909 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -16,6 +16,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -49,6 +50,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexFieldCollation; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -201,9 +203,9 @@ static RexNode makeOver( // sum(x) / count(x) return context.relBuilder.call( SqlStdOperatorTable.DIVIDE, - sumOver(context, field, partitions, rows, lowerBound, upperBound), + sumOver(context, field, partitions, orderKeys, rows, lowerBound, upperBound), context.relBuilder.cast( - countOver(context, field, partitions, rows, lowerBound, upperBound), + countOver(context, field, partitions, orderKeys, rows, lowerBound, upperBound), SqlTypeName.DOUBLE)); // stddev_pop(x) ==> // power((sum(x * x) - sum(x) * sum(x) / count(x)) / count(x), 0.5) @@ -217,13 +219,17 @@ static RexNode makeOver( // var_samp(x) ==> // (sum(x * x) - sum(x) * sum(x) / count(x)) / (count(x) - 1) case STDDEV_POP: - return variance(context, field, partitions, rows, lowerBound, upperBound, true, true); + return variance( + context, field, partitions, orderKeys, rows, lowerBound, upperBound, true, true); case STDDEV_SAMP: - return variance(context, field, partitions, rows, lowerBound, upperBound, false, true); + return variance( + context, field, partitions, orderKeys, rows, lowerBound, upperBound, false, true); case VARPOP: - return variance(context, field, partitions, rows, lowerBound, upperBound, true, false); + return variance( + context, field, partitions, orderKeys, rows, lowerBound, upperBound, true, false); case VARSAMP: - return variance(context, field, partitions, rows, lowerBound, upperBound, false, false); + return variance( + context, field, partitions, orderKeys, rows, lowerBound, upperBound, false, false); case ROW_NUMBER: return withOver( context.relBuilder.aggregateCall(SqlStdOperatorTable.ROW_NUMBER), @@ -255,24 +261,26 @@ private static RexNode sumOver( CalcitePlanContext ctx, RexNode operation, List partitions, + List orderKeys, boolean rows, RexWindowBound lowerBound, RexWindowBound upperBound) { return withOver( - ctx.relBuilder.sum(operation), partitions, List.of(), rows, lowerBound, upperBound); + ctx.relBuilder.sum(operation), partitions, orderKeys, rows, lowerBound, upperBound); } private static RexNode countOver( CalcitePlanContext ctx, RexNode operation, List partitions, + List orderKeys, boolean rows, RexWindowBound lowerBound, RexWindowBound upperBound) { return withOver( ctx.relBuilder.count(ImmutableList.of(operation)), partitions, - List.of(), + orderKeys, rows, lowerBound, upperBound); @@ -297,20 +305,67 @@ private static RexNode withOver( .toRex(); } + static ImmutableList toRexFieldCollations(List orderKeys) { + ImmutableList.Builder orderCollationBuilder = ImmutableList.builder(); + orderKeys.forEach(key -> orderCollationBuilder.add(toRexFieldCollation(key))); + return orderCollationBuilder.build(); + } + + static RexFieldCollation toRexFieldCollation(RexNode node) { + return toRexFieldCollation( + node, RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.UNSPECIFIED); + } + + private static RexFieldCollation toRexFieldCollation( + RexNode node, + RelFieldCollation.Direction direction, + RelFieldCollation.NullDirection nullDirection) { + switch (node.getKind()) { + case DESCENDING: + return toRexFieldCollation( + ((RexCall) node).getOperands().getFirst(), + RelFieldCollation.Direction.DESCENDING, + nullDirection); + case NULLS_FIRST: + return toRexFieldCollation( + ((RexCall) node).getOperands().getFirst(), + direction, + RelFieldCollation.NullDirection.FIRST); + case NULLS_LAST: + return toRexFieldCollation( + ((RexCall) node).getOperands().getFirst(), + direction, + RelFieldCollation.NullDirection.LAST); + default: + Set flags = EnumSet.noneOf(SqlKind.class); + if (direction == RelFieldCollation.Direction.DESCENDING) { + flags.add(SqlKind.DESCENDING); + } + if (nullDirection == RelFieldCollation.NullDirection.FIRST) { + flags.add(SqlKind.NULLS_FIRST); + } else if (nullDirection == RelFieldCollation.NullDirection.LAST) { + flags.add(SqlKind.NULLS_LAST); + } + return new RexFieldCollation(node, flags); + } + } + private static RexNode variance( CalcitePlanContext ctx, RexNode operator, List partitions, + List orderKeys, boolean rows, RexWindowBound lowerBound, RexWindowBound upperBound, boolean biased, boolean sqrt) { RexNode argSquared = ctx.relBuilder.call(SqlStdOperatorTable.MULTIPLY, operator, operator); - RexNode sumArgSquared = sumOver(ctx, argSquared, partitions, rows, lowerBound, upperBound); - RexNode sum = sumOver(ctx, operator, partitions, rows, lowerBound, upperBound); + RexNode sumArgSquared = + sumOver(ctx, argSquared, partitions, orderKeys, rows, lowerBound, upperBound); + RexNode sum = sumOver(ctx, operator, partitions, orderKeys, rows, lowerBound, upperBound); RexNode sumSquared = ctx.relBuilder.call(SqlStdOperatorTable.MULTIPLY, sum, sum); - RexNode count = countOver(ctx, operator, partitions, rows, lowerBound, upperBound); + RexNode count = countOver(ctx, operator, partitions, orderKeys, rows, lowerBound, upperBound); RexNode countCast = ctx.relBuilder.cast(count, SqlTypeName.DOUBLE); RexNode avgSumSquared = ctx.relBuilder.call(SqlStdOperatorTable.DIVIDE, sumSquared, countCast); RexNode diff = ctx.relBuilder.call(SqlStdOperatorTable.MINUS, sumArgSquared, avgSumSquared); @@ -701,6 +756,42 @@ public Void visitCorrelVariable(RexCorrelVariable correlVar) { return outputCollation; } + /** + * Convert a {@link RelCollation} to {@link RexNode} order keys using the current RelBuilder field + * references. + */ + public static List collationToOrderKeys( + RelBuilder relBuilder, @Nullable RelCollation collation) { + if (collation == null || collation.getFieldCollations().isEmpty()) { + return List.of(); + } + List orderKeys = new ArrayList<>(); + for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { + RexNode fieldRef = relBuilder.field(fieldCollation.getFieldIndex()); + if (fieldCollation.direction.isDescending()) { + fieldRef = relBuilder.desc(fieldRef); + } + if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) { + fieldRef = relBuilder.nullsLast(fieldRef); + } else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) { + fieldRef = relBuilder.nullsFirst(fieldRef); + } + orderKeys.add(fieldRef); + } + return orderKeys; + } + + /** + * Re-apply a sort to restore input order that may have been disrupted by a window operator. + * EnumerableWindow can re-partition data by the PARTITION BY key, destroying upstream sort order. + */ + public static void restoreInputOrder( + RelBuilder relBuilder, @Nullable RelCollation inputCollation) { + if (inputCollation != null && !inputCollation.getFieldCollations().isEmpty()) { + relBuilder.sort(collationToOrderKeys(relBuilder, inputCollation)); + } + } + /** * Remove the first Sort node found in the tree, replacing it with its input. Only traverses * through single-input operators (Filter, Project) that preserve order. diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 17b88a901d..fda656873f 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -1329,19 +1329,16 @@ task integTestRemote(type: RestIntegTestTask) { // === Excludes: asserts a Lucene pushdown fragment absent on the AE route === excludeTestsMatching '*CalciteSortCommandIT.testPushdownSortCastToDoubleExpression' - // === Excludes: CalciteStreamstatsCommandIT route divergences === + // === Excludes: chained CalciteStreamstatsCommandIT route divergences === // Each test also carries an in-test @RequiresCapability(...) recording the reason. - // - CHAINED_STREAMSTATS_BY: chaining two streamstats where an upstream stage has `by` - // emits two ROW_NUMBER() sequence columns the Substrait converter names identically, - // so the stacked schema has a duplicate/ambiguous field name (500) or, for chained - // window streamstats, non-deterministic values. Fails single- and multi-shard. + // Chaining two streamstats where an upstream stage has `by` emits two ROW_NUMBER() + // sequence columns the Substrait converter names identically, so the stacked schema has + // a duplicate/ambiguous field name (500) or, for chained window streamstats, + // non-deterministic values. Fails single- and multi-shard. excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstats' excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithWindow' excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithNull1' excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithEval' - // - STREAMSTATS_SORT_NOT_HONORED: streamstats computes its window over the backend scan - // order, ignoring a preceding `| sort` (the OVER clause has no explicit ORDER BY). - excludeTestsMatching '*CalciteStreamstatsCommandIT.testStreamstatsAndSort' } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java index e70812e3c3..5ab2ded656 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java @@ -8,7 +8,6 @@ import static org.opensearch.sql.legacy.TestsConstants.*; import static org.opensearch.sql.util.Capability.CHAINED_STREAMSTATS_BY; import static org.opensearch.sql.util.Capability.DOC_MUTATION; -import static org.opensearch.sql.util.Capability.STREAMSTATS_SORT_NOT_HONORED; import static org.opensearch.sql.util.MatcherUtils.*; import java.io.IOException; @@ -1017,7 +1016,6 @@ public void testStreamstatsAndEventstats() throws IOException { } @Test - @RequiresCapability(STREAMSTATS_SORT_NOT_HONORED) public void testStreamstatsAndSort() throws IOException { JSONObject actual = executeQuery( diff --git a/integ-test/src/test/java/org/opensearch/sql/util/Capability.java b/integ-test/src/test/java/org/opensearch/sql/util/Capability.java index 3bac06e7fc..86f8b8fc36 100644 --- a/integ-test/src/test/java/org/opensearch/sql/util/Capability.java +++ b/integ-test/src/test/java/org/opensearch/sql/util/Capability.java @@ -486,20 +486,7 @@ public enum Capability { "Chaining two streamstats where an upstream stage partitions by a group fails on the" + " analytics-engine route: both stages emit a ROW_NUMBER() sequence column the Substrait" + " converter names identically, producing a duplicate/ambiguous field name (500) or" - + " non-deterministic window values."), - - /** - * {@code streamstats} computes its running/window aggregate over the backend scan order on the - * analytics-engine route, ignoring a preceding {@code | sort}. The {@code OVER} clause carries no - * explicit {@code ORDER BY} (streamstats orders by encounter order by design), so DataFusion - * evaluates the window in scan order rather than the sorted order the v2/Calcite path honors. - * Verified: {@code sort age | streamstats window=2 avg(age)} yields window values computed in - * insertion order, not age order, so the per-row aggregates diverge. - */ - STREAMSTATS_SORT_NOT_HONORED( - "streamstats computes its window over the backend scan order on the analytics-engine route," - + " ignoring a preceding | sort (the OVER clause has no explicit ORDER BY), so the window" - + " values diverge from the v2/Calcite path which honors the sort."); + + " non-deterministic window values."); private final String reason; diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml index 0a280b77df..b57b021b03 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_distinct_count.yaml @@ -3,12 +3,12 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ORDER BY $17 ROWS UNBOUNDED PRECEDING)]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..12=[{inputs}], proj#0..10=[{exprs}], distinct_states=[$t12]) CalciteEnumerableTopK(sort0=[$11], dir0=[ASC], fetch=[10000]) - EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [DISTINCT_COUNT_APPROX($7)])]) + EnumerableWindow(window#0=[window(partition {4} order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [DISTINCT_COUNT_APPROX($7)])]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml index 2d6062c114..29c78d3126 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest.yaml @@ -3,12 +3,12 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1 ORDER BY $11 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1 ORDER BY $11 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | EnumerableCalc(expr#0..7=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t6], latest_message=[$t7]) CalciteEnumerableTopK(sort0=[$5], dir0=[ASC], fetch=[10000]) - EnumerableWindow(window#0=[window(partition {1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])]) + EnumerableWindow(window#0=[window(partition {1} order by [5] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["created_at","server","@timestamp","message","level"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml index 8a7612054c..0fa2c04ae1 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_earliest_latest_custom_time.yaml @@ -3,12 +3,12 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4 ORDER BY $11 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4 ORDER BY $11 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | EnumerableCalc(expr#0..7=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t6], latest_message=[$t7]) CalciteEnumerableTopK(sort0=[$5], dir0=[ASC], fetch=[10000]) - EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])]) + EnumerableWindow(window#0=[window(partition {4} order by [5] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["created_at","server","@timestamp","message","level"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml index d52457d667..d19f505337 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_null_bucket.yaml @@ -3,13 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], avg_age=[CASE(IS NOT NULL($4), /(SUM($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), CAST(COUNT($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)):DOUBLE NOT NULL), null:DOUBLE)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], avg_age=[CASE(IS NOT NULL($4), /(SUM($8) OVER (PARTITION BY $4 ORDER BY $17 ROWS UNBOUNDED PRECEDING), CAST(COUNT($8) OVER (PARTITION BY $4 ORDER BY $17 ROWS UNBOUNDED PRECEDING)):DOUBLE NOT NULL), null:DOUBLE)]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..14=[{inputs}], expr#15=[CAST($t14):DOUBLE NOT NULL], expr#16=[/($t13, $t15)], expr#17=[null:DOUBLE], expr#18=[CASE($t12, $t16, $t17)], proj#0..10=[{exprs}], avg_age=[$t18]) CalciteEnumerableTopK(sort0=[$11], dir0=[ASC], fetch=[10000]) - EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($8), COUNT($8)])]) + EnumerableWindow(window#0=[window(partition {4} order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($8), COUNT($8)])]) EnumerableCalc(expr#0..11=[{inputs}], expr#12=[IS NOT NULL($t4)], proj#0..12=[{exprs}]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml index 72f8f4d6ca..f57613734c 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml @@ -4,13 +4,13 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$21]) LogicalSort(sort0=[$17], dir0=[ASC]) LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17, 20}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($0)]) LogicalProject(age=[$8]) LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | @@ -19,7 +19,7 @@ calcite: EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) EnumerableCalc(expr#0..16=[{inputs}], expr#17=[0], expr#18=[COALESCE($t16, $t17)], expr#19=[+($t15, $t18)], proj#0..11=[{exprs}], __seg_id__=[$t19], $f16=[$t14]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $15 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(order by [11] rows between UNBOUNDED PRECEDING and $15 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) EnumerableCalc(expr#0..11=[{inputs}], expr#12=[34], expr#13=[>($t8, $t12)], expr#14=[1], expr#15=[0], expr#16=[CASE($t13, $t14, $t15)], expr#17=[25], expr#18=[<($t8, $t17)], expr#19=[CASE($t18, $t14, $t15)], expr#20=[IS NULL($t4)], proj#0..11=[{exprs}], __reset_before_flag__=[$t16], __reset_after_flag__=[$t19], $14=[$t20]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) @@ -29,12 +29,12 @@ calcite: EnumerableHashJoin(condition=[AND(=($2, $7), <($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) EnumerableAggregate(group=[{0, 1, 2, 3}]) EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..1=[{exprs}], __seg_id__=[$t9], $f16=[$t4]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(order by [1] rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], expr#11=[IS NULL($t0)], gender=[$t0], __stream_seq__=[$t2], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10], $4=[$t11]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(order by [2] rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], proj#0..2=[{exprs}], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml index 42b50e7eb5..929d00d033 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml @@ -4,13 +4,13 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$21]) LogicalSort(sort0=[$17], dir0=[ASC]) LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17, 20}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($0)]) LogicalProject(age=[$8]) LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | @@ -19,7 +19,7 @@ calcite: EnumerableMergeJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) EnumerableSort(sort0=[$4], sort1=[$11], sort2=[$12], dir0=[ASC], dir1=[ASC], dir2=[ASC]) EnumerableCalc(expr#0..15=[{inputs}], expr#16=[0], expr#17=[COALESCE($t15, $t16)], expr#18=[+($t14, $t17)], proj#0..11=[{exprs}], __seg_id__=[$t18]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(order by [11] rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) EnumerableCalc(expr#0..11=[{inputs}], expr#12=[34], expr#13=[>($t8, $t12)], expr#14=[1], expr#15=[0], expr#16=[CASE($t13, $t14, $t15)], expr#17=[25], expr#18=[<($t8, $t17)], expr#19=[CASE($t18, $t14, $t15)], proj#0..11=[{exprs}], __reset_before_flag__=[$t16], __reset_after_flag__=[$t19]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) @@ -29,12 +29,12 @@ calcite: EnumerableHashJoin(condition=[AND(=($2, $6), =($0, $3), <($5, $1))], joinType=[inner]) EnumerableAggregate(group=[{0, 1, 2}]) EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[COALESCE($t5, $t6)], expr#8=[+($t4, $t7)], proj#0..1=[{exprs}], __seg_id__=[$t8]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(order by [1] rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], gender=[$t0], __stream_seq__=[$t2], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(order by [2] rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) EnumerableCalc(expr#0..2=[{inputs}], expr#3=[34], expr#4=[>($t1, $t3)], expr#5=[1], expr#6=[0], expr#7=[CASE($t4, $t5, $t6)], expr#8=[25], expr#9=[<($t1, $t8)], expr#10=[CASE($t9, $t5, $t6)], proj#0..2=[{exprs}], __reset_before_flag__=[$t7], __reset_after_flag__=[$t10]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml index 354c7f74fb..5f6be04c24 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml @@ -1,7 +1,7 @@ calcite: logical: | LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalProject(ageTrend=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1), /(SUM($8) OVER (ROWS 1 PRECEDING), CAST(COUNT($8) OVER (ROWS 1 PRECEDING)):DOUBLE NOT NULL), null:NULL)]) + LogicalProject(ageTrend=[CASE(>(COUNT() OVER (ORDER BY $8 NULLS LAST ROWS 1 PRECEDING), 1), /(SUM($8) OVER (ORDER BY $8 NULLS LAST ROWS 1 PRECEDING), CAST(COUNT($8) OVER (ORDER BY $8 NULLS LAST ROWS 1 PRECEDING)):DOUBLE NOT NULL), null:NULL)]) LogicalFilter(condition=[IS NOT NULL($8)]) LogicalSort(sort0=[$8], dir0=[ASC]) LogicalSort(fetch=[5]) @@ -9,7 +9,7 @@ calcite: physical: | EnumerableLimit(fetch=[10000]) EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9]) - EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [0] rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])], constants=[[1]]) EnumerableCalc(expr#0=[{inputs}], expr#1=[IS NOT NULL($t0)], age=[$t0], $condition=[$t1]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{ "age" : { diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml index 550cf0ea9c..f696e69e88 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_distinct_count.yaml @@ -3,13 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4 ORDER BY $17 ROWS UNBOUNDED PRECEDING)]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], distinct_states=[$t18]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$17], dir0=[ASC]) - EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [DISTINCT_COUNT_APPROX($7)])]) + EnumerableWindow(window#0=[window(partition {4} order by [17] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [DISTINCT_COUNT_APPROX($7)])]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml index c37fae4877..2c91e94252 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest.yaml @@ -3,13 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1 ROWS UNBOUNDED PRECEDING)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1 ORDER BY $11 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1 ORDER BY $11 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | EnumerableCalc(expr#0..13=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t12], latest_message=[$t13]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableWindow(window#0=[window(partition {1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])]) + EnumerableWindow(window#0=[window(partition {1} order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml index b85e4b6b7b..fc5bc29a2d 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_earliest_latest_custom_time.yaml @@ -3,13 +3,13 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13]) LogicalSort(sort0=[$11], dir0=[ASC]) - LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)]) + LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[$11], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4 ORDER BY $11 ROWS UNBOUNDED PRECEDING)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4 ORDER BY $11 ROWS UNBOUNDED PRECEDING)]) LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) physical: | EnumerableCalc(expr#0..13=[{inputs}], proj#0..4=[{exprs}], earliest_message=[$t12], latest_message=[$t13]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])]) + EnumerableWindow(window#0=[window(partition {4} order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml index 0887604522..1126ffd7bd 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_null_bucket.yaml @@ -3,14 +3,14 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], avg_age=[CASE(IS NOT NULL($4), /(SUM($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING), CAST(COUNT($8) OVER (PARTITION BY $4 ROWS UNBOUNDED PRECEDING)):DOUBLE NOT NULL), null:DOUBLE)]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], avg_age=[CASE(IS NOT NULL($4), /(SUM($8) OVER (PARTITION BY $4 ORDER BY $17 ROWS UNBOUNDED PRECEDING), CAST(COUNT($8) OVER (PARTITION BY $4 ORDER BY $17 ROWS UNBOUNDED PRECEDING)):DOUBLE NOT NULL), null:DOUBLE)]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..14=[{inputs}], expr#15=[CAST($t14):DOUBLE NOT NULL], expr#16=[/($t13, $t15)], expr#17=[null:DOUBLE], expr#18=[CASE($t12, $t16, $t17)], proj#0..10=[{exprs}], avg_age=[$t18]) EnumerableLimit(fetch=[10000]) EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableWindow(window#0=[window(partition {4} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($8), COUNT($8)])]) + EnumerableWindow(window#0=[window(partition {4} order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($8), COUNT($8)])]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[IS NOT NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $12=[$t18]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml index 5664cc6aa8..b1eb418bbb 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset.yaml @@ -4,13 +4,13 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$21]) LogicalSort(sort0=[$17], dir0=[ASC]) LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17, 20}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($0)]) LogicalProject(age=[$8]) LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | @@ -19,7 +19,7 @@ calcite: EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) EnumerableCalc(expr#0..16=[{inputs}], expr#17=[0], expr#18=[COALESCE($t16, $t17)], expr#19=[+($t15, $t18)], proj#0..11=[{exprs}], __seg_id__=[$t19], $f16=[$t14]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $15 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(order by [11] rows between UNBOUNDED PRECEDING and $15 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], expr#26=[IS NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25], $14=[$t26]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) @@ -29,12 +29,12 @@ calcite: EnumerableHashJoin(condition=[AND(=($2, $7), <($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) EnumerableAggregate(group=[{0, 1, 2, 3}]) EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..1=[{exprs}], __seg_id__=[$t9], $f16=[$t4]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(order by [1] rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], expr#26=[IS NULL($t4)], gender=[$t4], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25], $4=[$t26]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(order by [2] rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], age=[$t8], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml index 40fb408700..959d3d5acd 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_reset_null_bucket.yaml @@ -4,13 +4,13 @@ calcite: LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$21]) LogicalSort(sort0=[$17], dir0=[ASC]) LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17, 20}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) LogicalAggregate(group=[{}], avg_age=[AVG($0)]) LogicalProject(age=[$8]) LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ORDER BY $17 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ORDER BY $17 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | @@ -19,7 +19,7 @@ calcite: EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) EnumerableSort(sort0=[$11], dir0=[ASC]) EnumerableCalc(expr#0..15=[{inputs}], expr#16=[0], expr#17=[COALESCE($t15, $t16)], expr#18=[+($t14, $t17)], proj#0..11=[{exprs}], __seg_id__=[$t18]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [11] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($12)])], window#1=[window(order by [11] rows between UNBOUNDED PRECEDING and $14 PRECEDING aggs [$SUM0($13)])], constants=[[1]]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], proj#0..10=[{exprs}], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) @@ -28,12 +28,12 @@ calcite: EnumerableHashJoin(condition=[AND(=($2, $6), =($0, $3), <($5, $1))], joinType=[inner]) EnumerableAggregate(group=[{0, 1, 2}]) EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[COALESCE($t5, $t6)], expr#8=[+($t4, $t7)], proj#0..1=[{exprs}], __seg_id__=[$t8]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($2)])], window#1=[window(order by [1] rows between UNBOUNDED PRECEDING and $4 PRECEDING aggs [$SUM0($3)])], constants=[[1]]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) EnumerableCalc(expr#0..6=[{inputs}], expr#7=[0], expr#8=[COALESCE($t6, $t7)], expr#9=[+($t5, $t8)], proj#0..2=[{exprs}], __seg_id__=[$t9]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [$SUM0($3)])], window#1=[window(order by [2] rows between UNBOUNDED PRECEDING and $5 PRECEDING aggs [$SUM0($4)])], constants=[[1]]) EnumerableCalc(expr#0..17=[{inputs}], expr#18=[34], expr#19=[>($t8, $t18)], expr#20=[1], expr#21=[0], expr#22=[CASE($t19, $t20, $t21)], expr#23=[25], expr#24=[<($t8, $t23)], expr#25=[CASE($t24, $t20, $t21)], gender=[$t4], age=[$t8], __stream_seq__=[$t17], __reset_before_flag__=[$t22], __reset_after_flag__=[$t25]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_trendline_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_trendline_sort_push.yaml index 2427a30e1a..3c87f8683f 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_trendline_sort_push.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_trendline_sort_push.yaml @@ -1,7 +1,7 @@ calcite: logical: | LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) - LogicalProject(ageTrend=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1), /(SUM($8) OVER (ROWS 1 PRECEDING), CAST(COUNT($8) OVER (ROWS 1 PRECEDING)):DOUBLE NOT NULL), null:NULL)]) + LogicalProject(ageTrend=[CASE(>(COUNT() OVER (ORDER BY $8 NULLS LAST ROWS 1 PRECEDING), 1), /(SUM($8) OVER (ORDER BY $8 NULLS LAST ROWS 1 PRECEDING), CAST(COUNT($8) OVER (ORDER BY $8 NULLS LAST ROWS 1 PRECEDING)):DOUBLE NOT NULL), null:NULL)]) LogicalFilter(condition=[IS NOT NULL($8)]) LogicalSort(sort0=[$8], dir0=[ASC]) LogicalSort(fetch=[5]) @@ -9,7 +9,7 @@ calcite: physical: | EnumerableLimit(fetch=[10000]) EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9]) - EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])], constants=[[1]]) + EnumerableWindow(window#0=[window(order by [0] rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])], constants=[[1]]) EnumerableCalc(expr#0..16=[{inputs}], expr#17=[IS NOT NULL($t8)], age=[$t8], $condition=[$t17]) EnumerableSort(sort0=[$8], dir0=[ASC]) EnumerableLimit(fetch=[5]) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java index 2e4b6a605d..f86b6ee562 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ppl.calcite; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -24,25 +25,19 @@ public void testStreamstatsBy() { String ppl = "source=EMP | streamstats max(SAL) by DEPTNO"; RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," - + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" - + " LogicalSort(sort0=[$8], dir0=[ASC])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[MAX($5) OVER" - + " (PARTITION BY $7 ROWS UNBOUNDED PRECEDING)])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + "LogicalSort(sort0=[$0], dir0=[ASC])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], max(SAL)=[MAX($5) OVER (PARTITION BY $7 ORDER BY" + + " $0 NULLS LAST ROWS UNBOUNDED PRECEDING)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, MAX(`SAL`)" - + " OVER (PARTITION BY `DEPTNO` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)" - + " `max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `t`\n" - + "ORDER BY `__stream_seq__` NULLS LAST"; + + " OVER (PARTITION BY `DEPTNO` ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN" + + " UNBOUNDED PRECEDING AND CURRENT ROW) `max(SAL)`\n" + + "FROM `scott`.`EMP`\n" + + "ORDER BY `EMPNO` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -51,44 +46,69 @@ public void testStreamstatsByNullBucket() { String ppl = "source=EMP | streamstats bucket_nullable=false max(SAL) by DEPTNO"; RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," - + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" - + " LogicalSort(sort0=[$8], dir0=[ASC])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[CASE(IS NOT" - + " NULL($7), MAX($5) OVER (PARTITION BY $7 ROWS UNBOUNDED PRECEDING), null:DECIMAL(7," + "LogicalSort(sort0=[$0], dir0=[ASC])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], max(SAL)=[CASE(IS NOT NULL($7), MAX($5) OVER" + + " (PARTITION BY $7 ORDER BY $0 NULLS LAST ROWS UNBOUNDED PRECEDING), null:DECIMAL(7," + " 2))])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, CASE WHEN" - + " `DEPTNO` IS NOT NULL THEN MAX(`SAL`) OVER (PARTITION BY `DEPTNO` ROWS BETWEEN" - + " UNBOUNDED PRECEDING AND CURRENT ROW) ELSE NULL END `max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `t`\n" - + "ORDER BY `__stream_seq__` NULLS LAST"; + + " `DEPTNO` IS NOT NULL THEN MAX(`SAL`) OVER (PARTITION BY `DEPTNO` ORDER BY `EMPNO`" + + " NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) ELSE" + + " NULL END `max(SAL)`\n" + + "FROM `scott`.`EMP`\n" + + "ORDER BY `EMPNO` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testStreamstatsByAfterSortOrdersWindowByCollation() { + String ppl = "source=EMP | sort - SAL | streamstats max(SAL) by DEPTNO"; + RelNode root = getRelNode(ppl); + + String plan = root.explain(); + assertTrue(plan.contains("__stream_seq__=[ROW_NUMBER() OVER ()]")); + assertTrue(plan.contains("MAX($5) OVER (PARTITION BY $7 ORDER BY $8")); + assertTrue(plan.contains("LogicalSort(sort0=[$5], dir0=[DESC-nulls-last])")); + assertTrue(plan.contains("LogicalSort(sort0=[$8], dir0=[ASC])")); + assertEquals(2, countOccurrences(plan, "LogicalSort(")); + } + + @Test + public void testStreamstatsAfterSortOrdersWindowByCollation() { + String ppl = "source=EMP | sort - SAL | streamstats max(SAL)"; + RelNode root = getRelNode(ppl); + + String plan = root.explain(); + assertFalse(plan.contains("__stream_seq__")); + assertTrue(plan.contains("max(SAL)=[MAX($5) OVER (ORDER BY $5 DESC NULLS LAST")); + } + + private static int countOccurrences(String text, String target) { + return text.split(java.util.regex.Pattern.quote(target), -1).length - 1; + } + @Test public void testStreamstatsCurrent() { String ppl = "source=EMP | streamstats current = false max(SAL)"; RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," - + " COMM=[$6], DEPTNO=[$7], max(SAL)=[MAX($5) OVER (ROWS BETWEEN UNBOUNDED PRECEDING" - + " AND 1 PRECEDING)])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + "LogicalSort(sort0=[$0], dir0=[ASC])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], max(SAL)=[MAX($5) OVER (ORDER BY $0 NULLS LAST" + + " ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, MAX(`SAL`)" - + " OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) `max(SAL)`\n" - + "FROM `scott`.`EMP`"; + + " OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)" + + " `max(SAL)`\n" + + "FROM `scott`.`EMP`\n" + + "ORDER BY `EMPNO` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -105,10 +125,11 @@ public void testStreamstatsWindow() { + " LogicalJoin(condition=[AND(>=($9, -($8, 4)), <=($9, $8), IS NOT DISTINCT" + " FROM($7, $10))], joinType=[left])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER (ORDER BY $0" + + " NULLS LAST)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_DEPTNO__=[$7]," - + " __r_SAL__=[$5])\n" + + " LogicalProject(__r_seq__=[ROW_NUMBER() OVER (ORDER BY $0 NULLS LAST)]," + + " __r_DEPTNO__=[$7], __r_SAL__=[$5])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); } @@ -118,24 +139,19 @@ public void testStreamstatsGlobal() { String ppl = "source=EMP | streamstats window = 5 global= false max(SAL) by DEPTNO"; RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," - + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" - + " LogicalSort(sort0=[$8], dir0=[ASC])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[MAX($5) OVER" - + " (PARTITION BY $7 ROWS 4 PRECEDING)])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + "LogicalSort(sort0=[$0], dir0=[ASC])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], max(SAL)=[MAX($5) OVER (PARTITION BY $7 ORDER BY" + + " $0 NULLS LAST ROWS 4 PRECEDING)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, MAX(`SAL`)" - + " OVER (PARTITION BY `DEPTNO` ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) `max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `t`\n" - + "ORDER BY `__stream_seq__` NULLS LAST"; + + " OVER (PARTITION BY `DEPTNO` ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 4" + + " PRECEDING AND CURRENT ROW) `max(SAL)`\n" + + "FROM `scott`.`EMP`\n" + + "ORDER BY `EMPNO` NULLS LAST"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -152,12 +168,13 @@ public void testStreamstatsReset() { + " 11}])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], __reset_before_flag__=[$9]," - + " __reset_after_flag__=[$10], __seg_id__=[+(SUM($9) OVER (ROWS UNBOUNDED PRECEDING)," - + " COALESCE(SUM($10) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])\n" + + " __reset_after_flag__=[$10], __seg_id__=[+(SUM($9) OVER (ORDER BY $8 ROWS" + + " UNBOUNDED PRECEDING), COALESCE(SUM($10) OVER (ORDER BY $8 ROWS BETWEEN UNBOUNDED" + + " PRECEDING AND 1 PRECEDING), 0))])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()]," - + " __reset_before_flag__=[CASE(>($5, 100), 1, 0)], __reset_after_flag__=[CASE(<($5," - + " 50), 1, 0)])\n" + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER (ORDER BY $0" + + " NULLS LAST)], __reset_before_flag__=[CASE(>($5, 100), 1, 0)]," + + " __reset_after_flag__=[CASE(<($5, 50), 1, 0)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalAggregate(group=[{}], avg(SAL)=[AVG($0)])\n" + " LogicalProject(SAL=[$5])\n" @@ -167,11 +184,11 @@ public void testStreamstatsReset() { + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8]," + " __reset_before_flag__=[$9], __reset_after_flag__=[$10], __seg_id__=[+(SUM($9) OVER" - + " (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($10) OVER (ROWS BETWEEN UNBOUNDED" - + " PRECEDING AND 1 PRECEDING), 0))])\n" + + " (ORDER BY $8 ROWS UNBOUNDED PRECEDING), COALESCE(SUM($10) OVER (ORDER BY $8 ROWS" + + " BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER" - + " ()], __reset_before_flag__=[CASE(>($5, 100), 1, 0)]," + + " (ORDER BY $0 NULLS LAST)], __reset_before_flag__=[CASE(>($5, 100), 1, 0)]," + " __reset_after_flag__=[CASE(<($5, 50), 1, 0)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); @@ -181,22 +198,26 @@ public void testStreamstatsReset() { + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t5`.`avg(SAL)`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + " `__stream_seq__`, `__reset_before_flag__`, `__reset_after_flag__`," - + " (SUM(`__reset_before_flag__`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT" - + " ROW)) + COALESCE(SUM(`__reset_after_flag__`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING" - + " AND 1 PRECEDING), 0) `__seg_id__`\n" + + " (SUM(`__reset_before_flag__`) OVER (ORDER BY `__stream_seq__` NULLS LAST ROWS" + + " BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) +" + + " COALESCE(SUM(`__reset_after_flag__`) OVER (ORDER BY `__stream_seq__` NULLS LAST" + + " ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0) `__seg_id__`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`, CASE WHEN `SAL` > 100 THEN 1 ELSE 0 END" + + " ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST) `__stream_seq__`, CASE WHEN `SAL`" + + " > 100 THEN 1 ELSE 0 END" + " `__reset_before_flag__`, CASE WHEN `SAL` < 50 THEN 1 ELSE 0 END" + " `__reset_after_flag__`\n" + "FROM `scott`.`EMP`) `t`) `$cor0`,\n" + "LATERAL (SELECT AVG(`SAL`) `avg(SAL)`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," + " `__stream_seq__`, `__reset_before_flag__`, `__reset_after_flag__`," - + " (SUM(`__reset_before_flag__`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT" - + " ROW)) + COALESCE(SUM(`__reset_after_flag__`) OVER (ROWS BETWEEN UNBOUNDED PRECEDING" - + " AND 1 PRECEDING), 0) `__seg_id__`\n" + + " (SUM(`__reset_before_flag__`) OVER (ORDER BY `__stream_seq__` NULLS LAST ROWS" + + " BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) +" + + " COALESCE(SUM(`__reset_after_flag__`) OVER (ORDER BY `__stream_seq__` NULLS LAST" + + " ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0) `__seg_id__`\n" + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`, CASE WHEN `SAL` > 100 THEN 1 ELSE 0 END" + + " ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST) `__stream_seq__`, CASE WHEN `SAL`" + + " > 100 THEN 1 ELSE 0 END" + " `__reset_before_flag__`, CASE WHEN `SAL` < 50 THEN 1 ELSE 0 END" + " `__reset_after_flag__`\n" + "FROM `scott`.`EMP`) `t1`) `t2`\n" @@ -227,27 +248,21 @@ public void testMultipleStreamstatsWithWindow() { public void testStreamstatsWithReverse() { String ppl = "source=EMP | streamstats max(SAL) by DEPTNO | reverse"; RelNode root = getRelNode(ppl); - // Reverse replaces the __stream_seq__ sort in-place via backtracking + // Reverse replaces the input collation sort in-place via backtracking String expectedLogical = - "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," - + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" - + " LogicalSort(sort0=[$8], dir0=[DESC])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[$8], max(SAL)=[MAX($5) OVER" - + " (PARTITION BY $7 ROWS UNBOUNDED PRECEDING)])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + "LogicalSort(sort0=[$0], dir0=[DESC])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], max(SAL)=[MAX($5) OVER (PARTITION BY $7 ORDER BY" + + " $0 NULLS LAST ROWS UNBOUNDED PRECEDING)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " MAX(`SAL`) OVER (PARTITION BY `DEPTNO` ROWS BETWEEN UNBOUNDED" - + " PRECEDING AND CURRENT ROW) `max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `t`\n" - + "ORDER BY `__stream_seq__` DESC NULLS FIRST"; + + " MAX(`SAL`) OVER (PARTITION BY `DEPTNO` ORDER BY `EMPNO` NULLS LAST ROWS" + + " BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) `max(SAL)`\n" + + "FROM `scott`.`EMP`\n" + + "ORDER BY `EMPNO` DESC NULLS FIRST"; verifyPPLToSparkSQL(root, expectedSparkSql); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTrendlineTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTrendlineTest.java index 3c23af4b7a..bf7ba61ecc 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTrendlineTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTrendlineTest.java @@ -5,6 +5,8 @@ package org.opensearch.sql.ppl.calcite; +import static org.junit.Assert.assertTrue; + import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert.SchemaSpec; import org.junit.Test; @@ -20,18 +22,20 @@ public void testTrendlineSma() { RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(SAL=[$5], sal_trend=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1), /(SUM($5)" - + " OVER (ROWS 1 PRECEDING), CAST(COUNT($5) OVER (ROWS 1 PRECEDING)):DOUBLE NOT NULL)," + "LogicalProject(SAL=[$5], sal_trend=[CASE(>(COUNT() OVER (ORDER BY $0 NULLS LAST ROWS 1" + + " PRECEDING), 1), /(SUM($5) OVER (ORDER BY $0 NULLS LAST ROWS 1 PRECEDING)," + + " CAST(COUNT($5) OVER (ORDER BY $0 NULLS LAST ROWS 1 PRECEDING)):DOUBLE NOT NULL)," + " null:NULL)])\n" + " LogicalFilter(condition=[IS NOT NULL($5)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `SAL`, CASE WHEN (COUNT(*) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) > 1" - + " THEN (SUM(`SAL`) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) /" - + " CAST(COUNT(`SAL`) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS DOUBLE) ELSE" - + " NULL END `sal_trend`\n" + "SELECT `SAL`, CASE WHEN (COUNT(*) OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 1" + + " PRECEDING AND CURRENT ROW)) > 1 THEN (SUM(`SAL`) OVER (ORDER BY `EMPNO` NULLS LAST" + + " ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) / CAST(COUNT(`SAL`) OVER (ORDER BY" + + " `EMPNO` NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS DOUBLE) ELSE NULL" + + " END `sal_trend`\n" + "FROM `scott`.`EMP`\n" + "WHERE `SAL` IS NOT NULL"; verifyPPLToSparkSQL(root, expectedSparkSql); @@ -43,25 +47,38 @@ public void testTrendlineWma() { RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(SAL=[$5], SAL_trendline=[CASE(>(COUNT() OVER (ROWS 2 PRECEDING), 2)," - + " /(+(+(CAST(NTH_VALUE($5, 1) OVER (ROWS 2 PRECEDING)):DECIMAL(18, 2)," - + " *(NTH_VALUE($5, 2) OVER (ROWS 2 PRECEDING), 2)), *(NTH_VALUE($5, 3) OVER (ROWS 2" - + " PRECEDING), 3)), 6.0E0:DOUBLE), null:NULL)])\n" + "LogicalProject(SAL=[$5], SAL_trendline=[CASE(>(COUNT() OVER (ORDER BY $0 NULLS LAST ROWS 2" + + " PRECEDING), 2), /(+(+(CAST(NTH_VALUE($5, 1) OVER (ORDER BY $0 NULLS LAST ROWS 2" + + " PRECEDING)):DECIMAL(18, 2), *(NTH_VALUE($5, 2) OVER (ORDER BY $0 NULLS LAST ROWS 2" + + " PRECEDING), 2)), *(NTH_VALUE($5, 3) OVER (ORDER BY $0 NULLS LAST ROWS 2 PRECEDING)," + + " 3)), 6.0E0:DOUBLE), null:NULL)])\n" + " LogicalFilter(condition=[IS NOT NULL($5)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `SAL`, CASE WHEN (COUNT(*) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)) > 2" - + " THEN (CAST(NTH_VALUE(`SAL`, 1) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS" - + " DECIMAL(18, 2)) + (NTH_VALUE(`SAL`, 2) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT" - + " ROW)) * 2 + (NTH_VALUE(`SAL`, 3) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)) *" - + " 3) / 6.0E0 ELSE NULL END `SAL_trendline`\n" + "SELECT `SAL`, CASE WHEN (COUNT(*) OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 2" + + " PRECEDING AND CURRENT ROW)) > 2 THEN (CAST(NTH_VALUE(`SAL`, 1) OVER (ORDER BY" + + " `EMPNO` NULLS LAST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS DECIMAL(18, 2)) +" + + " (NTH_VALUE(`SAL`, 2) OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 2 PRECEDING AND" + + " CURRENT ROW)) * 2 + (NTH_VALUE(`SAL`, 3) OVER (ORDER BY `EMPNO` NULLS LAST ROWS" + + " BETWEEN 2 PRECEDING AND CURRENT ROW)) * 3) / 6.0E0 ELSE NULL END `SAL_trendline`\n" + "FROM `scott`.`EMP`\n" + "WHERE `SAL` IS NOT NULL"; verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testTrendlineWithSortOrdersWindowFrame() { + String ppl = "source=EMP | trendline sort - SAL sma(2, SAL) | fields SAL, SAL_trendline"; + RelNode root = getRelNode(ppl); + + String plan = root.explain(); + assertTrue(plan.contains("LogicalSort(sort0=[$5], dir0=[DESC])")); + assertTrue(plan.contains("COUNT() OVER (ORDER BY $5 DESC")); + assertTrue(plan.contains("SUM($5) OVER (ORDER BY $5 DESC")); + } + @Test public void testTrendlineMultipleFields() { String ppl = @@ -70,24 +87,28 @@ public void testTrendlineMultipleFields() { RelNode root = getRelNode(ppl); String expectedLogical = - "LogicalProject(SAL_trendline=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1)," - + " /(+(CAST(NTH_VALUE($5, 1) OVER (ROWS 1 PRECEDING)):DECIMAL(18, 2), *(NTH_VALUE($5," - + " 2) OVER (ROWS 1 PRECEDING), 2)), 3.0E0:DOUBLE), null:NULL)]," - + " DEPTNO_trendline=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1), /(SUM($7) OVER (ROWS" - + " 1 PRECEDING), CAST(COUNT($7) OVER (ROWS 1 PRECEDING)):DOUBLE NOT NULL)," + "LogicalProject(SAL_trendline=[CASE(>(COUNT() OVER (ORDER BY $0 NULLS LAST ROWS 1" + + " PRECEDING), 1), /(+(CAST(NTH_VALUE($5, 1) OVER (ORDER BY $0 NULLS LAST ROWS 1" + + " PRECEDING)):DECIMAL(18, 2), *(NTH_VALUE($5, 2) OVER (ORDER BY $0 NULLS LAST ROWS 1" + + " PRECEDING), 2)), 3.0E0:DOUBLE), null:NULL)], DEPTNO_trendline=[CASE(>(COUNT() OVER" + + " (ORDER BY $0 NULLS LAST ROWS 1 PRECEDING), 1), /(SUM($7) OVER (ORDER BY $0 NULLS" + + " LAST ROWS 1 PRECEDING), CAST(COUNT($7) OVER (ORDER BY $0 NULLS LAST ROWS 1" + + " PRECEDING)):DOUBLE NOT NULL)," + " null:NULL)])\n" + " LogicalFilter(condition=[AND(IS NOT NULL($5), IS NOT NULL($7))])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT CASE WHEN (COUNT(*) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) > 1 THEN" - + " (CAST(NTH_VALUE(`SAL`, 1) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS" - + " DECIMAL(18, 2)) + (NTH_VALUE(`SAL`, 2) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT" - + " ROW)) * 2) / 3.0E0 ELSE NULL END `SAL_trendline`, CASE WHEN (COUNT(*) OVER (ROWS" - + " BETWEEN 1 PRECEDING AND CURRENT ROW)) > 1 THEN (SUM(`DEPTNO`) OVER (ROWS BETWEEN 1" - + " PRECEDING AND CURRENT ROW)) / CAST(COUNT(`DEPTNO`) OVER (ROWS BETWEEN 1 PRECEDING" - + " AND CURRENT ROW) AS DOUBLE) ELSE NULL END `DEPTNO_trendline`\n" + "SELECT CASE WHEN (COUNT(*) OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 1 PRECEDING AND" + + " CURRENT ROW)) > 1 THEN (CAST(NTH_VALUE(`SAL`, 1) OVER (ORDER BY `EMPNO` NULLS LAST" + + " ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS DECIMAL(18, 2)) + (NTH_VALUE(`SAL`, 2)" + + " OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) * 2) /" + + " 3.0E0 ELSE NULL END `SAL_trendline`, CASE WHEN (COUNT(*) OVER (ORDER BY `EMPNO`" + + " NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) > 1 THEN (SUM(`DEPTNO`) OVER" + + " (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) /" + + " CAST(COUNT(`DEPTNO`) OVER (ORDER BY `EMPNO` NULLS LAST ROWS BETWEEN 1 PRECEDING AND" + + " CURRENT ROW) AS DOUBLE) ELSE NULL END `DEPTNO_trendline`\n" + "FROM `scott`.`EMP`\n" + "WHERE `SAL` IS NOT NULL AND `DEPTNO` IS NOT NULL"; verifyPPLToSparkSQL(root, expectedSparkSql);