Skip to content

Commit e5d50f2

Browse files
committed
add earliest and latest as two condition function
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 7cbe0d0 commit e5d50f2

7 files changed

Lines changed: 265 additions & 2 deletions

File tree

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.opensearch.sql.data.type.ExprCoreType;
2828
import org.opensearch.sql.expression.datetime.DateTimeFunctions;
2929
import org.opensearch.sql.expression.function.udf.CryptographicFunction;
30+
import org.opensearch.sql.expression.function.udf.condition.EarliestFunction;
31+
import org.opensearch.sql.expression.function.udf.condition.LatestFunction;
3032
import org.opensearch.sql.expression.function.udf.datetime.AddSubDateFunction;
3133
import org.opensearch.sql.expression.function.udf.datetime.CurrentFunction;
3234
import org.opensearch.sql.expression.function.udf.datetime.DateAddSubFunction;
@@ -65,6 +67,10 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
6567
public static final SqlOperator DIVIDE = new DivideFunction().toUDF("DIVIDE");
6668
public static final SqlOperator SHA2 = CryptographicFunction.sha2().toUDF("SHA2");
6769

70+
// Condition function
71+
public static final SqlOperator EARLIEST = new EarliestFunction().toUDF("EARLIEST");
72+
public static final SqlOperator LATEST = new LatestFunction().toUDF("LATEST");
73+
6874
// Datetime function
6975
public static final SqlOperator TIMESTAMP = new TimestampFunction().toUDF("TIMESTAMP");
7076
public static final SqlOperator DATE =

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ void populate() {
203203
registerOperator(IF, SqlStdOperatorTable.CASE);
204204
registerOperator(IFNULL, SqlStdOperatorTable.COALESCE);
205205
registerOperator(IS_PRESENT, SqlStdOperatorTable.IS_NOT_NULL);
206+
registerOperator(EARLIEST, PPLBuiltinOperators.EARLIEST);
207+
registerOperator(LATEST, PPLBuiltinOperators.LATEST);
206208

207209
// Register library operator
208210
registerOperator(REGEXP, SqlLibraryOperators.REGEXP);
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.function.udf.condition;
7+
8+
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.prependFunctionProperties;
9+
import static org.opensearch.sql.utils.DateTimeUtils.getRelativeZonedDateTime;
10+
11+
import java.time.Clock;
12+
import java.time.Instant;
13+
import java.time.ZonedDateTime;
14+
import java.util.List;
15+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
16+
import org.apache.calcite.adapter.enumerable.NullPolicy;
17+
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
18+
import org.apache.calcite.linq4j.tree.Expression;
19+
import org.apache.calcite.linq4j.tree.Expressions;
20+
import org.apache.calcite.rel.type.RelDataType;
21+
import org.apache.calcite.rex.RexCall;
22+
import org.apache.calcite.rex.RexNode;
23+
import org.apache.calcite.sql.type.ReturnTypes;
24+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
25+
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
26+
import org.opensearch.sql.data.model.ExprValue;
27+
import org.opensearch.sql.expression.function.FunctionProperties;
28+
import org.opensearch.sql.expression.function.ImplementorUDF;
29+
30+
public class EarliestFunction extends ImplementorUDF {
31+
public EarliestFunction() {
32+
super(new EarliestImplementor(), NullPolicy.ANY);
33+
}
34+
35+
@Override
36+
public SqlReturnTypeInference getReturnTypeInference() {
37+
return ReturnTypes.BOOLEAN;
38+
}
39+
40+
public static class EarliestImplementor implements NotNullImplementor {
41+
@Override
42+
public Expression implement(
43+
RexToLixTranslator rexToLixTranslator, RexCall rexCall, List<Expression> list) {
44+
List<RelDataType> types = rexCall.getOperands().stream().map(RexNode::getType).toList();
45+
return Expressions.call(
46+
EarliestFunction.class,
47+
"earliest",
48+
prependFunctionProperties(
49+
UserDefinedFunctionUtils.convertToExprValues(list, types), rexToLixTranslator));
50+
}
51+
}
52+
53+
public static Boolean earliest(Object... inputs) {
54+
String expression = ((ExprValue) inputs[1]).stringValue();
55+
Instant candidate = ((ExprValue) inputs[2]).timestampValue();
56+
FunctionProperties functionProperties = (FunctionProperties) inputs[0];
57+
Clock clock = functionProperties.getQueryStartClock();
58+
ZonedDateTime candidateDatetime = ZonedDateTime.ofInstant(candidate, clock.getZone());
59+
ZonedDateTime earliest =
60+
getRelativeZonedDateTime(
61+
expression, ZonedDateTime.ofInstant(clock.instant(), clock.getZone()));
62+
return earliest.isBefore(candidateDatetime);
63+
}
64+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.function.udf.condition;
7+
8+
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.prependFunctionProperties;
9+
import static org.opensearch.sql.utils.DateTimeUtils.getRelativeZonedDateTime;
10+
11+
import java.time.Clock;
12+
import java.time.Instant;
13+
import java.time.ZonedDateTime;
14+
import java.util.List;
15+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
16+
import org.apache.calcite.adapter.enumerable.NullPolicy;
17+
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
18+
import org.apache.calcite.linq4j.tree.Expression;
19+
import org.apache.calcite.linq4j.tree.Expressions;
20+
import org.apache.calcite.rel.type.RelDataType;
21+
import org.apache.calcite.rex.RexCall;
22+
import org.apache.calcite.rex.RexNode;
23+
import org.apache.calcite.sql.type.ReturnTypes;
24+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
25+
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
26+
import org.opensearch.sql.data.model.ExprValue;
27+
import org.opensearch.sql.expression.function.FunctionProperties;
28+
import org.opensearch.sql.expression.function.ImplementorUDF;
29+
30+
public class LatestFunction extends ImplementorUDF {
31+
public LatestFunction() {
32+
super(new LatestFunction.LatestImplementor(), NullPolicy.ANY);
33+
}
34+
35+
@Override
36+
public SqlReturnTypeInference getReturnTypeInference() {
37+
return ReturnTypes.BOOLEAN;
38+
}
39+
40+
public static class LatestImplementor implements NotNullImplementor {
41+
@Override
42+
public Expression implement(
43+
RexToLixTranslator rexToLixTranslator, RexCall rexCall, List<Expression> list) {
44+
List<RelDataType> types = rexCall.getOperands().stream().map(RexNode::getType).toList();
45+
return Expressions.call(
46+
LatestFunction.class,
47+
"latest",
48+
prependFunctionProperties(
49+
UserDefinedFunctionUtils.convertToExprValues(list, types), rexToLixTranslator));
50+
}
51+
}
52+
53+
public static Boolean latest(Object... inputs) {
54+
String expression = ((ExprValue) inputs[1]).stringValue();
55+
Instant candidate = ((ExprValue) inputs[2]).timestampValue();
56+
FunctionProperties functionProperties = (FunctionProperties) inputs[0];
57+
Clock clock = functionProperties.getQueryStartClock();
58+
ZonedDateTime candidateDatetime = ZonedDateTime.ofInstant(candidate, clock.getZone());
59+
ZonedDateTime latest =
60+
getRelativeZonedDateTime(
61+
expression, ZonedDateTime.ofInstant(clock.instant(), clock.getZone()));
62+
return latest.isAfter(candidateDatetime);
63+
}
64+
}

core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
import java.time.ZoneId;
1212
import java.time.ZoneOffset;
1313
import java.time.ZonedDateTime;
14+
import java.time.format.DateTimeFormatter;
15+
import java.time.format.DateTimeParseException;
16+
import java.time.temporal.ChronoUnit;
17+
import java.util.regex.Matcher;
18+
import java.util.regex.Pattern;
1419
import lombok.experimental.UtilityClass;
1520
import org.opensearch.sql.data.model.ExprTimeValue;
1621
import org.opensearch.sql.data.model.ExprValue;
@@ -19,6 +24,10 @@
1924
@UtilityClass
2025
public class DateTimeUtils {
2126

27+
private static final Pattern OFFSET_PATTERN = Pattern.compile("([+-])(\\d+)([smhdwMy]?)");
28+
private static final DateTimeFormatter DIRECT_FORMATTER =
29+
DateTimeFormatter.ofPattern("MM/dd/yyyy:HH:mm:ss");
30+
2231
/**
2332
* Util method to round the date/time with given unit.
2433
*
@@ -151,4 +160,93 @@ public static LocalDate extractDate(ExprValue value, FunctionProperties function
151160
? ((ExprTimeValue) value).dateValue(functionProperties)
152161
: value.dateValue();
153162
}
163+
164+
public static ZonedDateTime getRelativeZonedDateTime(String input, ZonedDateTime baseTime) {
165+
try {
166+
Instant localDateTime =
167+
LocalDateTime.parse(input, DIRECT_FORMATTER).toInstant(ZoneOffset.UTC);
168+
return localDateTime.atZone(baseTime.getZone());
169+
} catch (DateTimeParseException ignored) {
170+
}
171+
172+
if ("now".equalsIgnoreCase(input) || "now()".equalsIgnoreCase(input)) {
173+
return baseTime;
174+
}
175+
176+
// 1. extract snap(like @d)
177+
String snapUnit = null;
178+
int atIndex = input.indexOf('@');
179+
if (atIndex != -1) {
180+
snapUnit = input.substring(atIndex + 1);
181+
input = input.substring(0, atIndex);
182+
}
183+
184+
// 2. apply snap
185+
ZonedDateTime result = baseTime;
186+
if (snapUnit != null && !snapUnit.isEmpty()) {
187+
result = applySnap(result, snapUnit);
188+
}
189+
190+
// 3. apply offset one by one(like -1d+2h-10m)
191+
Matcher matcher = OFFSET_PATTERN.matcher(input);
192+
while (matcher.find()) {
193+
String sign = matcher.group(1);
194+
int value = Integer.parseInt(matcher.group(2));
195+
String unit = matcher.group(3);
196+
if (unit == null || unit.isEmpty()) {
197+
unit = "s"; // default value is second
198+
}
199+
result = applyOffset(result, sign, value, unit);
200+
}
201+
202+
return result;
203+
}
204+
205+
private static ZonedDateTime applyOffset(
206+
ZonedDateTime base, String sign, int value, String unit) {
207+
ChronoUnit chronoUnit = parseUnit(unit);
208+
return sign.equals("-") ? base.minus(value, chronoUnit) : base.plus(value, chronoUnit);
209+
}
210+
211+
private static ZonedDateTime applySnap(ZonedDateTime base, String unit) {
212+
switch (unit) {
213+
case "s":
214+
return base.truncatedTo(ChronoUnit.SECONDS);
215+
case "m":
216+
return base.truncatedTo(ChronoUnit.MINUTES);
217+
case "h":
218+
return base.truncatedTo(ChronoUnit.HOURS);
219+
case "d":
220+
return base.truncatedTo(ChronoUnit.DAYS);
221+
case "w":
222+
return base.minusDays((base.getDayOfWeek().getValue() % 7)).truncatedTo(ChronoUnit.DAYS);
223+
case "M":
224+
return base.withDayOfMonth(1).truncatedTo(ChronoUnit.DAYS);
225+
case "y":
226+
return base.withDayOfYear(1).truncatedTo(ChronoUnit.DAYS);
227+
default:
228+
throw new IllegalArgumentException("Unsupported snap unit: " + unit);
229+
}
230+
}
231+
232+
private static ChronoUnit parseUnit(String unit) {
233+
switch (unit) {
234+
case "s":
235+
return ChronoUnit.SECONDS;
236+
case "m":
237+
return ChronoUnit.MINUTES;
238+
case "h":
239+
return ChronoUnit.HOURS;
240+
case "d":
241+
return ChronoUnit.DAYS;
242+
case "w":
243+
return ChronoUnit.WEEKS;
244+
case "M":
245+
return ChronoUnit.MONTHS;
246+
case "y":
247+
return ChronoUnit.YEARS;
248+
default:
249+
throw new IllegalArgumentException("Unsupported time unit: " + unit);
250+
}
251+
}
154252
}

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLConditionBuiltinFunctionIT.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55

66
package org.opensearch.sql.calcite.standalone;
77

8-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
9-
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY_WITH_NULL;
8+
import static org.opensearch.sql.legacy.TestsConstants.*;
109
import static org.opensearch.sql.util.MatcherUtils.*;
1110
import static org.opensearch.sql.util.MatcherUtils.rows;
1211

@@ -21,6 +20,7 @@ public void init() throws IOException {
2120
super.init();
2221
loadIndex(Index.STATE_COUNTRY);
2322
loadIndex(Index.STATE_COUNTRY_WITH_NULL);
23+
loadIndex(Index.CALCS);
2424
Request request1 =
2525
new Request("PUT", "/" + TEST_INDEX_STATE_COUNTRY_WITH_NULL + "/_doc/7?refresh=true");
2626
request1.setJsonEntity(
@@ -216,4 +216,31 @@ public void testIsBlank() throws IOException {
216216

217217
verifyDataRows(actual, rows(null, 10), rows(" ", 27), rows("", 57));
218218
}
219+
220+
@Test
221+
public void testEarliest() throws IOException {
222+
JSONObject actual =
223+
executeQuery(
224+
String.format(
225+
"source=%s | where earliest('07/28/2004:12:34:27', datetime0) | stats COUNT() as"
226+
+ " cnt",
227+
TEST_INDEX_CALCS));
228+
229+
verifySchema(actual, schema("cnt", "long"));
230+
231+
verifyDataRows(actual, rows(4));
232+
}
233+
234+
@Test
235+
public void testLatest() throws IOException {
236+
JSONObject actual =
237+
executeQuery(
238+
String.format(
239+
"source=%s | where latest('07/28/2004:12:34:27', datetime0) | stats COUNT() as cnt",
240+
TEST_INDEX_CALCS));
241+
242+
verifySchema(actual, schema("cnt", "long"));
243+
244+
verifyDataRows(actual, rows(13));
245+
}
219246
}

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,8 @@ conditionFunctionName
839839
| ISPRESENT
840840
| ISEMPTY
841841
| ISBLANK
842+
| EARLIEST
843+
| LATEST
842844
;
843845

844846
// flow control function return non-boolean value

0 commit comments

Comments
 (0)