Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
Expand All @@ -25,13 +26,14 @@
*/
public class CalciteAliasFieldAggregationIT extends PPLIntegTestCase {

private static final String TEST_INDEX_ALIAS = "test_alias_bug";
private static final String TEST_ALIAS_BUG = "test_alias_bug";

@Override
public void init() throws Exception {
super.init();
enableCalcite();
createTestIndexWithAliasFields();
loadIndex(Index.DATA_TYPE_ALIAS);
}

/**
Expand All @@ -41,14 +43,14 @@ public void init() throws Exception {
private void createTestIndexWithAliasFields() throws IOException {
// Delete the index if it exists (for test isolation)
try {
Request deleteIndex = new Request("DELETE", "/" + TEST_INDEX_ALIAS);
Request deleteIndex = new Request("DELETE", "/" + TEST_ALIAS_BUG);
client().performRequest(deleteIndex);
} catch (ResponseException e) {
// Index doesn't exist, which is fine
}

// Create index with alias fields
Request createIndex = new Request("PUT", "/" + TEST_INDEX_ALIAS);
Request createIndex = new Request("PUT", "/" + TEST_ALIAS_BUG);
createIndex.setJsonEntity(
"{\n"
+ " \"mappings\": {\n"
Expand All @@ -63,7 +65,7 @@ private void createTestIndexWithAliasFields() throws IOException {
client().performRequest(createIndex);

// Insert test documents
Request bulkRequest = new Request("POST", "/" + TEST_INDEX_ALIAS + "/_bulk?refresh=true");
Request bulkRequest = new Request("POST", "/" + TEST_ALIAS_BUG + "/_bulk?refresh=true");
bulkRequest.setJsonEntity(
"{\"index\":{}}\n"
+ "{\"created_at\": \"2024-01-01T10:00:00Z\", \"value\": 100}\n"
Expand All @@ -77,15 +79,15 @@ private void createTestIndexWithAliasFields() throws IOException {
@Test
public void testMinWithDateAliasField() throws IOException {
JSONObject actual =
executeQuery(String.format("source=%s | stats MIN(@timestamp)", TEST_INDEX_ALIAS));
executeQuery(String.format("source=%s | stats MIN(@timestamp)", TEST_ALIAS_BUG));
verifySchema(actual, schema("MIN(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-01 10:00:00"));
}

@Test
public void testMaxWithDateAliasField() throws IOException {
JSONObject actual =
executeQuery(String.format("source=%s | stats MAX(@timestamp)", TEST_INDEX_ALIAS));
executeQuery(String.format("source=%s | stats MAX(@timestamp)", TEST_ALIAS_BUG));
verifySchema(actual, schema("MAX(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-03 10:00:00"));
}
Expand All @@ -94,8 +96,7 @@ public void testMaxWithDateAliasField() throws IOException {
public void testMinMaxWithNumericAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats MIN(value_alias), MAX(value_alias)", TEST_INDEX_ALIAS));
String.format("source=%s | stats MIN(value_alias), MAX(value_alias)", TEST_ALIAS_BUG));
verifySchemaInOrder(
actual, schema("MIN(value_alias)", "int"), schema("MAX(value_alias)", "int"));
verifyDataRows(actual, rows(100, 300));
Expand All @@ -105,8 +106,7 @@ public void testMinMaxWithNumericAliasField() throws IOException {
public void testFirstWithAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | sort @timestamp | stats FIRST(@timestamp)", TEST_INDEX_ALIAS));
String.format("source=%s | sort @timestamp | stats FIRST(@timestamp)", TEST_ALIAS_BUG));
verifySchema(actual, schema("FIRST(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-01 10:00:00"));
}
Expand All @@ -115,8 +115,7 @@ public void testFirstWithAliasField() throws IOException {
public void testLastWithAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | sort @timestamp | stats LAST(@timestamp)", TEST_INDEX_ALIAS));
String.format("source=%s | sort @timestamp | stats LAST(@timestamp)", TEST_ALIAS_BUG));
verifySchema(actual, schema("LAST(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-03 10:00:00"));
}
Expand All @@ -126,7 +125,7 @@ public void testTakeWithAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | sort @timestamp | stats TAKE(@timestamp, 2)", TEST_INDEX_ALIAS));
"source=%s | sort @timestamp | stats TAKE(@timestamp, 2)", TEST_ALIAS_BUG));
verifySchema(actual, schema("TAKE(@timestamp, 2)", "array"));
verifyDataRows(actual, rows(List.of("2024-01-01T10:00:00.000Z", "2024-01-02T10:00:00.000Z")));
}
Expand All @@ -135,7 +134,7 @@ public void testTakeWithAliasField() throws IOException {
public void testAggregationsWithOriginalFieldsStillWork() throws IOException {
JSONObject actual =
executeQuery(
String.format("source=%s | stats MIN(created_at), MAX(value)", TEST_INDEX_ALIAS));
String.format("source=%s | stats MIN(created_at), MAX(value)", TEST_ALIAS_BUG));
verifySchemaInOrder(
actual, schema("MIN(created_at)", "timestamp"), schema("MAX(value)", "int"));
verifyDataRows(actual, rows("2024-01-01 10:00:00", 300));
Expand All @@ -147,12 +146,50 @@ public void testUnaffectedAggregationsWithAliasFields() throws IOException {
executeQuery(
String.format(
"source=%s | stats SUM(value_alias), AVG(value_alias), COUNT(value_alias)",
TEST_INDEX_ALIAS));
TEST_ALIAS_BUG));
verifySchemaInOrder(
actual,
schema("SUM(value_alias)", "bigint"),
schema("AVG(value_alias)", "double"),
schema("COUNT(value_alias)", "bigint"));
verifyDataRows(actual, rows(600, 200.0, 3));
}

@Test
public void testAliasTypeWithLastFirstTakeLatestEarliestAggregation() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats take(original_text, 2), last(original_text),"
+ " first(original_text), take(alias_text, 2), last(alias_text),"
+ " first(alias_text), take(original_col, 2), last(original_col),"
+ " first(original_col), take(alias_col, 2), last(alias_col), first(alias_col),"
+ " latest(original_col), earliest(original_col), latest(alias_col),"
+ " earliest(alias_col),latest(original_text), earliest(original_text),"
+ " latest(alias_text), earliest(alias_text)",
TEST_INDEX_ALIAS));
verifyDataRows(
actual,
rows(
List.of("a b c", "d e f"),
"x y z",
"a b c",
List.of("a b c", "d e f"),
"x y z",
"a b c",
List.of(1, 2),
3,
1,
List.of(1, 2),
3,
1,
3,
1,
3,
1,
"x y z",
"a b c",
"x y z",
"a b c"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,31 @@ public void testExplainOnFirstLast() throws IOException {
TEST_INDEX_BANK)));
}

@Test
public void testExplainOnTextFirstLast() throws IOException {
String expected = loadExpectedPlan("explain_first_last_text.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryYaml(
String.format(
"source=%s | stats first(employer) as first_employer, last(employer) as"
+ " last_employer by gender",
TEST_INDEX_BANK)));
}

@Test
public void testExplainTakeAggregationWithNegative() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
// without agg pushdown
String expected = loadExpectedPlan("explain_take_negative.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryYaml(
String.format(
"source=%s | stats take(employer, 0), take(balance, -2) by gender",
TEST_INDEX_BANK)));
}

// Only for Calcite
@Test
public void testExplainOnEventstatsEarliestLatest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,75 @@ public void testFirstLastDifferentFields() throws IOException {
verifyDataRows(actual, rows(1L, 48086L, 32L));
}

@Test
public void testFirstAggregationOnTextField() throws IOException {
JSONObject actual =
executeQuery(
String.format("source=%s | stats first(employer), first(email)", TEST_INDEX_BANK));
verifySchema(actual, schema("first(employer)", "string"), schema("first(email)", "string"));
verifyDataRows(actual, rows("Pyrami", "amberduke@pyrami.com"));
}

@Test
public void testLastAggregationOnTextField() throws IOException {
JSONObject actual =
executeQuery(
String.format("source=%s | stats last(employer), first(email)", TEST_INDEX_BANK));
verifySchema(actual, schema("last(employer)", "string"), schema("first(email)", "string"));
verifyDataRows(actual, rows("Quailcom", "amberduke@pyrami.com"));
}

@Test
public void testFirstLastByGroupOnTextField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats first(employer), last(email) by gender", TEST_INDEX_BANK));
verifySchema(
actual,
schema("first(employer)", "string"),
schema("last(email)", "string"),
schema("gender", "string"));
verifyDataRows(
actual,
rows("Quility", "dillardmcpherson@quailcom.com", "F"),
rows("Pyrami", "elinorratliff@scentric.com", "M"));
}

@Test
public void testFirstLastWithOtherAggregationsOnTextField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats first(employer), last(email), count(), avg(age) by gender",
TEST_INDEX_BANK));
verifySchema(
actual,
schema("first(employer)", "string"),
schema("last(email)", "string"),
schema("count()", "bigint"),
schema("avg(age)", "double"),
schema("gender", "string"));
verifyDataRows(
actual,
rows("Quility", "dillardmcpherson@quailcom.com", 3, 33.666666666666664, "F"),
rows("Pyrami", "elinorratliff@scentric.com", 4, 34.25, "M"));
}

@Test
public void testFirstLastMixedFields() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats first(employer), last(balance), first(age)", TEST_INDEX_BANK));
verifySchema(
actual,
schema("first(employer)", "string"),
schema("last(balance)", "bigint"),
schema("first(age)", "int"));
verifyDataRows(actual, rows("Pyrami", 48086L, 32));
}
Comment thread
LantaoJin marked this conversation as resolved.

@Test
public void testFirstLastWithBirthdate() throws IOException {
JSONObject actual =
Expand Down Expand Up @@ -1533,4 +1602,62 @@ public void testMixedTypesNestedFieldAggregations() throws IOException {
schema("first_lang", "string"));
verifyDataRows(actual, rows(10, 14, false, true, "java"));
}

@Test
public void testTextTypeWithLastFirstTakeAggregation() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats take(address, 2), last(address), first(address), "
+ "take(state, 2), last(state), first(state), "
+ "take(balance, 2), last(balance), first(balance)",
TEST_INDEX_BANK));
verifyDataRows(
actual,
rows(
List.of("880 Holmes Lane", "671 Bristol Street"),
"702 Quentin Street",
"880 Holmes Lane",
List.of("IL", "TN"),
"IN",
"IL",
List.of(39225, 5686),
48086,
39225));
}

@Test
public void testScriptWithLastFirstTakeAggregation() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval new_address = upper(address), new_state = lower(state),"
+ " new_balance = balance * 10 | stats take(new_address, 2), last(new_address),"
+ " first(new_address), take(new_state, 2), last(new_state), first(new_state),"
+ " take(new_balance, 2), last(new_balance), first(new_balance)",
TEST_INDEX_BANK));
verifyDataRows(
actual,
rows(
List.of("880 HOLMES LANE", "671 BRISTOL STREET"),
"702 QUENTIN STREET",
"880 HOLMES LANE",
List.of("il", "tn"),
"in",
"il",
List.of(392250, 56860),
480860,
392250));
}

@Test
public void testTakeAggregationWithNegative() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval new_balance = balance * 10 | stats take(new_balance, 0),"
+ " take(new_balance, -1), take(balance, 0), take(balance, -1)",
TEST_INDEX_BANK));
verifyDataRows(actual, rows(List.of(), List.of(), List.of(), List.of()));
}
}
6 changes: 3 additions & 3 deletions integ-test/src/test/resources/alias.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"index":{"_id":"1"}}
{"original_col" : 1}
{"original_col" : 1,"original_text" : "a b c","original_date":"2026-01-01T10:00:00Z"}
{"index":{"_id":"2"}}
{"original_col" : 2}
{"original_col" : 2,"original_text" : "d e f","original_date":"2026-01-02T10:00:00Z"}
{"index":{"_id":"3"}}
{"original_col" : 3}
{"original_col" : 3,"original_text" : "x y z","original_date":"2026-01-03T10:00:00Z"}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ calcite:
physical: |
EnumerableCalc(expr#0..15=[{inputs}], proj#0..12=[{exprs}], aws=[$t14], event=[$t15])
CalciteEnumerableTopK(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","process","log","message","tags","cloud","input","@timestamp","ecs","data_stream","meta","host","metrics","aws","event"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"metrics.size"},{"field":"message"},{"field":"tags"},{"field":"@timestamp"}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Loading
Loading