Skip to content

Preserve stream window arrival order#5588

Open
songkant-aws wants to merge 10 commits into
opensearch-project:mainfrom
songkant-aws:relnode-streamstats-trendline-ordering
Open

Preserve stream window arrival order#5588
songkant-aws wants to merge 10 commits into
opensearch-project:mainfrom
songkant-aws:relnode-streamstats-trendline-ordering

Conversation

@songkant-aws

@songkant-aws songkant-aws commented Jun 26, 2026

Copy link
Copy Markdown
Collaborator

What changed

This PR makes streamstats and trendline declare explicit window ordering when the Calcite RelNode plan is consumed by engines that do not preserve arrival order through window execution.

For grouped streamstats with an explicit upstream sort, the plan now preserves the input Sort, materializes __stream_seq__ after that sort, and uses __stream_seq__ for both the partitioned window order and the final output order. This represents SPL semantics as “sort first, then compute streamstats in arrival order”, instead of rewriting it as partition-local ORDER BY <sort key>.

Why

DataFusion/Enumerable window execution can repartition rows for grouped windows and does not guarantee that the input arrival order survives unless the window frame declares an order.

The earlier workaround that directly used inherited collation in grouped windows could produce a plan like:

PARTITION BY service ORDER BY service
For sort service | streamstats ... by service, that order key is constant inside each partition and does not encode arrival order. The outer sort could also be optimized away as redundant, which caused the streamstats doctest output to diverge from the documented sorted order.
Plan shape
For grouped sorted streamstats, the intended shape is now:
input Sort(...)
  -> ROW_NUMBER() OVER () AS __stream_seq__
  -> window aggregate PARTITION BY ... ORDER BY __stream_seq__
  -> final Sort(__stream_seq__)
  -> project out __stream_seq__

Ungrouped sorted streamstats does not add stream_seq; it continues to use the inherited collation directly in the window order.

Tradeoff

This is a correctness-focused short-term fix for engines like DataFusion. Some engines can process arrival-order streamstats without explicitly sorting inside the window partition, so declaring order keys may add extra work. The long-term fix should be a true streaming window/streamstats operator that can preserve arrival order without encoding it through ROW_NUMBER.

Validation

./gradlew :ppl:test --tests org.opensearch.sql.ppl.calcite.CalcitePPLStreamstatsTest --tests org.opensearch.sql.ppl.calcite.CalcitePPLTrendlineTest
./gradlew :integ-test:integTest --tests org.opensearch.sql.calcite.remote.CalciteExplainIT
./gradlew :integ-test:integTest --tests org.opensearch.sql.calcite.remote.CalcitePPLTrendlineIT
./gradlew :doctest:doctest -Pdocs=user/ppl/cmd/streamstats.md -DignorePrometheus -Pdebug=true
./gradlew :core:spotlessJavaCheck :ppl:spotlessJavaCheck

Make streamstats and trendline encode pipeline ordering explicitly in their Calcite RelNodes so DataFusion consumes order-sensitive window frames deterministically.

Keep the change scoped to the command construction layer and add RelNode coverage for sorted streamstats and trendline windows.

Signed-off-by: Songkan Tang <songkant@amazon.com>
The RelNode ordering fix makes sort followed by streamstats deterministic on the analytics-engine route, so remove the temporary STREAMSTATS_SORT_NOT_HONORED capability gate and Gradle exclude.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Avoid materializing __stream_seq__ when the input subtree already advertises a collation. In that case streamstats windows order directly by the input collation and reserve __stream_seq__ for grouped windows without an order contract.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Use advertised input collation directly for ordered stream windows, keep __stream_seq__ only for grouped fallback without an order contract, and share bucket-nullability projection logic across both branches.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions

github-actions Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 6c12356)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ No major issues detected

@songkant-aws songkant-aws self-assigned this Jun 26, 2026
@github-actions

github-actions Bot commented Jun 26, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 6c12356

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Use field name instead of index

The seqColIndex calculation assumes the sequence column is always the last field,
but this may not hold if the input already has trailing columns or if other
transformations have occurred. Consider using the column name
ROW_NUMBER_COLUMN_FOR_STREAMSTATS to retrieve the field index instead of relying on
positional assumptions.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2322-2323]

 if (useStreamSeq) {
   RexNode streamSeq =
       context
           .relBuilder
           .aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
           .over()
           .rowsTo(RexWindowBounds.CURRENT_ROW)
           .as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
   context.relBuilder.projectPlus(streamSeq);
-  int seqColIndex = context.relBuilder.peek().getRowType().getFieldCount() - 1;
-  windowOrderKeys = List.of(context.relBuilder.field(seqColIndex));
+  windowOrderKeys = List.of(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
 }
Suggestion importance[1-10]: 7

__

Why: This is a valid improvement that makes the code more robust by using the field name ROW_NUMBER_COLUMN_FOR_STREAMSTATS instead of relying on positional assumptions. This reduces the risk of errors if the schema changes.

Medium
Verify recursion order in shuttle visitor

The RexShuttle recursively visits nested expressions, but the logic only checks
window.orderKeys.isEmpty() at the top level. If a nested RexOver already has order
keys, it returns early without recursing into its operands. This may leave deeply
nested window expressions without the required order keys. Ensure recursion
completes before checking the condition.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2371-2394]

 return rex.accept(
     new RexShuttle() {
       @Override
       public RexNode visitOver(RexOver over) {
         RexOver recursed = (RexOver) super.visitOver(over);
         RexWindow window = recursed.getWindow();
         if (!window.orderKeys.isEmpty()) {
           return recursed;
         }
-        return context.rexBuilder.makeOver(
-            recursed.getType(),
-            recursed.getAggOperator(),
-            recursed.getOperands(),
-            window.partitionKeys,
-            orderCollations,
-            window.getLowerBound(),
-            window.getUpperBound(),
-            window.isRows(),
-            true,
-            false,
-            recursed.isDistinct(),
-            recursed.ignoreNulls());
+        ...
       }
     });
Suggestion importance[1-10]: 4

__

Why: The concern about recursion is noted, but the current implementation calls super.visitOver(over) first, which does recurse into operands before checking the condition. The improved code snippet is incomplete and doesn't demonstrate a clear fix. The suggestion is somewhat valid but lacks concrete evidence of an issue.

Low
Possible issue
Fix conditional logic for sequence usage

The logic for useStreamSeq may produce incorrect behavior when windowOrderKeys is
non-empty but explicitInputCollation is null. In this case, useStreamSeq becomes
false, skipping sequence materialization, yet the code later attempts to use
windowOrderKeys which may reference fields that don't exist without the sequence
column. Verify that the condition correctly handles all combinations of
windowOrderKeys and explicitInputCollation.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2308-2310]

 List<RexNode> windowOrderKeys = deriveCollationOrderKeys(context);
-boolean useStreamSeq =
-    hasGroup && (windowOrderKeys.isEmpty() || explicitInputCollation != null);
+boolean useStreamSeq = hasGroup && windowOrderKeys.isEmpty();
+if (hasGroup && !windowOrderKeys.isEmpty() && explicitInputCollation != null) {
+  useStreamSeq = true;
+}
Suggestion importance[1-10]: 3

__

Why: The suggestion raises a valid concern about the logic, but the improved code doesn't accurately reflect the original intent. The current condition hasGroup && (windowOrderKeys.isEmpty() || explicitInputCollation != null) is designed to handle multiple scenarios. The suggested fix oversimplifies and may break existing functionality.

Low

Previous suggestions

Suggestions up to commit 2b3f739
CategorySuggestion                                                                                                                                    Impact
General
Ensure nested window expressions are visited

The RexShuttle recursively visits nested expressions but only modifies the top-level
RexOver. If operands contain nested window functions, those won't receive the order
keys. Consider whether nested windows should also be updated or if this is
intentional.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2373-2397]

 return rex.accept(
     new RexShuttle() {
       @Override
       public RexNode visitOver(RexOver over) {
+        RexWindow window = over.getWindow();
+        if (!window.orderKeys.isEmpty()) {
+          return super.visitOver(over);
+        }
         RexOver recursed = (RexOver) super.visitOver(over);
-        RexWindow window = recursed.getWindow();
-        if (!window.orderKeys.isEmpty()) {
-          return recursed;
-        }
         return context.rexBuilder.makeOver(
             recursed.getType(),
             recursed.getAggOperator(),
             recursed.getOperands(),
             window.partitionKeys,
             orderCollations,
             window.getLowerBound(),
             window.getUpperBound(),
             window.isRows(),
             true,
             false,
             recursed.isDistinct(),
             recursed.ignoreNulls());
       }
     });
Suggestion importance[1-10]: 4

__

Why: The suggestion raises a valid concern about whether nested window functions should also receive order keys. The proposed change to call super.visitOver(over) earlier when orderKeys is not empty is a minor improvement in code structure, though the current implementation using RexShuttle should handle recursion correctly. This is more of a code clarity improvement than a bug fix.

Low
Add explicit variable extraction for clarity

The recursive calls to toRexFieldCollation may cause a ClassCastException if the
operand is not a RexCall. Verify that getOperands().getFirst() always returns a
RexCall or add type checking before casting.

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java [319-339]

 private static RexFieldCollation toRexFieldCollation(
     RexNode node,
     RelFieldCollation.Direction direction,
     RelFieldCollation.NullDirection nullDirection) {
   switch (node.getKind()) {
     case DESCENDING:
+      RexNode descNode = ((RexCall) node).getOperands().getFirst();
       return toRexFieldCollation(
-          ((RexCall) node).getOperands().getFirst(),
+          descNode,
           RelFieldCollation.Direction.DESCENDING,
           nullDirection);
     case NULLS_FIRST:
+      RexNode firstNode = ((RexCall) node).getOperands().getFirst();
       return toRexFieldCollation(
-          ((RexCall) node).getOperands().getFirst(),
+          firstNode,
           direction,
           RelFieldCollation.NullDirection.FIRST);
     case NULLS_LAST:
+      RexNode lastNode = ((RexCall) node).getOperands().getFirst();
       return toRexFieldCollation(
-          ((RexCall) node).getOperands().getFirst(),
+          lastNode,
           direction,
           RelFieldCollation.NullDirection.LAST);
Suggestion importance[1-10]: 2

__

Why: The suggestion proposes extracting ((RexCall) node).getOperands().getFirst() into explicit variables for clarity. While this could improve readability slightly, it doesn't address a real issue since the code already handles the casting correctly within the switch cases. The concern about ClassCastException is unfounded as the node.getKind() check ensures the node is a RexCall for these specific cases.

Low
Possible issue
Fix window ordering logic condition

The logic for useStreamSeq may produce incorrect results when windowOrderKeys is
empty but hasGroup is true. If no collation exists and no order keys are derived,
the window should still be ordered by a sequence column to ensure deterministic
results, regardless of grouping.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2306-2310]

 List<RexNode> windowOrderKeys =
     strippedInputCollation == null
         ? deriveCollationOrderKeys(context)
         : PlanUtils.collationToOrderKeys(context.relBuilder, strippedInputCollation);
-boolean useStreamSeq = windowOrderKeys.isEmpty() && hasGroup;
+boolean useStreamSeq = windowOrderKeys.isEmpty();
Suggestion importance[1-10]: 3

__

Why: The suggestion questions whether useStreamSeq should depend on both windowOrderKeys.isEmpty() and hasGroup, proposing to remove the hasGroup condition. However, the PR code intentionally uses both conditions to determine when a stream sequence is needed. The logic appears correct for the use case where a sequence column is only needed when there are no order keys AND there is grouping. This is a minor optimization concern rather than a correctness issue.

Low
Suggestions up to commit 22c5154
CategorySuggestion                                                                                                                                    Impact
General
Validate operands before accessing elements

The recursive calls in toRexFieldCollation don't validate that getOperands() returns
a non-empty list before calling getFirst(). If a malformed RexCall node has zero
operands, this will throw an exception at runtime.

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java [319-351]

 private static RexFieldCollation toRexFieldCollation(
     RexNode node,
     RelFieldCollation.Direction direction,
     RelFieldCollation.NullDirection nullDirection) {
   switch (node.getKind()) {
     case DESCENDING:
+      List<RexNode> descOps = ((RexCall) node).getOperands();
+      if (descOps.isEmpty()) {
+        throw new IllegalArgumentException("DESCENDING node has no operands");
+      }
       return toRexFieldCollation(
-          ((RexCall) node).getOperands().getFirst(),
+          descOps.getFirst(),
           RelFieldCollation.Direction.DESCENDING,
           nullDirection);
     case NULLS_FIRST:
+      List<RexNode> firstOps = ((RexCall) node).getOperands();
+      if (firstOps.isEmpty()) {
+        throw new IllegalArgumentException("NULLS_FIRST node has no operands");
+      }
       return toRexFieldCollation(
-          ((RexCall) node).getOperands().getFirst(),
+          firstOps.getFirst(),
           direction,
           RelFieldCollation.NullDirection.FIRST);
     case NULLS_LAST:
+      List<RexNode> lastOps = ((RexCall) node).getOperands();
+      if (lastOps.isEmpty()) {
+        throw new IllegalArgumentException("NULLS_LAST node has no operands");
+      }
       return toRexFieldCollation(
-          ((RexCall) node).getOperands().getFirst(),
+          lastOps.getFirst(),
           direction,
           RelFieldCollation.NullDirection.LAST);
     default:
       ...
   }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion identifies a potential runtime exception if RexCall nodes have empty operands lists. While this is a valid defensive programming concern, Calcite's type system typically ensures well-formed nodes, making this scenario unlikely. The suggestion adds robustness but addresses an edge case rather than a critical bug.

Medium
Process nested window operands recursively

The addWindowOrder method modifies window expressions by adding order keys, but it
doesn't recursively process nested expressions within the window operands. If window
function operands contain nested RexOver nodes, they won't be transformed,
potentially causing inconsistent ordering semantics.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2362-2391]

 private RexNode addWindowOrder(RexNode rex, List<RexNode> orderKeys, CalcitePlanContext context) {
   if (orderKeys.isEmpty()) {
     return rex;
   }
   ImmutableList<RexFieldCollation> orderCollations = PlanUtils.toRexFieldCollations(orderKeys);
   return rex.accept(
       new RexShuttle() {
         @Override
         public RexNode visitOver(RexOver over) {
-          RexOver recursed = (RexOver) super.visitOver(over);
-          RexWindow window = recursed.getWindow();
+          List<RexNode> newOperands = visitList(over.getOperands(), this);
+          RexWindow window = over.getWindow();
           if (!window.orderKeys.isEmpty()) {
-            return recursed;
+            return over;
           }
           return context.rexBuilder.makeOver(
-              recursed.getType(),
-              recursed.getAggOperator(),
-              recursed.getOperands(),
+              over.getType(),
+              over.getAggOperator(),
+              newOperands,
               window.partitionKeys,
               orderCollations,
               window.getLowerBound(),
               window.getUpperBound(),
               window.isRows(),
               true,
               false,
-              recursed.isDistinct(),
-              recursed.ignoreNulls());
+              over.isDistinct(),
+              over.ignoreNulls());
         }
       });
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that nested RexOver nodes in operands might not be transformed. However, the current implementation using super.visitOver(over) should already handle recursion through the parent RexShuttle class. The suggestion's concern is valid but may not be a critical issue in practice, as the parent class typically handles operand traversal.

Low

Strip explicit input sorts before grouped streamstats windows and restore them afterward so Calcite does not remove the post-window sort as redundant. This keeps sorted streamstats output in pipeline order while still declaring window order for deterministic window evaluation.

Share the collation-to-order and restore-order helpers with dedup, and add RelNode coverage for the stripped input sort shape.

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2b3f739

@songkant-aws songkant-aws requested a review from ahkcs as a code owner June 26, 2026 07:03
Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6c12356

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant