Preserve stream window arrival order#5588
Conversation
Make streamstats and trendline encode pipeline ordering explicitly in their Calcite RelNodes so DataFusion consumes order-sensitive window frames deterministically. Keep the change scoped to the command construction layer and add RelNode coverage for sorted streamstats and trendline windows. Signed-off-by: Songkan Tang <songkant@amazon.com>
…tats-trendline-ordering
The RelNode ordering fix makes sort followed by streamstats deterministic on the analytics-engine route, so remove the temporary STREAMSTATS_SORT_NOT_HONORED capability gate and Gradle exclude. Signed-off-by: Songkan Tang <songkant@amazon.com>
Avoid materializing __stream_seq__ when the input subtree already advertises a collation. In that case streamstats windows order directly by the input collation and reserve __stream_seq__ for grouped windows without an order contract. Signed-off-by: Songkan Tang <songkant@amazon.com>
Use advertised input collation directly for ordered stream windows, keep __stream_seq__ only for grouped fallback without an order contract, and share bucket-nullability projection logic across both branches. Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
Signed-off-by: Songkan Tang <songkant@amazon.com>
PR Reviewer Guide 🔍(Review updated until commit 6c12356)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 6c12356 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 2b3f739
Suggestions up to commit 22c5154
|
Strip explicit input sorts before grouped streamstats windows and restore them afterward so Calcite does not remove the post-window sort as redundant. This keeps sorted streamstats output in pipeline order while still declaring window order for deterministic window evaluation. Share the collation-to-order and restore-order helpers with dedup, and add RelNode coverage for the stripped input sort shape. Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 2b3f739 |
Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 6c12356 |
What changed
This PR makes
streamstatsandtrendlinedeclare explicit window ordering when the Calcite RelNode plan is consumed by engines that do not preserve arrival order through window execution.For grouped
streamstatswith an explicit upstreamsort, the plan now preserves the inputSort, materializes__stream_seq__after that sort, and uses__stream_seq__for both the partitioned window order and the final output order. This represents SPL semantics as “sort first, then compute streamstats in arrival order”, instead of rewriting it as partition-localORDER BY <sort key>.Why
DataFusion/Enumerable window execution can repartition rows for grouped windows and does not guarantee that the input arrival order survives unless the window frame declares an order.
The earlier workaround that directly used inherited collation in grouped windows could produce a plan like:
Ungrouped sorted streamstats does not add stream_seq; it continues to use the inherited collation directly in the window order.
Tradeoff
This is a correctness-focused short-term fix for engines like DataFusion. Some engines can process arrival-order streamstats without explicitly sorting inside the window partition, so declaring order keys may add extra work. The long-term fix should be a true streaming window/streamstats operator that can preserve arrival order without encoding it through ROW_NUMBER.
Validation