Skip to content

Commit 7abcfce

Browse files
authored
Merge branch 'main' into unified-allocator-followup
2 parents e2e869d + cea1b65 commit 7abcfce

10 files changed

Lines changed: 2724 additions & 31 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/WindowFunction.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@
1212

1313
/**
1414
* Window functions a backend may support. Covers aggregate-as-window
15-
* (SUM/AVG/COUNT/MIN/MAX over a frame) — these are what PPL {@code eventstats}
16-
* lowers to — plus ranking functions (ROW_NUMBER) used by PPL {@code dedup}
17-
* lowering (ROW_NUMBER OVER PARTITION BY ... <= N).
15+
* (SUM/AVG/COUNT/MIN/MAX over a frame, what PPL {@code eventstats} lowers to)
16+
* plus ROW_NUMBER, used by PPL {@code dedup} (ROW_NUMBER OVER PARTITION BY … <= N)
17+
* and emitted by PPL {@code streamstats … by …} as the helper sequence column
18+
* {@code __row_number_for_streamstats__}.
19+
*
20+
* <p>PARTITION BY is allowed: {@code OpenSearchProject}'s cost gate forces SINGLETON
21+
* input on any RexOver-bearing project, so the coordinator's {@code WindowAggExec}
22+
* sees the full partition regardless of how partition keys span shards.
1823
*
1924
* @opensearch.internal
2025
*/
@@ -24,7 +29,6 @@ public enum WindowFunction {
2429
COUNT(SqlKind.COUNT),
2530
MIN(SqlKind.MIN),
2631
MAX(SqlKind.MAX),
27-
/** Sequence number per window partition — backs PPL dedup's row-number filter. */
2832
ROW_NUMBER(SqlKind.ROW_NUMBER);
2933

3034
private final SqlKind sqlKind;

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,13 @@ public Set<JoinCapability> joinCapabilities() {
430430

431431
@Override
432432
public Set<WindowCapability> windowCapabilities() {
433+
// SUM/AVG/COUNT/MIN/MAX cover PPL eventstats; ROW_NUMBER covers PPL dedup
434+
// (ROW_NUMBER OVER PARTITION BY … <= N) and the helper sequence column
435+
// PPL streamstats … by … emits as __row_number_for_streamstats__.
436+
// isthmus's RexExpressionConverter.visitOver serializes the RexOver inline as a
437+
// Substrait WindowFunctionInvocation; DataFusion's substrait consumer splits it
438+
// into a dedicated LogicalPlan::Window. No adapter or Rust UDF is needed —
439+
// row_number is a Substrait-stdlib window function and a DataFusion built-in.
433440
return Set.of(
434441
new WindowCapability(
435442
Set.of(
@@ -438,11 +445,6 @@ public Set<WindowCapability> windowCapabilities() {
438445
WindowFunction.COUNT,
439446
WindowFunction.MIN,
440447
WindowFunction.MAX,
441-
// ROW_NUMBER backs PPL `dedup` lowering (ROW_NUMBER OVER PARTITION BY ... <= N).
442-
// isthmus's RexExpressionConverter.visitOver serializes the RexOver inline as a
443-
// Substrait WindowFunctionInvocation; DataFusion's substrait consumer splits it
444-
// into a dedicated LogicalPlan::Window. No adapter or Rust UDF is needed —
445-
// row_number is a Substrait-stdlib window function and a DataFusion built-in.
446448
WindowFunction.ROW_NUMBER
447449
),
448450
Set.copyOf(plugin.getSupportedFormats())

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionFragmentConvertor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,13 @@ private static Rel replaceInput(Rel wrapper, Rel newInput) {
548548
return Filter.builder().from(filter).input(newInput).build();
549549
}
550550
if (wrapper instanceof Project project) {
551+
// Lifted-window shape: OpenSearchProject.liftNestedRexOver emits Project(Project(input))
552+
// where the lower Project computes a window column the outer references. A flat swap
553+
// here would drop the lower Project and break the outer's input refs.
554+
if (project.getInput() instanceof Project lower && containsWindowFunction(lower)) {
555+
Rel rewiredLower = replaceInput(lower, newInput);
556+
return Project.builder().from(project).input(rewiredLower).build();
557+
}
551558
return Project.builder().from(project).input(newInput).build();
552559
}
553560
if (wrapper instanceof Fetch fetch) {
@@ -561,6 +568,15 @@ private static Rel replaceInput(Rel wrapper, Rel newInput) {
561568
);
562569
}
563570

571+
private static boolean containsWindowFunction(Project project) {
572+
for (Expression expr : project.getExpressions()) {
573+
if (expr instanceof Expression.WindowFunctionInvocation) {
574+
return true;
575+
}
576+
}
577+
return false;
578+
}
579+
564580
/**
565581
* Overrides the {@link Expression.AggregationPhase} on every {@link Aggregate.Measure}
566582
* inside an {@link Aggregate} wrapper. No-op for non-aggregate wrappers.

sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DataFusionFragmentConvertorTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,4 +641,61 @@ public void testOtherFunctionsNotRenamed() throws Exception {
641641
assertTrue("must find sum in extension declarations", foundSum);
642642
}
643643

644+
/**
645+
* Regression: a lifted-window Project wrapper, shaped {@code Project_outer(Project_lower(input))}
646+
* with the outer's RexInputRefs pointing into the lower's appended window column, used
647+
* to lose its lower layer when re-wired. The next attach-on-top then crashed deserialising
648+
* with "Field reference offset (N) must be less than number of fields in struct (N)".
649+
*/
650+
public void testAttachFragmentOnTop_PreservesLiftedWindowProjectLayer() throws Exception {
651+
DataFusionFragmentConvertor convertor = newConvertor();
652+
653+
RelDataType inputRowType = rowType("a");
654+
RelNode innerStageScan = new OpenSearchStageInputScan(cluster, cluster.traitSet(), 0, inputRowType, List.of("datafusion"));
655+
byte[] innerBytes = convertor.convertFragment(innerStageScan);
656+
657+
RelNode placeholderInput = buildTableScan("__placeholder__", "a");
658+
659+
RelDataType bigintType = typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true);
660+
RexNode rowNumberOver = rexBuilder.makeOver(
661+
bigintType,
662+
(org.apache.calcite.sql.SqlAggFunction) SqlStdOperatorTable.ROW_NUMBER,
663+
List.of(),
664+
List.of(),
665+
com.google.common.collect.ImmutableList.<org.apache.calcite.rex.RexFieldCollation>of(),
666+
org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING,
667+
org.apache.calcite.rex.RexWindowBounds.CURRENT_ROW,
668+
true,
669+
true,
670+
false,
671+
false,
672+
false
673+
);
674+
RelNode lowerProject = org.apache.calcite.rel.logical.LogicalProject.create(
675+
placeholderInput,
676+
List.of(),
677+
List.of(rexBuilder.makeInputRef(placeholderInput, 0), rowNumberOver),
678+
List.of("a", "rn"),
679+
java.util.Set.of()
680+
);
681+
682+
RelNode outerProject = org.apache.calcite.rel.logical.LogicalProject.create(
683+
lowerProject,
684+
List.of(),
685+
List.of(rexBuilder.makeInputRef(lowerProject, 0), rexBuilder.makeInputRef(lowerProject, 1)),
686+
List.of("a", "rn"),
687+
java.util.Set.of()
688+
);
689+
690+
byte[] combined = convertor.attachFragmentOnTop(outerProject, innerBytes);
691+
692+
Plan plan = decodeSubstrait(combined);
693+
Rel root = rootRel(plan);
694+
assertTrue("root must be a ProjectRel (outer lift)", root.hasProject());
695+
Rel innerOfOuter = root.getProject().getInput();
696+
assertTrue("outer's input must remain the lower lift Project", innerOfOuter.hasProject());
697+
Rel innerOfLower = innerOfOuter.getProject().getInput();
698+
assertTrue("lower's input must be the rewired stage-scan", innerOfLower.hasRead());
699+
}
700+
644701
}

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
import org.apache.calcite.rel.type.RelDataTypeField;
1616
import org.apache.calcite.rex.RexBuilder;
1717
import org.apache.calcite.rex.RexCall;
18+
import org.apache.calcite.rex.RexFieldCollation;
1819
import org.apache.calcite.rex.RexInputRef;
1920
import org.apache.calcite.rex.RexNode;
21+
import org.apache.calcite.rex.RexOver;
2022
import org.apache.calcite.rex.RexShuttle;
23+
import org.apache.calcite.rex.RexWindow;
2124
import org.apache.logging.log4j.LogManager;
2225
import org.apache.logging.log4j.Logger;
2326
import org.opensearch.analytics.planner.CapabilityRegistry;
@@ -204,6 +207,52 @@ private static RexNode adaptRex(
204207
if (adapted != operand) operandsChanged = true;
205208
}
206209

210+
// Window functions: adapter recursion has to descend into PARTITION BY / ORDER BY
211+
// expressions too — they live on RexOver.window, not in getOperands(), and isthmus's
212+
// WindowFunctionConverter walks them when emitting substrait. Without this,
213+
// calls like SPAN that need a backend-specific rewrite (SPAN(field, n, NULL) →
214+
// FLOOR(field/n)*n) survive into substrait emission carrying their NULL-typed
215+
// operand and trip TypeConverter.
216+
if (call instanceof RexOver over) {
217+
RexWindow window = over.getWindow();
218+
List<RexNode> adaptedPartitionKeys = new ArrayList<>(window.partitionKeys.size());
219+
boolean windowChanged = false;
220+
for (RexNode key : window.partitionKeys) {
221+
RexNode adapted = adaptRex(key, adapters, fieldStorage, cluster);
222+
adaptedPartitionKeys.add(adapted);
223+
if (adapted != key) windowChanged = true;
224+
}
225+
List<RexFieldCollation> adaptedOrderKeys = new ArrayList<>(window.orderKeys.size());
226+
for (RexFieldCollation order : window.orderKeys) {
227+
RexNode adapted = adaptRex(order.left, adapters, fieldStorage, cluster);
228+
if (adapted != order.left) {
229+
adaptedOrderKeys.add(new RexFieldCollation(adapted, order.right));
230+
windowChanged = true;
231+
} else {
232+
adaptedOrderKeys.add(order);
233+
}
234+
}
235+
if (operandsChanged || windowChanged) {
236+
RexBuilder rexBuilder = cluster.getRexBuilder();
237+
return rexBuilder.makeOver(
238+
over.getType(),
239+
over.getAggOperator(),
240+
adaptedOperands,
241+
adaptedPartitionKeys,
242+
com.google.common.collect.ImmutableList.copyOf(adaptedOrderKeys),
243+
window.getLowerBound(),
244+
window.getUpperBound(),
245+
window.getExclude(),
246+
window.isRows(),
247+
true,
248+
false,
249+
over.isDistinct(),
250+
over.ignoreNulls()
251+
);
252+
}
253+
return over;
254+
}
255+
207256
RexCall current = operandsChanged ? call.clone(call.getType(), adaptedOperands) : call;
208257

209258
// Look up adapter for this function

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchProjectRule.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,15 @@ private boolean isOpaqueOperation(String funcName) {
285285
}
286286

287287
/** Walks project expressions and collects the {@link WindowFunction}s used by any {@link RexOver}.
288-
* PARTITION BY is rejected for aggregate-as-window functions (SUM/AVG/COUNT/MIN/MAX) since
289-
* shuffle exchange isn't wired yet — those run frame-only across all rows on a single fragment.
290-
* ROW_NUMBER is the exception: PPL {@code dedup} lowers to ROW_NUMBER OVER (PARTITION BY ...)
291-
* whose semantics are local to each partition; isthmus + DataFusion's substrait consumer
292-
* emit a Window rel that executes the partition without requiring shuffle (the partition
293-
* is the row-group boundary, not a data-redistribution operator). Unrecognized window
294-
* SqlKinds (LAG, LEAD, NTILE, etc.) also fail here. */
288+
* Unrecognized window SqlKinds (LAG, LEAD, NTILE, etc.) fail here.
289+
*
290+
* <p>PARTITION BY and ORDER BY are both accepted — {@code OpenSearchProject}'s cost gate
291+
* already forces SINGLETON input on any RexOver-bearing project, so all rows in a partition
292+
* arrive on the coordinator regardless of whether partition keys span shards. The
293+
* coordinator's WindowAggExec then computes the window correctly per partition / per frame.
294+
* Covers ROW_NUMBER OVER PARTITION BY (PPL dedup), SUM/AVG/COUNT/MIN/MAX OVER PARTITION BY
295+
* (PPL eventstats by ...), and the empty-OVER aggregate-as-window forms. HASH-shuffle is a
296+
* future strict improvement, not a correctness prerequisite. */
295297
private static Set<WindowFunction> collectWindowFunctions(List<? extends RexNode> exprs) {
296298
Set<WindowFunction> fns = new LinkedHashSet<>();
297299
for (RexNode expr : exprs) {
@@ -302,11 +304,6 @@ public RexNode visitOver(RexOver over) {
302304
if (fn == null) {
303305
throw new IllegalStateException("Window function [" + over.getAggOperator().getName() + "] is not supported");
304306
}
305-
if (fn != WindowFunction.ROW_NUMBER && !over.getWindow().partitionKeys.isEmpty()) {
306-
throw new IllegalStateException(
307-
"Window OVER (PARTITION BY ...) is not supported for [" + fn + "] — no shuffle exchange available yet"
308-
);
309-
}
310307
fns.add(fn);
311308
return super.visitOver(over);
312309
}

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/MockDataFusionBackend.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ protected Set<JoinCapability> joinCapabilities() {
138138

139139
@Override
140140
protected Set<WindowCapability> windowCapabilities() {
141+
// Mirrors DataFusionAnalyticsBackendPlugin.windowCapabilities — SUM/AVG/COUNT/MIN/MAX
142+
// for PPL eventstats; ROW_NUMBER backs PPL top/rare/dedup and the streamstats … by …
143+
// helper sequence column.
141144
return Set.of(
142145
new WindowCapability(
143146
Set.of(
@@ -146,8 +149,6 @@ protected Set<WindowCapability> windowCapabilities() {
146149
WindowFunction.COUNT,
147150
WindowFunction.MIN,
148151
WindowFunction.MAX,
149-
// ROW_NUMBER backs PPL `top` / `rare` / `dedup` lowering
150-
// (ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...)).
151152
WindowFunction.ROW_NUMBER
152153
),
153154
Set.of(PARQUET_DATA_FORMAT)

0 commit comments

Comments
 (0)