Commit c2d0a6c
authored
[analytics-engine] wire PPL TAKE / FIRST / LAST / LIST / VALUES end-to-end (#21731)
* analytics-framework: extend AggregateFunction SPI for state-expanding aggregates
Adds five enum entries for PPL TAKE / FIRST / LAST / LIST / VALUES and an
input-parameterised IntermediateField type resolver. Existing fixed-shape
aggregates (HLL Binary sketch, COUNT Int64 counter) keep their constant
intermediate types via IntermediateTypeResolver.fixed(); state-expanding
aggregates whose FINAL state shape derives from arg 0 use
IntermediateTypeResolver.passThroughArg0() so the planner can resolve the
exchange column type from the actual call's arg type rather than a constant.
Internalises the Arrow → Calcite type mapping (previously in the planner
module's ArrowCalciteTypes) so the SPI is self-contained.
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
* analytics-backend-datafusion: wire 5 PPL state-expanding aggregates end-to-end
Wires PPL TAKE / FIRST / LAST / LIST / VALUES through the DataFusion backend:
* Rust UDAF for take(field, n) at rust/src/udaf/take.rs. Limit is read from a
Substrait Literal when present, or from row 0 of arg 1 when Calcite has
materialised n as a constant Project column. Default n = 10. State is a
bounded List<elem> emitted from state(); coordinator-side merge_batch
concatenates and re-truncates. UDAF registered on every SessionContext
via crate::udaf::register_all.
* Substrait extension entries in opensearch_aggregate_functions.yaml for
take, first_value, last_value, and array_agg. take's 2-arg shape uses
distinct any1/any2 generics so isthmus's wildcard binding accepts
mismatched arg types (state list<elem> + integer n).
* DataFusionFragmentConvertor: LOCAL_TAKE_OP / LOCAL_FIRST_OP / LOCAL_LAST_OP /
LOCAL_ARRAY_AGG_OP stub SqlAggFunctions plus a pre-emit RelShuttle that
rewrites PPL aggregation calls (TAKE / FIRST / LAST / LIST / VALUES) onto
the stubs so isthmus's AggregateFunctionConverter resolves them through
ADDITIONAL_AGGREGATE_SIGS to the YAML extension names. LIST/VALUES rebuild
the call's return type as ARRAY<actual-arg0> to override PPL's lossy
STRING_ARRAY which would otherwise force ARRAY<VARCHAR>.
* SubstraitPlanRewriter.visit(Aggregate): post-emit pass that inlines
Project-column literals into Aggregate.Measure args. Required because
Calcite's RelBuilder.aggregate auto-projects literal aggregate-args as
$f1 columns, leaving the substrait emit with FieldRefs instead of
Literals.
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
* analytics-engine: distributed planner rewrites for PPL state-expanding aggregates
Three planner-level changes that compose to make TAKE / FIRST / LAST / LIST /
VALUES correct across shards:
1. Move AggregateCallAnnotation out of AggregateCall.rexList onto an
OpenSearchAggregate side-map keyed by call index. Calcite's
AggCallBinding.preOperands treats every rexList entry as a leading
operand for inferReturnType, which corrupts return-type inference for
PPL operators that read getOperandType(0) (ARG0_ARRAY on TAKE / LIST /
VALUES would double-wrap). Storing annotations out-of-band on
OpenSearchAggregate sidesteps the issue without touching PPL. Drops
the AGG_CALL_ANNOTATION marker class and the annotation-stripping
logic in stripAnnotations / copyResolved.
2. DistributedAggregateRewriter: drive intermediate-field types through
the SPI's IntermediateTypeResolver (replacing the static ArrowCalciteTypes
helper, which is removed). For STATE_EXPANDING + APPROXIMATE engine-native-merge
aggregates, pin the FINAL aggCall's explicit return type to the StageInputScan
column type so substrait declares what Rust's derive_schema_from_partial_plan
produces — PPL's stock ReturnTypes are wrong here (ARG0_ARRAY double-wraps,
STRING_ARRAY ignores the actual element type).
3. Cross-shard literal-arg forwarding for TAKE's N. OpenSearchAggregateSplitRule
captures any RexLiteral aggregate-args from the original SINGLE aggregate's
underlying Project and stashes them on the FINAL OpenSearchAggregate as a
side-map. DistributedAggregateRewriter (Phase 2b) wraps the FINAL's
StageInputScan in an OpenSearchProject that re-creates each captured literal
as a constant column, and rebuilds the FINAL aggCall's argList to
[stateColIdx, ...litColIdxs]. The convertor's existing SubstraitPlanRewriter
inliner then emits the literals as Substrait Literal expressions. The
producer-side PARTIAL stage still consumes N inside its own accumulator;
without this, FINAL would re-aggregate without N and fall back to the
default limit.
IT cluster runs with -da:org.apache.calcite... so Calcite's typeMatchesInferred
assertion is silenced — post #21690 the wire schema is derived in Rust, not
Java, and PPL operators with non-idempotent return-type inference would
otherwise trip this assertion on the FINAL side. Production runs without -ea.
CoordinatorReduceIT: ten new tests (single-shard + cross-shard for each of
the five aggregates). LIST/VALUES across-shards remain @AwaitsFix until the
PPL frontend stops declaring STRING_ARRAY for them.
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
* analytics-engine: cross-shard LIST and VALUES via list_merge UDAF
Two stacked issues blocked cross-shard LIST/VALUES:
1. PPL declares LIST/VALUES return type via PPLReturnTypes.STRING_ARRAY which
forces ARRAY<VARCHAR> regardless of input element type. Fix: repair the
PARTIAL aggCall return type to ARRAY<actual-arg0> in OpenSearchAggregateSplitRule
(PARTIAL only — FINAL keeps the original list to satisfy Volcano's parent
row-type check on transformTo). The corrected type propagates through
PARTIAL output → StageInputScan → FINAL substrait base_schema.
2. DataFusion's substrait consumer ignores AggregationPhase, so a FINAL
array_agg(state) gets lowered as a single-pass aggregate that re-wraps each
shard's list rather than concatenating. Fix: custom Rust UDAFs `list_merge`
and `list_merge_distinct` whose `update_batch` un-nests each List<elem> row
into elements (with optional dedup). Same pattern as TAKE's accumulator.
The convertor's pre-emit RelShuttle detects the FINAL form (arg0 already
a list type) and routes LIST → list_merge, VALUES → list_merge_distinct.
PARTIAL still uses DataFusion's native array_agg (with INVOCATION_DISTINCT for
VALUES). Cross-shard testListAcrossShards / testValuesAcrossShards pass.
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
* analytics-backend-datafusion: relocate state-expanding aggregate rewrites
Addresses two review comments on the state-expanding aggregate (TAKE / FIRST
/ LAST / LIST / VALUES) wiring.
1. Move literal-arg inlining off the post-emit Substrait IR pass and onto
the isthmus binding layer. Calcite's RelBuilder.aggregate auto-projects
literal aggregate-args (e.g. TAKE's N) as separate columns and
AggregateCall.argList only carries column indices, so without inlining
our Rust UDAFs only see a column reference at construction time. With
DataFusion's two-stage execution, the Final accumulator dispatches
through merge_batch — which receives only state columns, not the
constant — so the limit can never be resolved on the coordinator.
The fix overrides AggregateFunctionConverter.convert() in createVisitor()
(next to the existing getSigs() override): after isthmus binds the
AggregateFunctionInvocation, replace any FieldReference arg pointing to
a literal Project column with the converted RexLiteral via the supplied
Function<RexNode, Expression> — same Substrait literal an inlined arg
would produce, routed through isthmus's standard literal converter.
SubstraitPlanRewriter.visit(Aggregate) and its simpleStructOffset helper
are deleted; the post-emit pass now only handles Filter rewrites and
the PrecisionTimestampLiteral precision fix.
2. Extract the PPL aggregate-name → LOCAL_*_OP rewrite from
DataFusionFragmentConvertor into a new package-private
PplAggregateCallRewriter, matching the UntypedNullPreprocessor /
DatetimeOutputCastRewriter sibling pattern. The if/else-if chain on
operator name becomes a switch on toUpperCase(Locale.ROOT) and the
six-way `==` chain that gates already-rewritten calls becomes a
Set<SqlAggFunction> contains check. The two call sites in
DataFusionFragmentConvertor now read like the other preprocessor
invocations.
CoordinatorReduceIT (18 tests), integTestMemtable, and integTestStreaming
all pass.
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
---------
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>1 parent f15c326 commit c2d0a6c
34 files changed
Lines changed: 2193 additions & 602 deletions
File tree
- sandbox
- libs/analytics-framework/src
- main/java/org/opensearch/analytics/spi
- test/java/org/opensearch/analytics/spi
- plugins
- analytics-backend-datafusion
- rust/src
- udaf
- src/main
- java/org/opensearch/be/datafusion
- resources
- analytics-engine/src
- main/java/org/opensearch/analytics/planner
- dag
- rel
- rules
- test/java/org/opensearch/analytics/planner
- dag
- qa/analytics-engine-rest
- src/test/java/org/opensearch/analytics/qa
sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java
Lines changed: 80 additions & 8 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
9 | 9 | | |
10 | 10 | | |
11 | 11 | | |
| 12 | + | |
| 13 | + | |
12 | 14 | | |
13 | 15 | | |
14 | 16 | | |
| 17 | + | |
15 | 18 | | |
16 | 19 | | |
17 | 20 | | |
| |||
50 | 53 | | |
51 | 54 | | |
52 | 55 | | |
53 | | - | |
54 | | - | |
55 | | - | |
56 | | - | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
57 | 62 | | |
58 | 63 | | |
59 | 64 | | |
| |||
63 | 68 | | |
64 | 69 | | |
65 | 70 | | |
66 | | - | |
67 | | - | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
68 | 136 | | |
69 | 137 | | |
70 | 138 | | |
| |||
93 | 161 | | |
94 | 162 | | |
95 | 163 | | |
96 | | - | |
| 164 | + | |
97 | 165 | | |
98 | 166 | | |
99 | 167 | | |
| |||
163 | 231 | | |
164 | 232 | | |
165 | 233 | | |
166 | | - | |
| 234 | + | |
| 235 | + | |
| 236 | + | |
| 237 | + | |
| 238 | + | |
167 | 239 | | |
168 | 240 | | |
Lines changed: 88 additions & 4 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
8 | 8 | | |
9 | 9 | | |
10 | 10 | | |
11 | | - | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
12 | 14 | | |
| 15 | + | |
13 | 16 | | |
14 | 17 | | |
15 | 18 | | |
16 | 19 | | |
17 | 20 | | |
18 | 21 | | |
19 | 22 | | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
20 | 26 | | |
21 | 27 | | |
22 | 28 | | |
| 29 | + | |
| 30 | + | |
23 | 31 | | |
24 | 32 | | |
25 | 33 | | |
| |||
34 | 42 | | |
35 | 43 | | |
36 | 44 | | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
37 | 52 | | |
38 | 53 | | |
39 | 54 | | |
| |||
52 | 67 | | |
53 | 68 | | |
54 | 69 | | |
55 | | - | |
56 | | - | |
| 70 | + | |
57 | 71 | | |
58 | 72 | | |
59 | 73 | | |
| |||
77 | 91 | | |
78 | 92 | | |
79 | 93 | | |
80 | | - | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
| 141 | + | |
| 142 | + | |
| 143 | + | |
| 144 | + | |
| 145 | + | |
| 146 | + | |
| 147 | + | |
| 148 | + | |
| 149 | + | |
| 150 | + | |
| 151 | + | |
| 152 | + | |
| 153 | + | |
| 154 | + | |
| 155 | + | |
| 156 | + | |
| 157 | + | |
| 158 | + | |
| 159 | + | |
| 160 | + | |
| 161 | + | |
| 162 | + | |
| 163 | + | |
| 164 | + | |
81 | 165 | | |
82 | 166 | | |
83 | 167 | | |
| |||
Lines changed: 2 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
534 | 534 | | |
535 | 535 | | |
536 | 536 | | |
| 537 | + | |
537 | 538 | | |
538 | 539 | | |
539 | 540 | | |
| |||
585 | 586 | | |
586 | 587 | | |
587 | 588 | | |
| 589 | + | |
588 | 590 | | |
589 | 591 | | |
590 | 592 | | |
| |||
Lines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
148 | 148 | | |
149 | 149 | | |
150 | 150 | | |
| 151 | + | |
151 | 152 | | |
152 | 153 | | |
153 | 154 | | |
| |||
Lines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
33 | 33 | | |
34 | 34 | | |
35 | 35 | | |
| 36 | + | |
36 | 37 | | |
37 | 38 | | |
38 | 39 | | |
Lines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
74 | 74 | | |
75 | 75 | | |
76 | 76 | | |
| 77 | + | |
77 | 78 | | |
78 | 79 | | |
79 | 80 | | |
| |||
Lines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
113 | 113 | | |
114 | 114 | | |
115 | 115 | | |
| 116 | + | |
116 | 117 | | |
117 | 118 | | |
118 | 119 | | |
| |||
Lines changed: 1 addition & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
133 | 133 | | |
134 | 134 | | |
135 | 135 | | |
| 136 | + | |
136 | 137 | | |
137 | 138 | | |
138 | 139 | | |
| |||
0 commit comments