Skip to content

Commit cea1b65

Browse files
authored
[analytics-engine] Wire PPL eventstats / streamstats commands end-to-end (#21734)
* [analytics-engine] Allow PARTITION BY in window functions; add ROW_NUMBER Build directly on #21668's framework: cost gate on OpenSearchProject already forces SINGLETON input on any RexOver-bearing project. With all rows on the coordinator, WindowAggExec computes per-partition windows correctly regardless of whether partition keys span shards. HASH-shuffle parallel execution remains a future strict improvement, not a correctness prerequisite. Changes: - OpenSearchProjectRule.collectWindowFunctions: drop the partition-by rejection branch. ORDER BY was already unchecked (good: streamstats running window uses it). - WindowFunction enum: add ROW_NUMBER. PPL streamstats … by … emits a helper __row_number_for_streamstats__ column via ROW_NUMBER() OVER ROWS UNBOUNDED. Other ranking functions (RANK / DENSE_RANK) are not yet emitted by the PPL paths exercised by analytics-engine. - DataFusionAnalyticsBackendPlugin.windowCapabilities: add ROW_NUMBER to the declared set so the backend is viable for streamstats … by …. - WindowPlanShapeTests.testRexOverPartitionBy_rejected → renamed to testRexOverPartitionBy_2shard, asserting the cost-gate now produces the same SINGLETON-gather-then-project shape as the empty OVER case. Unblocks PPL eventstats … by <field> and the simpler streamstats forms (no reset / no global+window+by — those use a self-join lowering that doesn't flow through RexOver and aren't covered by this PR). Signed-off-by: Songkan Tang <songkant@amazon.com> * [QA] Port EventstatsCommandIT + StreamstatsCommandIT from sql repo One-to-one ports of CalcitePPLEventstatsIT and CalciteStreamstatsCommandIT from opensearch-project/sql. Every @test method here corresponds to a method of the same name in the source IT. Schema is translated to calcs (the only fixture this QA module provisions): age → int0 (11 non-null + 6 nulls — gives the null-bubbling coverage that the sql IT gets from explicit null partition rows) country → str0 (3 distinct: FURNITURE, OFFICE SUPPLIES, TECHNOLOGY) state → str3 (e + null) name → key (unique row id) Reachable cases assert exact rows. Cases that depend on functionality not yet wired through the analytics-engine route are expectThrows negative tests pinning the precise error fragment — failing those is how a future PR observes that they should be promoted to passing assertions. EventstatsCommandIT (28 cases mirroring sql IT): - 12 reachable: testEventstats, *WithNull, *By, *ByWithNull, *ByWithNullBucket, *MultipleEventstats(WithNull|WithNullBucket|WithEval), *EmptyRows, *Variance, *VarianceWithNull, *VarianceBy, *VarianceWithNullBy - 16 expectThrows: 7 span()-blocked cases, 1 percentile rejection, 1 dc, 1 distinct_count, 1 dc-with-null, 1 earliest/latest, 1 variance-by-span, 3 multiple-partition-by-span variants StreamstatsCommandIT (47 cases mirroring sql IT): - ~17 reachable: basic streamstats with/without by, current=false, window=N, bigWindow, currentAndWindow, multiple chained, streamstatsAndEventstats, streamstatsAndSort, multipleWithEval (1+2), variance + varianceBy - ~30 expectThrows: span()-blocked cases, dc / distinct_count / earliest / latest / percentile not in WindowFunction enum, reset_before/after and global+window+by which use self-join lowering not flowing through RexOver, leftJoinWithStreamstats / whereInWithStreamstatsSubquery (composed shapes not exercised on the route). Streamstats running aggregates are row-order dependent. Every reachable test inserts | sort key as the first stage to pin output to a deterministic row order (calcs has 17 rows; without the sort, parallel scan can return them in any order, making running counts/avgs non-deterministic). Both files compile clean against the analytics-engine plugin. No IT actually runs in this commit — local cluster is unavailable due to a pre-existing dataformat-native cargo/protoc gap. Marc / verifier should run against a working cluster to confirm the reachable-vs-throws split matches the comment predictions. Signed-off-by: Songkan Tang <songkant@amazon.com> * [analytics-engine] Plan-shape tests for ROW_NUMBER + PARTITION BY ORDER BY Three additions to WindowPlanShapeTests pin behaviour of the relaxation in the previous commit: - testRowNumberOverEmpty_2shard — ROW_NUMBER OVER (), the helper sequence column PPL streamstats (with hasGroup=true) emits. - testSumOverPartitionByOrderBy_2shard — SUM with both partition keys and order keys. Calcite's RexOver expansion yields different rendered text (no NULLS LAST suffix on default null ordering); the assertion pins the actual rendered shape. - testRowNumberOverPartitionByOrderBy_2shard — combined PARTITION BY + ORDER BY for ROW_NUMBER, the streamstats … by … helper-sequence shape. Also extends MockDataFusionBackend.windowCapabilities to declare ROW_NUMBER, mirroring the production DataFusionAnalyticsBackendPlugin. Without it the planner rejects ROW_NUMBER on the mock backend with "No backend supports window functions [ROW_NUMBER]". All 23 WindowPlanShapeTests + analytics-framework tests pass. Signed-off-by: Songkan Tang <songkant@amazon.com> * [analytics-engine] Preserve lifted-window Project layer in fragment rewire OpenSearchProject.liftNestedRexOver hoists nested RexOver expressions into a 2-layer LogicalProject(LogicalProject(input)) — outer rewrites to RexInputRefs into the lower's appended window column, lower computes the window. After convertStandalone the wrapper is Project_outer(Project_lower(input)). DataFusionFragmentConvertor.replaceInput previously swapped the topmost Project's direct child wholesale, dropping the lower Project. Subsequent attachFragmentOnTop calls then crashed substrait deserialisation with "Field reference offset (N) must be less than number of fields in struct (N)" because the outer's RexInputRefs pointed past the now-narrower input struct. Surfaced on multi-shard streamstats … by where SINGLETON-gather injects an exchange cut that recursively rewires through the lift wrapper. Single-shard plans avoided it because no rewire happens. Detect the lift signature (Project whose direct child is a Project containing a WindowFunctionInvocation) and recurse so newInput replaces the lower Project's input — both lift layers stay above newInput. Adds a unit-level regression in DataFusionFragmentConvertorTests and upgrades StreamstatsCommandIT.testStreamstatsBy_3shard from an error-pinning assertion to a correctness assertion on the per-partition final running counts. Signed-off-by: Songkan Tang <songkant@amazon.com> * [QA] testChainedEventstats_3shard — pin lifted-window rewire across multiple stages Two chained eventstats on a 3-shard index, each producing its own 2-layer LogicalProject(LogicalProject(input)) lift bundle that flows through a separate attachFragmentOnTop call on the multi-shard exchange path. Pins that the lifted-window detection in DataFusionFragmentConvertor.replaceInput fires correctly per attach call rather than only on the outermost lift bundle. Signed-off-by: Songkan Tang <songkant@amazon.com> * [QA] Drop redundant *WithNull tests; tighten *WithNullBucket on str3 testEventstatsWithNull / testEventstatsByWithNull and the streamstats peers were exact copies of their *non*-null variants — calcs has no nulls in str0, so they exercised nothing the base tests didn't. Removed. testEventstatsByWithNullBucket / testStreamstatsByWithNullBucket now partition by str3 (7 nulls + 10 'e' rows) instead of str0 (no nulls). The default and bucket_nullable=false outputs only diverge when a null-key partition exists, so the prior asserts proved nothing about the bucket_nullable=false path. With str3 the null partition's row carries NULL aggregates under bucket_nullable=false vs concrete values under default, which is what these tests are supposed to demonstrate. Signed-off-by: Songkan Tang <songkant@amazon.com> * [QA] Pin span()-in-PARTITION-BY assertions to exact substrait error The 14 eventstats/streamstats tests that exercise span(...) inside a window function's PARTITION BY were using assertErrorAny — anything-fails — which hides regressions. After verifying each query against the cluster, all 14 fail with the same precise error: java.lang.UnsupportedOperationException: Unable to convert the type NULL at io.substrait.isthmus.TypeConverter.toSubstrait(TypeConverter.java:192) ... at WindowFunctionConverter.generateBinding (...) PPL's span(int0, 10) lowers to SPAN($10, 10, NULL:NULL) — the third operand is a NULL-typed literal. span() works fine in scalar contexts (stats … by span(...)) because non-window paths don't visit literal operands the same way, but isthmus's WindowFunctionConverter walks every operand and trips on the NULL type. Replaced assertErrorAny with assertErrorContains pinning the substring "Unable to convert the type NULL" in all 14 cases, plus updated the class-level docs in both files. When isthmus or the PPL frontend drops the NULL operand, these tests will fail loudly so a follow-up can upgrade them to row assertions. Signed-off-by: Songkan Tang <songkant@amazon.com> * [analytics-engine] Recurse BackendPlanAdapter into RexOver.window keys PPL eventstats/streamstats … by span(field, n) lowers to RexOver(... PARTITION BY SPAN(field, n, NULL:NULL)). Substrait conversion runs through SpanAdapter, which rewrites SPAN(field, n, NULL) into FLOOR(field/n)*n so the NULL-typed third operand never reaches isthmus. But BackendPlanAdapter.adaptRex only walked RexCall.getOperands() — RexOver's window (partitionKeys / orderKeys) lives on a separate field, so the SPAN calls inside PARTITION BY were untouched and survived into substrait emission, where isthmus's WindowFunctionConverter visited every operand including the NULL literal and threw 'Unable to convert the type NULL'. Teach adaptRex to recognize RexOver and recurse into its window's partitionKeys and orderKeys, then rebuild the RexOver via rexBuilder.makeOver with the adapted window. ORDER BY keys are wrapped in RexFieldCollation; rebuild the collation only when the inner expression changed. Same SpanAdapter rewrite path now applies to window-context SPAN exactly as it does for stats-context SPAN. Flips all 14 span-in-PARTITION-BY IT cases (7 in EventstatsCommandIT, 7 in StreamstatsCommandIT) from assertErrorContains('Unable to convert the type NULL') to assertRowsEqual against the actual per-bucket aggregates. Signed-off-by: Songkan Tang <songkant@amazon.com> * [QA] Drop stats-collapse, assert per-row data on slim schema The eventstats / streamstats IT cases were collapsing 17-row outputs through | stats max(...) by <col> chains so the assertions stayed compact. With only 17 rows in calcs the collapse hides the broadcast-vs-running shape these commands are supposed to demonstrate. Reworked every reachable test (no error / no flaky-by-shard) to: * drop the | stats collapse, * select a slim schema via | fields key, <by-col?>, int0, <new eventstats / streamstats cols>, * sort key for deterministic order, * assert all 17 (or the post-filter row count) per-row outputs explicitly. Single-shard streamstats becomes deterministic with `| sort key` so per-row running aggregates are reproducible. testStreamstatsBy_3shard keeps its `| stats max(cnt) by str0` collapse — 3-shard streaming order is shard- dependent and per-row would be flaky. While in there, two `WithNullBy` companions were partitioning by str0 (no nulls in calcs) which silently negated the by-key-null coverage they claimed: * testEventstatsVarianceWithNullBy → switched to `by str3` (7 nulls + 10 'e') * testStreamstatsVarianceWithNullBy → switched to `by str3` Added testStreamstatsByWithNull as the default-mode companion to testStreamstatsByWithNullBucket (eventstats already has the equivalent pair). Verified locally: * 29/29 EventstatsCommandIT pass * 47/47 StreamstatsCommandIT pass * :sandbox:qa:analytics-engine-rest:precommit green Signed-off-by: Songkan Tang <songkant@amazon.com> --------- Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent c2d0a6c commit cea1b65

10 files changed

Lines changed: 2724 additions & 31 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/WindowFunction.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@
1212

1313
/**
1414
* Window functions a backend may support. Covers aggregate-as-window
15-
* (SUM/AVG/COUNT/MIN/MAX over a frame) — these are what PPL {@code eventstats}
16-
* lowers to — plus ranking functions (ROW_NUMBER) used by PPL {@code dedup}
17-
* lowering (ROW_NUMBER OVER PARTITION BY ... &lt;= N).
15+
* (SUM/AVG/COUNT/MIN/MAX over a frame, what PPL {@code eventstats} lowers to)
16+
* plus ROW_NUMBER, used by PPL {@code dedup} (ROW_NUMBER OVER PARTITION BY … &lt;= N)
17+
* and emitted by PPL {@code streamstats … by …} as the helper sequence column
18+
* {@code __row_number_for_streamstats__}.
19+
*
20+
* <p>PARTITION BY is allowed: {@code OpenSearchProject}'s cost gate forces SINGLETON
21+
* input on any RexOver-bearing project, so the coordinator's {@code WindowAggExec}
22+
* sees the full partition regardless of how partition keys span shards.
1823
*
1924
* @opensearch.internal
2025
*/
@@ -24,7 +29,6 @@ public enum WindowFunction {
2429
COUNT(SqlKind.COUNT),
2530
MIN(SqlKind.MIN),
2631
MAX(SqlKind.MAX),
27-
/** Sequence number per window partition — backs PPL dedup's row-number filter. */
2832
ROW_NUMBER(SqlKind.ROW_NUMBER);
2933

3034
private final SqlKind sqlKind;

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,13 @@ public Set<JoinCapability> joinCapabilities() {
430430

431431
@Override
432432
public Set<WindowCapability> windowCapabilities() {
433+
// SUM/AVG/COUNT/MIN/MAX cover PPL eventstats; ROW_NUMBER covers PPL dedup
434+
// (ROW_NUMBER OVER PARTITION BY … <= N) and the helper sequence column
435+
// PPL streamstats … by … emits as __row_number_for_streamstats__.
436+
// isthmus's RexExpressionConverter.visitOver serializes the RexOver inline as a
437+
// Substrait WindowFunctionInvocation; DataFusion's substrait consumer splits it
438+
// into a dedicated LogicalPlan::Window. No adapter or Rust UDF is needed —
439+
// row_number is a Substrait-stdlib window function and a DataFusion built-in.
433440
return Set.of(
434441
new WindowCapability(
435442
Set.of(
@@ -438,11 +445,6 @@ public Set<WindowCapability> windowCapabilities() {
438445
WindowFunction.COUNT,
439446
WindowFunction.MIN,
440447
WindowFunction.MAX,
441-
// ROW_NUMBER backs PPL `dedup` lowering (ROW_NUMBER OVER PARTITION BY ... <= N).
442-
// isthmus's RexExpressionConverter.visitOver serializes the RexOver inline as a
443-
// Substrait WindowFunctionInvocation; DataFusion's substrait consumer splits it
444-
// into a dedicated LogicalPlan::Window. No adapter or Rust UDF is needed —
445-
// row_number is a Substrait-stdlib window function and a DataFusion built-in.
446448
WindowFunction.ROW_NUMBER
447449
),
448450
Set.copyOf(plugin.getSupportedFormats())

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionFragmentConvertor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,13 @@ private static Rel replaceInput(Rel wrapper, Rel newInput) {
548548
return Filter.builder().from(filter).input(newInput).build();
549549
}
550550
if (wrapper instanceof Project project) {
551+
// Lifted-window shape: OpenSearchProject.liftNestedRexOver emits Project(Project(input))
552+
// where the lower Project computes a window column the outer references. A flat swap
553+
// here would drop the lower Project and break the outer's input refs.
554+
if (project.getInput() instanceof Project lower && containsWindowFunction(lower)) {
555+
Rel rewiredLower = replaceInput(lower, newInput);
556+
return Project.builder().from(project).input(rewiredLower).build();
557+
}
551558
return Project.builder().from(project).input(newInput).build();
552559
}
553560
if (wrapper instanceof Fetch fetch) {
@@ -561,6 +568,15 @@ private static Rel replaceInput(Rel wrapper, Rel newInput) {
561568
);
562569
}
563570

571+
private static boolean containsWindowFunction(Project project) {
572+
for (Expression expr : project.getExpressions()) {
573+
if (expr instanceof Expression.WindowFunctionInvocation) {
574+
return true;
575+
}
576+
}
577+
return false;
578+
}
579+
564580
/**
565581
* Overrides the {@link Expression.AggregationPhase} on every {@link Aggregate.Measure}
566582
* inside an {@link Aggregate} wrapper. No-op for non-aggregate wrappers.

sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DataFusionFragmentConvertorTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,4 +641,61 @@ public void testOtherFunctionsNotRenamed() throws Exception {
641641
assertTrue("must find sum in extension declarations", foundSum);
642642
}
643643

644+
/**
645+
* Regression: a lifted-window Project wrapper, shaped {@code Project_outer(Project_lower(input))}
646+
* with the outer's RexInputRefs pointing into the lower's appended window column, used
647+
* to lose its lower layer when re-wired. The next attach-on-top then crashed deserialising
648+
* with "Field reference offset (N) must be less than number of fields in struct (N)".
649+
*/
650+
public void testAttachFragmentOnTop_PreservesLiftedWindowProjectLayer() throws Exception {
651+
DataFusionFragmentConvertor convertor = newConvertor();
652+
653+
RelDataType inputRowType = rowType("a");
654+
RelNode innerStageScan = new OpenSearchStageInputScan(cluster, cluster.traitSet(), 0, inputRowType, List.of("datafusion"));
655+
byte[] innerBytes = convertor.convertFragment(innerStageScan);
656+
657+
RelNode placeholderInput = buildTableScan("__placeholder__", "a");
658+
659+
RelDataType bigintType = typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true);
660+
RexNode rowNumberOver = rexBuilder.makeOver(
661+
bigintType,
662+
(org.apache.calcite.sql.SqlAggFunction) SqlStdOperatorTable.ROW_NUMBER,
663+
List.of(),
664+
List.of(),
665+
com.google.common.collect.ImmutableList.<org.apache.calcite.rex.RexFieldCollation>of(),
666+
org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING,
667+
org.apache.calcite.rex.RexWindowBounds.CURRENT_ROW,
668+
true,
669+
true,
670+
false,
671+
false,
672+
false
673+
);
674+
RelNode lowerProject = org.apache.calcite.rel.logical.LogicalProject.create(
675+
placeholderInput,
676+
List.of(),
677+
List.of(rexBuilder.makeInputRef(placeholderInput, 0), rowNumberOver),
678+
List.of("a", "rn"),
679+
java.util.Set.of()
680+
);
681+
682+
RelNode outerProject = org.apache.calcite.rel.logical.LogicalProject.create(
683+
lowerProject,
684+
List.of(),
685+
List.of(rexBuilder.makeInputRef(lowerProject, 0), rexBuilder.makeInputRef(lowerProject, 1)),
686+
List.of("a", "rn"),
687+
java.util.Set.of()
688+
);
689+
690+
byte[] combined = convertor.attachFragmentOnTop(outerProject, innerBytes);
691+
692+
Plan plan = decodeSubstrait(combined);
693+
Rel root = rootRel(plan);
694+
assertTrue("root must be a ProjectRel (outer lift)", root.hasProject());
695+
Rel innerOfOuter = root.getProject().getInput();
696+
assertTrue("outer's input must remain the lower lift Project", innerOfOuter.hasProject());
697+
Rel innerOfLower = innerOfOuter.getProject().getInput();
698+
assertTrue("lower's input must be the rewired stage-scan", innerOfLower.hasRead());
699+
}
700+
644701
}

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
import org.apache.calcite.rel.type.RelDataTypeField;
1616
import org.apache.calcite.rex.RexBuilder;
1717
import org.apache.calcite.rex.RexCall;
18+
import org.apache.calcite.rex.RexFieldCollation;
1819
import org.apache.calcite.rex.RexInputRef;
1920
import org.apache.calcite.rex.RexNode;
21+
import org.apache.calcite.rex.RexOver;
2022
import org.apache.calcite.rex.RexShuttle;
23+
import org.apache.calcite.rex.RexWindow;
2124
import org.apache.logging.log4j.LogManager;
2225
import org.apache.logging.log4j.Logger;
2326
import org.opensearch.analytics.planner.CapabilityRegistry;
@@ -204,6 +207,52 @@ private static RexNode adaptRex(
204207
if (adapted != operand) operandsChanged = true;
205208
}
206209

210+
// Window functions: adapter recursion has to descend into PARTITION BY / ORDER BY
211+
// expressions too — they live on RexOver.window, not in getOperands(), and isthmus's
212+
// WindowFunctionConverter walks them when emitting substrait. Without this,
213+
// calls like SPAN that need a backend-specific rewrite (SPAN(field, n, NULL) →
214+
// FLOOR(field/n)*n) survive into substrait emission carrying their NULL-typed
215+
// operand and trip TypeConverter.
216+
if (call instanceof RexOver over) {
217+
RexWindow window = over.getWindow();
218+
List<RexNode> adaptedPartitionKeys = new ArrayList<>(window.partitionKeys.size());
219+
boolean windowChanged = false;
220+
for (RexNode key : window.partitionKeys) {
221+
RexNode adapted = adaptRex(key, adapters, fieldStorage, cluster);
222+
adaptedPartitionKeys.add(adapted);
223+
if (adapted != key) windowChanged = true;
224+
}
225+
List<RexFieldCollation> adaptedOrderKeys = new ArrayList<>(window.orderKeys.size());
226+
for (RexFieldCollation order : window.orderKeys) {
227+
RexNode adapted = adaptRex(order.left, adapters, fieldStorage, cluster);
228+
if (adapted != order.left) {
229+
adaptedOrderKeys.add(new RexFieldCollation(adapted, order.right));
230+
windowChanged = true;
231+
} else {
232+
adaptedOrderKeys.add(order);
233+
}
234+
}
235+
if (operandsChanged || windowChanged) {
236+
RexBuilder rexBuilder = cluster.getRexBuilder();
237+
return rexBuilder.makeOver(
238+
over.getType(),
239+
over.getAggOperator(),
240+
adaptedOperands,
241+
adaptedPartitionKeys,
242+
com.google.common.collect.ImmutableList.copyOf(adaptedOrderKeys),
243+
window.getLowerBound(),
244+
window.getUpperBound(),
245+
window.getExclude(),
246+
window.isRows(),
247+
true,
248+
false,
249+
over.isDistinct(),
250+
over.ignoreNulls()
251+
);
252+
}
253+
return over;
254+
}
255+
207256
RexCall current = operandsChanged ? call.clone(call.getType(), adaptedOperands) : call;
208257

209258
// Look up adapter for this function

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchProjectRule.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,15 @@ private boolean isOpaqueOperation(String funcName) {
285285
}
286286

287287
/** Walks project expressions and collects the {@link WindowFunction}s used by any {@link RexOver}.
288-
* PARTITION BY is rejected for aggregate-as-window functions (SUM/AVG/COUNT/MIN/MAX) since
289-
* shuffle exchange isn't wired yet — those run frame-only across all rows on a single fragment.
290-
* ROW_NUMBER is the exception: PPL {@code dedup} lowers to ROW_NUMBER OVER (PARTITION BY ...)
291-
* whose semantics are local to each partition; isthmus + DataFusion's substrait consumer
292-
* emit a Window rel that executes the partition without requiring shuffle (the partition
293-
* is the row-group boundary, not a data-redistribution operator). Unrecognized window
294-
* SqlKinds (LAG, LEAD, NTILE, etc.) also fail here. */
288+
* Unrecognized window SqlKinds (LAG, LEAD, NTILE, etc.) fail here.
289+
*
290+
* <p>PARTITION BY and ORDER BY are both accepted — {@code OpenSearchProject}'s cost gate
291+
* already forces SINGLETON input on any RexOver-bearing project, so all rows in a partition
292+
* arrive on the coordinator regardless of whether partition keys span shards. The
293+
* coordinator's WindowAggExec then computes the window correctly per partition / per frame.
294+
* Covers ROW_NUMBER OVER PARTITION BY (PPL dedup), SUM/AVG/COUNT/MIN/MAX OVER PARTITION BY
295+
* (PPL eventstats by ...), and the empty-OVER aggregate-as-window forms. HASH-shuffle is a
296+
* future strict improvement, not a correctness prerequisite. */
295297
private static Set<WindowFunction> collectWindowFunctions(List<? extends RexNode> exprs) {
296298
Set<WindowFunction> fns = new LinkedHashSet<>();
297299
for (RexNode expr : exprs) {
@@ -302,11 +304,6 @@ public RexNode visitOver(RexOver over) {
302304
if (fn == null) {
303305
throw new IllegalStateException("Window function [" + over.getAggOperator().getName() + "] is not supported");
304306
}
305-
if (fn != WindowFunction.ROW_NUMBER && !over.getWindow().partitionKeys.isEmpty()) {
306-
throw new IllegalStateException(
307-
"Window OVER (PARTITION BY ...) is not supported for [" + fn + "] — no shuffle exchange available yet"
308-
);
309-
}
310307
fns.add(fn);
311308
return super.visitOver(over);
312309
}

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/MockDataFusionBackend.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ protected Set<JoinCapability> joinCapabilities() {
138138

139139
@Override
140140
protected Set<WindowCapability> windowCapabilities() {
141+
// Mirrors DataFusionAnalyticsBackendPlugin.windowCapabilities — SUM/AVG/COUNT/MIN/MAX
142+
// for PPL eventstats; ROW_NUMBER backs PPL top/rare/dedup and the streamstats … by …
143+
// helper sequence column.
141144
return Set.of(
142145
new WindowCapability(
143146
Set.of(
@@ -146,8 +149,6 @@ protected Set<WindowCapability> windowCapabilities() {
146149
WindowFunction.COUNT,
147150
WindowFunction.MIN,
148151
WindowFunction.MAX,
149-
// ROW_NUMBER backs PPL `top` / `rare` / `dedup` lowering
150-
// (ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...)).
151152
WindowFunction.ROW_NUMBER
152153
),
153154
Set.of(PARQUET_DATA_FORMAT)

0 commit comments

Comments
 (0)