Skip to content

Commit 6b7b915

Browse files
committed
Apply a non-null filter on fields referred by aggregations
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 718135e commit 6b7b915

29 files changed

Lines changed: 256 additions & 212 deletions

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,7 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
10971097
return Pair.of(groupByList, aggCallList);
10981098
}
10991099

1100+
/** Visits an aggregation for stats command */
11001101
@Override
11011102
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11021103
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
@@ -1107,7 +1108,7 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11071108
if (!bucketNullable) {
11081109
nonNullGroupMask.set(0, nGroup);
11091110
}
1110-
visitAggregation(node, context, true, nonNullGroupMask);
1111+
visitAggregation(node, context, nonNullGroupMask, true, false);
11111112
return context.relBuilder.peek();
11121113
}
11131114

@@ -1116,12 +1117,19 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11161117
*
11171118
* @param node the aggregation node containing group expressions and aggregation functions
11181119
* @param context the Calcite plan context for building RelNodes
1119-
* @param aggFirst if true, aggregation results (metrics) appear first in output schema (agg,
1120-
* group-by fields); if false, group expressions appear first (group-by fields, agg).
11211120
* @param nonNullGroupMask bit set indicating group by fields that need to be non-null
1121+
* @param metricsFirst if true, aggregation results (metrics) appear first in output schema
1122+
* (metrics, group-by fields); if false, group expressions appear first (group-by fields,
1123+
* metrics).
1124+
* @param includeAggFieldsInNullFilter if true, also applies non-null filters to aggregation input
1125+
* fields in addition to group-by fields
11221126
*/
11231127
private void visitAggregation(
1124-
Aggregation node, CalcitePlanContext context, boolean aggFirst, BitSet nonNullGroupMask) {
1128+
Aggregation node,
1129+
CalcitePlanContext context,
1130+
BitSet nonNullGroupMask,
1131+
boolean metricsFirst,
1132+
boolean includeAggFieldsInNullFilter) {
11251133
visitChildren(node, context);
11261134

11271135
List<UnresolvedExpression> aggExprList = node.getAggExprList();
@@ -1143,15 +1151,23 @@ private void visitAggregation(
11431151
// This checks if all group-bys should be nonnull
11441152
&& nonNullGroupMask.nextClearBit(0) >= groupExprList.size();
11451153
// Add isNotNull filter before aggregation for non-nullable buckets
1146-
List<RexNode> groupByList =
1147-
groupExprList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1148-
List<RexNode> nonNullGroupBys =
1149-
IntStream.range(0, groupByList.size())
1154+
List<RexNode> nonNullCandidates =
1155+
groupExprList.stream()
1156+
.map(expr -> rexVisitor.analyze(expr, context))
1157+
.collect(Collectors.toCollection(ArrayList::new));
1158+
if (includeAggFieldsInNullFilter) {
1159+
nonNullCandidates.addAll(
1160+
PlanUtils.getInputRefsFromAggCall(
1161+
aggExprList.stream().map(expr -> aggVisitor.analyze(expr, context)).toList()));
1162+
nonNullGroupMask.set(groupExprList.size(), nonNullCandidates.size());
1163+
}
1164+
List<RexNode> nonNullFields =
1165+
IntStream.range(0, nonNullCandidates.size())
11501166
.filter(nonNullGroupMask::get)
1151-
.mapToObj(groupByList::get)
1167+
.mapToObj(nonNullCandidates::get)
11521168
.toList();
11531169
context.relBuilder.filter(
1154-
PlanUtils.getSelectColumns(nonNullGroupBys).stream()
1170+
PlanUtils.getSelectColumns(nonNullFields).stream()
11551171
.map(context.relBuilder::field)
11561172
.map(context.relBuilder::isNotNull)
11571173
.toList());
@@ -1175,7 +1191,7 @@ private void visitAggregation(
11751191
.map(context.relBuilder::field)
11761192
.map(f -> (RexNode) f)
11771193
.toList();
1178-
if (aggFirst) {
1194+
if (metricsFirst) {
11791195
// As an example, in command `stats count() by colA, colB`,
11801196
// the sequence of output schema is "count, colA, colB".
11811197
reordered.addAll(aggRexList);
@@ -2411,11 +2427,11 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) {
24112427
} else {
24122428
nonNullGroupMask.set(0, groupExprList.size());
24132429
}
2414-
visitAggregation(aggregation, context, false, nonNullGroupMask);
2430+
visitAggregation(aggregation, context, nonNullGroupMask, false, true);
24152431
RelBuilder relBuilder = context.relBuilder;
24162432

2417-
// If row or column split does not present or limit equals 0, this is the same as `stats agg
2418-
// [group by col]` because all truncating is performed on the column split
2433+
// If a second split does not present or limit equals 0, we go no further for limit, nullstr,
2434+
// otherstr parameters because all truncating & renaming is performed on the column split
24192435
if (node.getRowSplit() == null
24202436
|| node.getColumnSplit() == null
24212437
|| Objects.equals(config.limit, 0)) {

docs/user/ppl/cmd/chart.rst

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Syntax
3636
* Syntax: ``limit=(top|bottom)<number>`` or ``limit=<number>`` (defaults to top)
3737
* When ``limit=K`` is set, the top or bottom K categories from the column split field are retained; the remaining categories are grouped into an "OTHER" category if ``useother`` is not set to false.
3838
* Set limit to 0 to show all categories without any limit.
39-
* Use ``limit=topK`` or ``limit=bottomK`` to specify whether to retain the top or bottom K column categories. The ranking is based on the aggregated values for each category. For example, ``chart limit=top3 count() by a b`` retains the 3 most common b categories; ``chart limit=top5 min(value) by a b`` selects the 5 b categories that contain the smallest aggregated values. If not specified, top is used by default.
39+
* Use ``limit=topK`` or ``limit=bottomK`` to specify whether to retain the top or bottom K column categories. The ranking is based on the sum of aggregated values for each column category. For example, ``chart limit=top3 count() by region, product`` keeps the 3 products with the highest total counts across all regions. If not specified, top is used by default.
4040
* Only applies when column split is present (by 2 fields or over...by... coexists).
4141

4242
* **useother**: optional. Controls whether to create an "OTHER" category for categories beyond the limit.
@@ -50,7 +50,7 @@ Syntax
5050

5151
* Default: true
5252
* ``usenull`` only applies to column split.
53-
* Row split should always be non-null value. Events with null values in row split will be ignored.
53+
* Row split should always be non-null value. Documents with null values in row split will be ignored.
5454
* When ``usenull=false``, events with a null column split are excluded from results.
5555
* When ``usenull=true``, events with a null column split are grouped into a separate "NULL" category.
5656

@@ -84,7 +84,8 @@ Notes
8484
=====
8585

8686
* The fields generated by column splitting are converted to strings so that they are compatible with ``nullstr`` and ``otherstr`` and can be used as column names once pivoted.
87-
* The aggregation metric appears as the last column in the result. Result columns are ordered as: [row-split] [column-split] [aggregation-metrics]
87+
* Documents with null values in fields used by the aggregation function are excluded from aggregation. For example, in ``chart avg(balance) over deptno, group``, documents where ``balance`` is null are excluded from the average calculation.
88+
* The aggregation metric appears as the last column in the result. Result columns are ordered as: [row-split] [column-split] [aggregation-metrics].
8889

8990
Examples
9091
========

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteChartCommandIT.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,7 @@ public void testChartUseNullTrueWithNullStr() throws IOException {
279279
schema("age", "string"),
280280
schema("avg(balance)", "double"));
281281
verifyDataRows(
282-
result,
283-
rows("M", "30", 21702.5),
284-
rows("F", "30", 48086.0),
285-
rows("F", "20", 32838.0),
286-
rows("F", "nil", null));
282+
result, rows("M", "30", 21702.5), rows("F", "30", 48086.0), rows("F", "20", 32838.0));
287283
}
288284

289285
@Test

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,43 +1409,48 @@ public void testPushDownMinOrMaxAggOnDerivedField() throws IOException {
14091409
@Test
14101410
public void testExplainChartWithSingleGroupKey() throws IOException {
14111411
assertYamlEqualsIgnoreId(
1412-
loadExpectedPlan("explain_chart_single_group_key.yaml"),
1412+
loadExpectedPlan("chart_single_group_key.yaml"),
14131413
explainQueryYaml(
14141414
String.format("source=%s | chart avg(balance) by gender", TEST_INDEX_BANK)));
14151415

14161416
assertYamlEqualsIgnoreId(
1417-
loadExpectedPlan("explain_chart_with_span.yaml"),
1417+
loadExpectedPlan("chart_with_integer_span.yaml"),
14181418
explainQueryYaml(
14191419
String.format("source=%s | chart max(balance) by age span=10", TEST_INDEX_BANK)));
14201420

14211421
assertYamlEqualsIgnoreId(
1422-
loadExpectedPlan("explain_chart_timestamp_span.yaml"),
1422+
loadExpectedPlan("chart_with_timestamp_span.yaml"),
14231423
explainQueryYaml(
14241424
String.format(
1425-
"source=%s | chart max(value) over timestamp span=1week by category",
1426-
TEST_INDEX_TIME_DATA)));
1425+
"source=%s | chart count by @timestamp span=1day", TEST_INDEX_TIME_DATA)));
14271426
}
14281427

14291428
@Test
14301429
public void testExplainChartWithMultipleGroupKeys() throws IOException {
1431-
String expected = loadExpectedPlan("explain_chart_multiple_group_keys.yaml");
14321430
assertYamlEqualsIgnoreId(
1433-
expected,
1431+
loadExpectedPlan("chart_multiple_group_keys.yaml"),
14341432
explainQueryYaml(
14351433
String.format("source=%s | chart avg(balance) over gender by age", TEST_INDEX_BANK)));
1434+
1435+
assertYamlEqualsIgnoreId(
1436+
loadExpectedPlan("chart_timestamp_span_and_category.yaml"),
1437+
explainQueryYaml(
1438+
String.format(
1439+
"source=%s | chart max(value) over timestamp span=1week by category",
1440+
TEST_INDEX_TIME_DATA)));
14361441
}
14371442

14381443
@Test
14391444
public void testExplainChartWithLimits() throws IOException {
1440-
String expected = loadExpectedPlan("explain_chart_with_limit.yaml");
1445+
String expected = loadExpectedPlan("chart_with_limit.yaml");
14411446
assertYamlEqualsIgnoreId(
14421447
expected,
14431448
explainQueryYaml(
14441449
String.format(
14451450
"source=%s | chart limit=0 avg(balance) over state by gender", TEST_INDEX_BANK)));
14461451

14471452
assertYamlEqualsIgnoreId(
1448-
loadExpectedPlan("explain_chart_use_other.yaml"),
1453+
loadExpectedPlan("chart_use_other.yaml"),
14491454
explainQueryYaml(
14501455
String.format(
14511456
"source=%s | chart limit=2 useother=true otherstr='max_among_other'"
@@ -1455,7 +1460,7 @@ public void testExplainChartWithLimits() throws IOException {
14551460

14561461
@Test
14571462
public void testExplainChartWithNullStr() throws IOException {
1458-
String expected = loadExpectedPlan("explain_chart_null_str.yaml");
1463+
String expected = loadExpectedPlan("chart_null_str.yaml");
14591464
assertYamlEqualsIgnoreId(
14601465
expected,
14611466
explainQueryYaml(

integ-test/src/test/resources/expectedOutput/calcite/chart_multiple_group_keys.yaml

Lines changed: 36 additions & 0 deletions
Large diffs are not rendered by default.

integ-test/src/test/resources/expectedOutput/calcite/chart_null_str.yaml

Lines changed: 40 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$0], dir0=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$0], dir0=[ASC])
5+
LogicalAggregate(group=[{0}], avg(balance)=[AVG($1)])
6+
LogicalProject(gender=[$4], balance=[$7])
7+
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
8+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
9+
physical: |
10+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[FILTER->AND(IS NOT NULL($4), IS NOT NULL($7)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},avg(balance)=AVG($1)), SORT->[0], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"gender","boost":1.0}},{"exists":{"field":"balance","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_chart_timestamp_span.yaml renamed to integ-test/src/test/resources/expectedOutput/calcite/chart_timestamp_span_and_category.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,25 @@ calcite:
88
LogicalProject(timestamp=[$1], category=[$0], max(value)=[$2])
99
LogicalAggregate(group=[{0, 2}], max(value)=[MAX($1)])
1010
LogicalProject(category=[$1], value=[$2], timestamp0=[SPAN($3, 1, 'w')])
11-
LogicalFilter(condition=[IS NOT NULL($3)])
11+
LogicalFilter(condition=[AND(IS NOT NULL($3), IS NOT NULL($2))])
1212
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
1313
LogicalProject(category=[$0], __grand_total__=[$1], _row_number_chart_=[ROW_NUMBER() OVER (ORDER BY $1 DESC NULLS LAST)])
1414
LogicalAggregate(group=[{0}], __grand_total__=[SUM($1)])
1515
LogicalFilter(condition=[IS NOT NULL($0)])
1616
LogicalProject(category=[$0], max(value)=[$2])
1717
LogicalAggregate(group=[{0, 2}], max(value)=[MAX($1)])
1818
LogicalProject(category=[$1], value=[$2], timestamp0=[SPAN($3, 1, 'w')])
19-
LogicalFilter(condition=[IS NOT NULL($3)])
19+
LogicalFilter(condition=[AND(IS NOT NULL($3), IS NOT NULL($2))])
2020
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
2121
physical: |
2222
EnumerableLimit(fetch=[10000])
2323
EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
2424
EnumerableAggregate(group=[{0, 1}], max(value)=[MAX($2)])
2525
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NULL($t1)], expr#6=['NULL'], expr#7=[10], expr#8=[<=($t4, $t7)], expr#9=['OTHER'], expr#10=[CASE($t5, $t6, $t8, $t1, $t9)], timestamp=[$t0], category=[$t10], max(value)=[$t2])
2626
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
27-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[category, value, timestamp], FILTER->IS NOT NULL($2), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},max(value)=MAX($1)), PROJECT->[timestamp0, category, max(value)], SORT->[1]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"timestamp","boost":1.0}},"_source":{"includes":["category","value","timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":true,"missing_order":"last","order":"asc"}}},{"timestamp0":{"date_histogram":{"field":"timestamp","missing_bucket":false,"order":"asc","calendar_interval":"1w"}}}]},"aggregations":{"max(value)":{"max":{"field":"value"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
27+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[category, value, timestamp], FILTER->AND(IS NOT NULL($2), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},max(value)=MAX($1)), PROJECT->[timestamp0, category, max(value)], SORT->[1]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"timestamp","boost":1.0}},{"exists":{"field":"value","boost":1.0}}],"filter":[{"exists":{"field":"category","boost":1.0}},{"exists":{"field":"category","boost":1.0}},{"exists":{"field":"category","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["category","value","timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":true,"missing_order":"last","order":"asc"}}},{"timestamp0":{"date_histogram":{"field":"timestamp","missing_bucket":false,"order":"asc","calendar_interval":"1w"}}}]},"aggregations":{"max(value)":{"max":{"field":"value"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
2828
EnumerableSort(sort0=[$0], dir0=[ASC])
2929
EnumerableCalc(expr#0..2=[{inputs}], category=[$t0], $1=[$t2])
3030
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
3131
EnumerableAggregate(group=[{0}], __grand_total__=[SUM($1)])
32-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[category, value, timestamp], FILTER->IS NOT NULL($2), FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},max(value)=MAX($1)), PROJECT->[category, max(value)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"filter":[{"exists":{"field":"timestamp","boost":1.0}},{"exists":{"field":"category","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["category","value","timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"timestamp0":{"date_histogram":{"field":"timestamp","missing_bucket":false,"order":"asc","calendar_interval":"1w"}}}]},"aggregations":{"max(value)":{"max":{"field":"value"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
32+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[PROJECT->[category, value, timestamp], FILTER->AND(IS NOT NULL($2), IS NOT NULL($1)), FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},max(value)=MAX($1)), PROJECT->[category, max(value)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"timestamp","boost":1.0}},{"exists":{"field":"value","boost":1.0}}],"filter":[{"exists":{"field":"category","boost":1.0}},{"exists":{"field":"category","boost":1.0}},{"exists":{"field":"category","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["category","value","timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"timestamp0":{"date_histogram":{"field":"timestamp","missing_bucket":false,"order":"asc","calendar_interval":"1w"}}}]},"aggregations":{"max(value)":{"max":{"field":"value"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)