Skip to content

Commit ba2a5a2

Browse files
authored
[analytics-datafusion] Apply aggregate mode in indexed executor, remove dc() gate (opensearch-project#22102)
The indexed executor now applies aggregate_mode stripping (Mode::Partial) after building the physical plan, producing Binary HLL state instead of Int64. This fixes the dc() schema mismatch without needing to suppress filter delegation at marking time. Changes: - indexed_executor.rs: apply_aggregate_mode after physical plan build - substrait_to_tree.rs: delegation_possible UDF returns first arg (transparent pass-through) instead of panicking - Remove engine-native-merge gate from OpenSearchFilterRule - Remove dead code: PlannerContext.hasEngineNativeMergeAggregate, PlannerImpl.containsEngineNativeMergeAggregate - Expand FilterDelegationExtendedIT with stddev_pop/samp, percentile, additional dc() shapes (29 exact-value + 6 double-tolerance cases) Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
1 parent 797128a commit ba2a5a2

7 files changed

Lines changed: 88 additions & 53 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/rust/src/agg_mode.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,23 @@ mod tests {
319319
!rules.iter().any(|r| r.name() == combine_name),
320320
"CombinePartialFinalAggregate should be filtered out"
321321
);
322-
// Verify we still have other rules
323322
assert!(!rules.is_empty(), "Should have other optimizer rules");
324323
}
324+
325+
/// Verifies apply_aggregate_mode(Partial) strips the Final aggregate and keeps
326+
/// only the Partial subtree — the core behavior the indexed executor relies on
327+
/// for engine-native-merge (dc/HLL) queries.
328+
#[tokio::test]
329+
async fn test_apply_partial_strips_final() {
330+
let plan = make_agg_plan().await;
331+
let display_before = plan_string(&plan);
332+
assert!(display_before.contains("AggregateExec: mode=Final"), "expected Final in plan");
333+
assert!(display_before.contains("AggregateExec: mode=Partial"), "expected Partial in plan");
334+
335+
let stripped = apply_aggregate_mode(plan, Mode::Partial).unwrap();
336+
let display_after = plan_string(&stripped);
337+
assert!(!display_after.contains("mode=Final"), "Final should be stripped");
338+
assert!(display_after.contains("mode=Partial"), "Partial should remain");
339+
}
340+
325341
}

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ async unsafe fn execute_indexed_with_context_inner(
500500

501501
let query_config = Arc::new(handle.query_config);
502502
let num_partitions = query_config.target_partitions.max(1);
503+
let aggregate_mode = handle.aggregate_mode;
503504
let ctx = handle.ctx;
504505
let table_name = handle.table_name;
505506
let table_path = handle.table_path;
@@ -880,6 +881,13 @@ async unsafe fn execute_indexed_with_context_inner(
880881
// types. The target is schema_coerce::coerce_inferred_schema(physical_schema) — same
881882
// narrowing the partition-stream registration uses, so consumer-side StreamingTable
882883
// and producer-side batches agree by construction (see crate::relabel_exec).
884+
// Apply aggregate mode stripping when prepare_partial_plan was called (engine-native-merge).
885+
// This makes the indexed executor produce Binary HLL state (Partial) instead of Int64 (Final).
886+
let physical_plan = if aggregate_mode != crate::agg_mode::Mode::Default {
887+
crate::agg_mode::apply_aggregate_mode(physical_plan, aggregate_mode)?
888+
} else {
889+
physical_plan
890+
};
883891
let target_schema = crate::schema_coerce::coerce_inferred_schema(physical_plan.schema());
884892
let physical_plan = crate::relabel_exec::wrap_if_relabel_needed(physical_plan, target_schema)?;
885893
log_debug!("DataFusion physical plan:\n{}", displayable(physical_plan.as_ref()).indent(true));

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/substrait_to_tree.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,13 +460,24 @@ impl ScalarUDFImpl for DelegationPossibleUdf {
460460
}
461461
fn invoke_with_args(
462462
&self,
463-
_args: ScalarFunctionArgs,
463+
args: ScalarFunctionArgs,
464464
) -> datafusion::common::Result<ColumnarValue> {
465-
Err(datafusion::common::DataFusionError::Internal(format!(
466-
"{} UDF body invoked — expr_to_bool_tree did not unwrap the marker; \
467-
treat as a serious correctness bug",
468-
DELEGATION_POSSIBLE_FUNCTION_NAME
469-
)))
465+
// Pass-through: evaluate the original predicate (first arg) directly.
466+
// Expected on the prepare_partial_plan path where FilterExec retains the
467+
// delegation_possible wrapper. On the indexed path, expr_to_bool_tree should
468+
// unwrap the marker before execution — log a warning if we reach here
469+
// unexpectedly so correctness issues surface in logs without crashing.
470+
log::warn!(
471+
"delegation_possible UDF evaluated at runtime — expected only on \
472+
prepare_partial_plan path; if this appears on a data-node indexed query, \
473+
expr_to_bool_tree may have missed the marker"
474+
);
475+
args.args.into_iter().next().ok_or_else(|| {
476+
datafusion::common::DataFusionError::Internal(format!(
477+
"{} UDF invoked with no arguments",
478+
DELEGATION_POSSIBLE_FUNCTION_NAME
479+
))
480+
})
470481
}
471482
}
472483

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/PlannerContext.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class PlannerContext {
3333
private final boolean preferMetadataDriver;
3434
private int annotationIdCounter;
3535
private RuleProfilingListener.PlannerProfile lastProfile;
36-
private boolean hasEngineNativeMergeAggregate;
3736
// Cluster settings the planner consults at planning time (oversampling factor + delegation
3837
// block-list). Defaults to planner defaults; DefaultPlanExecutor injects the live, settings-backed
3938
// instance via setPlannerSettings before planning.
@@ -128,14 +127,6 @@ public OpenSearchDistributionTraitDef getDistributionTraitDef() {
128127
return distributionTraitDef;
129128
}
130129

131-
public boolean hasEngineNativeMergeAggregate() {
132-
return hasEngineNativeMergeAggregate;
133-
}
134-
135-
public void setHasEngineNativeMergeAggregate(boolean value) {
136-
this.hasEngineNativeMergeAggregate = value;
137-
}
138-
139130
/**
140131
* Mirrors the {@code analytics.planner.prefer_metadata_driver} cluster setting at planning
141132
* time. When {@code false}, {@code OpenSearchTableScanRule} skips the permissive

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/PlannerImpl.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import org.apache.calcite.rel.RelHomogeneousShuttle;
1919
import org.apache.calcite.rel.RelNode;
2020
import org.apache.calcite.rel.RelShuttle;
21-
import org.apache.calcite.rel.core.Aggregate;
22-
import org.apache.calcite.rel.core.AggregateCall;
2321
import org.apache.calcite.rel.core.Filter;
2422
import org.apache.calcite.rel.core.Project;
2523
import org.apache.calcite.rel.core.Sort;
@@ -54,7 +52,6 @@
5452
import org.opensearch.analytics.planner.rules.OpenSearchUnionRule;
5553
import org.opensearch.analytics.planner.rules.OpenSearchUnionSplitRule;
5654
import org.opensearch.analytics.planner.rules.OpenSearchValuesRule;
57-
import org.opensearch.analytics.spi.AggregateFunction;
5855

5956
import java.util.List;
6057
import java.util.Optional;
@@ -102,7 +99,6 @@ public static RelNode runAllOptimizations(RelNode rawRelNode, PlannerContext con
10299
modifiedRelNode = reduceExpressions(modifiedRelNode, listener);
103100
modifiedRelNode = pushdownRules(modifiedRelNode, listener);
104101
modifiedRelNode = decomposeAggregates(modifiedRelNode, listener);
105-
context.setHasEngineNativeMergeAggregate(containsEngineNativeMergeAggregate(modifiedRelNode));
106102
modifiedRelNode = mark(modifiedRelNode, context, listener);
107103
LOGGER.debug("After marking:\n{}", RelOptUtil.toString(modifiedRelNode));
108104
// TODO(combine-delegated-predicates): a post-marking HEP rule should fuse same-backend
@@ -341,25 +337,6 @@ private static RelNode decomposeAggregates(RelNode input, RuleProfilingListener
341337
* <p>TODO: add SortPushdown rule here — pushes Sort below Exchange to data nodes for top-K
342338
* optimization.
343339
*/
344-
/**
345-
* True if the plan contains an aggregate with engine-native-merge semantics (intermediate
346-
* wire type differs from output type, e.g. APPROX_COUNT_DISTINCT emits Binary HLL state).
347-
* When present, filter delegation must be suppressed to avoid plan-shape divergence between
348-
* derive_schema_from_partial_plan and the data-node execution.
349-
*/
350-
private static boolean containsEngineNativeMergeAggregate(RelNode node) {
351-
if (node instanceof Aggregate agg) {
352-
for (AggregateCall call : agg.getAggCallList()) {
353-
if (AggregateFunction.isEngineNativeMerge(call)) {
354-
return true;
355-
}
356-
}
357-
}
358-
for (RelNode child : node.getInputs()) {
359-
if (containsEngineNativeMergeAggregate(child)) return true;
360-
}
361-
return false;
362-
}
363340

364341
private static RelNode mark(RelNode input, PlannerContext context, RuleProfilingListener listener) {
365342
return HepPhase.named("marking")

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchFilterRule.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -254,19 +254,6 @@ private List<String> resolveViableBackends(
254254
viableSet.retainAll(registry.scalarBackendsAnyFormat(scalarFunc, returnType));
255255
}
256256

257-
// TODO: Temporary workaround — suppress dual-viable filters for dc() (engine-native-merge)
258-
// queries. When delegation_possible UDF wraps a predicate in an APPROX_COUNT_DISTINCT plan,
259-
// the shard execution path produces Int64 instead of the expected Binary HLL intermediate,
260-
// causing a schema mismatch at the reduce sink. Proper fix requires changes to the DataFusion
261-
// local execution path to honour the prepared partial plan when delegation_possible is present.
262-
if (context.hasEngineNativeMergeAggregate() && viableSet.size() > 1) {
263-
List<String> delegationAcceptors = registry.delegationAcceptors(DelegationType.FILTER);
264-
boolean drivingBackendSurvives = viableSet.stream().anyMatch(b -> !delegationAcceptors.contains(b));
265-
if (drivingBackendSurvives) {
266-
viableSet.removeAll(delegationAcceptors);
267-
}
268-
}
269-
270257
// Per-backend delegation block-list. Drop backends that block this predicate so it is never
271258
// marked viable for them — but only when a non-blocked backend survives: blocking is a
272259
// delegation knob and must never make a predicate unexecutable.

sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/FilterDelegationExtendedIT.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,30 @@ private record Case(String pplSuffix, long expected, boolean isRowCount) {}
8484

8585
// ===== IN =====
8686
new Case("where str0 in ('apple', 'cherry') | stats count() as c", 5, false),
87-
new Case("where str0 in ('apple', 'cherry') | stats sum(num0) as c", 210, false)
87+
new Case("where str0 in ('apple', 'cherry') | stats sum(num0) as c", 210, false),
88+
89+
// ===== AVG — additional exact-integer cases =====
90+
new Case("where str0 != 'cherry' | stats avg(num0) as c", 50, false),
91+
92+
// ===== dc() across more predicate shapes =====
93+
new Case("where str0 != 'banana' | stats dc(str0) as c", 3, false),
94+
new Case("where isnull(str1) | stats dc(num0) as c", 4, false),
95+
new Case("where isnotnull(str1) | stats dc(num0) as c", 6, false),
96+
97+
// ===== PERCENTILE =====
98+
new Case("where str0 != 'apple' | stats percentile(num0, 50) as c", 70, false),
99+
new Case("where isnull(str1) | stats percentile(num0, 50) as c", 70, false)
100+
);
101+
102+
private record StatisticalCase(String pplSuffix, double expected) {}
103+
104+
private static final List<StatisticalCase> STATISTICAL_CASES = List.of(
105+
new StatisticalCase("where str0 != 'apple' | stats stddev_pop(num0) as c", 20.0),
106+
new StatisticalCase("where str0 != 'apple' | stats stddev_samp(num0) as c", 21.602469),
107+
new StatisticalCase("where isnull(str1) | stats stddev_pop(num0) as c", 25.860201),
108+
new StatisticalCase("where isnull(str1) | stats stddev_samp(num0) as c", 29.860788),
109+
new StatisticalCase("where isnotnull(str1) | stats stddev_pop(num0) as c", 27.487371),
110+
new StatisticalCase("where isnotnull(str1) | stats stddev_samp(num0) as c", 30.110906)
88111
);
89112

90113
private static boolean dataProvisioned = false;
@@ -116,6 +139,28 @@ public void testFilterDelegationExtended() throws Exception {
116139
}
117140
}
118141

142+
@SuppressWarnings("unchecked")
143+
public void testStatisticalAggregatesWithDelegation() throws Exception {
144+
List<String> failures = new java.util.ArrayList<>();
145+
for (StatisticalCase dc : STATISTICAL_CASES) {
146+
try {
147+
String ppl = "source = " + DATASET.indexName + " | " + dc.pplSuffix;
148+
Map<String, Object> result = executePpl(ppl);
149+
List<List<Object>> rows = (List<List<Object>>) result.get("datarows");
150+
assertNotNull("datarows null for [" + dc.pplSuffix + "]", rows);
151+
assertEquals("expected 1 agg row for [" + dc.pplSuffix + "]", 1, rows.size());
152+
double actual = ((Number) rows.get(0).get(0)).doubleValue();
153+
assertEquals("[" + dc.pplSuffix + "]", dc.expected, actual, 0.1);
154+
} catch (AssertionError | Exception e) {
155+
failures.add(dc.pplSuffix + " → " + e.getMessage());
156+
}
157+
}
158+
if (!failures.isEmpty()) {
159+
fail("Statistical aggregate failures (" + failures.size() + " of " + STATISTICAL_CASES.size() + "):\n"
160+
+ String.join("\n", failures));
161+
}
162+
}
163+
119164
@SuppressWarnings("unchecked")
120165
private void assertAggCount(String pplSuffix, long expected) throws Exception {
121166
String ppl = "source = " + DATASET.indexName + " | " + pplSuffix;

0 commit comments

Comments
 (0)