Skip to content

Commit 1ecf42d

Browse files
authored
add bucket_nullable for eventstats (#4817)
1 parent bf37067 commit 1ecf42d

11 files changed

Lines changed: 199 additions & 6 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/tree/Window.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
public class Window extends UnresolvedPlan {
2222

2323
private final List<UnresolvedExpression> windowFunctionList;
24+
private final List<UnresolvedExpression> groupList;
25+
private final boolean bucketNullable;
2426
@ToString.Exclude private UnresolvedPlan child;
2527

2628
@Override

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1614,9 +1614,32 @@ private static void buildDedupNotNull(
16141614
@Override
16151615
public RelNode visitWindow(Window node, CalcitePlanContext context) {
16161616
visitChildren(node, context);
1617+
1618+
List<UnresolvedExpression> groupList = node.getGroupList();
1619+
boolean hasGroup = groupList != null && !groupList.isEmpty();
1620+
boolean bucketNullable = node.isBucketNullable();
1621+
16171622
List<RexNode> overExpressions =
16181623
node.getWindowFunctionList().stream().map(w -> rexVisitor.analyze(w, context)).toList();
1619-
context.relBuilder.projectPlus(overExpressions);
1624+
1625+
if (hasGroup && !bucketNullable) {
1626+
// construct groupNotNull predicate
1627+
List<RexNode> groupByList =
1628+
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1629+
List<RexNode> notNullList =
1630+
PlanUtils.getSelectColumns(groupByList).stream()
1631+
.map(context.relBuilder::field)
1632+
.map(context.relBuilder::isNotNull)
1633+
.toList();
1634+
RexNode groupNotNull = context.relBuilder.and(notNullList);
1635+
1636+
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1637+
List<RexNode> wrappedOverExprs =
1638+
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1639+
context.relBuilder.projectPlus(wrappedOverExprs);
1640+
} else {
1641+
context.relBuilder.projectPlus(overExpressions);
1642+
}
16201643
return context.relBuilder.peek();
16211644
}
16221645

docs/user/ppl/cmd/eventstats.rst

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@ The ``stats`` and ``eventstats`` commands are both used for calculating statisti
4040

4141
Syntax
4242
======
43-
eventstats <function>... [by-clause]
43+
eventstats [bucket_nullable=bool] <function>... [by-clause]
4444

4545
* function: mandatory. An aggregation function or window function.
46+
* bucket_nullable: optional. Controls whether the eventstats command consider null buckets as a valid group in group-by aggregations. When set to ``false``, it will not treat null group-by values as a distinct group during aggregation. **Default:** Determined by ``plugins.ppl.syntax.legacy.preferred``.
47+
48+
* When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true``
49+
* When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false``
50+
4651
* by-clause: optional. Groups results by specified fields or expressions. Syntax: by [span-expression,] [field,]... **Default:** aggregation over the entire result set.
4752
* span-expression: optional, at most one. Splits field into buckets by intervals. Syntax: span(field_expr, interval_expr). For example, ``span(age, 10)`` creates 10-year age buckets, ``span(timestamp, 1h)`` creates hourly buckets.
4853

@@ -126,3 +131,32 @@ PPL query::
126131
| 13 | F | 28 | 1 |
127132
| 18 | M | 33 | 2 |
128133
+----------------+--------+-----+-----+
134+
135+
Example 3: Null buckets handling
136+
================================
137+
138+
PPL query::
139+
140+
os> source=accounts | eventstats bucket_nullable=false count() as cnt by employer | fields account_number, firstname, employer, cnt | sort account_number;
141+
fetched rows / total rows = 4/4
142+
+----------------+-----------+----------+------+
143+
| account_number | firstname | employer | cnt |
144+
|----------------+-----------+----------+------|
145+
| 1 | Amber | Pyrami | 1 |
146+
| 6 | Hattie | Netagy | 1 |
147+
| 13 | Nanette | Quility | 1 |
148+
| 18 | Dale | null | null |
149+
+----------------+-----------+----------+------+
150+
151+
PPL query::
152+
153+
os> source=accounts | eventstats bucket_nullable=true count() as cnt by employer | fields account_number, firstname, employer, cnt | sort account_number;
154+
fetched rows / total rows = 4/4
155+
+----------------+-----------+----------+-----+
156+
| account_number | firstname | employer | cnt |
157+
|----------------+-----------+----------+-----|
158+
| 1 | Amber | Pyrami | 1 |
159+
| 6 | Hattie | Netagy | 1 |
160+
| 13 | Nanette | Quility | 1 |
161+
| 18 | Dale | null | 1 |
162+
+----------------+-----------+----------+-----+

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,16 @@ public void testEventstatsDistinctCountFunctionExplain() throws IOException {
618618
assertJsonEqualsIgnoreId(expected, result);
619619
}
620620

621+
@Test
622+
public void testEventstatsNullBucketExplain() throws IOException {
623+
String query =
624+
"source=opensearch-sql_test_index_account | eventstats bucket_nullable=false count() by"
625+
+ " state";
626+
var result = explainQueryYaml(query);
627+
String expected = loadExpectedPlan("explain_eventstats_null_bucket.yaml");
628+
assertYamlEqualsIgnoreId(expected, result);
629+
}
630+
621631
@Test
622632
public void testStreamstatsDistinctCountExplain() throws IOException {
623633
String query =

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLEventstatsIT.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,40 @@ public void testEventstatsByWithNull() throws IOException {
165165
rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30));
166166
}
167167

168+
@Test
169+
public void testEventstatsByWithNullBucket() throws IOException {
170+
JSONObject actual =
171+
executeQuery(
172+
String.format(
173+
"source=%s | eventstats bucket_nullable=false count() as cnt, avg(age) as avg,"
174+
+ " min(age) as min, max(age) as max by country",
175+
TEST_INDEX_STATE_COUNTRY_WITH_NULL));
176+
177+
verifyDataRows(
178+
actual,
179+
rows("Kevin", null, null, 4, 2023, null, null, null, null, null),
180+
rows(null, "Canada", null, 4, 2023, 10, 3, 18.333333333333332, 10, 25),
181+
rows("John", "Canada", "Ontario", 4, 2023, 25, 3, 18.333333333333332, 10, 25),
182+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3, 18.333333333333332, 10, 25),
183+
rows("Jake", "USA", "California", 4, 2023, 70, 2, 50, 30, 70),
184+
rows("Hello", "USA", "New York", 4, 2023, 30, 2, 50, 30, 70));
185+
186+
actual =
187+
executeQuery(
188+
String.format(
189+
"source=%s | eventstats bucket_nullable=false count() as cnt, avg(age) as avg,"
190+
+ " min(age) as min, max(age) as max by state",
191+
TEST_INDEX_STATE_COUNTRY_WITH_NULL));
192+
verifyDataRows(
193+
actual,
194+
rows(null, "Canada", null, 4, 2023, 10, null, null, null, null),
195+
rows("Kevin", null, null, 4, 2023, null, null, null, null, null),
196+
rows("John", "Canada", "Ontario", 4, 2023, 25, 1, 25, 25, 25),
197+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 1, 20, 20, 20),
198+
rows("Jake", "USA", "California", 4, 2023, 70, 1, 70, 70, 70),
199+
rows("Hello", "USA", "New York", 4, 2023, 30, 1, 30, 30, 30));
200+
}
201+
168202
@Test
169203
public void testEventstatsBySpan() throws IOException {
170204
JSONObject actual =
@@ -324,6 +358,26 @@ public void testMultipleEventstatsWithNull() throws IOException {
324358
rows("Hello", "USA", "New York", 4, 2023, 30, 30.0, 50.0));
325359
}
326360

361+
@Test
362+
public void testMultipleEventstatsWithNullBucket() throws IOException {
363+
JSONObject actual =
364+
executeQuery(
365+
String.format(
366+
"source=%s | eventstats bucket_nullable=false avg(age) as avg_age by state, country"
367+
+ " | eventstats bucket_nullable=false avg(avg_age) as avg_state_age by"
368+
+ " country",
369+
TEST_INDEX_STATE_COUNTRY_WITH_NULL));
370+
371+
verifyDataRows(
372+
actual,
373+
rows("Kevin", null, null, 4, 2023, null, null, null),
374+
rows(null, "Canada", null, 4, 2023, 10, null, 22.5),
375+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20.0, 22.5),
376+
rows("John", "Canada", "Ontario", 4, 2023, 25, 25.0, 22.5),
377+
rows("Jake", "USA", "California", 4, 2023, 70, 70.0, 50.0),
378+
rows("Hello", "USA", "New York", 4, 2023, 30, 30.0, 50.0));
379+
}
380+
327381
@Test
328382
public void testMultipleEventstatsWithEval() throws IOException {
329383
JSONObject actual =
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], count()=[CASE(IS NOT NULL($7), COUNT() OVER (PARTITION BY $7), null:BIGINT)])
5+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
6+
physical: |
7+
EnumerableLimit(fetch=[10000])
8+
EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], expr#14=[CASE($t11, $t12, $t13)], proj#0..10=[{exprs}], count()=[$t14])
9+
EnumerableWindow(window#0=[window(partition {7} aggs [COUNT()])])
10+
EnumerableCalc(expr#0..10=[{inputs}], expr#11=[IS NOT NULL($t7)], proj#0..11=[{exprs}])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], count()=[CASE(IS NOT NULL($7), COUNT() OVER (PARTITION BY $7), null:BIGINT)])
5+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
6+
physical: |
7+
EnumerableLimit(fetch=[10000])
8+
EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], expr#14=[CASE($t11, $t12, $t13)], proj#0..10=[{exprs}], count()=[$t14])
9+
EnumerableWindow(window#0=[window(partition {7} aggs [COUNT()])])
10+
EnumerableCalc(expr#0..16=[{inputs}], expr#17=[IS NOT NULL($t7)], proj#0..10=[{exprs}], $11=[$t17])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ dedupSplitArg
255255
;
256256

257257
eventstatsCommand
258-
: EVENTSTATS eventstatsAggTerm (COMMA eventstatsAggTerm)* (statsByClause)?
258+
: EVENTSTATS (bucketNullableArg)? eventstatsAggTerm (COMMA eventstatsAggTerm)* (statsByClause)?
259259
;
260260

261261
streamstatsCommand

ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,14 +481,24 @@ public UnresolvedPlan visitStatsCommand(StatsCommandContext ctx) {
481481

482482
/** Eventstats command. */
483483
public UnresolvedPlan visitEventstatsCommand(OpenSearchPPLParser.EventstatsCommandContext ctx) {
484+
// 1. Parse arguments from the eventstats command
485+
List<Argument> argExprList = ArgumentFactory.getArgumentList(ctx, settings);
486+
ArgumentMap arguments = ArgumentMap.of(argExprList);
487+
488+
// bucket_nullable
489+
boolean bucketNullable =
490+
(Boolean) arguments.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
491+
492+
// 2. Build groupList
493+
List<UnresolvedExpression> groupList = getPartitionExprList(ctx.statsByClause());
494+
484495
ImmutableList.Builder<UnresolvedExpression> windownFunctionListBuilder =
485496
new ImmutableList.Builder<>();
486497
for (OpenSearchPPLParser.EventstatsAggTermContext aggCtx : ctx.eventstatsAggTerm()) {
487498
UnresolvedExpression windowFunction = internalVisitExpression(aggCtx.windowFunction());
488499
// set partition by list for window function
489500
if (windowFunction instanceof WindowFunction) {
490-
((WindowFunction) windowFunction)
491-
.setPartitionByList(getPartitionExprList(ctx.statsByClause()));
501+
((WindowFunction) windowFunction).setPartitionByList(groupList);
492502
}
493503
String name =
494504
aggCtx.alias == null
@@ -498,7 +508,7 @@ public UnresolvedPlan visitEventstatsCommand(OpenSearchPPLParser.EventstatsComma
498508
windownFunctionListBuilder.add(alias);
499509
}
500510

501-
return new Window(windownFunctionListBuilder.build());
511+
return new Window(windownFunctionListBuilder.build(), groupList, bucketNullable);
502512
}
503513

504514
/** Streamstats command. */

ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.DecimalLiteralContext;
2727
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.DedupCommandContext;
2828
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.DefaultSortFieldContext;
29+
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.EventstatsCommandContext;
2930
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext;
3031
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IntegerLiteralContext;
3132
import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.PrefixSortFieldContext;
@@ -111,6 +112,22 @@ public static List<Argument> getArgumentList(StreamstatsCommandContext ctx) {
111112
: new Argument("global", new Literal(true, DataType.BOOLEAN)));
112113
}
113114

115+
/**
116+
* Get list of {@link Argument}.
117+
*
118+
* @param ctx EventstatsCommandContext instance
119+
* @return the list of arguments fetched from the eventstats command
120+
*/
121+
public static List<Argument> getArgumentList(EventstatsCommandContext ctx, Settings settings) {
122+
return Collections.singletonList(
123+
ctx.bucketNullableArg() != null && !ctx.bucketNullableArg().isEmpty()
124+
? new Argument(
125+
Argument.BUCKET_NULLABLE, getArgumentValue(ctx.bucketNullableArg().bucket_nullable))
126+
: new Argument(
127+
Argument.BUCKET_NULLABLE,
128+
legacyPreferred(settings) ? Literal.TRUE : Literal.FALSE));
129+
}
130+
114131
/**
115132
* Get list of {@link Argument}.
116133
*

0 commit comments

Comments
 (0)