Skip to content

Commit 8497dd5

Browse files
committed
Handle common agg functions for OTHER category for timechart
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> # Conflicts: # core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java
1 parent 0d0c953 commit 8497dd5

7 files changed

Lines changed: 266 additions & 88 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1971,7 +1971,7 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
19711971
}
19721972

19731973
/** Helper method to get the function name for proper column naming */
1974-
private String getValueFunctionName(UnresolvedExpression aggregateFunction) {
1974+
private String getAggFieldAlias(UnresolvedExpression aggregateFunction) {
19751975
if (aggregateFunction instanceof Alias) {
19761976
return ((Alias) aggregateFunction).getName();
19771977
}
@@ -2141,15 +2141,15 @@ public RelNode visitTimechart(
21412141

21422142
// Handle no by field case
21432143
if (node.getByField() == null) {
2144-
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
2144+
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());
21452145

21462146
// Create group expression list with just the timestamp span but use a different alias
21472147
// to avoid @timestamp naming conflict
21482148
List<UnresolvedExpression> simpleGroupExprList = new ArrayList<>();
21492149
simpleGroupExprList.add(new Alias("timestamp", spanExpr));
21502150
// Create agg expression list with the aggregate function
21512151
List<UnresolvedExpression> simpleAggExprList =
2152-
List.of(new Alias(valueFunctionName, node.getAggregateFunction()));
2152+
List.of(new Alias(aggFieldAlias, node.getAggregateFunction()));
21532153
// Create an Aggregation object
21542154
Aggregation aggregation =
21552155
new Aggregation(
@@ -2164,9 +2164,9 @@ public RelNode visitTimechart(
21642164
context.relBuilder.push(result);
21652165
// Reorder fields: timestamp first, then count
21662166
context.relBuilder.project(
2167-
context.relBuilder.field("timestamp"), context.relBuilder.field(valueFunctionName));
2167+
context.relBuilder.field("timestamp"), context.relBuilder.field(aggFieldAlias));
21682168
// Rename timestamp to @timestamp
2169-
context.relBuilder.rename(List.of("@timestamp", valueFunctionName));
2169+
context.relBuilder.rename(List.of("@timestamp", aggFieldAlias));
21702170

21712171
context.relBuilder.sort(context.relBuilder.field(0));
21722172
return context.relBuilder.peek();
@@ -2175,7 +2175,7 @@ public RelNode visitTimechart(
21752175
// Extract parameters for byField case
21762176
UnresolvedExpression byField = node.getByField();
21772177
String byFieldName = ((Field) byField).getField().toString();
2178-
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
2178+
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());
21792179

21802180
int limit = Optional.ofNullable(node.getLimit()).orElse(10);
21812181
boolean useOther = Optional.ofNullable(node.getUseOther()).orElse(true);
@@ -2202,11 +2202,11 @@ public RelNode visitTimechart(
22022202

22032203
// Handle no limit case - just sort and return with proper field aliases
22042204
if (limit == 0) {
2205-
// Add final projection with proper aliases: [@timestamp, byField, valueFunctionName]
2205+
// Add final projection with proper aliases: [@timestamp, byField, aggFieldAlias]
22062206
context.relBuilder.project(
22072207
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
22082208
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
2209-
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
2209+
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
22102210
context.relBuilder.sort(context.relBuilder.field(0), context.relBuilder.field(1));
22112211
return context.relBuilder.peek();
22122212
}
@@ -2216,32 +2216,61 @@ public RelNode visitTimechart(
22162216

22172217
// Step 2: Find top N categories using window function approach (more efficient than separate
22182218
// aggregation)
2219-
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, context);
2219+
String aggFunctionName = getAggFunctionName(node.getAggregateFunction());
2220+
Optional<BuiltinFunctionName> aggFuncNameOptional = BuiltinFunctionName.of(aggFunctionName);
2221+
if (aggFuncNameOptional.isEmpty()) {
2222+
throw new IllegalArgumentException(
2223+
StringUtils.format("Unrecognized aggregation function: %s", aggFunctionName));
2224+
}
2225+
BuiltinFunctionName aggFunction = aggFuncNameOptional.get();
2226+
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, aggFunction, context);
22202227

22212228
// Step 3: Apply OTHER logic with single pass
22222229
return buildFinalResultWithOther(
2223-
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
2230+
completeResults,
2231+
topCategories,
2232+
byFieldName,
2233+
aggFunction,
2234+
aggFieldAlias,
2235+
useOther,
2236+
limit,
2237+
context);
22242238

22252239
} catch (Exception e) {
22262240
throw new RuntimeException("Error in visitTimechart: " + e.getMessage(), e);
22272241
}
22282242
}
22292243

2244+
private String getAggFunctionName(UnresolvedExpression aggregateFunction) {
2245+
if (aggregateFunction instanceof Alias alias) {
2246+
return getAggFunctionName(alias.getDelegated());
2247+
}
2248+
return ((AggregateFunction) aggregateFunction).getFuncName();
2249+
}
2250+
22302251
/** Build top categories query - simpler approach that works better with OTHER handling */
22312252
private RelNode buildTopCategoriesQuery(
2232-
RelNode completeResults, int limit, CalcitePlanContext context) {
2253+
RelNode completeResults,
2254+
int limit,
2255+
BuiltinFunctionName aggFunction,
2256+
CalcitePlanContext context) {
22332257
context.relBuilder.push(completeResults);
22342258

22352259
// Filter out null values when determining top categories - null should not count towards limit
22362260
context.relBuilder.filter(context.relBuilder.isNotNull(context.relBuilder.field(1)));
22372261

22382262
// Get totals for non-null categories - field positions: 0=@timestamp, 1=byField, 2=value
2263+
RexInputRef valueField = context.relBuilder.field(2);
2264+
AggCall call = buildAggCall(context.relBuilder, aggFunction, valueField);
2265+
22392266
context.relBuilder.aggregate(
2240-
context.relBuilder.groupKey(context.relBuilder.field(1)),
2241-
context.relBuilder.sum(context.relBuilder.field(2)).as("grand_total"));
2267+
context.relBuilder.groupKey(context.relBuilder.field(1)), call.as("grand_total"));
22422268

22432269
// Apply sorting and limit to non-null categories only
2244-
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field("grand_total")));
2270+
RexNode sortField = context.relBuilder.field("grand_total");
2271+
sortField =
2272+
aggFunction == BuiltinFunctionName.MIN ? sortField : context.relBuilder.desc(sortField);
2273+
context.relBuilder.sort(sortField);
22452274
if (limit > 0) {
22462275
context.relBuilder.limit(0, limit);
22472276
}
@@ -2253,18 +2282,25 @@ private RelNode buildFinalResultWithOther(
22532282
RelNode completeResults,
22542283
RelNode topCategories,
22552284
String byFieldName,
2256-
String valueFunctionName,
2285+
BuiltinFunctionName aggFunction,
2286+
String aggFieldAlias,
22572287
boolean useOther,
22582288
int limit,
22592289
CalcitePlanContext context) {
22602290

22612291
// Use zero-filling for count aggregations, standard result for others
2262-
if (valueFunctionName.equals("count")) {
2292+
if (aggFieldAlias.equals("count")) {
22632293
return buildZeroFilledResult(
2264-
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
2294+
completeResults, topCategories, byFieldName, aggFieldAlias, useOther, limit, context);
22652295
} else {
22662296
return buildStandardResult(
2267-
completeResults, topCategories, byFieldName, valueFunctionName, useOther, context);
2297+
completeResults,
2298+
topCategories,
2299+
byFieldName,
2300+
aggFunction,
2301+
aggFieldAlias,
2302+
useOther,
2303+
context);
22682304
}
22692305
}
22702306

@@ -2273,7 +2309,8 @@ private RelNode buildStandardResult(
22732309
RelNode completeResults,
22742310
RelNode topCategories,
22752311
String byFieldName,
2276-
String valueFunctionName,
2312+
BuiltinFunctionName aggFunctionName,
2313+
String aggFieldAlias,
22772314
boolean useOther,
22782315
CalcitePlanContext context) {
22792316

@@ -2296,11 +2333,13 @@ private RelNode buildStandardResult(
22962333
context.relBuilder.project(
22972334
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
22982335
context.relBuilder.alias(categoryExpr, byFieldName),
2299-
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
2336+
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
23002337

2338+
RexInputRef valueField = context.relBuilder.field(2);
2339+
AggCall aggCall = buildAggCall(context.relBuilder, aggFunctionName, valueField);
23012340
context.relBuilder.aggregate(
23022341
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
2303-
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
2342+
aggCall.as(aggFieldAlias));
23042343

23052344
applyFiltersAndSort(useOther, context);
23062345
return context.relBuilder.peek();
@@ -2335,7 +2374,7 @@ private RelNode buildZeroFilledResult(
23352374
RelNode completeResults,
23362375
RelNode topCategories,
23372376
String byFieldName,
2338-
String valueFunctionName,
2377+
String aggFieldAlias,
23392378
boolean useOther,
23402379
int limit,
23412380
CalcitePlanContext context) {
@@ -2374,7 +2413,7 @@ private RelNode buildZeroFilledResult(
23742413
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
23752414
"@timestamp"),
23762415
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
2377-
context.relBuilder.alias(context.relBuilder.literal(0), valueFunctionName));
2416+
context.relBuilder.alias(context.relBuilder.literal(0), aggFieldAlias));
23782417
RelNode zeroFilledCombinations = context.relBuilder.build();
23792418

23802419
// Get actual results with OTHER logic applied
@@ -2396,7 +2435,7 @@ private RelNode buildZeroFilledResult(
23962435
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
23972436
"@timestamp"),
23982437
context.relBuilder.alias(actualCategoryExpr, byFieldName),
2399-
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
2438+
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
24002439

24012440
context.relBuilder.aggregate(
24022441
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
@@ -2411,12 +2450,30 @@ private RelNode buildZeroFilledResult(
24112450
// Aggregate to combine actual and zero-filled data
24122451
context.relBuilder.aggregate(
24132452
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
2414-
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
2453+
context.relBuilder.sum(context.relBuilder.field(2)).as(aggFieldAlias));
24152454

24162455
applyFiltersAndSort(useOther, context);
24172456
return context.relBuilder.peek();
24182457
}
24192458

2459+
/**
2460+
* Aggregate a field based on a given built-in aggregation function name.
2461+
*
2462+
* <p>It is intended for secondary aggregations in timechart and chart commands. Using it
2463+
* elsewhere may lead to unintended results. It handles explicitly only MIN, MAX, AVG, COUNT,
2464+
* DISTINCT_COUNT, EARLIEST, and LATEST. It sums the results for the rest aggregation types,
2465+
* assuming them to be accumulative.
2466+
*/
2467+
private AggCall buildAggCall(
2468+
RelBuilder relBuilder, BuiltinFunctionName aggFunction, RexNode node) {
2469+
return switch (aggFunction) {
2470+
case MIN, EARLIEST -> relBuilder.min(node);
2471+
case MAX, LATEST -> relBuilder.max(node);
2472+
case AVG -> relBuilder.avg(node);
2473+
default -> relBuilder.sum(node);
2474+
};
2475+
}
2476+
24202477
@Override
24212478
public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
24222479
visitChildren(node, context);

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -422,10 +422,7 @@ public void testExplainWithReverse() throws IOException {
422422
@Test
423423
public void testExplainWithTimechartAvg() throws IOException {
424424
var result = explainQueryYaml("source=events | timechart span=1m avg(cpu_usage) by host");
425-
String expected =
426-
!isPushdownDisabled()
427-
? loadFromFile("expectedOutput/calcite/explain_timechart.yaml")
428-
: loadFromFile("expectedOutput/calcite/explain_timechart_no_pushdown.yaml");
425+
String expected = loadExpectedPlan("explain_timechart.yaml");
429426
assertYamlEqualsIgnoreId(expected, result);
430427
}
431428

integ-test/src/test/resources/expectedOutput/calcite/explain_timechart.yaml

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ calcite:
22
logical: |
33
LogicalSystemLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
5-
LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)])
5+
LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)])
66
LogicalProject(@timestamp=[$0], host=[CASE(IS NOT NULL($3), $1, CASE(IS NULL($1), null:NULL, 'OTHER'))], avg(cpu_usage)=[$2])
77
LogicalJoin(condition=[=($1, $3)], joinType=[left])
88
LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2])
99
LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)])
1010
LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')])
1111
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
1212
LogicalSort(sort0=[$1], dir0=[DESC], fetch=[10])
13-
LogicalAggregate(group=[{1}], grand_total=[SUM($2)])
13+
LogicalAggregate(group=[{1}], grand_total=[AVG($2)])
1414
LogicalFilter(condition=[IS NOT NULL($1)])
1515
LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2])
1616
LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)])
@@ -19,19 +19,21 @@ calcite:
1919
physical: |
2020
EnumerableLimit(fetch=[10000])
2121
EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
22-
EnumerableAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)])
23-
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2])
24-
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
25-
EnumerableSort(sort0=[$1], dir0=[ASC])
26-
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8])
27-
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
28-
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
29-
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
30-
EnumerableSort(sort0=[$0], dir0=[ASC])
31-
EnumerableLimit(fetch=[10])
32-
EnumerableSort(sort0=[$1], dir0=[DESC])
33-
EnumerableAggregate(group=[{0}], grand_total=[SUM($1)])
34-
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], host=[$t0], $f2=[$t8])
35-
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
36-
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
37-
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp], FILTER->IS NOT NULL($0)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"host","boost":1.0}},"_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
22+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], proj#0..1=[{exprs}], avg(cpu_usage)=[$t8])
23+
EnumerableAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)])
24+
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2])
25+
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
26+
EnumerableSort(sort0=[$1], dir0=[ASC])
27+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8])
28+
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
29+
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
30+
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
31+
EnumerableSort(sort0=[$0], dir0=[ASC])
32+
EnumerableLimit(fetch=[10])
33+
EnumerableSort(sort0=[$1], dir0=[DESC])
34+
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], host=[$t0], grand_total=[$t7])
35+
EnumerableAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
36+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], host=[$t0], $f2=[$t8])
37+
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
38+
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
39+
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp], FILTER->IS NOT NULL($0)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"host","boost":1.0}},"_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)