Skip to content

Commit 4f94859

Browse files
committed
Pushdown case function in aggregations as range queries (opensearch-project#4400)
* WIP: implementing case range analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Correct case analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Create bucket aggregation parsers that supports parsing nested sub aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unit tests Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix parsers to multi-range cases Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update leaf bucket parser Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unit test case range analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add explain ITs for pushing down case in aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update CaseRangeAnalyzerTest Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add a yaml test that replicates issue 4201 Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add integration tests for case in aggregation Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unit tests Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add a patch to CalcitePPLCaseFunctionIT Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Migrate all composite aggregation parser usage to bucket aggregate parser Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Create a parent abstract classes for BucketAggregationParsers Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove an unnecessary bucket agg in AggregationQueryBuilder Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Test pushing down case where there exists null values Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Return empty in CaseRangeAnalyzer to unblock the rest pushdown - Additionally test number as result expressions Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Document limitations of pushding case as range queries Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Make case pushdown a private method Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Chores: remove unused helper method Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unify logics for creating nested aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove a note in condition.rst Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Optmize range aggregation Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Ignore testNestedAggregationsExplain when pushdown is disabled Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix explain ITs after merge Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> (cherry picked from commit 18ab4dc)
1 parent dd749bd commit 4f94859

42 files changed

Lines changed: 2221 additions & 155 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -976,19 +976,15 @@ void populate() {
976976
XOR,
977977
SqlStdOperatorTable.NOT_EQUALS,
978978
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.BOOLEAN));
979-
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a
980-
// type checker
981-
// for it. The second and third operands are required to be of the same type. If
982-
// not,
983-
// it will throw an IllegalArgumentException with information Can't find
984-
// leastRestrictive type
979+
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a type checker
980+
// for it. The second and third operands are required to be of the same type. If not, it will
981+
// throw an IllegalArgumentException with information Can't find leastRestrictive type
985982
registerOperator(
986983
IF,
987984
SqlStdOperatorTable.CASE,
988985
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.ANY, SqlTypeFamily.ANY));
989986
// Re-define the type checker for is not null, is present, and is null since
990-
// their original
991-
// type checker ANY isn't compatible with struct types.
987+
// their original type checker ANY isn't compatible with struct types.
992988
registerOperator(
993989
IS_NOT_NULL,
994990
SqlStdOperatorTable.IS_NOT_NULL,

docs/user/ppl/functions/condition.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ Argument type: all the supported data type, (NOTE : there is no comma before "el
207207

208208
Return type: any
209209

210+
Limitations
211+
>>>>>>>>>>>
212+
213+
When each condition is a field comparison with a numeric literal and each result expression is a string literal, the query will be optimized as `range aggregations <https://docs.opensearch.org/latest/aggregations/bucket/range>`_ if pushdown optimization is enabled. However, this optimization has the following limitations:
214+
215+
- Null values will not be grouped into any bucket of a range aggregation and will be ignored
216+
- The default ELSE clause will use the string literal ``"null"`` instead of actual NULL values
217+
210218
Example::
211219

212220
os> source=accounts | eval result = case(age > 35, firstname, age < 30, lastname else employer) | fields result, firstname, lastname, age, employer

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
1111
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
1212
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS;
13+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TIME_DATA;
1314
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WEBLOGS;
1415
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORKER;
1516
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION;
@@ -18,6 +19,7 @@
1819

1920
import java.io.IOException;
2021
import java.util.Locale;
22+
import org.junit.Assume;
2123
import org.junit.Ignore;
2224
import org.junit.Test;
2325
import org.opensearch.sql.ppl.ExplainIT;
@@ -512,22 +514,6 @@ public void testExplainStatsWithSubAggregation() throws IOException {
512514
+ " @timestamp, region"));
513515
}
514516

515-
@Test
516-
public void bucketNullableNotSupportSubAggregation() throws IOException {
517-
// TODO: Don't throw exception after addressing
518-
// https://github.com/opensearch-project/sql/issues/4317
519-
// When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the
520-
// aggregation in this query. Caused by issue
521-
// https://github.com/opensearch-project/sql/issues/4317,
522-
// bin aggregation on timestamp field won't work if not been push down.
523-
enabledOnlyWhenPushdownIsEnabled();
524-
assertThrows(
525-
Exception.class,
526-
() ->
527-
explainQueryToString(
528-
"source=events | bin @timestamp bins=3 | stats count() by @timestamp, region"));
529-
}
530-
531517
@Test
532518
public void testExplainBinWithSpan() throws IOException {
533519
String expected = loadExpectedPlan("explain_bin_span.yaml");
@@ -1145,4 +1131,127 @@ public void testPushDownMinOrMaxAggOnDerivedField() throws IOException {
11451131
+ "| stats MIN(balance2), MAX(balance2)",
11461132
TEST_INDEX_ACCOUNT)));
11471133
}
1134+
1135+
@Test
1136+
public void testCasePushdownAsRangeQueryExplain() throws IOException {
1137+
// CASE 1: Range - Metric
1138+
// 1.1 Range - Metric
1139+
assertYamlEqualsIgnoreId(
1140+
loadExpectedPlan("agg_range_metric_push.yaml"),
1141+
explainQueryYaml(
1142+
String.format(
1143+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100') |"
1144+
+ " stats avg(age) as avg_age by age_range",
1145+
TEST_INDEX_BANK)));
1146+
1147+
// 1.2 Range - Metric (COUNT)
1148+
assertYamlEqualsIgnoreId(
1149+
loadExpectedPlan("agg_range_count_push.yaml"),
1150+
explainQueryYaml(
1151+
String.format(
1152+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age < 40, 'u40'"
1153+
+ " else 'u100') | stats avg(age) by age_range",
1154+
TEST_INDEX_BANK)));
1155+
1156+
// 1.3 Range - Range - Metric
1157+
assertYamlEqualsIgnoreId(
1158+
loadExpectedPlan("agg_range_range_metric_push.yaml"),
1159+
explainQueryYaml(
1160+
String.format(
1161+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100'),"
1162+
+ " balance_range = case(balance < 20000, 'medium' else 'high') | stats"
1163+
+ " avg(balance) as avg_balance by age_range, balance_range",
1164+
TEST_INDEX_BANK)));
1165+
1166+
// 1.4 Range - Metric (With null & discontinuous ranges)
1167+
assertYamlEqualsIgnoreId(
1168+
loadExpectedPlan("agg_range_metric_complex_push.yaml"),
1169+
explainQueryYaml(
1170+
String.format(
1171+
"source=%s | eval age_range = case(age < 30, 'u30', (age >= 35 and age < 40) or age"
1172+
+ " >= 80, '30-40 or >=80') | stats avg(balance) by age_range",
1173+
TEST_INDEX_BANK)));
1174+
1175+
// 1.5 Should not be pushed because the range is not closed-open
1176+
assertYamlEqualsIgnoreId(
1177+
loadExpectedPlan("agg_case_cannot_push.yaml"),
1178+
explainQueryYaml(
1179+
String.format(
1180+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age <= 40, 'u40'"
1181+
+ " else 'u100') | stats avg(age) as avg_age by age_range",
1182+
TEST_INDEX_BANK)));
1183+
1184+
// 1.6 Should not be pushed as range query because the result expression is not a string
1185+
// literal.
1186+
// Range aggregation keys must be strings
1187+
assertYamlEqualsIgnoreId(
1188+
loadExpectedPlan("agg_case_num_res_cannot_push.yaml"),
1189+
explainQueryYaml(
1190+
String.format(
1191+
"source=%s | eval age_range = case(age < 30, 30 else 100) | stats count() by"
1192+
+ " age_range",
1193+
TEST_INDEX_BANK)));
1194+
1195+
// CASE 2: Composite - Range - Metric
1196+
// 2.1 Composite (term) - Range - Metric
1197+
assertYamlEqualsIgnoreId(
1198+
loadExpectedPlan("agg_composite_range_metric_push.yaml"),
1199+
explainQueryYaml(
1200+
String.format(
1201+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats avg(balance)"
1202+
+ " by state, age_range",
1203+
TEST_INDEX_BANK)));
1204+
1205+
// 2.2 Composite (date histogram) - Range - Metric
1206+
assertYamlEqualsIgnoreId(
1207+
loadExpectedPlan("agg_composite_date_range_push.yaml"),
1208+
explainQueryYaml(
1209+
"source=opensearch-sql_test_index_time_data | eval value_range = case(value < 7000,"
1210+
+ " 'small' else 'large') | stats avg(value) by value_range, span(@timestamp,"
1211+
+ " 1h)"));
1212+
1213+
// 2.3 Composite(2 fields) - Range - Metric (with count)
1214+
assertYamlEqualsIgnoreId(
1215+
loadExpectedPlan("agg_composite2_range_count_push.yaml"),
1216+
explainQueryYaml(
1217+
String.format(
1218+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats"
1219+
+ " avg(balance), count() by age_range, state, gender",
1220+
TEST_INDEX_BANK)));
1221+
1222+
// 2.4 Composite (2 fields) - Range - Range - Metric (with count)
1223+
assertYamlEqualsIgnoreId(
1224+
loadExpectedPlan("agg_composite2_range_range_count_push.yaml"),
1225+
explainQueryYaml(
1226+
String.format(
1227+
"source=%s | eval age_range = case(age < 35, 'u35' else 'a35'), balance_range ="
1228+
+ " case(balance < 20000, 'medium' else 'high') | stats avg(balance) as"
1229+
+ " avg_balance by age_range, balance_range, state",
1230+
TEST_INDEX_BANK)));
1231+
1232+
// 2.5 Should not be pushed down as range query because case result expression is not constant
1233+
assertYamlEqualsIgnoreId(
1234+
loadExpectedPlan("agg_case_composite_cannot_push.yaml"),
1235+
explainQueryYaml(
1236+
String.format(
1237+
"source=%s | eval age_range = case(age < 35, 'u35' else email) | stats avg(balance)"
1238+
+ " as avg_balance by age_range, state",
1239+
TEST_INDEX_BANK)));
1240+
}
1241+
1242+
@Test
1243+
public void testNestedAggregationsExplain() throws IOException {
1244+
// TODO: Remove after resolving: https://github.com/opensearch-project/sql/issues/4578
1245+
Assume.assumeFalse(
1246+
"The query runs into error when pushdown is disabled due to bin's implementation",
1247+
isPushdownDisabled());
1248+
assertYamlEqualsIgnoreId(
1249+
loadExpectedPlan("agg_composite_autodate_range_metric_push.yaml"),
1250+
explainQueryYaml(
1251+
String.format(
1252+
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
1253+
+ " else 'great') | stats bucket_nullable=false avg(value), count() by"
1254+
+ " timestamp, value_range, category",
1255+
TEST_INDEX_TIME_DATA)));
1256+
}
11481257
}

0 commit comments

Comments
 (0)