Skip to content

Commit f7b860b

Browse files
authored
Add bucket_nullable argument for Streamstats command (opensearch-project#4831)
* add bucket_nullable for streamstats Signed-off-by: Xinyu Hao <haoxinyu@amazon.com> * little change Signed-off-by: Xinyu Hao <haoxinyu@amazon.com> * fix IT, UT and doc Signed-off-by: Xinyu Hao <haoxinyu@amazon.com> * fix Signed-off-by: Xinyu Hao <haoxinyu@amazon.com> * change bucketNullable getOrDefault to get Signed-off-by: Xinyu Hao <haoxinyu@amazon.com> * fix error Signed-off-by: Xinyu Hao <haoxinyu@amazon.com> --------- Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
1 parent 164c3f0 commit f7b860b

25 files changed

Lines changed: 672 additions & 171 deletions

core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,50 +9,27 @@
99
import java.util.List;
1010
import lombok.EqualsAndHashCode;
1111
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
1213
import lombok.ToString;
1314
import org.opensearch.sql.ast.AbstractNodeVisitor;
1415
import org.opensearch.sql.ast.expression.UnresolvedExpression;
1516

1617
@Getter
1718
@ToString
1819
@EqualsAndHashCode(callSuper = false)
20+
@RequiredArgsConstructor
1921
public class StreamWindow extends UnresolvedPlan {
2022

2123
private final List<UnresolvedExpression> windowFunctionList;
2224
private final List<UnresolvedExpression> groupList;
2325
private final boolean current;
2426
private final int window;
2527
private final boolean global;
28+
private final boolean bucketNullable;
2629
private final UnresolvedExpression resetBefore;
2730
private final UnresolvedExpression resetAfter;
2831
@ToString.Exclude private UnresolvedPlan child;
2932

30-
/** StreamWindow Constructor. */
31-
public StreamWindow(
32-
List<UnresolvedExpression> windowFunctionList,
33-
List<UnresolvedExpression> groupList,
34-
boolean current,
35-
int window,
36-
boolean global,
37-
UnresolvedExpression resetBefore,
38-
UnresolvedExpression resetAfter) {
39-
this.windowFunctionList = windowFunctionList;
40-
this.groupList = groupList;
41-
this.current = current;
42-
this.window = window;
43-
this.global = global;
44-
this.resetBefore = resetBefore;
45-
this.resetAfter = resetAfter;
46-
}
47-
48-
public boolean isCurrent() {
49-
return current;
50-
}
51-
52-
public boolean isGlobal() {
53-
return global;
54-
}
55-
5633
@Override
5734
public StreamWindow attach(UnresolvedPlan child) {
5835
this.child = child;

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,8 +1125,7 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
11251125
@Override
11261126
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11271127
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
1128-
Boolean bucketNullable =
1129-
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
1128+
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
11301129
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
11311130
BitSet nonNullGroupMask = new BitSet(nGroup);
11321131
if (!bucketNullable) {
@@ -1742,20 +1741,25 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
17421741
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
17431742
context.relBuilder.projectPlus(streamSeq);
17441743

1745-
// construct groupNotNull predicate
1746-
List<RexNode> groupByList =
1747-
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1748-
List<RexNode> notNullList =
1749-
PlanUtils.getSelectColumns(groupByList).stream()
1750-
.map(context.relBuilder::field)
1751-
.map(context.relBuilder::isNotNull)
1752-
.toList();
1753-
RexNode groupNotNull = context.relBuilder.and(notNullList);
1744+
if (!node.isBucketNullable()) {
1745+
// construct groupNotNull predicate
1746+
List<RexNode> groupByList =
1747+
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1748+
List<RexNode> notNullList =
1749+
PlanUtils.getSelectColumns(groupByList).stream()
1750+
.map(context.relBuilder::field)
1751+
.map(context.relBuilder::isNotNull)
1752+
.toList();
1753+
RexNode groupNotNull = context.relBuilder.and(notNullList);
1754+
1755+
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1756+
List<RexNode> wrappedOverExprs =
1757+
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1758+
context.relBuilder.projectPlus(wrappedOverExprs);
1759+
} else {
1760+
context.relBuilder.projectPlus(overExpressions);
1761+
}
17541762

1755-
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1756-
List<RexNode> wrappedOverExprs =
1757-
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1758-
context.relBuilder.projectPlus(wrappedOverExprs);
17591763
// resort when there is by condition
17601764
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
17611765
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
@@ -1811,11 +1815,11 @@ private RelNode buildStreamWindowJoinPlan(
18111815
RexNode segRight = context.relBuilder.field(segmentCol);
18121816
RexNode segOuter = context.relBuilder.field(v.get(), segmentCol);
18131817
RexNode frame = buildResetFrameFilter(context, node, outerSeq, rightSeq, segOuter, segRight);
1814-
RexNode group = buildGroupFilter(context, groupList, v.get());
1818+
RexNode group = buildGroupFilter(context, node, groupList, v.get());
18151819
filter = (group == null) ? frame : context.relBuilder.and(frame, group);
18161820
} else { // global + window + by condition
18171821
RexNode frame = buildFrameFilter(context, node, outerSeq, rightSeq);
1818-
RexNode group = buildGroupFilter(context, groupList, v.get());
1822+
RexNode group = buildGroupFilter(context, node, groupList, v.get());
18191823
filter = context.relBuilder.and(frame, group);
18201824
}
18211825
context.relBuilder.filter(filter);
@@ -1965,7 +1969,10 @@ private RexNode buildResetFrameFilter(
19651969
}
19661970

19671971
private RexNode buildGroupFilter(
1968-
CalcitePlanContext context, List<UnresolvedExpression> groupList, RexCorrelVariable correl) {
1972+
CalcitePlanContext context,
1973+
StreamWindow node,
1974+
List<UnresolvedExpression> groupList,
1975+
RexCorrelVariable correl) {
19691976
// build conjunctive equality filters: right.g_i = outer.g_i
19701977
if (groupList.isEmpty()) {
19711978
return null;
@@ -1977,7 +1984,17 @@ private RexNode buildGroupFilter(
19771984
String groupName = extractGroupFieldName(expr);
19781985
RexNode rightGroup = context.relBuilder.field(groupName);
19791986
RexNode outerGroup = context.relBuilder.field(correl, groupName);
1980-
return context.relBuilder.equals(rightGroup, outerGroup);
1987+
RexNode equalCondition = context.relBuilder.equals(rightGroup, outerGroup);
1988+
// handle bucket_nullable case
1989+
if (!node.isBucketNullable()) {
1990+
return equalCondition;
1991+
} else {
1992+
RexNode bothNull =
1993+
context.relBuilder.and(
1994+
context.relBuilder.isNull(rightGroup),
1995+
context.relBuilder.isNull(outerGroup));
1996+
return context.relBuilder.or(equalCondition, bothNull);
1997+
}
19811998
})
19821999
.toList();
19832000
return context.relBuilder.and(equalsList);

docs/user/ppl/cmd/streamstats.rst

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ All of these commands can be used to generate aggregations such as average, sum,
5050

5151
Syntax
5252
======
53-
streamstats [current=<bool>] [window=<int>] [global=<bool>] [reset_before="("<eval-expression>")"] [reset_after="("<eval-expression>")"] <function>... [by-clause]
53+
streamstats [bucket_nullable=bool] [current=<bool>] [window=<int>] [global=<bool>] [reset_before="("<eval-expression>")"] [reset_after="("<eval-expression>")"] <function>... [by-clause]
5454

5555
* function: mandatory. A aggregation function or window function.
56+
* bucket_nullable: optional. Controls whether the streamstats command consider null buckets as a valid group in group-by aggregations. When set to ``false``, it will not treat null group-by values as a distinct group during aggregation. **Default:** Determined by ``plugins.ppl.syntax.legacy.preferred``.
57+
58+
* When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true``
59+
* When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false``
60+
5661
* current: optional. If true, the search includes the given, or current, event in the summary calculations. If false, the search uses the field value from the previous event. Syntax: current=<boolean>. **Default:** true.
5762
* window: optional. Specifies the number of events to use when computing the statistics. Syntax: window=<integer>. **Default:** 0, which means that all previous and current events are used.
5863
* global: optional. Used only when the window argument is set. Defines whether to use a single window, global=true, or to use separate windows based on the by clause. If global=false and window is set to a non-zero value, a separate window is used for each group of values of the field specified in the by clause. Syntax: global=<boolean>. **Default:** true.
@@ -235,4 +240,34 @@ PPL query::
235240
| Peter | Canada | B.C | 4 | 2023 | 57 | null |
236241
| Rick | Canada | B.C | 4 | 2023 | 70 | null |
237242
| David | USA | Washington | 4 | 2023 | 40 | null |
238-
+-------+---------+------------+-------+------+-----+---------+
243+
+-------+---------+------------+-------+------+-----+---------+
244+
245+
246+
Example 5: Null buckets handling
247+
================================
248+
249+
PPL query::
250+
251+
os> source=accounts | streamstats bucket_nullable=false count() as cnt by employer | fields account_number, firstname, employer, cnt;
252+
fetched rows / total rows = 4/4
253+
+----------------+-----------+----------+------+
254+
| account_number | firstname | employer | cnt |
255+
|----------------+-----------+----------+------|
256+
| 1 | Amber | Pyrami | 1 |
257+
| 6 | Hattie | Netagy | 1 |
258+
| 13 | Nanette | Quility | 1 |
259+
| 18 | Dale | null | null |
260+
+----------------+-----------+----------+------+
261+
262+
PPL query::
263+
264+
os> source=accounts | streamstats bucket_nullable=true count() as cnt by employer | fields account_number, firstname, employer, cnt;
265+
fetched rows / total rows = 4/4
266+
+----------------+-----------+----------+-----+
267+
| account_number | firstname | employer | cnt |
268+
|----------------+-----------+----------+-----|
269+
| 1 | Amber | Pyrami | 1 |
270+
| 6 | Hattie | Netagy | 1 |
271+
| 13 | Nanette | Quility | 1 |
272+
| 18 | Dale | null | 1 |
273+
+----------------+-----------+----------+-----+

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,36 @@ public void testStreamstatsResetExplain() throws IOException {
667667
assertYamlEqualsIgnoreId(expected, result);
668668
}
669669

670+
@Test
671+
public void testStreamstatsNullBucketExplain() throws IOException {
672+
String query =
673+
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false avg(age) as"
674+
+ " avg_age by gender";
675+
var result = explainQueryYaml(query);
676+
String expected = loadExpectedPlan("explain_streamstats_null_bucket.yaml");
677+
assertYamlEqualsIgnoreId(expected, result);
678+
}
679+
680+
@Test
681+
public void testStreamstatsGlobalNullBucketExplain() throws IOException {
682+
String query =
683+
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false window=2"
684+
+ " global=true avg(age) as avg_age by gender";
685+
var result = explainQueryYaml(query);
686+
String expected = loadExpectedPlan("explain_streamstats_global_null_bucket.yaml");
687+
assertYamlEqualsIgnoreId(expected, result);
688+
}
689+
690+
@Test
691+
public void testStreamstatsResetNullBucketExplain() throws IOException {
692+
String query =
693+
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false current=false"
694+
+ " reset_before=age>34 reset_after=age<25 avg(age) as avg_age by gender";
695+
var result = explainQueryYaml(query);
696+
String expected = loadExpectedPlan("explain_streamstats_reset_null_bucket.yaml");
697+
assertYamlEqualsIgnoreId(expected, result);
698+
}
699+
670700
@Test
671701
public void testKeywordILikeFunctionExplain() throws IOException {
672702
// ilike is only supported in v3

0 commit comments

Comments
 (0)