Skip to content

Commit 7d2813d

Browse files
committed
add earliest and latest
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 94fb171 commit 7d2813d

9 files changed

Lines changed: 225 additions & 10 deletions

File tree

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.udf.udaf;
7+
8+
import java.util.Objects;
9+
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;
10+
11+
public class EarliestFunction
12+
implements UserDefinedAggFunction<EarliestFunction.EarliestAccumulator> {
13+
@Override
14+
public EarliestAccumulator init() {
15+
return new EarliestAccumulator();
16+
}
17+
18+
@Override
19+
public Object result(EarliestAccumulator accumulator) {
20+
return accumulator.value();
21+
}
22+
23+
@Override
24+
public EarliestAccumulator add(EarliestAccumulator acc, Object... values) {
25+
acc.add(values[0]);
26+
return acc;
27+
}
28+
29+
public static class EarliestAccumulator implements UserDefinedAggFunction.Accumulator {
30+
private String earliest;
31+
32+
public EarliestAccumulator() {
33+
earliest = null;
34+
}
35+
36+
@Override
37+
public Object value(Object... argList) {
38+
return earliest;
39+
}
40+
41+
public void add(Object value) {
42+
if (!Objects.isNull(value)) {
43+
if (earliest == null) {
44+
earliest = String.valueOf(value);
45+
} else if (earliest.compareTo(value.toString()) > 0) {
46+
earliest = String.valueOf(value);
47+
}
48+
}
49+
}
50+
}
51+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.udf.udaf;
7+
8+
import java.util.Objects;
9+
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;
10+
11+
public class LatestFunction implements UserDefinedAggFunction<LatestFunction.LatestAccumulator> {
12+
@Override
13+
public LatestAccumulator init() {
14+
return new LatestAccumulator();
15+
}
16+
17+
@Override
18+
public Object result(LatestFunction.LatestAccumulator accumulator) {
19+
return accumulator.value();
20+
}
21+
22+
@Override
23+
public LatestFunction.LatestAccumulator add(
24+
LatestFunction.LatestAccumulator acc, Object... values) {
25+
acc.add(values[0]);
26+
return acc;
27+
}
28+
29+
public static class LatestAccumulator implements UserDefinedAggFunction.Accumulator {
30+
private String latest;
31+
32+
public LatestAccumulator() {
33+
latest = null;
34+
}
35+
36+
@Override
37+
public Object value(Object... argList) {
38+
return latest;
39+
}
40+
41+
public void add(Object value) {
42+
if (!Objects.isNull(value)) {
43+
if (latest == null) {
44+
latest = String.valueOf(value);
45+
} else if (latest.compareTo(value.toString()) < 0) {
46+
latest = String.valueOf(value);
47+
}
48+
}
49+
}
50+
}
51+
}

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.opensearch.sql.ast.expression.WindowBound;
3434
import org.opensearch.sql.ast.expression.WindowFrame;
3535
import org.opensearch.sql.calcite.CalcitePlanContext;
36+
import org.opensearch.sql.calcite.udf.udaf.EarliestFunction;
37+
import org.opensearch.sql.calcite.udf.udaf.LatestFunction;
3638
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
3739
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
3840
import org.opensearch.sql.expression.function.BuiltinFunctionName;
@@ -256,6 +258,22 @@ static RelBuilder.AggCall makeAggCall(
256258
List.of(field),
257259
newArgList,
258260
context.relBuilder);
261+
case EARLIEST:
262+
return TransferUserDefinedAggFunction(
263+
EarliestFunction.class,
264+
"earliest",
265+
ReturnTypes.ARG0_FORCE_NULLABLE,
266+
List.of(field),
267+
argList,
268+
context.relBuilder);
269+
case LATEST:
270+
return TransferUserDefinedAggFunction(
271+
LatestFunction.class,
272+
"latest",
273+
ReturnTypes.ARG0_FORCE_NULLABLE,
274+
List.of(field),
275+
argList,
276+
context.relBuilder);
259277
default:
260278
throw new UnsupportedOperationException(
261279
"Unexpected aggregation: " + functionName.getName().getFunctionName());

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ public enum BuiltinFunctionName {
188188
TAKE(FunctionName.of("take")),
189189
// t-digest percentile which is used in OpenSearch core by default.
190190
PERCENTILE_APPROX(FunctionName.of("percentile_approx")),
191+
EARLIEST(FunctionName.of("earliest")),
192+
LATEST(FunctionName.of("latest")),
191193
// Not always an aggregation query
192194
NESTED(FunctionName.of("nested")),
193195

@@ -314,6 +316,8 @@ public enum BuiltinFunctionName {
314316
.put("take", BuiltinFunctionName.TAKE)
315317
.put("percentile", BuiltinFunctionName.PERCENTILE_APPROX)
316318
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
319+
.put("earliest", BuiltinFunctionName.EARLIEST)
320+
.put("latest", BuiltinFunctionName.LATEST)
317321
.build();
318322

319323
private static final Map<String, BuiltinFunctionName> WINDOW_FUNC_MAPPING =
@@ -330,6 +334,8 @@ public enum BuiltinFunctionName {
330334
.put("stddev", BuiltinFunctionName.STDDEV_POP)
331335
.put("stddev_pop", BuiltinFunctionName.STDDEV_POP)
332336
.put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP)
337+
.put("earliest", BuiltinFunctionName.EARLIEST)
338+
.put("latest", BuiltinFunctionName.LATEST)
333339
.build();
334340

335341
public static Optional<BuiltinFunctionName> of(String str) {

integ-test/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,8 @@ integTest {
448448

449449
dependsOn ':opensearch-sql-plugin:bundlePlugin'
450450
if(getOSFamilyType() != "windows") {
451-
dependsOn startPrometheus
452-
finalizedBy stopPrometheus
451+
//dependsOn startPrometheus
452+
//finalizedBy stopPrometheus
453453
}
454454

455455
// enable calcite codegen in IT

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLAggregationIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,52 @@ public void testApproxCountDistinct() {
532532
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
533533
}
534534

535+
@Test
536+
public void testEarliestAndLatest() {
537+
JSONObject actual =
538+
executeQuery(
539+
String.format(
540+
"source=%s | stats latest(datetime0), earliest(datetime0)", TEST_INDEX_CALCS));
541+
542+
verifySchema(
543+
actual,
544+
schema("latest(datetime0)", "timestamp"),
545+
schema("earliest(datetime0)", "timestamp"));
546+
verifyDataRows(actual, rows("2004-08-02 07:59:23", "2004-07-04 22:49:28"));
547+
}
548+
549+
@Test
550+
public void testEarliestAndLatestWithAlias() {
551+
JSONObject actual =
552+
executeQuery(
553+
String.format(
554+
"source=%s | stats latest(datetime0) as late, earliest(datetime0) as early",
555+
TEST_INDEX_CALCS));
556+
557+
verifySchema(actual, schema("late", "timestamp"), schema("early", "timestamp"));
558+
verifyDataRows(actual, rows("2004-08-02 07:59:23", "2004-07-04 22:49:28"));
559+
}
560+
561+
@Test
562+
public void testEarliestAndLatestWithBy() {
563+
JSONObject actual =
564+
executeQuery(
565+
String.format(
566+
"source=%s | stats latest(datetime0) as late, earliest(datetime0) as early by"
567+
+ " bool2",
568+
TEST_INDEX_CALCS));
569+
570+
verifySchema(
571+
actual,
572+
schema("late", "timestamp"),
573+
schema("early", "timestamp"),
574+
schema("bool2", "boolean"));
575+
verifyDataRows(
576+
actual,
577+
rows("2004-07-31 11:57:52", "2004-07-12 17:30:16", true),
578+
rows("2004-08-02 07:59:23", "2004-07-04 22:49:28", false));
579+
}
580+
535581
@Test
536582
public void testVarSampVarPop() {
537583
JSONObject actual =

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLEventstatsIT.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,8 @@
55

66
package org.opensearch.sql.calcite.standalone;
77

8-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
9-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY_WITH_NULL;
10-
import static org.opensearch.sql.util.MatcherUtils.rows;
11-
import static org.opensearch.sql.util.MatcherUtils.schema;
12-
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
13-
import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains;
14-
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
15-
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;
8+
import static org.opensearch.sql.legacy.TestsConstants.*;
9+
import static org.opensearch.sql.util.MatcherUtils.*;
1610

1711
import java.io.IOException;
1812
import java.util.List;
@@ -28,6 +22,7 @@ public void init() throws IOException {
2822
super.init();
2923
loadIndex(Index.STATE_COUNTRY);
3024
loadIndex(Index.STATE_COUNTRY_WITH_NULL);
25+
loadIndex(Index.BANK_TWO);
3126
}
3227

3328
@Test
@@ -600,4 +595,48 @@ public void testEventstatVarianceWithNullBy() {
600595
58.333333333333314),
601596
rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800));
602597
}
598+
599+
@Test
600+
public void testEventstatEarliestAndLatest() {
601+
JSONObject actual =
602+
executeQuery(
603+
String.format(
604+
"source=%s | eventstats earliest(birthdate), latest(birthdate) | head 1",
605+
TEST_INDEX_BANK_TWO));
606+
verifySchema(
607+
actual,
608+
schema("account_number", "long"),
609+
schema("firstname", "string"),
610+
schema("address", "string"),
611+
schema("birthdate", "timestamp"),
612+
schema("gender", "string"),
613+
schema("city", "string"),
614+
schema("lastname", "string"),
615+
schema("balance", "long"),
616+
schema("employer", "string"),
617+
schema("state", "string"),
618+
schema("age", "integer"),
619+
schema("email", "string"),
620+
schema("male", "boolean"),
621+
schema("earliest(birthdate)", "timestamp"),
622+
schema("latest(birthdate)", "timestamp"));
623+
verifyDataRows(
624+
actual,
625+
rows(
626+
1,
627+
"Amber JOHnny",
628+
"880 Holmes Lane",
629+
"2017-10-23 00:00:00",
630+
"M",
631+
"Brogan",
632+
"Duke Willmington",
633+
39225,
634+
"Pyrami",
635+
"IL",
636+
32,
637+
"amberduke@pyrami.com",
638+
true,
639+
"1970-01-18 20:22:32",
640+
"2018-08-19 00:00:00"));
641+
}
603642
}

ppl/src/main/antlr/OpenSearchPPLLexer.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ STDDEV_SAMP: 'STDDEV_SAMP';
221221
STDDEV_POP: 'STDDEV_POP';
222222
PERCENTILE: 'PERCENTILE';
223223
PERCENTILE_APPROX: 'PERCENTILE_APPROX';
224+
EARLIEST: 'EARLIEST';
225+
LATEST: 'LATEST';
224226
TAKE: 'TAKE';
225227
LIST: 'LIST';
226228
VALUES: 'VALUES';

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,8 @@ statsFunctionName
405405
| STDDEV_POP
406406
| PERCENTILE
407407
| PERCENTILE_APPROX
408+
| EARLIEST
409+
| LATEST
408410
;
409411

410412
takeAggFunction

0 commit comments

Comments
 (0)