Skip to content

Commit 1510d07

Browse files
committed
Merge branch 'main' into ad-operator
2 parents c3f7f50 + 96370bf commit 1510d07

352 files changed

Lines changed: 4077 additions & 1659 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1313
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
1414
import org.opensearch.plugins.Plugin;
15-
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
1615
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
1716
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
1817
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
@@ -21,6 +20,8 @@
2120
import org.opensearch.threadpool.ThreadPool;
2221
import org.opensearch.transport.client.Client;
2322

23+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
24+
2425
/**
2526
* The job runner class for scheduling async query.
2627
*
@@ -37,7 +38,7 @@
3738
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
3839
// Share SQL plugin thread pool
3940
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
40-
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
41+
SQL_WORKER_THREAD_POOL_NAME;
4142
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);
4243

4344
private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();

async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.mockito.Mockito.spy;
1616
import static org.mockito.Mockito.verify;
1717
import static org.mockito.Mockito.when;
18+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1819

1920
import java.time.Instant;
2021
import org.apache.logging.log4j.LogManager;
@@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() {
8788
spyJobRunner.runJob(request, context);
8889

8990
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
90-
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
91+
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
9192
.submit(captor.capture());
9293

9394
Runnable runnable = captor.getValue();
@@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() {
145146
spyJobRunner.runJob(request, context);
146147

147148
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
148-
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
149+
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
149150
.submit(captor.capture());
150151

151152
Runnable runnable = captor.getValue();

core/src/main/java/org/opensearch/sql/ast/tree/Chart.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,14 @@ private UnresolvedPlan transformPerFunction() {
9797

9898
PerFunction perFunc = perFuncOpt.get();
9999
// For chart, the rowSplit should contain the span information
100-
UnresolvedExpression spanExpr = rowSplit;
101-
if (rowSplit instanceof Alias) {
102-
spanExpr = ((Alias) rowSplit).getDelegated();
103-
}
100+
UnresolvedExpression spanExpr =
101+
rowSplit instanceof Alias ? ((Alias) rowSplit).getDelegated() : rowSplit;
104102
if (!(spanExpr instanceof Span)) {
105103
return this; // Cannot transform without span information
106104
}
107105

108106
Span span = (Span) spanExpr;
109-
Field spanStartTime = AstDSL.implicitTimestampField();
107+
Field spanStartTime = (Field) span.getField();
110108
Function spanEndTime = timestampadd(span.getUnit(), span.getValue(), spanStartTime);
111109
Function spanMillis = timestampdiff(MILLISECOND, spanStartTime, spanEndTime);
112110
final int SECOND_IN_MILLISECOND = 1000;

core/src/main/java/org/opensearch/sql/ast/tree/StreamWindow.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,50 +9,27 @@
99
import java.util.List;
1010
import lombok.EqualsAndHashCode;
1111
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
1213
import lombok.ToString;
1314
import org.opensearch.sql.ast.AbstractNodeVisitor;
1415
import org.opensearch.sql.ast.expression.UnresolvedExpression;
1516

1617
@Getter
1718
@ToString
1819
@EqualsAndHashCode(callSuper = false)
20+
@RequiredArgsConstructor
1921
public class StreamWindow extends UnresolvedPlan {
2022

2123
private final List<UnresolvedExpression> windowFunctionList;
2224
private final List<UnresolvedExpression> groupList;
2325
private final boolean current;
2426
private final int window;
2527
private final boolean global;
28+
private final boolean bucketNullable;
2629
private final UnresolvedExpression resetBefore;
2730
private final UnresolvedExpression resetAfter;
2831
@ToString.Exclude private UnresolvedPlan child;
2932

30-
/** StreamWindow Constructor. */
31-
public StreamWindow(
32-
List<UnresolvedExpression> windowFunctionList,
33-
List<UnresolvedExpression> groupList,
34-
boolean current,
35-
int window,
36-
boolean global,
37-
UnresolvedExpression resetBefore,
38-
UnresolvedExpression resetAfter) {
39-
this.windowFunctionList = windowFunctionList;
40-
this.groupList = groupList;
41-
this.current = current;
42-
this.window = window;
43-
this.global = global;
44-
this.resetBefore = resetBefore;
45-
this.resetAfter = resetAfter;
46-
}
47-
48-
public boolean isCurrent() {
49-
return current;
50-
}
51-
52-
public boolean isGlobal() {
53-
return global;
54-
}
55-
5633
@Override
5734
public StreamWindow attach(UnresolvedPlan child) {
5835
this.child = child;

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 66 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
1616
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
1717
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
18+
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP;
1819
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
1920
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_RARE_TOP;
2021
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_STREAMSTATS;
@@ -50,9 +51,6 @@
5051
import org.apache.calcite.rel.RelNode;
5152
import org.apache.calcite.rel.core.Aggregate;
5253
import org.apache.calcite.rel.core.JoinRelType;
53-
import org.apache.calcite.rel.hint.HintStrategyTable;
54-
import org.apache.calcite.rel.hint.RelHint;
55-
import org.apache.calcite.rel.logical.LogicalAggregate;
5654
import org.apache.calcite.rel.logical.LogicalValues;
5755
import org.apache.calcite.rel.type.RelDataType;
5856
import org.apache.calcite.rel.type.RelDataTypeFamily;
@@ -147,6 +145,7 @@
147145
import org.opensearch.sql.ast.tree.UnresolvedPlan;
148146
import org.opensearch.sql.ast.tree.Values;
149147
import org.opensearch.sql.ast.tree.Window;
148+
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
150149
import org.opensearch.sql.calcite.plan.LogicalAD;
151150
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
152151
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
@@ -201,7 +200,11 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) {
201200
throw new CalciteUnsupportedException("information_schema is unsupported in Calcite");
202201
}
203202
context.relBuilder.scan(node.getTableQualifiedName().getParts());
204-
return context.relBuilder.peek();
203+
RelNode scan = context.relBuilder.peek();
204+
if (scan instanceof AliasFieldsWrappable) {
205+
return ((AliasFieldsWrappable) scan).wrapProjectForAliasFields(context.relBuilder);
206+
}
207+
return scan;
205208
}
206209

207210
// This is a tool method to add an existed RelOptTable to builder stack, not used for now
@@ -1057,7 +1060,7 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
10571060
List<String> intendedGroupKeyAliases = getGroupKeyNamesAfterAggregation(reResolved.getLeft());
10581061
context.relBuilder.aggregate(
10591062
context.relBuilder.groupKey(reResolved.getLeft()), reResolved.getRight());
1060-
if (hintBucketNonNull) addIgnoreNullBucketHintToAggregate(context);
1063+
if (hintBucketNonNull) PlanUtils.addIgnoreNullBucketHintToAggregate(context.relBuilder);
10611064
// During aggregation, Calcite projects both input dependencies and output group-by fields.
10621065
// When names conflict, Calcite adds numeric suffixes (e.g., "value0").
10631066
// Apply explicit renaming to restore the intended aliases.
@@ -1128,8 +1131,7 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
11281131
@Override
11291132
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
11301133
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
1131-
Boolean bucketNullable =
1132-
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
1134+
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
11331135
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
11341136
BitSet nonNullGroupMask = new BitSet(nGroup);
11351137
if (!bucketNullable) {
@@ -1320,7 +1322,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13201322
: duplicatedFieldNames.stream()
13211323
.map(a -> (RexNode) context.relBuilder.field(a))
13221324
.toList();
1323-
buildDedupNotNull(context, dedupeFields, allowedDuplication);
1325+
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
13241326
}
13251327
context.relBuilder.join(
13261328
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
@@ -1376,7 +1378,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13761378
List<RexNode> dedupeFields =
13771379
getRightColumnsInJoinCriteria(context.relBuilder, joinCondition);
13781380

1379-
buildDedupNotNull(context, dedupeFields, allowedDuplication);
1381+
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
13801382
}
13811383
context.relBuilder.join(
13821384
JoinAndLookupUtils.translateJoinType(node.getJoinType()), joinCondition);
@@ -1541,24 +1543,20 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
15411543
if (keepEmpty) {
15421544
buildDedupOrNull(context, dedupeFields, allowedDuplication);
15431545
} else {
1544-
buildDedupNotNull(context, dedupeFields, allowedDuplication);
1546+
buildDedupNotNull(context, dedupeFields, allowedDuplication, false);
15451547
}
15461548
return context.relBuilder.peek();
15471549
}
15481550

15491551
private static void buildDedupOrNull(
15501552
CalcitePlanContext context, List<RexNode> dedupeFields, Integer allowedDuplication) {
15511553
/*
1552-
* | dedup 2 a, b keepempty=false
1553-
* DropColumns('_row_number_dedup_)
1554-
* +- Filter ('_row_number_dedup_ <= n OR isnull('a) OR isnull('b))
1555-
* +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
1554+
* | dedup 2 a, b keepempty=true
1555+
* LogicalProject(...)
1556+
* +- LogicalFilter(condition=[OR(IS NULL(a), IS NULL(b), <=(_row_number_dedup_, 1))])
1557+
* +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY a, b)])
15561558
* +- ...
15571559
*/
1558-
// Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
1559-
// specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a
1560-
// ASC
1561-
// NULLS FIRST, 'b ASC NULLS FIRST]
15621560
RexNode rowNumber =
15631561
context
15641562
.relBuilder
@@ -1581,16 +1579,21 @@ private static void buildDedupOrNull(
15811579
}
15821580

15831581
private static void buildDedupNotNull(
1584-
CalcitePlanContext context, List<RexNode> dedupeFields, Integer allowedDuplication) {
1582+
CalcitePlanContext context,
1583+
List<RexNode> dedupeFields,
1584+
Integer allowedDuplication,
1585+
boolean fromJoinMaxOption) {
15851586
/*
15861587
* | dedup 2 a, b keepempty=false
1587-
* DropColumns('_row_number_dedup_)
1588-
* +- Filter ('_row_number_dedup_ <= n)
1589-
* +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
1590-
* +- Filter (isnotnull('a) AND isnotnull('b))
1591-
* +- ...
1588+
* LogicalProject(...)
1589+
* +- LogicalFilter(condition=[<=(_row_number_dedup_, n)]))
1590+
* +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY a, b)])
1591+
* +- LogicalFilter(condition=[AND(IS NOT NULL(a), IS NOT NULL(b))])
1592+
* +- ...
15921593
*/
15931594
// Filter (isnotnull('a) AND isnotnull('b))
1595+
String rowNumberAlias =
1596+
fromJoinMaxOption ? ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP : ROW_NUMBER_COLUMN_FOR_DEDUP;
15941597
context.relBuilder.filter(
15951598
context.relBuilder.and(dedupeFields.stream().map(context.relBuilder::isNotNull).toList()));
15961599
// Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
@@ -1604,15 +1607,15 @@ private static void buildDedupNotNull(
16041607
.partitionBy(dedupeFields)
16051608
.orderBy(dedupeFields)
16061609
.rowsTo(RexWindowBounds.CURRENT_ROW)
1607-
.as(ROW_NUMBER_COLUMN_FOR_DEDUP);
1610+
.as(rowNumberAlias);
16081611
context.relBuilder.projectPlus(rowNumber);
1609-
RexNode _row_number_dedup_ = context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_DEDUP);
1612+
RexNode rowNumberField = context.relBuilder.field(rowNumberAlias);
16101613
// Filter ('_row_number_dedup_ <= n)
16111614
context.relBuilder.filter(
16121615
context.relBuilder.lessThanOrEqual(
1613-
_row_number_dedup_, context.relBuilder.literal(allowedDuplication)));
1616+
rowNumberField, context.relBuilder.literal(allowedDuplication)));
16141617
// DropColumns('_row_number_dedup_)
1615-
context.relBuilder.projectExcept(_row_number_dedup_);
1618+
context.relBuilder.projectExcept(rowNumberField);
16161619
}
16171620

16181621
@Override
@@ -1745,20 +1748,25 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
17451748
.as(ROW_NUMBER_COLUMN_FOR_STREAMSTATS);
17461749
context.relBuilder.projectPlus(streamSeq);
17471750

1748-
// construct groupNotNull predicate
1749-
List<RexNode> groupByList =
1750-
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1751-
List<RexNode> notNullList =
1752-
PlanUtils.getSelectColumns(groupByList).stream()
1753-
.map(context.relBuilder::field)
1754-
.map(context.relBuilder::isNotNull)
1755-
.toList();
1756-
RexNode groupNotNull = context.relBuilder.and(notNullList);
1751+
if (!node.isBucketNullable()) {
1752+
// construct groupNotNull predicate
1753+
List<RexNode> groupByList =
1754+
groupList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList();
1755+
List<RexNode> notNullList =
1756+
PlanUtils.getSelectColumns(groupByList).stream()
1757+
.map(context.relBuilder::field)
1758+
.map(context.relBuilder::isNotNull)
1759+
.toList();
1760+
RexNode groupNotNull = context.relBuilder.and(notNullList);
1761+
1762+
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1763+
List<RexNode> wrappedOverExprs =
1764+
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1765+
context.relBuilder.projectPlus(wrappedOverExprs);
1766+
} else {
1767+
context.relBuilder.projectPlus(overExpressions);
1768+
}
17571769

1758-
// wrap each expr: CASE WHEN groupNotNull THEN rawExpr ELSE CAST(NULL AS rawType) END
1759-
List<RexNode> wrappedOverExprs =
1760-
wrapWindowFunctionsWithGroupNotNull(overExpressions, groupNotNull, context);
1761-
context.relBuilder.projectPlus(wrappedOverExprs);
17621770
// resort when there is by condition
17631771
context.relBuilder.sort(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
17641772
context.relBuilder.projectExcept(context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_STREAMSTATS));
@@ -1814,11 +1822,11 @@ private RelNode buildStreamWindowJoinPlan(
18141822
RexNode segRight = context.relBuilder.field(segmentCol);
18151823
RexNode segOuter = context.relBuilder.field(v.get(), segmentCol);
18161824
RexNode frame = buildResetFrameFilter(context, node, outerSeq, rightSeq, segOuter, segRight);
1817-
RexNode group = buildGroupFilter(context, groupList, v.get());
1825+
RexNode group = buildGroupFilter(context, node, groupList, v.get());
18181826
filter = (group == null) ? frame : context.relBuilder.and(frame, group);
18191827
} else { // global + window + by condition
18201828
RexNode frame = buildFrameFilter(context, node, outerSeq, rightSeq);
1821-
RexNode group = buildGroupFilter(context, groupList, v.get());
1829+
RexNode group = buildGroupFilter(context, node, groupList, v.get());
18221830
filter = context.relBuilder.and(frame, group);
18231831
}
18241832
context.relBuilder.filter(filter);
@@ -1968,7 +1976,10 @@ private RexNode buildResetFrameFilter(
19681976
}
19691977

19701978
private RexNode buildGroupFilter(
1971-
CalcitePlanContext context, List<UnresolvedExpression> groupList, RexCorrelVariable correl) {
1979+
CalcitePlanContext context,
1980+
StreamWindow node,
1981+
List<UnresolvedExpression> groupList,
1982+
RexCorrelVariable correl) {
19721983
// build conjunctive equality filters: right.g_i = outer.g_i
19731984
if (groupList.isEmpty()) {
19741985
return null;
@@ -1980,7 +1991,17 @@ private RexNode buildGroupFilter(
19801991
String groupName = extractGroupFieldName(expr);
19811992
RexNode rightGroup = context.relBuilder.field(groupName);
19821993
RexNode outerGroup = context.relBuilder.field(correl, groupName);
1983-
return context.relBuilder.equals(rightGroup, outerGroup);
1994+
RexNode equalCondition = context.relBuilder.equals(rightGroup, outerGroup);
1995+
// handle bucket_nullable case
1996+
if (!node.isBucketNullable()) {
1997+
return equalCondition;
1998+
} else {
1999+
RexNode bothNull =
2000+
context.relBuilder.and(
2001+
context.relBuilder.isNull(rightGroup),
2002+
context.relBuilder.isNull(outerGroup));
2003+
return context.relBuilder.or(equalCondition, bothNull);
2004+
}
19842005
})
19852006
.toList();
19862007
return context.relBuilder.and(equalsList);
@@ -2395,25 +2416,6 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
23952416
return context.relBuilder.peek();
23962417
}
23972418

2398-
private static void addIgnoreNullBucketHintToAggregate(CalcitePlanContext context) {
2399-
final RelHint statHits =
2400-
RelHint.builder("stats_args").hintOption(Argument.BUCKET_NULLABLE, "false").build();
2401-
assert context.relBuilder.peek() instanceof LogicalAggregate
2402-
: "Stats hits should be added to LogicalAggregate";
2403-
context.relBuilder.hints(statHits);
2404-
context
2405-
.relBuilder
2406-
.getCluster()
2407-
.setHintStrategies(
2408-
HintStrategyTable.builder()
2409-
.hintStrategy(
2410-
"stats_args",
2411-
(hint, rel) -> {
2412-
return rel instanceof LogicalAggregate;
2413-
})
2414-
.build());
2415-
}
2416-
24172419
@Override
24182420
public RelNode visitTableFunction(TableFunction node, CalcitePlanContext context) {
24192421
throw new CalciteUnsupportedException("Table function is unsupported in Calcite");

0 commit comments

Comments
 (0)