Skip to content

Commit 6f333fc

Browse files
committed
Force hash/merge join on no-BY rewrite to fix N-call OS round-trip blowup
A perf A/B on a local 20k-doc index uncovered a real problem in the no-BY rewrite that was hidden by tests with bounded result sets. Before this fix, the no-BY case emitted: EnumerableNestedLoopJoin(condition=[true], joinType=[inner]) leftScan (returns N rows) rightScan (returns 1 row — the COUNT() scalar) Calcite's NestedLoopJoin contract calls Enumerable.enumerator() on the right side once per left tuple. Each enumerator open on a CalciteEnumerableIndexScan triggers a fresh OpenSearch _search request. For a 10k-row left side that means 10k OpenSearch calls. On a remote cluster (1-10ms RTT per call), the head-less query would take tens of seconds. Measured on the local node, 20k docs, single shard, no head: before: 10,004 OS calls per PPL query, ~1055ms wall after: 4 OS calls per PPL query, ~174ms wall That's ~6x faster wall and ~2500x fewer OS round-trips, with no correctness change (results identical). Fix: in the no-BY branch of rewriteWindowAsAggregateJoin, project a literal-0 key column onto both sides (left: append after orig cols; right: append after agg outputs) and join on equality. The equi-join condition makes the planner pick EnumerableHashJoin, which drains the single-row right side once into a hash table and probes per left row in O(1). Pushdown still fires on the right side — verified via EXPLAIN that the right scan still carries `AGGREGATION->...COUNT()` and `size:0` in PushDownContext; the literal-0 projection is a top-level wrapper that doesn't disrupt the Aggregate→Scan operand chain AggregateIndexScanRule.AGGREGATE_SCAN matches. The BY case is unchanged — it already has an equi-join condition (or IS NOT DISTINCT FROM for bucketNullable=true) which Calcite handles correctly via EnumerableMergeJoin. Test expectation updates: - CalcitePPLEventstatsTest.testEventstatsCount / testEventstatsAvg - CalcitePPLEventstatsEarliestLatestTest no-BY variants (4 tests) - explain_eventstats_dc.json (no-BY, pushdown) - explain_eventstats_earliest_latest_no_group.json (no-BY, both modes) Outer LogicalProject now appears in the no-BY case because we must strip the literal-0 key columns from the join output — it's no longer a no-op passthrough that Calcite folds. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent bb8d9b2 commit 6f333fc

6 files changed

Lines changed: 116 additions & 51 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2165,17 +2165,45 @@ private RelNode rewriteWindowAsAggregateJoin(Window node, CalcitePlanContext con
21652165
aggregateWithTrimming(groupList, aggExprList, context, !bucketNullable);
21662166
RelNode rightAggregate = context.relBuilder.build();
21672167

2168-
// Join left and right. Cross-join for no-BY (right is a single scalar row); equi-join on each
2169-
// partition key for BY. The condition for bucketNullable=true is IS NOT DISTINCT FROM so the
2170-
// NULL bucket on each side matches; LEFT for bucketNullable=false so NULL-keyed left rows
2171-
// survive with NULL aggregate values (right has no NULL bucket to match).
2172-
context.relBuilder.push(leftInput);
2173-
context.relBuilder.push(rightAggregate);
2168+
// Join left and right. For BY: equi-join on each partition key (IS NOT DISTINCT FROM for
2169+
// bucketNullable=true so NULL buckets match; plain equality + LEFT join for false so NULL-keyed
2170+
// left rows survive with NULL agg values). For no-BY: a true cross-join with `condition=true`
2171+
// would force EnumerableNestedLoopJoin to re-open the right-side enumerator per left row —
2172+
// which is catastrophic when the right side is a CalciteEnumerableIndexScan (one OpenSearch
2173+
// request per left row, e.g. 10k OS calls for 10k matching docs). Instead, project a literal-0
2174+
// key on both sides and join on equality so Calcite picks EnumerableMergeJoin /
2175+
// EnumerableHashJoin, which drains the (single-row) right side once and probes per left row.
21742176
int leftFieldCount = leftInput.getRowType().getFieldCount();
2177+
int rightGroupKeyCount = hasGroup ? groupList.size() : 0;
2178+
int aggCount = node.getWindowFunctionList().size();
2179+
2180+
// For no-BY, append the literal-0 key column to both sides AFTER the existing columns.
2181+
// Left side becomes [orig cols..., key]; right side becomes [agg outputs..., key].
2182+
RelNode leftForJoin = leftInput;
2183+
RelNode rightForJoin = rightAggregate;
2184+
int leftJoinKeyOffset = 0;
2185+
if (!hasGroup) {
2186+
context.relBuilder.push(leftInput);
2187+
context.relBuilder.projectPlus(
2188+
context.relBuilder.alias(context.relBuilder.literal(0), EVENTSTATS_NOGROUP_JOIN_KEY));
2189+
leftForJoin = context.relBuilder.build();
2190+
context.relBuilder.push(rightAggregate);
2191+
context.relBuilder.projectPlus(
2192+
context.relBuilder.alias(context.relBuilder.literal(0), EVENTSTATS_NOGROUP_JOIN_KEY));
2193+
rightForJoin = context.relBuilder.build();
2194+
leftJoinKeyOffset = 1;
2195+
}
2196+
2197+
context.relBuilder.push(leftForJoin);
2198+
context.relBuilder.push(rightForJoin);
21752199

21762200
RexNode joinCondition;
21772201
if (!hasGroup) {
2178-
joinCondition = context.relBuilder.literal(true);
2202+
// Equi-join on the literal-0 keys we just projected (positions: leftFieldCount on the left
2203+
// input post-projection, aggCount on the right after the agg outputs).
2204+
RexNode leftKey = context.relBuilder.field(2, 0, leftFieldCount);
2205+
RexNode rightKey = context.relBuilder.field(2, 1, aggCount);
2206+
joinCondition = context.relBuilder.equals(leftKey, rightKey);
21792207
} else {
21802208
List<RexNode> perKeyConditions = new ArrayList<>();
21812209
for (UnresolvedExpression groupExpr : groupList) {
@@ -2199,18 +2227,20 @@ private RelNode rewriteWindowAsAggregateJoin(Window node, CalcitePlanContext con
21992227
context.relBuilder.join(joinType, joinCondition);
22002228

22012229
// Final projection: keep all original left columns, then append the aggregate output columns
2202-
// (skipping the right-side group key columns). The output row type matches what the legacy
2203-
// RexOver lowering produced: [left cols ..., agg outputs ...] with the user-supplied aliases.
2204-
int rightGroupKeyCount = hasGroup ? groupList.size() : 0;
2205-
int aggCount = node.getWindowFunctionList().size();
2230+
// (skipping the right-side group key columns AND the literal-0 key columns when present).
2231+
// Output row type matches what the legacy RexOver lowering produced: [left cols ..., agg
2232+
// outputs ...] with the user-supplied aliases.
22062233
List<RexNode> finalProjects = new ArrayList<>();
22072234
List<String> finalNames = new ArrayList<>();
22082235
List<String> leftNames = leftInput.getRowType().getFieldNames();
22092236
for (int i = 0; i < leftFieldCount; i++) {
22102237
finalProjects.add(context.relBuilder.field(i));
22112238
finalNames.add(leftNames.get(i));
22122239
}
2213-
int rightAggStart = leftFieldCount + rightGroupKeyCount;
2240+
// Right-side agg outputs come BEFORE the literal-0 key (key was added to the end via
2241+
// projectPlus). Absolute position in the join row = leftWidth + (right-side group-key prefix).
2242+
int leftWidth = leftFieldCount + leftJoinKeyOffset;
2243+
int rightAggStart = leftWidth + rightGroupKeyCount;
22142244
for (int i = 0; i < aggCount; i++) {
22152245
finalProjects.add(context.relBuilder.field(rightAggStart + i));
22162246
finalNames.add(extractAliasName(node.getWindowFunctionList().get(i)));
@@ -2219,6 +2249,8 @@ private RelNode rewriteWindowAsAggregateJoin(Window node, CalcitePlanContext con
22192249
return context.relBuilder.peek();
22202250
}
22212251

2252+
private static final String EVENTSTATS_NOGROUP_JOIN_KEY = "__eventstats_join_key__";
2253+
22222254
/**
22232255
* Returns true if {@code node} matches the shape PPL {@code eventstats} actually emits — all
22242256
* window functions resolve to a windowable aggregation (present in both {@link
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$17])\n LogicalJoin(condition=[true], joinType=[inner])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n LogicalAggregate(group=[{}], distinct_states=[DISTINCT_COUNT_APPROX($0)])\n LogicalProject(state=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableNestedLoopJoin(condition=[true], joinType=[inner])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#48:LogicalAggregate.NONE.[](input=RelSubset#47,group={},distinct_states=DISTINCT_COUNT_APPROX($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"distinct_states\":{\"cardinality\":{\"field\":\"state.keyword\"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$18])\n LogicalJoin(condition=[=($17, $19)], joinType=[inner])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __eventstats_join_key__=[0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n LogicalProject(distinct_states=[$0], __eventstats_join_key__=[0])\n LogicalAggregate(group=[{}], distinct_states=[DISTINCT_COUNT_APPROX($0)])\n LogicalProject(state=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t2], firstname=[$t3], address=[$t4], balance=[$t5], gender=[$t6], city=[$t7], employer=[$t8], state=[$t9], age=[$t10], email=[$t11], lastname=[$t12], distinct_states=[$t0])\n EnumerableHashJoin(condition=[=($1, $13)], joinType=[inner])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[0], proj#0..1=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#2342:LogicalAggregate.NONE.[](input=RelSubset#2341,group={},distinct_states=DISTINCT_COUNT_APPROX($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"distinct_states\":{\"cardinality\":{\"field\":\"state.keyword\"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..10=[{inputs}], expr#11=[0], proj#0..11=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$11], latest_message=[$12])\n LogicalJoin(condition=[true], joinType=[inner])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n LogicalAggregate(group=[{}], earliest_message=[ARG_MIN($0, $1)], latest_message=[ARG_MAX($0, $1)])\n LogicalProject(message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableNestedLoopJoin(condition=[true], joinType=[inner])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"created_at\",\"server\",\"@timestamp\",\"message\",\"level\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[AGGREGATION->rel#333:LogicalAggregate.NONE.[](input=RelSubset#332,group={},earliest_message=ARG_MIN($0, $1),latest_message=ARG_MAX($0, $1))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"earliest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":false,\"fields\":[{\"field\":\"message\"}],\"sort\":[{\"@timestamp\":{\"order\":\"asc\"}}]}},\"latest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":false,\"fields\":[{\"field\":\"message\"}],\"sort\":[{\"@timestamp\":{\"order\":\"desc\"}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13])\n LogicalJoin(condition=[=($11, $14)], joinType=[inner])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __eventstats_join_key__=[0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n LogicalProject(earliest_message=[$0], latest_message=[$1], __eventstats_join_key__=[0])\n LogicalAggregate(group=[{}], earliest_message=[ARG_MIN($0, $1)], latest_message=[ARG_MAX($0, $1)])\n LogicalProject(message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..8=[{inputs}], created_at=[$t3], server=[$t4], @timestamp=[$t5], message=[$t6], level=[$t7], earliest_message=[$t0], latest_message=[$t1])\n EnumerableHashJoin(condition=[=($2, $8)], joinType=[inner])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[AGGREGATION->rel#2527:LogicalAggregate.NONE.[](input=RelSubset#2526,group={},earliest_message=ARG_MIN($0, $1),latest_message=ARG_MAX($0, $1))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"earliest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":false,\"fields\":[{\"field\":\"message\"}],\"sort\":[{\"@timestamp\":{\"order\":\"asc\"}}]}},\"latest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":false,\"fields\":[{\"field\":\"message\"}],\"sort\":[{\"@timestamp\":{\"order\":\"desc\"}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], proj#0..5=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"created_at\",\"server\",\"@timestamp\",\"message\",\"level\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$11], latest_message=[$12])\n LogicalJoin(condition=[true], joinType=[inner])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n LogicalAggregate(group=[{}], earliest_message=[ARG_MIN($0, $1)], latest_message=[ARG_MAX($0, $1)])\n LogicalProject(message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableNestedLoopJoin(condition=[true], joinType=[inner])\n EnumerableCalc(expr#0..10=[{inputs}], proj#0..4=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n EnumerableAggregate(group=[{}], earliest_message=[ARG_MIN($3, $2)], latest_message=[ARG_MAX($3, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[$12], latest_message=[$13])\n LogicalJoin(condition=[=($11, $14)], joinType=[inner])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], _id=[$5], _index=[$6], _score=[$7], _maxscore=[$8], _sort=[$9], _routing=[$10], __eventstats_join_key__=[0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n LogicalProject(earliest_message=[$0], latest_message=[$1], __eventstats_join_key__=[0])\n LogicalAggregate(group=[{}], earliest_message=[ARG_MIN($0, $1)], latest_message=[ARG_MAX($0, $1)])\n LogicalProject(message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..8=[{inputs}], created_at=[$t3], server=[$t4], @timestamp=[$t5], message=[$t6], level=[$t7], earliest_message=[$t0], latest_message=[$t1])\n EnumerableHashJoin(condition=[=($2, $8)], joinType=[inner])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], proj#0..2=[{exprs}])\n EnumerableAggregate(group=[{}], earliest_message=[ARG_MIN($3, $2)], latest_message=[ARG_MAX($3, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n EnumerableCalc(expr#0..10=[{inputs}], expr#11=[0], proj#0..4=[{exprs}], __eventstats_join_key__=[$t11])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n"
55
}
66
}

0 commit comments

Comments
 (0)