Skip to content

Commit e146792

Browse files
committed
Pushdown case function in aggregations as range queries (opensearch-project#4400)
* WIP: implementing case range analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Correct case analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Create bucket aggregation parsers that supports parsing nested sub aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unit tests Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix parsers to multi-range cases Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update leaf bucket parser Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unit test case range analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add explain ITs for pushing down case in aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update CaseRangeAnalyzerTest Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add a yaml test that replicates issue 4201 Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add integration tests for case in aggregation Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unit tests Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add a patch to CalcitePPLCaseFunctionIT Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Migrate all composite aggregation parser usage to bucket aggregate parser Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Create a parent abstract classes for BucketAggregationParsers Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove an unnecessary bucket agg in AggregationQueryBuilder Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Test pushing down case where there exists null values Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Return empty in CaseRangeAnalyzer to unblock the rest pushdown - Additionally test number as result expressions Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Document limitations of pushding case as range queries Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Make case pushdown a private method Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Chores: remove unused helper method Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unify logics for creating nested aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove a note in condition.rst Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Optmize range aggregation Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Ignore testNestedAggregationsExplain when pushdown is disabled Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix explain ITs after merge Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> (cherry picked from commit 18ab4dc)
1 parent 02135be commit e146792

42 files changed

Lines changed: 2221 additions & 155 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -976,19 +976,15 @@ void populate() {
976976
XOR,
977977
SqlStdOperatorTable.NOT_EQUALS,
978978
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.BOOLEAN));
979-
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a
980-
// type checker
981-
// for it. The second and third operands are required to be of the same type. If
982-
// not,
983-
// it will throw an IllegalArgumentException with information Can't find
984-
// leastRestrictive type
979+
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a type checker
980+
// for it. The second and third operands are required to be of the same type. If not, it will
981+
// throw an IllegalArgumentException with information Can't find leastRestrictive type
985982
registerOperator(
986983
IF,
987984
SqlStdOperatorTable.CASE,
988985
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.ANY, SqlTypeFamily.ANY));
989986
// Re-define the type checker for is not null, is present, and is null since
990-
// their original
991-
// type checker ANY isn't compatible with struct types.
987+
// their original type checker ANY isn't compatible with struct types.
992988
registerOperator(
993989
IS_NOT_NULL,
994990
SqlStdOperatorTable.IS_NOT_NULL,

docs/user/ppl/functions/condition.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ Argument type: all the supported data type, (NOTE : there is no comma before "el
207207

208208
Return type: any
209209

210+
Limitations
211+
>>>>>>>>>>>
212+
213+
When each condition is a field comparison with a numeric literal and each result expression is a string literal, the query will be optimized as `range aggregations <https://docs.opensearch.org/latest/aggregations/bucket/range>`_ if pushdown optimization is enabled. However, this optimization has the following limitations:
214+
215+
- Null values will not be grouped into any bucket of a range aggregation and will be ignored
216+
- The default ELSE clause will use the string literal ``"null"`` instead of actual NULL values
217+
210218
Example::
211219

212220
os> source=accounts | eval result = case(age > 35, firstname, age < 30, lastname else employer) | fields result, firstname, lastname, age, employer

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
1111
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
1212
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS;
13+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TIME_DATA;
1314
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WEBLOGS;
1415
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORKER;
1516
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION;
@@ -18,6 +19,7 @@
1819

1920
import java.io.IOException;
2021
import java.util.Locale;
22+
import org.junit.Assume;
2123
import org.junit.Ignore;
2224
import org.junit.Test;
2325
import org.opensearch.sql.ppl.ExplainIT;
@@ -514,22 +516,6 @@ public void testExplainStatsWithSubAggregation() throws IOException {
514516
+ " @timestamp, region"));
515517
}
516518

517-
@Test
518-
public void bucketNullableNotSupportSubAggregation() throws IOException {
519-
// TODO: Don't throw exception after addressing
520-
// https://github.com/opensearch-project/sql/issues/4317
521-
// When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the
522-
// aggregation in this query. Caused by issue
523-
// https://github.com/opensearch-project/sql/issues/4317,
524-
// bin aggregation on timestamp field won't work if not been push down.
525-
enabledOnlyWhenPushdownIsEnabled();
526-
assertThrows(
527-
Exception.class,
528-
() ->
529-
explainQueryToString(
530-
"source=events | bin @timestamp bins=3 | stats count() by @timestamp, region"));
531-
}
532-
533519
@Test
534520
public void testExplainBinWithSpan() throws IOException {
535521
String expected = loadExpectedPlan("explain_bin_span.yaml");
@@ -1147,4 +1133,127 @@ public void testPushDownMinOrMaxAggOnDerivedField() throws IOException {
11471133
+ "| stats MIN(balance2), MAX(balance2)",
11481134
TEST_INDEX_ACCOUNT)));
11491135
}
1136+
1137+
@Test
1138+
public void testCasePushdownAsRangeQueryExplain() throws IOException {
1139+
// CASE 1: Range - Metric
1140+
// 1.1 Range - Metric
1141+
assertYamlEqualsIgnoreId(
1142+
loadExpectedPlan("agg_range_metric_push.yaml"),
1143+
explainQueryYaml(
1144+
String.format(
1145+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100') |"
1146+
+ " stats avg(age) as avg_age by age_range",
1147+
TEST_INDEX_BANK)));
1148+
1149+
// 1.2 Range - Metric (COUNT)
1150+
assertYamlEqualsIgnoreId(
1151+
loadExpectedPlan("agg_range_count_push.yaml"),
1152+
explainQueryYaml(
1153+
String.format(
1154+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age < 40, 'u40'"
1155+
+ " else 'u100') | stats avg(age) by age_range",
1156+
TEST_INDEX_BANK)));
1157+
1158+
// 1.3 Range - Range - Metric
1159+
assertYamlEqualsIgnoreId(
1160+
loadExpectedPlan("agg_range_range_metric_push.yaml"),
1161+
explainQueryYaml(
1162+
String.format(
1163+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100'),"
1164+
+ " balance_range = case(balance < 20000, 'medium' else 'high') | stats"
1165+
+ " avg(balance) as avg_balance by age_range, balance_range",
1166+
TEST_INDEX_BANK)));
1167+
1168+
// 1.4 Range - Metric (With null & discontinuous ranges)
1169+
assertYamlEqualsIgnoreId(
1170+
loadExpectedPlan("agg_range_metric_complex_push.yaml"),
1171+
explainQueryYaml(
1172+
String.format(
1173+
"source=%s | eval age_range = case(age < 30, 'u30', (age >= 35 and age < 40) or age"
1174+
+ " >= 80, '30-40 or >=80') | stats avg(balance) by age_range",
1175+
TEST_INDEX_BANK)));
1176+
1177+
// 1.5 Should not be pushed because the range is not closed-open
1178+
assertYamlEqualsIgnoreId(
1179+
loadExpectedPlan("agg_case_cannot_push.yaml"),
1180+
explainQueryYaml(
1181+
String.format(
1182+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age <= 40, 'u40'"
1183+
+ " else 'u100') | stats avg(age) as avg_age by age_range",
1184+
TEST_INDEX_BANK)));
1185+
1186+
// 1.6 Should not be pushed as range query because the result expression is not a string
1187+
// literal.
1188+
// Range aggregation keys must be strings
1189+
assertYamlEqualsIgnoreId(
1190+
loadExpectedPlan("agg_case_num_res_cannot_push.yaml"),
1191+
explainQueryYaml(
1192+
String.format(
1193+
"source=%s | eval age_range = case(age < 30, 30 else 100) | stats count() by"
1194+
+ " age_range",
1195+
TEST_INDEX_BANK)));
1196+
1197+
// CASE 2: Composite - Range - Metric
1198+
// 2.1 Composite (term) - Range - Metric
1199+
assertYamlEqualsIgnoreId(
1200+
loadExpectedPlan("agg_composite_range_metric_push.yaml"),
1201+
explainQueryYaml(
1202+
String.format(
1203+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats avg(balance)"
1204+
+ " by state, age_range",
1205+
TEST_INDEX_BANK)));
1206+
1207+
// 2.2 Composite (date histogram) - Range - Metric
1208+
assertYamlEqualsIgnoreId(
1209+
loadExpectedPlan("agg_composite_date_range_push.yaml"),
1210+
explainQueryYaml(
1211+
"source=opensearch-sql_test_index_time_data | eval value_range = case(value < 7000,"
1212+
+ " 'small' else 'large') | stats avg(value) by value_range, span(@timestamp,"
1213+
+ " 1h)"));
1214+
1215+
// 2.3 Composite(2 fields) - Range - Metric (with count)
1216+
assertYamlEqualsIgnoreId(
1217+
loadExpectedPlan("agg_composite2_range_count_push.yaml"),
1218+
explainQueryYaml(
1219+
String.format(
1220+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats"
1221+
+ " avg(balance), count() by age_range, state, gender",
1222+
TEST_INDEX_BANK)));
1223+
1224+
// 2.4 Composite (2 fields) - Range - Range - Metric (with count)
1225+
assertYamlEqualsIgnoreId(
1226+
loadExpectedPlan("agg_composite2_range_range_count_push.yaml"),
1227+
explainQueryYaml(
1228+
String.format(
1229+
"source=%s | eval age_range = case(age < 35, 'u35' else 'a35'), balance_range ="
1230+
+ " case(balance < 20000, 'medium' else 'high') | stats avg(balance) as"
1231+
+ " avg_balance by age_range, balance_range, state",
1232+
TEST_INDEX_BANK)));
1233+
1234+
// 2.5 Should not be pushed down as range query because case result expression is not constant
1235+
assertYamlEqualsIgnoreId(
1236+
loadExpectedPlan("agg_case_composite_cannot_push.yaml"),
1237+
explainQueryYaml(
1238+
String.format(
1239+
"source=%s | eval age_range = case(age < 35, 'u35' else email) | stats avg(balance)"
1240+
+ " as avg_balance by age_range, state",
1241+
TEST_INDEX_BANK)));
1242+
}
1243+
1244+
@Test
1245+
public void testNestedAggregationsExplain() throws IOException {
1246+
// TODO: Remove after resolving: https://github.com/opensearch-project/sql/issues/4578
1247+
Assume.assumeFalse(
1248+
"The query runs into error when pushdown is disabled due to bin's implementation",
1249+
isPushdownDisabled());
1250+
assertYamlEqualsIgnoreId(
1251+
loadExpectedPlan("agg_composite_autodate_range_metric_push.yaml"),
1252+
explainQueryYaml(
1253+
String.format(
1254+
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
1255+
+ " else 'great') | stats bucket_nullable=false avg(value), count() by"
1256+
+ " timestamp, value_range, category",
1257+
TEST_INDEX_TIME_DATA)));
1258+
}
11501259
}

0 commit comments

Comments
 (0)