Skip to content

Commit 96e5303

Browse files
committed
Ignore rows without a row split in chart command
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 285780e commit 96e5303

10 files changed

Lines changed: 106 additions & 68 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.common.collect.Streams;
3131
import java.util.ArrayList;
3232
import java.util.Arrays;
33+
import java.util.BitSet;
3334
import java.util.Collections;
3435
import java.util.Comparator;
3536
import java.util.HashSet;
@@ -39,6 +40,7 @@
3940
import java.util.Optional;
4041
import java.util.Set;
4142
import java.util.stream.Collectors;
43+
import java.util.stream.IntStream;
4244
import java.util.stream.Stream;
4345
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
4446
import org.apache.calcite.plan.RelOptTable;
@@ -1097,7 +1099,15 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
10971099

10981100
@Override
10991101
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
1100-
visitAggregation(node, context, true);
1102+
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
1103+
Boolean bucketNullable =
1104+
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
1105+
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
1106+
BitSet nonNullGroupMask = new BitSet(nGroup);
1107+
if (!bucketNullable) {
1108+
nonNullGroupMask.set(0, nGroup);
1109+
}
1110+
visitAggregation(node, context, true, nonNullGroupMask);
11011111
return context.relBuilder.peek();
11021112
}
11031113

@@ -1108,8 +1118,10 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11081118
* @param context the Calcite plan context for building RelNodes
11091119
* @param aggFirst if true, aggregation results (metrics) appear first in output schema (agg,
11101120
* group-by fields); if false, group expressions appear first (group-by fields, agg).
1121+
* @param nonNullGroupMask bit set indicating group by fields that need to be non-null
11111122
*/
1112-
private void visitAggregation(Aggregation node, CalcitePlanContext context, boolean aggFirst) {
1123+
private void visitAggregation(
1124+
Aggregation node, CalcitePlanContext context, boolean aggFirst, BitSet nonNullGroupMask) {
11131125
visitChildren(node, context);
11141126

11151127
List<UnresolvedExpression> aggExprList = node.getAggExprList();
@@ -1119,36 +1131,30 @@ private void visitAggregation(Aggregation node, CalcitePlanContext context, bool
11191131
UnresolvedExpression span = node.getSpan();
11201132
if (Objects.nonNull(span)) {
11211133
groupExprList.add(span);
1122-
List<RexNode> timeSpanFilters =
1123-
getTimeSpanField(span).stream()
1124-
.map(f -> rexVisitor.analyze(f, context))
1125-
.map(context.relBuilder::isNotNull)
1126-
.toList();
1127-
if (!timeSpanFilters.isEmpty()) {
1128-
// add isNotNull filter before aggregation for time span
1129-
context.relBuilder.filter(timeSpanFilters);
1134+
if (getTimeSpanField(span).isPresent()){
1135+
nonNullGroupMask.set(0);
11301136
}
11311137
}
11321138
groupExprList.addAll(node.getGroupExprList());
11331139

11341140
// add stats hint to LogicalAggregation
1135-
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
1136-
Boolean bucketNullable =
1137-
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
1138-
boolean toAddHintsOnAggregate = false;
1139-
if (!bucketNullable
1140-
&& !groupExprList.isEmpty()
1141-
&& !(groupExprList.size() == 1 && getTimeSpanField(span).isPresent())) {
1142-
toAddHintsOnAggregate = true;
1143-
// add isNotNull filter before aggregation for non-nullable buckets
1144-
List<RexNode> groupByList =
1145-
groupExprList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1146-
context.relBuilder.filter(
1147-
PlanUtils.getSelectColumns(groupByList).stream()
1148-
.map(context.relBuilder::field)
1149-
.map(context.relBuilder::isNotNull)
1150-
.toList());
1151-
}
1141+
boolean toAddHintsOnAggregate =
1142+
nonNullGroupMask.nextClearBit(0) >= groupExprList.size() // This checks if all group-bys are nonnull
1143+
&& !groupExprList.isEmpty()
1144+
&& !(groupExprList.size() == 1 && getTimeSpanField(span).isPresent());
1145+
// add isNotNull filter before aggregation for non-nullable buckets
1146+
List<RexNode> groupByList =
1147+
groupExprList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1148+
List<RexNode> nonNullGroupBys =
1149+
IntStream.range(0, groupByList.size())
1150+
.filter(nonNullGroupMask::get)
1151+
.mapToObj(groupByList::get)
1152+
.toList();
1153+
context.relBuilder.filter(
1154+
PlanUtils.getSelectColumns(nonNullGroupBys).stream()
1155+
.map(context.relBuilder::field)
1156+
.map(context.relBuilder::isNotNull)
1157+
.toList());
11521158

11531159
Pair<List<RexNode>, List<AggCall>> aggregationAttributes =
11541160
aggregateWithTrimming(groupExprList, aggExprList, context, toAddHintsOnAggregate);
@@ -2397,12 +2403,15 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) {
23972403
ChartConfig config = ChartConfig.fromArguments(argMap);
23982404
Aggregation aggregation =
23992405
new Aggregation(
2400-
List.of(node.getAggregationFunction()),
2401-
List.of(),
2402-
groupExprList,
2403-
null,
2404-
List.of(new Argument(Argument.BUCKET_NULLABLE, AstDSL.booleanLiteral(config.useNull))));
2405-
visitAggregation(aggregation, context, false);
2406+
List.of(node.getAggregationFunction()), List.of(), groupExprList, null, List.of());
2407+
BitSet nonNullGroupMask = new BitSet(groupExprList.size());
2408+
// Rows without a row-split are always ignored
2409+
if (config.useNull) {
2410+
nonNullGroupMask.set(0);
2411+
} else {
2412+
nonNullGroupMask.set(0, groupExprList.size());
2413+
}
2414+
visitAggregation(aggregation, context, false, nonNullGroupMask);
24062415
RelBuilder relBuilder = context.relBuilder;
24072416
String columnSplitName =
24082417
relBuilder.peek().getRowType().getFieldNames().size() > 2

docs/user/ppl/cmd/chart.rst

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,29 @@ Syntax
3030
3131
**Parameters:**
3232

33-
* **limit**: optional. Specifies the number of distinct values to display when using column split.
33+
* **limit**: optional. Specifies the number of categories to display when using column split. Each unique value in the column split field represents a category.
3434

3535
* Default: top10
3636
* Syntax: ``limit=(top|bottom)<number>`` or ``limit=<number>`` (defaults to top)
37-
* When ``limit=K`` is set, the top or bottom K distinct column split values are retained; the additional values are grouped into an "OTHER" category if ``useother`` is not set to false.
38-
* Set limit to 0 to show all distinct values without any limit.
39-
* Use ``limit=topK`` or ``limit=bottomK`` to specify whether to retain the top or bottom K column categories. The ranking is based on the aggregated values for each distinct column-split value. For example, ``chart limit=top3 count() by a b`` retains the 3 most common b categories; ``chart limit=top5 min(value) by a b`` selects the 5 b categories that contains smallest aggregated values. If not specified, top is used by default.
40-
* Only applies when column split presents (by 2 fields or over...by... coexists).
37+
* When ``limit=K`` is set, the top or bottom K categories from the column split field are retained; the remaining categories are grouped into an "OTHER" category if ``useother`` is not set to false.
38+
* Set limit to 0 to show all categories without any limit.
39+
* Use ``limit=topK`` or ``limit=bottomK`` to specify whether to retain the top or bottom K column categories. The ranking is based on the aggregated values for each category. For example, ``chart limit=top3 count() by a b`` retains the 3 most common b categories; ``chart limit=top5 min(value) by a b`` selects the 5 b categories that contain the smallest aggregated values. If not specified, top is used by default.
40+
* Only applies when column split is present (by 2 fields or over...by... coexists).
4141

42-
* **useother**: optional. Controls whether to create an "OTHER" category for distinct column-split values beyond the limit.
42+
* **useother**: optional. Controls whether to create an "OTHER" category for categories beyond the limit.
4343

4444
* Default: true
45-
* When set to false, only the top/bottom N distinct column-split values (based on limit) are shown without an "OTHER" category.
46-
* When set to true, distinct values beyond the limit are grouped into an "OTHER" category.
47-
* Only applies when using column split and when there are more distinct column-split values than the limit.
45+
* When set to false, only the top/bottom N categories (based on limit) are shown without an "OTHER" category.
46+
* When set to true, categories beyond the limit are grouped into an "OTHER" category.
47+
* Only applies when using column split and when there are more categories than the limit.
4848

4949
* **usenull**: optional. Controls whether to group events without a column split (i.e. whose column split is null) into a separate "NULL" category.
5050

5151
* Default: true
52+
* ``usenull`` only applies to column split.
53+
* Row split should always be non-null value. Events with null values in row split will be ignored.
5254
* When ``usenull=false``, events with a null column split are excluded from results.
5355
* When ``usenull=true``, events with a null column split are grouped into a separate "NULL" category.
54-
* ``usenull`` only applies to column split. Null values in the row split are handled in the same way as normal aggregations.
5556

5657
* **nullstr**: optional. Specifies the category name for rows that do not contain the column split value.
5758

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteChartCommandIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,4 +314,21 @@ public void testChartUseNullFalseWithNullStr() throws IOException {
314314
result, schema("gender", "string"), schema("age", "string"), schema("count()", "bigint"));
315315
verifyDataRows(result, rows("M", "30", 4), rows("F", "30", 1), rows("F", "20", 1));
316316
}
317+
318+
@Test
319+
public void testChartNullsInRowSplitShouldBeIgnored() throws IOException {
320+
JSONObject result =
321+
executeQuery(
322+
"source=events_null | chart min(cpu_usage) by host region");
323+
verifySchema(
324+
result,
325+
schema("host", "string"),
326+
schema("region", "string"),
327+
schema("min(cpu_usage)", "double"));
328+
verifyDataRows(
329+
result,
330+
rows("db-01", "eu-west", 42.1),
331+
rows("web-01", "us-east", 45.2),
332+
rows("web-02", "us-west", 38.7));
333+
}
317334
}

integ-test/src/test/resources/expectedOutput/calcite/explain_chart_multiple_group_keys.yaml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ calcite:
88
LogicalProject(gender=[$0], age=[SAFE_CAST($1)], avg(balance)=[$2])
99
LogicalAggregate(group=[{0, 1}], avg(balance)=[AVG($2)])
1010
LogicalProject(gender=[$4], age=[$10], balance=[$7])
11-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
11+
LogicalFilter(condition=[IS NOT NULL($4)])
12+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
1213
LogicalProject(age=[$0], __grand_total__=[$1], _row_number_chart_=[ROW_NUMBER() OVER (ORDER BY $1 DESC NULLS LAST)])
1314
LogicalAggregate(group=[{0}], __grand_total__=[AVG($1)])
1415
LogicalFilter(condition=[IS NOT NULL($0)])
1516
LogicalProject(age=[SAFE_CAST($1)], avg(balance)=[$2])
1617
LogicalAggregate(group=[{0, 1}], avg(balance)=[AVG($2)])
1718
LogicalProject(gender=[$4], age=[$10], balance=[$7])
18-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
19+
LogicalFilter(condition=[IS NOT NULL($4)])
20+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
1921
physical: |
2022
EnumerableLimit(fetch=[10000])
2123
EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
@@ -25,11 +27,11 @@ calcite:
2527
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
2628
EnumerableSort(sort0=[$1], dir0=[ASC])
2729
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[SAFE_CAST($t1)], gender=[$t0], age=[$t3], avg(balance)=[$t2])
28-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(balance)=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
30+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[gender, balance, age], FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(balance)=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["gender","balance","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
2931
EnumerableSort(sort0=[$0], dir0=[ASC])
3032
EnumerableCalc(expr#0..2=[{inputs}], age=[$t0], $1=[$t2])
3133
EnumerableWindow(window#0=[window(order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
3234
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], age=[$t0], __grand_total__=[$t7])
3335
EnumerableAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
34-
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[SAFE_CAST($t0)], expr#3=[IS NOT NULL($t2)], $f0=[$t2], avg(balance)=[$t1], $condition=[$t3])
35-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(balance)=AVG($2)), PROJECT->[age, avg(balance)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
36+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[SAFE_CAST($t0)], expr#3=[IS NOT NULL($t2)], age=[$t2], avg(balance)=[$t1], $condition=[$t3])
37+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[gender, balance, age], FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},avg(balance)=AVG($2)), PROJECT->[age, avg(balance)]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"exists":{"field":"gender","boost":1.0}},"_source":{"includes":["gender","balance","age"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"age":{"terms":{"field":"age","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"avg(balance)":{"avg":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)