Skip to content

Commit 7d82cd8

Browse files
committed
Merge remote-tracking branch 'origin/main' into issues/4201
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
2 parents 010fd06 + 5630119 commit 7d82cd8

111 files changed

Lines changed: 4390 additions & 180 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/stalled.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ jobs:
2121
with:
2222
repo-token: ${{ steps.github_app_token.outputs.token }}
2323
stale-pr-label: 'stalled'
24-
stale-pr-message: 'This PR is stalled because it has been open for 30 days with no activity.'
25-
days-before-pr-stale: 30
24+
stale-pr-message: 'This PR is stalled because it has been open for 2 weeks with no activity.'
25+
days-before-pr-stale: 14
2626
days-before-issue-stale: -1
2727
days-before-pr-close: -1
2828
days-before-issue-close: -1
29+
exempt-draft-pr: true

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public enum Key {
3333
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
3434
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
3535
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
36+
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
37+
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),
3638

3739
/** Enable Calcite as execution engine */
3840
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),

core/src/main/java/org/opensearch/sql/ast/tree/Timechart.java

Lines changed: 140 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,43 @@
55

66
package org.opensearch.sql.ast.tree;
77

8+
import static org.opensearch.sql.ast.dsl.AstDSL.aggregate;
9+
import static org.opensearch.sql.ast.dsl.AstDSL.doubleLiteral;
10+
import static org.opensearch.sql.ast.dsl.AstDSL.eval;
11+
import static org.opensearch.sql.ast.dsl.AstDSL.function;
12+
import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral;
13+
import static org.opensearch.sql.ast.expression.IntervalUnit.SECOND;
14+
import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.sum;
15+
import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.timestampadd;
16+
import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.timestampdiff;
17+
import static org.opensearch.sql.calcite.plan.OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP;
18+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE;
19+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY;
20+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUM;
21+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.TIMESTAMPADD;
22+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.TIMESTAMPDIFF;
23+
824
import com.google.common.collect.ImmutableList;
925
import java.util.List;
26+
import java.util.Locale;
27+
import java.util.Map;
28+
import java.util.Optional;
1029
import lombok.AllArgsConstructor;
1130
import lombok.EqualsAndHashCode;
1231
import lombok.Getter;
32+
import lombok.RequiredArgsConstructor;
1333
import lombok.ToString;
1434
import org.opensearch.sql.ast.AbstractNodeVisitor;
35+
import org.opensearch.sql.ast.dsl.AstDSL;
36+
import org.opensearch.sql.ast.expression.AggregateFunction;
37+
import org.opensearch.sql.ast.expression.Field;
38+
import org.opensearch.sql.ast.expression.Function;
39+
import org.opensearch.sql.ast.expression.IntervalUnit;
40+
import org.opensearch.sql.ast.expression.Let;
41+
import org.opensearch.sql.ast.expression.Span;
42+
import org.opensearch.sql.ast.expression.SpanUnit;
1543
import org.opensearch.sql.ast.expression.UnresolvedExpression;
44+
import org.opensearch.sql.calcite.utils.PlanUtils;
1645

1746
/** AST node represent Timechart operation. */
1847
@Getter
@@ -49,8 +78,9 @@ public Timechart useOther(Boolean useOther) {
4978
}
5079

5180
@Override
52-
public Timechart attach(UnresolvedPlan child) {
53-
return toBuilder().child(child).build();
81+
public UnresolvedPlan attach(UnresolvedPlan child) {
82+
// Transform after child attached to avoid unintentionally overriding it
83+
return toBuilder().child(child).build().transformPerFunction();
5484
}
5585

5686
@Override
@@ -62,4 +92,112 @@ public List<UnresolvedPlan> getChild() {
6292
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
6393
return nodeVisitor.visitTimechart(this, context);
6494
}
95+
96+
/**
97+
* Transform per function to eval-based post-processing on sum result by timechart. Specifically,
98+
* calculate how many seconds are in the time bucket based on the span option dynamically, then
99+
* divide the aggregated sum value by the number of seconds to get the per-second rate.
100+
*
101+
* <p>For example, with span=5m per_second(field): per second rate = sum(field) / 300 seconds
102+
*
103+
* @return eval+timechart if per function present, or the original timechart otherwise.
104+
*/
105+
private UnresolvedPlan transformPerFunction() {
106+
Optional<PerFunction> perFuncOpt = PerFunction.from(aggregateFunction);
107+
if (perFuncOpt.isEmpty()) {
108+
return this;
109+
}
110+
111+
PerFunction perFunc = perFuncOpt.get();
112+
Span span = (Span) this.binExpression;
113+
Field spanStartTime = AstDSL.field(IMPLICIT_FIELD_TIMESTAMP);
114+
Function spanEndTime = timestampadd(span.getUnit(), span.getValue(), spanStartTime);
115+
Function spanSeconds = timestampdiff(SECOND, spanStartTime, spanEndTime);
116+
117+
return eval(
118+
timechart(AstDSL.alias(perFunc.aggName, sum(perFunc.aggArg))),
119+
let(perFunc.aggName).multiply(perFunc.seconds).dividedBy(spanSeconds));
120+
}
121+
122+
private Timechart timechart(UnresolvedExpression newAggregateFunction) {
123+
return this.toBuilder().aggregateFunction(newAggregateFunction).build();
124+
}
125+
126+
/** TODO: extend to support additional per_* functions */
127+
@RequiredArgsConstructor
128+
static class PerFunction {
129+
private static final Map<String, Integer> UNIT_SECONDS = Map.of("per_second", 1);
130+
private final String aggName;
131+
private final UnresolvedExpression aggArg;
132+
private final int seconds;
133+
134+
static Optional<PerFunction> from(UnresolvedExpression aggExpr) {
135+
if (!(aggExpr instanceof AggregateFunction)) {
136+
return Optional.empty();
137+
}
138+
139+
AggregateFunction aggFunc = (AggregateFunction) aggExpr;
140+
String aggFuncName = aggFunc.getFuncName().toLowerCase(Locale.ROOT);
141+
if (!UNIT_SECONDS.containsKey(aggFuncName)) {
142+
return Optional.empty();
143+
}
144+
145+
String aggName = toAggName(aggFunc);
146+
return Optional.of(
147+
new PerFunction(aggName, aggFunc.getField(), UNIT_SECONDS.get(aggFuncName)));
148+
}
149+
150+
private static String toAggName(AggregateFunction aggFunc) {
151+
String fieldName =
152+
(aggFunc.getField() instanceof Field)
153+
? ((Field) aggFunc.getField()).getField().toString()
154+
: aggFunc.getField().toString();
155+
return String.format(Locale.ROOT, "%s(%s)", aggFunc.getFuncName(), fieldName);
156+
}
157+
}
158+
159+
private PerFunctionRateExprBuilder let(String fieldName) {
160+
return new PerFunctionRateExprBuilder(AstDSL.field(fieldName));
161+
}
162+
163+
/** Fluent builder for creating Let expressions with mathematical operations. */
164+
static class PerFunctionRateExprBuilder {
165+
private final Field field;
166+
private UnresolvedExpression expr;
167+
168+
PerFunctionRateExprBuilder(Field field) {
169+
this.field = field;
170+
this.expr = field;
171+
}
172+
173+
PerFunctionRateExprBuilder multiply(Integer multiplier) {
174+
// Promote to double literal to avoid integer division in downstream
175+
this.expr =
176+
function(
177+
MULTIPLY.getName().getFunctionName(), expr, doubleLiteral(multiplier.doubleValue()));
178+
return this;
179+
}
180+
181+
Let dividedBy(UnresolvedExpression divisor) {
182+
return AstDSL.let(field, function(DIVIDE.getName().getFunctionName(), expr, divisor));
183+
}
184+
185+
static UnresolvedExpression sum(UnresolvedExpression field) {
186+
return aggregate(SUM.getName().getFunctionName(), field);
187+
}
188+
189+
static Function timestampadd(
190+
SpanUnit unit, UnresolvedExpression value, UnresolvedExpression timestampField) {
191+
UnresolvedExpression intervalUnit =
192+
stringLiteral(PlanUtils.spanUnitToIntervalUnit(unit).toString());
193+
return function(
194+
TIMESTAMPADD.getName().getFunctionName(), intervalUnit, value, timestampField);
195+
}
196+
197+
static Function timestampdiff(
198+
IntervalUnit unit, UnresolvedExpression start, UnresolvedExpression end) {
199+
return function(
200+
TIMESTAMPDIFF.getName().getFunctionName(), stringLiteral(unit.toString()), start, end);
201+
}
202+
}
65203
}

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class CalcitePlanContext {
3535
public final ExtendedRexBuilder rexBuilder;
3636
public final FunctionProperties functionProperties;
3737
public final QueryType queryType;
38-
public final Integer querySizeLimit;
38+
public final SysLimit sysLimit;
3939

4040
/** This thread local variable is only used to skip script encoding in script pushdown. */
4141
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);
@@ -61,9 +61,9 @@ public class CalcitePlanContext {
6161

6262
@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;
6363

64-
private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
64+
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
6565
this.config = config;
66-
this.querySizeLimit = querySizeLimit;
66+
this.sysLimit = sysLimit;
6767
this.queryType = queryType;
6868
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
6969
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
@@ -102,12 +102,12 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
102102
}
103103

104104
public CalcitePlanContext clone() {
105-
return new CalcitePlanContext(config, querySizeLimit, queryType);
105+
return new CalcitePlanContext(config, sysLimit, queryType);
106106
}
107107

108108
public static CalcitePlanContext create(
109-
FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
110-
return new CalcitePlanContext(config, querySizeLimit, queryType);
109+
FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
110+
return new CalcitePlanContext(config, sysLimit, queryType);
111111
}
112112

113113
/**

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@
133133
import org.opensearch.sql.ast.tree.UnresolvedPlan;
134134
import org.opensearch.sql.ast.tree.Values;
135135
import org.opensearch.sql.ast.tree.Window;
136+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
137+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
136138
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
137139
import org.opensearch.sql.calcite.utils.BinUtils;
138140
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
@@ -1136,6 +1138,15 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
11361138
public RelNode visitJoin(Join node, CalcitePlanContext context) {
11371139
List<UnresolvedPlan> children = node.getChildren();
11381140
children.forEach(c -> analyze(c, context));
1141+
// add join.subsearch_maxout limit to subsearch side, 0 and negative means unlimited.
1142+
if (context.sysLimit.joinSubsearchLimit() > 0) {
1143+
PlanUtils.replaceTop(
1144+
context.relBuilder,
1145+
LogicalSystemLimit.create(
1146+
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
1147+
context.relBuilder.peek(),
1148+
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
1149+
}
11391150
if (node.getJoinCondition().isEmpty()) {
11401151
// join-with-field-list grammar
11411152
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();
@@ -1914,6 +1925,9 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
19141925

19151926
/** Helper method to get the function name for proper column naming */
19161927
private String getValueFunctionName(UnresolvedExpression aggregateFunction) {
1928+
if (aggregateFunction instanceof Alias) {
1929+
return ((Alias) aggregateFunction).getName();
1930+
}
19171931
if (!(aggregateFunction instanceof AggregateFunction)) {
19181932
return "value";
19191933
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,13 @@
6868
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
6969
import org.opensearch.sql.ast.expression.subquery.InSubquery;
7070
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
71+
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
7172
import org.opensearch.sql.ast.tree.UnresolvedPlan;
73+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
74+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
7275
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
7376
import org.opensearch.sql.calcite.utils.PlanUtils;
77+
import org.opensearch.sql.calcite.utils.SubsearchUtils;
7478
import org.opensearch.sql.common.utils.StringUtils;
7579
import org.opensearch.sql.data.type.ExprType;
7680
import org.opensearch.sql.exception.CalciteUnsupportedException;
@@ -465,7 +469,7 @@ private RexNode extractRexNodeFromAlias(RexNode node) {
465469
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
466470
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
467471
UnresolvedPlan subquery = node.getQuery();
468-
RelNode subqueryRel = resolveSubqueryPlan(subquery, context);
472+
RelNode subqueryRel = resolveSubqueryPlan(subquery, node, context);
469473
if (subqueryRel.getRowType().getFieldCount() != nodes.size()) {
470474
throw new SemanticCheckException(
471475
"The number of columns in the left hand side of an IN subquery does not match the number"
@@ -489,7 +493,7 @@ public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext conte
489493
return context.relBuilder.scalarQuery(
490494
b -> {
491495
UnresolvedPlan subquery = node.getQuery();
492-
return resolveSubqueryPlan(subquery, context);
496+
return resolveSubqueryPlan(subquery, node, context);
493497
});
494498
}
495499

@@ -498,21 +502,39 @@ public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext conte
498502
return context.relBuilder.exists(
499503
b -> {
500504
UnresolvedPlan subquery = node.getQuery();
501-
return resolveSubqueryPlan(subquery, context);
505+
return resolveSubqueryPlan(subquery, node, context);
502506
});
503507
}
504508

505-
private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) {
509+
private RelNode resolveSubqueryPlan(
510+
UnresolvedPlan subquery, SubqueryExpression subqueryExpression, CalcitePlanContext context) {
506511
boolean isNestedSubquery = context.isResolvingSubquery();
507512
context.setResolvingSubquery(true);
508513
// clear and store the outer state
509514
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
510515
if (isResolvingJoinConditionOuter) {
511516
context.setResolvingJoinCondition(false);
512517
}
513-
RelNode subqueryRel = subquery.accept(planVisitor, context);
518+
subquery.accept(planVisitor, context);
519+
// add subsearch.maxout limit to exists-in subsearch, 0 and negative means unlimited
520+
if (context.sysLimit.subsearchLimit() > 0 && !(subqueryExpression instanceof ScalarSubquery)) {
521+
// Cannot add system limit to the top of subquery simply.
522+
// Instead, add system limit under the correlated conditions.
523+
SubsearchUtils.SystemLimitInsertionShuttle shuttle =
524+
new SubsearchUtils.SystemLimitInsertionShuttle(context);
525+
RelNode replacement = context.relBuilder.peek().accept(shuttle);
526+
if (!shuttle.isCorrelatedConditionFound()) {
527+
// If no correlated condition found, add system limit to the top of subquery.
528+
replacement =
529+
LogicalSystemLimit.create(
530+
SystemLimitType.SUBSEARCH_MAXOUT,
531+
replacement,
532+
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
533+
}
534+
PlanUtils.replaceTop(context.relBuilder, replacement);
535+
}
514536
// pop the inner plan
515-
context.relBuilder.build();
537+
RelNode subqueryRel = context.relBuilder.build();
516538
// clear the exists subquery resolving state
517539
// restore to the previous state
518540
if (isResolvingJoinConditionOuter) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite;
7+
8+
import org.opensearch.sql.common.setting.Settings;
9+
10+
public record SysLimit(Integer querySizeLimit, Integer subsearchLimit, Integer joinSubsearchLimit) {
11+
/** Create SysLimit from Settings. */
12+
public static SysLimit fromSettings(Settings settings) {
13+
return settings == null
14+
? UNLIMITED_SUBSEARCH
15+
: new SysLimit(
16+
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT),
17+
settings.getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT),
18+
settings.getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT));
19+
}
20+
21+
/** No limitation on subsearch */
22+
public static SysLimit UNLIMITED_SUBSEARCH = new SysLimit(10000, 0, 0);
23+
24+
/** For testing only */
25+
public static SysLimit DEFAULT = new SysLimit(10000, 10000, 50000);
26+
}

core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ public enum SystemLimitType {
3030
*
3131
* <p>This type is used to indicate that the limit is applied to the system level.
3232
*/
33-
QUERY_SIZE_LIMIT
33+
QUERY_SIZE_LIMIT,
34+
/** The max output from subsearch to join against. */
35+
JOIN_SUBSEARCH_MAXOUT,
36+
/** Max output to return from a subsearch. */
37+
SUBSEARCH_MAXOUT,
3438
}
3539

3640
@Getter private final SystemLimitType type;

core/src/main/java/org/opensearch/sql/calcite/udf/udaf/PercentileApproxFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Object result(PencentileApproAccumulator acc) {
6262
float floatRet = (float) retValue;
6363
return floatRet;
6464
default:
65-
return acc.value();
65+
return retValue;
6666
}
6767
}
6868

0 commit comments

Comments
 (0)