Skip to content

Commit 6e7e8c3

Browse files
committed
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
1 parent 9890adb commit 6e7e8c3

25 files changed

Lines changed: 735 additions & 207 deletions

core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,50 +9,27 @@
99
import java.util.List;
1010
import lombok.EqualsAndHashCode;
1111
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
1213
import lombok.ToString;
1314
import org.opensearch.sql.ast.AbstractNodeVisitor;
1415
import org.opensearch.sql.ast.expression.UnresolvedExpression;
1516

1617
@Getter
1718
@ToString
1819
@EqualsAndHashCode(callSuper = false)
20+
@RequiredArgsConstructor
1921
public class StreamWindow extends UnresolvedPlan {
2022

2123
private final List<UnresolvedExpression> windowFunctionList;
2224
private final List<UnresolvedExpression> groupList;
2325
private final boolean current;
2426
private final int window;
2527
private final boolean global;
28+
private final boolean bucketNullable;
2629
private final UnresolvedExpression resetBefore;
2730
private final UnresolvedExpression resetAfter;
2831
@ToString.Exclude private UnresolvedPlan child;
2932

30-
/** StreamWindow Constructor. */
31-
public StreamWindow(
32-
List<UnresolvedExpression> windowFunctionList,
33-
List<UnresolvedExpression> groupList,
34-
boolean current,
35-
int window,
36-
boolean global,
37-
UnresolvedExpression resetBefore,
38-
UnresolvedExpression resetAfter) {
39-
this.windowFunctionList = windowFunctionList;
40-
this.groupList = groupList;
41-
this.current = current;
42-
this.window = window;
43-
this.global = global;
44-
this.resetBefore = resetBefore;
45-
this.resetAfter = resetAfter;
46-
}
47-
48-
public boolean isCurrent() {
49-
return current;
50-
}
51-
52-
public boolean isGlobal() {
53-
return global;
54-
}
55-
5633
@Override
5734
public StreamWindow attach(UnresolvedPlan child) {
5835
this.child = child;

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 70 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,8 +1130,7 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
11301130
@Override
11311131
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11321132
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
1133-
Boolean bucketNullable =
1134-
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
1133+
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
11351134
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
11361135
BitSet nonNullGroupMask = new BitSet(nGroup);
11371136
if (!bucketNullable) {
@@ -1748,26 +1747,31 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
17481747
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
17491748
context.relBuilder.projectPlus(streamSeq);
17501749

1751-
// construct groupNotNull predicate
1752-
List<RexNode> groupByList =
1753-
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).collect(Collectors.toList());
1754-
List<RexNode> notNullList =
1755-
PlanUtils.getSelectColumns(groupByList).stream()
1756-
.map(context.relBuilder::field)
1757-
.map(context.relBuilder::isNotNull)
1758-
.collect(Collectors.toList());
1759-
RexNode groupNotNull = context.relBuilder.and(notNullList);
1760-
1761-
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1762-
List<RexNode> wrappedOverExprs =
1763-
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1764-
context.relBuilder.projectPlus(wrappedOverExprs);
1765-
// resort when there is by condition
1766-
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
1767-
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
1768-
} else {
1769-
context.relBuilder.projectPlus(overExpressions);
1770-
}
1750+
if (!node.isBucketNullable()) {
1751+
// construct groupNotNull predicate
1752+
List<RexNode> groupByList =
1753+
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).collect(Collectors.toList());
1754+
List<RexNode> notNullList =
1755+
PlanUtils.getSelectColumns(groupByList).stream()
1756+
.map(context.relBuilder::field)
1757+
.map(context.relBuilder::isNotNull)
1758+
.collect(Collectors.toList());
1759+
RexNode groupNotNull = context.relBuilder.and(notNullList);
1760+
1761+
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1762+
List<RexNode> wrappedOverExprs =
1763+
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1764+
context.relBuilder.projectPlus(wrappedOverExprs);
1765+
} else {
1766+
context.relBuilder.projectPlus(overExpressions);
1767+
}
1768+
1769+
// resort when there is by condition
1770+
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
1771+
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
1772+
} else {
1773+
context.relBuilder.projectPlus(overExpressions);
1774+
}
17711775

17721776
return context.relBuilder.peek();
17731777
}
@@ -1816,19 +1820,19 @@ private RelNode buildStreamWindowJoinPlan(
18161820
RexNode rightSeq = context.relBuilder.field(seqCol);
18171821
RexNode outerSeq = context.relBuilder.field(v.get(), seqCol);
18181822

1819-
RexNode filter;
1820-
if (segmentCol != null) { // reset condition
1821-
RexNode segRight = context.relBuilder.field(segmentCol);
1822-
RexNode segOuter = context.relBuilder.field(v.get(), segmentCol);
1823-
RexNode frame = buildResetFrameFilter(context, node, outerSeq, rightSeq, segOuter, segRight);
1824-
RexNode group = buildGroupFilter(context, groupList, v.get());
1825-
filter = (group == null) ? frame : context.relBuilder.and(frame, group);
1826-
} else { // global + window + by condition
1827-
RexNode frame = buildFrameFilter(context, node, outerSeq, rightSeq);
1828-
RexNode group = buildGroupFilter(context, groupList, v.get());
1829-
filter = context.relBuilder.and(frame, group);
1830-
}
1831-
context.relBuilder.filter(filter);
1823+
RexNode filter;
1824+
if (segmentCol != null) { // reset condition
1825+
RexNode segRight = context.relBuilder.field(segmentCol);
1826+
RexNode segOuter = context.relBuilder.field(v.get(), segmentCol);
1827+
RexNode frame = buildResetFrameFilter(context, node, outerSeq, rightSeq, segOuter, segRight);
1828+
RexNode group = buildGroupFilter(context, node, groupList, v.get());
1829+
filter = (group == null) ? frame : context.relBuilder.and(frame, group);
1830+
} else { // global + window + by condition
1831+
RexNode frame = buildFrameFilter(context, node, outerSeq, rightSeq);
1832+
RexNode group = buildGroupFilter(context, node, groupList, v.get());
1833+
filter = context.relBuilder.and(frame, group);
1834+
}
1835+
context.relBuilder.filter(filter);
18321836

18331837
// aggregate all window functions on right side
18341838
List<AggCall> aggCalls = buildAggCallsForWindowFunctions(node.getWindowFunctionList(), context);
@@ -1974,24 +1978,37 @@ private RexNode buildResetFrameFilter(
19741978
return context.relBuilder.and(seqFilter, segFilter);
19751979
}
19761980

1977-
private RexNode buildGroupFilter(
1978-
CalcitePlanContext context, List<UnresolvedExpression> groupList, RexCorrelVariable correl) {
1979-
// build conjunctive equality filters: right.g_i = outer.g_i
1980-
if (groupList.isEmpty()) {
1981-
return null;
1982-
}
1983-
List<RexNode> equalsList =
1984-
groupList.stream()
1985-
.map(
1986-
expr -> {
1987-
String groupName = extractGroupFieldName(expr);
1988-
RexNode rightGroup = context.relBuilder.field(groupName);
1989-
RexNode outerGroup = context.relBuilder.field(correl, groupName);
1990-
return context.relBuilder.equals(rightGroup, outerGroup);
1991-
})
1992-
.collect(Collectors.toList());
1993-
return context.relBuilder.and(equalsList);
1994-
}
1981+
private RexNode buildGroupFilter(
1982+
CalcitePlanContext context,
1983+
StreamWindow node,
1984+
List<UnresolvedExpression> groupList,
1985+
RexCorrelVariable correl) {
1986+
// build conjunctive equality filters: right.g_i = outer.g_i
1987+
if (groupList.isEmpty()) {
1988+
return null;
1989+
}
1990+
List<RexNode> equalsList =
1991+
groupList.stream()
1992+
.map(
1993+
expr -> {
1994+
String groupName = extractGroupFieldName(expr);
1995+
RexNode rightGroup = context.relBuilder.field(groupName);
1996+
RexNode outerGroup = context.relBuilder.field(correl, groupName);
1997+
RexNode equalCondition = context.relBuilder.equals(rightGroup, outerGroup);
1998+
// handle bucket_nullable case
1999+
if (!node.isBucketNullable()) {
2000+
return equalCondition;
2001+
} else {
2002+
RexNode bothNull =
2003+
context.relBuilder.and(
2004+
context.relBuilder.isNull(rightGroup),
2005+
context.relBuilder.isNull(outerGroup));
2006+
return context.relBuilder.or(equalCondition, bothNull);
2007+
}
2008+
})
2009+
.collect(Collectors.toList());
2010+
return context.relBuilder.and(equalsList);
2011+
}
19952012

19962013
private String extractGroupFieldName(UnresolvedExpression groupExpr) {
19972014
if (groupExpr instanceof Alias) {

docs/user/ppl/cmd/streamstats.rst

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,14 @@ All of these commands can be used to generate aggregations such as average, sum,
4343

4444
Syntax
4545
======
46-
streamstats [current=<bool>] [window=<int>] [global=<bool>] [reset_before="("<eval-expression>")"] [reset_after="("<eval-expression>")"] <function>... [by-clause]
46+
streamstats [bucket_nullable=bool] [current=<bool>] [window=<int>] [global=<bool>] [reset_before="("<eval-expression>")"] [reset_after="("<eval-expression>")"] <function>... [by-clause]
4747

4848
* function: mandatory. A aggregation function or window function.
49+
* bucket_nullable: optional. Controls whether the streamstats command consider null buckets as a valid group in group-by aggregations. When set to ``false``, it will not treat null group-by values as a distinct group during aggregation. **Default:** Determined by ``plugins.ppl.syntax.legacy.preferred``.
50+
51+
* When ``plugins.ppl.syntax.legacy.preferred=true``, ``bucket_nullable`` defaults to ``true``
52+
* When ``plugins.ppl.syntax.legacy.preferred=false``, ``bucket_nullable`` defaults to ``false``
53+
4954
* current: optional. If true, the search includes the given, or current, event in the summary calculations. If false, the search uses the field value from the previous event. Syntax: current=<boolean>. **Default:** true.
5055
* window: optional. Specifies the number of events to use when computing the statistics. Syntax: window=<integer>. **Default:** 0, which means that all previous and current events are used.
5156
* global: optional. Used only when the window argument is set. Defines whether to use a single window, global=true, or to use separate windows based on the by clause. If global=false and window is set to a non-zero value, a separate window is used for each group of values of the field specified in the by clause. Syntax: global=<boolean>. **Default:** true.
@@ -226,4 +231,34 @@ PPL query::
226231
| Peter | Canada | B.C | 4 | 2023 | 57 | null |
227232
| Rick | Canada | B.C | 4 | 2023 | 70 | null |
228233
| David | USA | Washington | 4 | 2023 | 40 | null |
229-
+-------+---------+------------+-------+------+-----+---------+
234+
+-------+---------+------------+-------+------+-----+---------+
235+
236+
237+
Example 5: Null buckets handling
238+
================================
239+
240+
PPL query::
241+
242+
os> source=accounts | streamstats bucket_nullable=false count() as cnt by employer | fields account_number, firstname, employer, cnt;
243+
fetched rows / total rows = 4/4
244+
+----------------+-----------+----------+------+
245+
| account_number | firstname | employer | cnt |
246+
|----------------+-----------+----------+------|
247+
| 1 | Amber | Pyrami | 1 |
248+
| 6 | Hattie | Netagy | 1 |
249+
| 13 | Nanette | Quility | 1 |
250+
| 18 | Dale | null | null |
251+
+----------------+-----------+----------+------+
252+
253+
PPL query::
254+
255+
os> source=accounts | streamstats bucket_nullable=true count() as cnt by employer | fields account_number, firstname, employer, cnt;
256+
fetched rows / total rows = 4/4
257+
+----------------+-----------+----------+-----+
258+
| account_number | firstname | employer | cnt |
259+
|----------------+-----------+----------+-----|
260+
| 1 | Amber | Pyrami | 1 |
261+
| 6 | Hattie | Netagy | 1 |
262+
| 13 | Nanette | Quility | 1 |
263+
| 18 | Dale | null | 1 |
264+
+----------------+-----------+----------+-----+

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,56 @@ public void testStreamstatsResetExplain() throws IOException {
668668
assertYamlEqualsIgnoreId(expected, result);
669669
}
670670

671+
@Test
672+
public void testStreamstatsNullBucketExplain() throws IOException {
673+
String query =
674+
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false avg(age) as"
675+
+ " avg_age by gender";
676+
var result = explainQueryYaml(query);
677+
String expected = loadExpectedPlan("explain_streamstats_null_bucket.yaml");
678+
assertYamlEqualsIgnoreId(expected, result);
679+
}
680+
681+
@Test
682+
public void testStreamstatsGlobalNullBucketExplain() throws IOException {
683+
String query =
684+
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false window=2"
685+
+ " global=true avg(age) as avg_age by gender";
686+
var result = explainQueryYaml(query);
687+
String expected = loadExpectedPlan("explain_streamstats_global_null_bucket.yaml");
688+
assertYamlEqualsIgnoreId(expected, result);
689+
}
690+
691+
@Test
692+
public void testStreamstatsResetNullBucketExplain() throws IOException {
693+
String query =
694+
"source=opensearch-sql_test_index_account | streamstats bucket_nullable=false current=false"
695+
+ " reset_before=age>34 reset_after=age<25 avg(age) as avg_age by gender";
696+
var result = explainQueryYaml(query);
697+
String expected = loadExpectedPlan("explain_streamstats_reset_null_bucket.yaml");
698+
assertYamlEqualsIgnoreId(expected, result);
699+
}
700+
701+
@Test
702+
public void testKeywordILikeFunctionExplain() throws IOException {
703+
// ilike is only supported in v3
704+
String expected = loadExpectedPlan("explain_keyword_ilike_function.yaml");
705+
assertYamlEqualsIgnoreId(
706+
expected,
707+
explainQueryYaml(
708+
"source=opensearch-sql_test_index_account | where ilike(firstname, '%mbe%')"));
709+
}
710+
711+
@Test
712+
public void testTextILikeFunctionExplain() throws IOException {
713+
// ilike is only supported in v3
714+
String expected = loadExpectedPlan("explain_text_ilike_function.yaml");
715+
assertYamlEqualsIgnoreId(
716+
expected,
717+
explainQueryYaml(
718+
"source=opensearch-sql_test_index_account | where ilike(address, '%Holmes%')"));
719+
}
720+
671721
// Only for Calcite, as v2 gets unstable serialized string for function
672722
@Test
673723
public void testExplainOnAggregationWithSumEnhancement() throws IOException {

0 commit comments

Comments
 (0)