Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldCollation;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.rex.RexWindow;
import org.apache.calcite.rex.RexWindowBounds;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlLibraryOperators;
Expand Down Expand Up @@ -2275,6 +2279,7 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.orderBy(deriveCollationOrderKeys(context))
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
context.relBuilder.projectPlus(streamSeq);
Expand All @@ -2291,12 +2296,21 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS});
}

// Default: first get rawExpr
List<RexNode> overExpressions =
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();

if (hasGroup) {
// only build sequence when there is by condition
// Short-term correctness workaround: streamstats/trendline are evaluated in arrival order, and
// some engines can preserve that order through window partitions without an explicit ORDER BY.
// DataFusion and Calcite EnumerableWindow do not currently provide that guarantee for
// partitioned windows, so we make grouped streamstats frames walk an explicit input sequence.
// When the input already has an explicit Sort, materialize the sequence after that Sort instead
// of stripping it; this preserves "sort, then streamstats" semantics including tie arrival
// order. This may add a small per-partition sort cost on engines that did not need it; the
// long-term fix is a real streaming window operator.
RelCollation explicitInputCollation = PlanUtils.findInputCollation(context.relBuilder.peek());
List<RexNode> windowOrderKeys = deriveCollationOrderKeys(context);
boolean useStreamSeq =
hasGroup && (windowOrderKeys.isEmpty() || explicitInputCollation != null);
if (useStreamSeq) {
// streamstats is order-sensitive. Materialize input order before any grouped window can
// repartition rows, then make each window frame walk that sequence explicitly.
RexNode streamSeq =
context
.relBuilder
Expand All @@ -2305,34 +2319,79 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
context.relBuilder.projectPlus(streamSeq);
int seqColIndex = context.relBuilder.peek().getRowType().getFieldCount() - 1;
windowOrderKeys = List.of(context.relBuilder.field(seqColIndex));
}

if (!node.isBucketNullable()) {
// construct groupNotNull predicate
List<RexNode> groupByList =
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
List<RexNode> notNullList =
PlanUtils.getSelectColumns(groupByList).stream()
.map(context.relBuilder::field)
.map(context.relBuilder::isNotNull)
.toList();
RexNode groupNotNull = context.relBuilder.and(notNullList);

// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
List<RexNode> wrappedOverExprs =
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
context.relBuilder.projectPlus(wrappedOverExprs);
} else {
context.relBuilder.projectPlus(overExpressions);
}
List<RexNode> finalWindowOrderKeys = windowOrderKeys;
List<RexNode> overExpressions =
node.getWindowFunctionList().stream()
.map(w -> rexVisitor.analyze(w, context))
.map(rex -> addWindowOrder(rex, finalWindowOrderKeys, context))
.toList();
projectStreamWindowExpressions(overExpressions, hasGroup, groupList, node, context);

// resort when there is by condition
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
if (!finalWindowOrderKeys.isEmpty()) {
context.relBuilder.sort(finalWindowOrderKeys);
}
if (useStreamSeq) {
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
}

return context.relBuilder.peek();
}

private void projectStreamWindowExpressions(
List<RexNode> overExpressions,
boolean hasGroup,
List<UnresolvedExpression> groupList,
StreamWindow node,
CalcitePlanContext context) {
if (hasGroup && !node.isBucketNullable()) {
List<RexNode> groupByList =
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
List<RexNode> notNullList =
PlanUtils.getSelectColumns(groupByList).stream()
.map(context.relBuilder::field)
.map(context.relBuilder::isNotNull)
.toList();
RexNode groupNotNull = context.relBuilder.and(notNullList);
context.relBuilder.projectPlus(
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context));
} else {
context.relBuilder.projectPlus(overExpressions);
}
}

return context.relBuilder.peek();
private RexNode addWindowOrder(RexNode rex, List<RexNode> orderKeys, CalcitePlanContext context) {
if (orderKeys.isEmpty()) {
return rex;
}
ImmutableList<RexFieldCollation> orderCollations = PlanUtils.toRexFieldCollations(orderKeys);
return rex.accept(
new RexShuttle() {
@Override
public RexNode visitOver(RexOver over) {
RexOver recursed = (RexOver) super.visitOver(over);
RexWindow window = recursed.getWindow();
if (!window.orderKeys.isEmpty()) {
return recursed;
}
return context.rexBuilder.makeOver(
recursed.getType(),
recursed.getAggOperator(),
recursed.getOperands(),
window.partitionKeys,
orderCollations,
window.getLowerBound(),
window.getUpperBound(),
window.isRows(),
true,
false,
recursed.isDistinct(),
recursed.ignoreNulls());
}
});
}

private List<RexNode> wrapWindowFunctionsWithGroupNotNull(
Expand Down Expand Up @@ -2648,6 +2707,7 @@ private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.orderBy(deriveCollationOrderKeys(context))
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
context.relBuilder.projectPlus(rowNum);
Expand Down Expand Up @@ -2683,6 +2743,7 @@ private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow
.aggregateCall(
SqlStdOperatorTable.SUM, context.relBuilder.field("__reset_before_flag__"))
.over()
.orderBy(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS))
.rowsTo(RexWindowBounds.CURRENT_ROW)
.toRex();
RexNode sumAfterPrev =
Expand All @@ -2691,6 +2752,7 @@ private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow
.aggregateCall(
SqlStdOperatorTable.SUM, context.relBuilder.field("__reset_after_flag__"))
.over()
.orderBy(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS))
.rowsBetween(
RexWindowBounds.UNBOUNDED_PRECEDING,
RexWindowBounds.preceding(context.relBuilder.literal(1)))
Expand Down Expand Up @@ -3928,6 +3990,14 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
}
});

// Short-term correctness workaround: streamstats/trendline are evaluated in arrival order, and
// some engines can preserve that order through window partitions without an explicit ORDER BY.
// DataFusion and Calcite EnumerableWindow do not currently provide that guarantee for every
// window frame, so we declare the inherited input order on the window frame. This may add a
// small per-partition sort cost on engines that did not need it; the long-term fix is a real
// streaming window operator.
List<RexNode> trendlineOrderKeys = deriveCollationOrderKeys(context);

List<RexNode> trendlineNodes = new ArrayList<>();
List<String> aliases = new ArrayList<>();
node.getComputations()
Expand All @@ -3949,7 +4019,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
null,
List.of(),
List.of(),
List.of(),
trendlineOrderKeys,
windowFrame);
// CASE WHEN count() over (ROWS (windowSize-1) PRECEDING) > windowSize - 1
RexNode whenConditionExpr =
Expand All @@ -3970,7 +4040,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
field,
List.of(),
List.of(),
List.of(),
trendlineOrderKeys,
windowFrame);
break;
case TrendlineType.WMA:
Expand All @@ -3980,6 +4050,7 @@ public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
field,
trendlineComputation.getNumberOfDataPoints(),
windowFrame,
trendlineOrderKeys,
context);
break;
default:
Expand Down Expand Up @@ -4007,6 +4078,7 @@ private RexNode buildWmaRexNode(
RexNode field,
Integer numberOfDataPoints,
WindowFrame windowFrame,
List<RexNode> orderKeys,
CalcitePlanContext context) {

// Divisor: 1 + 2 + 3 + ... + windowSize, aka (windowSize * (windowSize + 1) / 2)
Expand All @@ -4023,7 +4095,7 @@ private RexNode buildWmaRexNode(
field,
List.of(context.relBuilder.literal(i)),
List.of(),
List.of(),
orderKeys,
windowFrame);
divider =
context.relBuilder.call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.sql.calcite.plan.rule;

import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
import static org.opensearch.sql.calcite.utils.PlanUtils.collationToOrderKeys;
import static org.opensearch.sql.calcite.utils.PlanUtils.restoreInputOrder;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -187,42 +189,6 @@ public static void buildDedupNotNull(
restoreInputOrder(relBuilder, inputCollation);
}

/**
* Convert a RelCollation to a list of RexNode order keys using the RelBuilder's field references.
*/
private static List<RexNode> collationToOrderKeys(RelBuilder relBuilder, RelCollation collation) {
if (collation == null || collation.getFieldCollations().isEmpty()) {
return List.of();
}
List<RexNode> orderKeys = new ArrayList<>();
for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
RexNode fieldRef = relBuilder.field(fieldCollation.getFieldIndex());
if (fieldCollation.direction.isDescending()) {
fieldRef = relBuilder.desc(fieldRef);
}
if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
fieldRef = relBuilder.nullsLast(fieldRef);
} else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
fieldRef = relBuilder.nullsFirst(fieldRef);
}
orderKeys.add(fieldRef);
}
return orderKeys;
}

/**
* Re-apply a sort after dedup to restore the input order that may have been disrupted by the
* window operator. EnumerableWindow can re-partition data by the PARTITION BY key, destroying any
* upstream sort order. This explicit re-sort ensures the final output preserves the original
* order.
*/
private static void restoreInputOrder(RelBuilder relBuilder, RelCollation inputCollation) {
if (inputCollation != null && !inputCollation.getFieldCollations().isEmpty()) {
List<RexNode> sortKeys = collationToOrderKeys(relBuilder, inputCollation);
relBuilder.sort(sortKeys);
}
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends OpenSearchRuleConfig {
Expand Down
Loading
Loading