Skip to content

Commit 4f6bcf5

Browse files
authored
[analytics-engine] Wire FINAL aggregate filter drop and join-condition adapter dispatch (opensearch-project#21911)
* [analytics-engine] Wire FINAL aggregate filter drop and join-condition adapter dispatch Two small fixes that together unblock CalciteTransposeCommandIT on the analytics-engine route end-to-end. Both surface only when a single PPL command produces (a) a non-prefix groupSet, (b) a FILTER aggCall, and (c) a Join whose condition carries PPL UDFs — which is how PPL transpose lowers via RelBuilder.unpivot()/pivot(). Today transpose is the only PPL command that hits this combination, but the bugs are general. 1. DistributedAggregateRewriter.buildOne — drop filterArg on FINAL ----------------------------------------------------------------- FINAL's input is PARTIAL output, laid out as [group keys, agg states]. The boolean column referenced by an aggCall's FILTER predicate exists only on the ORIGINAL child input that PARTIAL consumed; PARTIAL has already applied the filter while accumulating state. The Aggregate constructor's `isPredicate(input, filterArg)` check then fires when it reads filterArg=N against PARTIAL output that has fewer than N+1 columns (or whose Nth column is non-boolean). Set filterArg = -1 (Calcite's "no FILTER" sentinel — there's no create() overload that omits it) on the FINAL call. Semantically correct: FILTER is a row-level gate consumed once during accumulation; merging states never re-applies it. This generalises to multi-stage chains (PARTIAL → PARTIAL2 → FINAL): only the first stage that consumes raw rows keeps filterArg. Without this fix, transpose IT fails with `IllegalArgumentException: filter must be BOOLEAN NOT NULL` from Aggregate.<init>:178. 2. BackendPlanAdapter — dispatch OpenSearchJoin for adapter rewrite ------------------------------------------------------------------ adaptNode() walks Filter / Project / Aggregate(FINAL) and runs each RexNode through the backend's ScalarFunctionAdapter chain (e.g. ToStringFunctionAdapter rewrites NUMBER_TO_STRING to a plain CAST that isthmus understands). Calcite's FILTER_INTO_JOIN rule inlines an outer Filter's predicate into an inner Join's condition, so any PPL UDF that lived in the Project below the Filter rides into the Join condition. With Join missing from adaptNode's dispatch list, that copy of the UDF reaches isthmus unrewritten and trips "Unable to convert call NUMBER_TO_STRING(fp64?)" in RexExpressionConverter. Add an OpenSearchJoin branch that runs the join's condition through adaptRex with concatenated left+right field storage — same convention OpenSearchJoin#getOutputFieldStorage() uses on the output side. Verified -------- CalciteTransposeCommandIT (with `-Dtests.analytics.parquet_indices=true`) on 21804-merged main: * Without these fixes: 0/5 pass (5/5 hit "filter must be BOOLEAN NOT NULL") * With #1 only: 4/5 pass (testTransposeWithValueFieldNameCollision hits "Unable to convert call NUMBER_TO_STRING(fp64?)") * With #1 + #2: 5/5 pass Signed-off-by: Songkan Tang <songkant@amazon.com> * Read join field storage from the join node, not re-derived from children Address review feedback (expani): use OpenSearchJoin#getOutputFieldStorage() directly in adaptJoin instead of re-assembling it by unwrapping the children — same result (the node derives it from left ++ right child storage, which traces back to the FieldStorageInfo marked on the leaf OpenSearchTableScan), and consistent with how adaptFilter/adaptProject read storage off their node. Also drop the fieldStorage.isEmpty() short-circuit: adaptRex never indexes the storage list (it only hands it to scalar adapters, which no-op when a ref has no storage), so an empty list flows through harmlessly and yields the same result as the explicit guard — matching adaptFilter, which has no such check. BackendPlanAdapterTests 8/8 pass (incl. both join-condition tests). Signed-off-by: Songkan Tang <songkant@amazon.com> --------- Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent 4cafb38 commit 4f6bcf5

3 files changed

Lines changed: 120 additions & 1 deletion

File tree

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.analytics.planner.rel.AggregateMode;
2929
import org.opensearch.analytics.planner.rel.OpenSearchAggregate;
3030
import org.opensearch.analytics.planner.rel.OpenSearchFilter;
31+
import org.opensearch.analytics.planner.rel.OpenSearchJoin;
3132
import org.opensearch.analytics.planner.rel.OpenSearchProject;
3233
import org.opensearch.analytics.planner.rel.OpenSearchRelNode;
3334
import org.opensearch.analytics.planner.rel.OperatorAnnotation;
@@ -104,6 +105,9 @@ private static RelNode adaptNode(RelNode node, Adapters adapters) {
104105
if (node instanceof OpenSearchProject project) {
105106
return adaptProject(project, adapters, adaptedChildren, childrenChanged);
106107
}
108+
if (node instanceof OpenSearchJoin join) {
109+
return adaptJoin(join, adapters, adaptedChildren, childrenChanged);
110+
}
107111
if (node instanceof OpenSearchAggregate agg && agg.getMode() == AggregateMode.FINAL) {
108112
OpenSearchAggregate withAdaptedChildren = childrenChanged
109113
? (OpenSearchAggregate) agg.copy(agg.getTraitSet(), adaptedChildren)
@@ -114,6 +118,30 @@ private static RelNode adaptNode(RelNode node, Adapters adapters) {
114118
return childrenChanged ? node.copy(node.getTraitSet(), adaptedChildren) : node;
115119
}
116120

121+
/**
122+
* Adapts {@link OpenSearchJoin#getCondition()} so PPL UDFs inlined by
123+
* Calcite's FILTER_INTO_JOIN reach the fragment converter in their adapted shape.
124+
* Field storage is left ++ right output storage (Calcite join row-type ordering).
125+
*/
126+
private static RelNode adaptJoin(OpenSearchJoin join, Adapters adapters, List<RelNode> adaptedChildren, boolean childrenChanged) {
127+
RelNode left = childrenChanged ? adaptedChildren.get(0) : join.getLeft();
128+
RelNode right = childrenChanged ? adaptedChildren.get(1) : join.getRight();
129+
List<FieldStorageInfo> fieldStorage = join.getOutputFieldStorage();
130+
RexNode adaptedCondition = adaptRex(join.getCondition(), adapters, fieldStorage, join.getCluster());
131+
if (adaptedCondition != join.getCondition() || childrenChanged) {
132+
return new OpenSearchJoin(
133+
join.getCluster(),
134+
join.getTraitSet(),
135+
left,
136+
right,
137+
adaptedCondition,
138+
join.getJoinType(),
139+
join.getViableBackends()
140+
);
141+
}
142+
return join;
143+
}
144+
117145
private static RelNode adaptFilter(OpenSearchFilter filter, Adapters adapters, List<RelNode> adaptedChildren, boolean childrenChanged) {
118146
List<FieldStorageInfo> fieldStorage = filter.getOutputFieldStorage();
119147
RexNode adaptedCondition = adaptRex(filter.getCondition(), adapters, fieldStorage, filter.getCluster());

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/DistributedAggregateRewriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ private static AggregateCall buildOne(
304304
call.ignoreNulls(),
305305
call.rexList,
306306
List.copyOf(argList),
307-
call.filterArg,
307+
-1, // filterArg dropped: FILTER consumed by PARTIAL on raw rows; FINAL only merges states.
308308
call.distinctKeys,
309309
call.collation,
310310
hasEmptyGroup,

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/dag/BackendPlanAdapterTests.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import org.apache.calcite.plan.RelOptTable;
1212
import org.apache.calcite.plan.RelOptUtil;
1313
import org.apache.calcite.rel.RelNode;
14+
import org.apache.calcite.rel.core.JoinRelType;
1415
import org.apache.calcite.rel.logical.LogicalFilter;
16+
import org.apache.calcite.rel.logical.LogicalJoin;
1517
import org.apache.calcite.rel.logical.LogicalProject;
1618
import org.apache.calcite.rex.RexCall;
1719
import org.apache.calcite.rex.RexInputRef;
@@ -30,6 +32,7 @@
3032
import org.opensearch.analytics.planner.PlannerContext;
3133
import org.opensearch.analytics.planner.rel.AnnotatedPredicate;
3234
import org.opensearch.analytics.planner.rel.OpenSearchFilter;
35+
import org.opensearch.analytics.planner.rel.OpenSearchJoin;
3336
import org.opensearch.analytics.planner.rel.OperatorAnnotation;
3437
import org.opensearch.analytics.spi.FieldType;
3538
import org.opensearch.analytics.spi.ProjectCapability;
@@ -311,6 +314,94 @@ protected Map<ScalarFunction, ScalarFunctionAdapter> scalarFunctionAdapters() {
311314
);
312315
}
313316

317+
/**
318+
* Join condition carries SIN(integer_column) — a PPL UDF that the SIN adapter rewrites to
319+
* SIN(CAST(... AS DOUBLE)). Calcite's FILTER_INTO_JOIN rule inlines outer-Filter predicates
320+
* into inner-Join conditions, so any PPL UDF that lived above a Join can ride into the
321+
* Join's condition. Without dispatching OpenSearchJoin in adaptNode, that inlined UDF
322+
* reaches the substrait converter unrewritten and fails (e.g. PPL transpose's
323+
* `Unable to convert call NUMBER_TO_STRING(fp64?)`).
324+
*
325+
* <p>Verifies the adapter chain runs on the join condition: SIN's INPUT_REF operand
326+
* should be wrapped in a CAST after adaptation, exactly as it would on a Filter.
327+
*/
328+
public void testJoinConditionAdapterInsertsCastForIntegerField() {
329+
MockDataFusionBackend dfWithAdapter = new MockDataFusionBackend() {
330+
@Override
331+
protected Map<ScalarFunction, ScalarFunctionAdapter> scalarFunctionAdapters() {
332+
return Map.of(ScalarFunction.SIN, sinCastAdapter);
333+
}
334+
};
335+
336+
PlannerContext context = buildContext("parquet", 1, intFields(), List.of(dfWithAdapter));
337+
338+
RelOptTable leftTable = mockTable("test_index", "status", "size");
339+
RelOptTable rightTable = mockTable("test_index", "status", "size");
340+
RelNode leftScan = stubScan(leftTable);
341+
RelNode rightScan = stubScan(rightTable);
342+
343+
// Join condition: SIN(left.$0) > 0.5. SIN is a PPL UDF whose adapter inserts
344+
// CAST(... AS DOUBLE) around any RexInputRef operand whose type is INTEGER/BIGINT.
345+
RexNode sinCall = rexBuilder.makeCall(SIN_FUNCTION, rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0));
346+
RexNode condition = rexBuilder.makeCall(
347+
SqlStdOperatorTable.GREATER_THAN,
348+
sinCall,
349+
rexBuilder.makeLiteral(0.5, typeFactory.createSqlType(SqlTypeName.DOUBLE), true)
350+
);
351+
RelNode join = LogicalJoin.create(leftScan, rightScan, List.of(), condition, Set.of(), JoinRelType.INNER);
352+
353+
RelNode marked = runPlanner(join, context);
354+
355+
QueryDAG dag = DAGBuilder.build(marked, context.getCapabilityRegistry(), mockClusterService(), TEST_RESOLVER);
356+
PlanForker.forkAll(dag, context.getCapabilityRegistry());
357+
BackendPlanAdapter.adaptAll(dag, context.getCapabilityRegistry());
358+
359+
StagePlan plan = findStagePlanByFragmentType(dag, OpenSearchJoin.class);
360+
OpenSearchJoin adaptedJoin = (OpenSearchJoin) plan.resolvedFragment();
361+
RexCall sinResult = findCallByName(adaptedJoin.getCondition(), "SIN");
362+
assertNotNull("SIN call should exist in adapted join condition", sinResult);
363+
assertEquals(
364+
"SIN operand should be CAST after adaptation in join condition",
365+
SqlKind.CAST,
366+
sinResult.getOperands().getFirst().getKind()
367+
);
368+
}
369+
370+
/**
371+
* Join with no adapted UDFs in its condition — adapter chain runs but rewrites nothing,
372+
* the Join's condition passes through structurally unchanged. Guards against the
373+
* dispatcher accidentally rebuilding (and possibly mis-configuring) Joins that don't
374+
* need adaptation.
375+
*/
376+
public void testJoinConditionNoOpWhenNoAdaptedFunctions() {
377+
PlannerContext context = buildContext("parquet", 1, intFields());
378+
379+
RelOptTable table = mockTable("test_index", "status", "size");
380+
RelNode leftScan = stubScan(table);
381+
RelNode rightScan = stubScan(table);
382+
383+
// Equi-join on left.$0 == right.$0 — no PPL UDF, nothing for adapter to rewrite.
384+
RexNode condition = rexBuilder.makeCall(
385+
SqlStdOperatorTable.EQUALS,
386+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0),
387+
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 2)
388+
);
389+
RelNode join = LogicalJoin.create(leftScan, rightScan, List.of(), condition, Set.of(), JoinRelType.INNER);
390+
391+
RelNode marked = runPlanner(join, context);
392+
QueryDAG dag = DAGBuilder.build(marked, context.getCapabilityRegistry(), mockClusterService(), TEST_RESOLVER);
393+
PlanForker.forkAll(dag, context.getCapabilityRegistry());
394+
BackendPlanAdapter.adaptAll(dag, context.getCapabilityRegistry());
395+
396+
StagePlan plan = findStagePlanByFragmentType(dag, OpenSearchJoin.class);
397+
OpenSearchJoin adaptedJoin = (OpenSearchJoin) plan.resolvedFragment();
398+
// Condition is structurally identical: EQUALS($0, $2) — both operands still INPUT_REF.
399+
RexCall eq = (RexCall) adaptedJoin.getCondition();
400+
assertEquals(SqlKind.EQUALS, eq.getKind());
401+
assertEquals(SqlKind.INPUT_REF, eq.getOperands().get(0).getKind());
402+
assertEquals(SqlKind.INPUT_REF, eq.getOperands().get(1).getKind());
403+
}
404+
314405
private static RexCall findCallByName(RexNode node, String name) {
315406
if (node instanceof AnnotatedPredicate annotated) return findCallByName(annotated.getOriginal(), name);
316407
if (node instanceof RexCall call) {

0 commit comments

Comments
 (0)