Skip to content

Commit 93beadf

Browse files
committed
Onboard PPL dedup to the analytics-engine path (ROW_NUMBER window function)
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent f1cbbba commit 93beadf

6 files changed

Lines changed: 148 additions & 0 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendCapabilityProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ default Set<ProjectCapability> projectCapabilities() {
4747
return Set.of();
4848
}
4949

50+
/** Window functions this backend can evaluate when a {@link org.apache.calcite.rex.RexOver}
51+
* appears inside a project. Reported separately from {@link #projectCapabilities()} so
52+
* the planner can pick a window-capable backend without conflating window evaluation with
53+
* ordinary scalar projection. */
54+
default Set<WindowCapability> windowCapabilities() {
55+
return Set.of();
56+
}
57+
5058
/**
5159
* Delegation types this backend can initiate — it has a custom physical operator
5260
* that calls Analytics Core's delegation API to offload work to another backend.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.spi;
10+
11+
import java.util.Set;
12+
13+
/**
14+
* Declares that a backend can evaluate a specific {@link WindowFunction} as part
15+
* of a {@link org.apache.calcite.rex.RexOver} inside a project. Window functions
16+
* are independent of {@link FieldType} because Substrait serializes them inline
17+
* with their argument expressions — the backend dispatches on the substrait
18+
* function reference at execution time, not on the Calcite operator's typed inputs.
19+
*
20+
* @opensearch.internal
21+
*/
22+
public record WindowCapability(WindowFunction function, Set<String> formats) {}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.spi;
10+
11+
import org.apache.calcite.sql.SqlOperator;
12+
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
/**
18+
* Window functions a backend may support — distinct from {@link AggregateFunction}
19+
* because {@link org.apache.calcite.rex.RexOver} occurrences are evaluated by the
20+
* backend's window operator (Substrait inline {@code WindowFunctionInvocation}),
21+
* not as scalar projections or rolled-up aggregates.
22+
*
23+
* @opensearch.internal
24+
*/
25+
public enum WindowFunction {
26+
/** Sequence number per window partition — backs PPL dedup's row-number filter. */
27+
ROW_NUMBER(SqlStdOperatorTable.ROW_NUMBER);
28+
29+
private static final Map<SqlOperator, WindowFunction> OPERATOR_INDEX = new HashMap<>();
30+
31+
static {
32+
for (WindowFunction fn : values()) {
33+
OPERATOR_INDEX.put(fn.operator, fn);
34+
}
35+
}
36+
37+
private final SqlOperator operator;
38+
39+
WindowFunction(SqlOperator operator) {
40+
this.operator = operator;
41+
}
42+
43+
public SqlOperator operator() {
44+
return operator;
45+
}
46+
47+
/** Resolves a Calcite {@link SqlOperator} to a {@link WindowFunction}, or {@code null} if unsupported. */
48+
public static WindowFunction fromOperator(SqlOperator operator) {
49+
return OPERATOR_INDEX.get(operator);
50+
}
51+
}

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.opensearch.analytics.spi.ScanCapability;
3232
import org.opensearch.analytics.spi.SearchExecEngineProvider;
3333
import org.opensearch.analytics.spi.StdOperatorRewriteAdapter;
34+
import org.opensearch.analytics.spi.WindowCapability;
35+
import org.opensearch.analytics.spi.WindowFunction;
3436
import org.opensearch.be.datafusion.indexfilter.FilterTreeCallbacks;
3537
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
3638

@@ -361,6 +363,17 @@ public Set<ProjectCapability> projectCapabilities() {
361363
return Set.copyOf(caps);
362364
}
363365

366+
@Override
367+
public Set<WindowCapability> windowCapabilities() {
368+
Set<String> formats = Set.copyOf(plugin.getSupportedFormats());
369+
// ROW_NUMBER inside a project (e.g. PPL dedup's row-number filter) goes through
370+
// isthmus's RexExpressionConverter.visitOver → Substrait WindowFunctionInvocation
371+
// → DataFusion's Substrait consumer splits it into LogicalPlan::Window. No adapter
372+
// or Rust UDF is needed — row_number is a Substrait-stdlib window function and a
373+
// DataFusion built-in.
374+
return Set.of(new WindowCapability(WindowFunction.ROW_NUMBER, formats));
375+
}
376+
364377
@Override
365378
public Set<AggregateCapability> aggregateCapabilities() {
366379
Set<String> formats = Set.copyOf(plugin.getSupportedFormats());

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/CapabilityRegistry.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.opensearch.analytics.spi.ProjectCapability;
2121
import org.opensearch.analytics.spi.ScalarFunction;
2222
import org.opensearch.analytics.spi.ScanCapability;
23+
import org.opensearch.analytics.spi.WindowCapability;
24+
import org.opensearch.analytics.spi.WindowFunction;
2325
import org.opensearch.cluster.metadata.IndexMetadata;
2426

2527
import java.util.ArrayList;
@@ -62,6 +64,7 @@ public class CapabilityRegistry {
6264
private final Map<ScalarKey, Map<String, List<String>>> filterIndex = new HashMap<>();
6365
private final Map<AggregateKey, Map<String, List<String>>> aggregateIndex = new HashMap<>();
6466
private final Map<ScalarKey, Map<String, List<String>>> scalarIndex = new HashMap<>();
67+
private final Map<WindowFunction, Map<String, List<String>>> windowIndex = new HashMap<>();
6568
// Backends that declared supportsLiteralEvaluation=true for a (function, fieldType)
6669
private final Map<ScalarKey, List<String>> literalScalarIndex = new HashMap<>();
6770
// Opaque operations keyed by name (e.g. "painless") rather than a typed key
@@ -80,6 +83,7 @@ public class CapabilityRegistry {
8083
private final Set<String> filterCapableBackends = new HashSet<>();
8184
private final Set<String> aggregateCapableBackends = new HashSet<>();
8285
private final Set<String> projectCapableBackends = new HashSet<>();
86+
private final Set<String> windowCapableBackends = new HashSet<>();
8387

8488
public CapabilityRegistry(
8589
List<AnalyticsSearchBackendPlugin> backends,
@@ -169,6 +173,13 @@ public CapabilityRegistry(
169173
}
170174
projectCapableBackends.add(name);
171175
}
176+
for (WindowCapability cap : caps.windowCapabilities()) {
177+
Map<String, List<String>> formatMap = windowIndex.computeIfAbsent(cap.function(), k -> new HashMap<>());
178+
for (String format : cap.formats()) {
179+
formatMap.computeIfAbsent(format, k -> new ArrayList<>()).add(name);
180+
}
181+
windowCapableBackends.add(name);
182+
}
172183
}
173184
}
174185

@@ -204,6 +215,15 @@ public Set<String> projectCapableBackends() {
204215
return projectCapableBackends;
205216
}
206217

218+
public Set<String> windowCapableBackends() {
219+
return windowCapableBackends;
220+
}
221+
222+
/** All backends declaring window-function support for {@code function} ignoring storage formats. */
223+
public List<String> windowBackendsAnyFormat(WindowFunction function) {
224+
return allBackends(windowIndex.getOrDefault(function, Map.of()));
225+
}
226+
207227
// ---- Scan lookups ----
208228

209229
public List<String> scanBackends(Class<? extends ScanCapability> kind, FieldType fieldType, String format) {

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchProjectRule.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.calcite.rex.RexCall;
1616
import org.apache.calcite.rex.RexInputRef;
1717
import org.apache.calcite.rex.RexNode;
18+
import org.apache.calcite.rex.RexOver;
1819
import org.apache.calcite.sql.SqlFunction;
1920
import org.apache.calcite.sql.SqlOperator;
2021
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -27,6 +28,7 @@
2728
import org.opensearch.analytics.spi.DelegationType;
2829
import org.opensearch.analytics.spi.FieldType;
2930
import org.opensearch.analytics.spi.ScalarFunction;
31+
import org.opensearch.analytics.spi.WindowFunction;
3032

3133
import java.util.ArrayList;
3234
import java.util.List;
@@ -167,6 +169,17 @@ private RexNode annotateExpr(RexNode expr, List<String> childViableBackends) {
167169
return expr;
168170
}
169171

172+
// Window functions (RexOver) — Calcite emits these from rules like PPL's dedup
173+
// (ROW_NUMBER() OVER (PARTITION BY ...)). isthmus's RexExpressionConverter.visitOver
174+
// serializes them inline as Substrait WindowFunctionInvocation expressions, and the
175+
// consumer (DataFusion) splits them into a dedicated Window plan node. The planner's
176+
// job here is only to confine the project to a backend that declared support for the
177+
// specific WindowFunction; no scalar-style annotation walk applies because RexOver's
178+
// operands are window arguments, not subordinate scalar expressions to resolve.
179+
if (rexCall instanceof RexOver rexOver) {
180+
return annotateWindow(rexOver, childViableBackends);
181+
}
182+
170183
// Baseline operators — arithmetic, CAST, null-handling, conditional — are assumed
171184
// supported by every backend and are not subject to capability-registry enforcement.
172185
// Recurse into operands so a non-baseline function nested inside (e.g.
@@ -219,6 +232,27 @@ private RexNode annotateExpr(RexNode expr, List<String> childViableBackends) {
219232
return new AnnotatedProjectExpression(target.getType(), target, scalarViable, context.nextAnnotationId());
220233
}
221234

235+
private RexNode annotateWindow(RexOver rexOver, List<String> childViableBackends) {
236+
SqlOperator op = rexOver.getOperator();
237+
WindowFunction windowFn = WindowFunction.fromOperator(op);
238+
if (windowFn == null) {
239+
throw new IllegalStateException("Unsupported window function [" + op.getName() + "]");
240+
}
241+
List<String> windowCapable = context.getCapabilityRegistry().windowBackendsAnyFormat(windowFn);
242+
List<String> viable = new ArrayList<>();
243+
for (String candidate : childViableBackends) {
244+
if (windowCapable.contains(candidate)) {
245+
viable.add(candidate);
246+
}
247+
}
248+
if (viable.isEmpty()) {
249+
throw new IllegalStateException(
250+
"No backend supports window function [" + windowFn + "] among " + childViableBackends
251+
);
252+
}
253+
return new AnnotatedProjectExpression(rexOver.getType(), rexOver, viable, context.nextAnnotationId());
254+
}
255+
222256
private List<String> resolveOpaqueViableBackends(String funcName, List<String> childViableBackends) {
223257
CapabilityRegistry registry = context.getCapabilityRegistry();
224258
List<String> viable = registry.opaqueBackendsAnyFormat(funcName);

0 commit comments

Comments
 (0)