|
35 | 35 | import java.util.Comparator; |
36 | 36 | import java.util.HashMap; |
37 | 37 | import java.util.HashSet; |
| 38 | +import java.util.LinkedHashSet; |
38 | 39 | import java.util.List; |
39 | 40 | import java.util.Map; |
40 | 41 | import java.util.Objects; |
|
103 | 104 | import org.opensearch.sql.ast.expression.ParseMethod; |
104 | 105 | import org.opensearch.sql.ast.expression.PatternMethod; |
105 | 106 | import org.opensearch.sql.ast.expression.PatternMode; |
| 107 | +import org.opensearch.sql.ast.expression.QualifiedName; |
106 | 108 | import org.opensearch.sql.ast.expression.Span; |
107 | 109 | import org.opensearch.sql.ast.expression.SpanUnit; |
108 | 110 | import org.opensearch.sql.ast.expression.UnresolvedExpression; |
|
191 | 193 |
|
192 | 194 | public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalcitePlanContext> { |
193 | 195 |
|
| 196 | + /** |
| 197 | + * Prefix/suffix applied to right-side fields in the streamstats self-join plan to avoid name |
| 198 | + * collisions with the left side and to make the renaming reversible. |
| 199 | + */ |
| 200 | + private static final String RIGHT_SIDE_FIELD_PREFIX = "__r_"; |
| 201 | + |
| 202 | + private static final String RIGHT_SIDE_FIELD_SUFFIX = "__"; |
| 203 | + |
| 204 | + /** Name of the right-side sequence column in the streamstats self-join plan. */ |
| 205 | + private static final String RIGHT_SIDE_SEQ_COLUMN = |
| 206 | + RIGHT_SIDE_FIELD_PREFIX + "seq" + RIGHT_SIDE_FIELD_SUFFIX; |
| 207 | + |
194 | 208 | private final CalciteRexNodeVisitor rexVisitor; |
195 | 209 | private final CalciteAggCallVisitor aggVisitor; |
196 | 210 | private final DataSourceService dataSourceService; |
@@ -2097,14 +2111,14 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) |
2097 | 2111 | context.relBuilder.projectPlus(streamSeq); |
2098 | 2112 | RelNode left = context.relBuilder.build(); |
2099 | 2113 |
|
2100 | | - // 2. Run correlate + aggregate |
2101 | | - return buildStreamWindowJoinPlan( |
| 2114 | + // 2. Use self-join approach to avoid nested correlates (which cause NPE |
| 2115 | + // in Calcite's RelDecorrelator when chaining multiple streamstats) |
| 2116 | + return buildStreamWindowSelfJoinPlan( |
2102 | 2117 | context, |
2103 | 2118 | left, |
2104 | 2119 | node, |
2105 | 2120 | groupList, |
2106 | 2121 | ROW_NUMBER_COLUMN_FOR_STREAMSTATS, |
2107 | | - null, |
2108 | 2122 | new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS}); |
2109 | 2123 | } |
2110 | 2124 |
|
@@ -2235,6 +2249,229 @@ private RelNode buildStreamWindowJoinPlan( |
2235 | 2249 | return context.relBuilder.peek(); |
2236 | 2250 | } |
2237 | 2251 |
|
| 2252 | + /** |
| 2253 | + * Builds a self-join based plan for streamstats with global=true + window + group. This avoids |
| 2254 | + * using LogicalCorrelate which causes NPE in Calcite's RelDecorrelator when chaining multiple |
| 2255 | + * streamstats commands. |
| 2256 | + * |
| 2257 | + * <p>Plan structure: |
| 2258 | + * |
| 2259 | + * <ol> |
| 2260 | + * <li>left = input + __stream_seq__ |
| 2261 | + * <li>right = trim to only aggregate input + __stream_seq__ |
| 2262 | + * <li>Join left and right on window frame + group conditions |
| 2263 | + * <li>Group by all left field indices, compute AGG(right.X) |
| 2264 | + * <li>Sort by __stream_seq__, then remove it |
| 2265 | + * </ol> |
| 2266 | + */ |
| 2267 | + private RelNode buildStreamWindowSelfJoinPlan( |
| 2268 | + CalcitePlanContext context, |
| 2269 | + RelNode leftWithHelpers, |
| 2270 | + StreamWindow node, |
| 2271 | + List<UnresolvedExpression> groupList, |
| 2272 | + String seqCol, |
| 2273 | + String[] helperColsToCleanup) { |
| 2274 | + |
| 2275 | + int leftFieldCount = leftWithHelpers.getRowType().getFieldCount(); |
| 2276 | + |
| 2277 | + // Build right side: project only the fields needed for aggregation + seq + group columns |
| 2278 | + // This avoids field name collisions and keeps the right side minimal |
| 2279 | + context.relBuilder.push(leftWithHelpers); |
| 2280 | + |
| 2281 | + // Collect fields needed on right side: seq col + group cols + aggregate input fields |
| 2282 | + List<RexNode> rightFields = new ArrayList<>(); |
| 2283 | + List<String> rightFieldNames = new ArrayList<>(); |
| 2284 | + |
| 2285 | + // Always include seq col |
| 2286 | + rightFields.add(context.relBuilder.field(seqCol)); |
| 2287 | + rightFieldNames.add(RIGHT_SIDE_SEQ_COLUMN); |
| 2288 | + |
| 2289 | + // Include group columns |
| 2290 | + for (UnresolvedExpression groupExpr : groupList) { |
| 2291 | + String groupName = extractGroupFieldName(groupExpr); |
| 2292 | + rightFields.add(context.relBuilder.field(groupName)); |
| 2293 | + rightFieldNames.add(toRightSideFieldName(groupName)); |
| 2294 | + } |
| 2295 | + |
| 2296 | + // Include aggregate input fields (extract field names from window functions) |
| 2297 | + Set<String> aggInputFields = new LinkedHashSet<>(); |
| 2298 | + for (UnresolvedExpression wfExpr : node.getWindowFunctionList()) { |
| 2299 | + collectFieldNames(wfExpr, aggInputFields); |
| 2300 | + } |
| 2301 | + // Remove already-included fields |
| 2302 | + aggInputFields.remove(seqCol); |
| 2303 | + for (UnresolvedExpression groupExpr : groupList) { |
| 2304 | + aggInputFields.remove(extractGroupFieldName(groupExpr)); |
| 2305 | + } |
| 2306 | + for (String aggField : aggInputFields) { |
| 2307 | + rightFields.add(context.relBuilder.field(aggField)); |
| 2308 | + rightFieldNames.add(toRightSideFieldName(aggField)); |
| 2309 | + } |
| 2310 | + |
| 2311 | + context.relBuilder.project(rightFields, rightFieldNames); |
| 2312 | + RelNode rightProjected = context.relBuilder.build(); |
| 2313 | + |
| 2314 | + // Push left and right |
| 2315 | + context.relBuilder.push(leftWithHelpers); |
| 2316 | + context.relBuilder.push(rightProjected); |
| 2317 | + |
| 2318 | + // Build join condition using 2-input references |
| 2319 | + RexNode leftSeq = context.relBuilder.field(2, 0, seqCol); |
| 2320 | + RexNode rightSeq = context.relBuilder.field(2, 1, RIGHT_SIDE_SEQ_COLUMN); |
| 2321 | + |
| 2322 | + // Frame filter |
| 2323 | + RexNode frameFilter; |
| 2324 | + if (node.isCurrent()) { |
| 2325 | + RexNode lower = |
| 2326 | + context.relBuilder.call( |
| 2327 | + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow() - 1)); |
| 2328 | + frameFilter = context.relBuilder.between(rightSeq, lower, leftSeq); |
| 2329 | + } else { |
| 2330 | + RexNode lower = |
| 2331 | + context.relBuilder.call( |
| 2332 | + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow())); |
| 2333 | + RexNode upper = |
| 2334 | + context.relBuilder.call( |
| 2335 | + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(1)); |
| 2336 | + frameFilter = context.relBuilder.between(rightSeq, lower, upper); |
| 2337 | + } |
| 2338 | + |
| 2339 | + // Group filter |
| 2340 | + List<RexNode> groupFilters = new ArrayList<>(); |
| 2341 | + for (UnresolvedExpression groupExpr : groupList) { |
| 2342 | + String groupName = extractGroupFieldName(groupExpr); |
| 2343 | + RexNode leftGroup = context.relBuilder.field(2, 0, groupName); |
| 2344 | + RexNode rightGroup = context.relBuilder.field(2, 1, toRightSideFieldName(groupName)); |
| 2345 | + RexNode equalCondition = context.relBuilder.equals(leftGroup, rightGroup); |
| 2346 | + if (node.isBucketNullable()) { |
| 2347 | + RexNode bothNull = |
| 2348 | + context.relBuilder.and( |
| 2349 | + context.relBuilder.isNull(leftGroup), context.relBuilder.isNull(rightGroup)); |
| 2350 | + groupFilters.add(context.relBuilder.or(equalCondition, bothNull)); |
| 2351 | + } else { |
| 2352 | + groupFilters.add(equalCondition); |
| 2353 | + } |
| 2354 | + } |
| 2355 | + |
| 2356 | + RexNode joinCondition = |
| 2357 | + groupFilters.isEmpty() |
| 2358 | + ? frameFilter |
| 2359 | + : context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters)); |
| 2360 | + context.relBuilder.join(JoinRelType.LEFT, joinCondition); |
| 2361 | + |
| 2362 | + // After join: [left_fields(0..leftFieldCount-1), right_fields(leftFieldCount..)] |
| 2363 | + // Aggregate: group by all left fields, compute AGG on right fields |
| 2364 | + // The aggregate functions need to reference the right-side fields in the joined row |
| 2365 | + |
| 2366 | + // Build aggregate calls using the right-side field references |
| 2367 | + List<AggCall> aggCalls = buildAggCallsFromJoinedRight(node.getWindowFunctionList(), context); |
| 2368 | + |
| 2369 | + RelBuilder.GroupKey groupKey = |
| 2370 | + context.relBuilder.groupKey( |
| 2371 | + IntStream.range(0, leftFieldCount).mapToObj(context.relBuilder::field).toList()); |
| 2372 | + |
| 2373 | + context.relBuilder.aggregate(groupKey, aggCalls); |
| 2374 | + |
| 2375 | + // Resort by the sequence column |
| 2376 | + context.relBuilder.sort(context.relBuilder.field(seqCol)); |
| 2377 | + |
| 2378 | + // Cleanup helper columns |
| 2379 | + List<RexNode> cleanup = new ArrayList<>(); |
| 2380 | + for (String c : helperColsToCleanup) { |
| 2381 | + cleanup.add(context.relBuilder.field(c)); |
| 2382 | + } |
| 2383 | + context.relBuilder.projectExcept(cleanup); |
| 2384 | + return context.relBuilder.peek(); |
| 2385 | + } |
| 2386 | + |
| 2387 | + /** Collect field names referenced by an expression tree. */ |
| 2388 | + private void collectFieldNames(UnresolvedExpression expr, Set<String> fieldNames) { |
| 2389 | + if (expr instanceof Field f) { |
| 2390 | + fieldNames.add(f.getField().toString()); |
| 2391 | + } else if (expr instanceof Alias a) { |
| 2392 | + collectFieldNames(a.getDelegated(), fieldNames); |
| 2393 | + } else if (expr instanceof WindowFunction wf) { |
| 2394 | + collectFieldNames(wf.getFunction(), fieldNames); |
| 2395 | + } else if (expr instanceof Function func) { |
| 2396 | + for (UnresolvedExpression arg : func.getFuncArgs()) { |
| 2397 | + collectFieldNames(arg, fieldNames); |
| 2398 | + } |
| 2399 | + } |
| 2400 | + } |
| 2401 | + |
| 2402 | + /** |
| 2403 | + * Build AggCall list for the self-join plan. The aggregate functions reference fields from the |
| 2404 | + * right side of the join, which carry the {@code __r_<name>__} prefix applied during right-side |
| 2405 | + * projection. This method rewrites the window function's field references to those prefixed |
| 2406 | + * names, unwraps the {@link WindowFunction} to its inner {@link Function}, and then delegates to |
| 2407 | + * the shared {@link #aggVisitor} so the self-join path reuses the same aggregate-resolution logic |
| 2408 | + * as regular {@code stats}/{@code eventstats} aggregations. |
| 2409 | + */ |
| 2410 | + private List<AggCall> buildAggCallsFromJoinedRight( |
| 2411 | + List<UnresolvedExpression> windowFunctionList, CalcitePlanContext context) { |
| 2412 | + List<AggCall> aggCalls = new ArrayList<>(); |
| 2413 | + for (UnresolvedExpression wfExpr : windowFunctionList) { |
| 2414 | + UnresolvedExpression rewritten = rewriteWindowFunctionForSelfJoin(wfExpr); |
| 2415 | + aggCalls.add(aggVisitor.analyze(rewritten, context)); |
| 2416 | + } |
| 2417 | + return aggCalls; |
| 2418 | + } |
| 2419 | + |
| 2420 | + /** |
| 2421 | + * Rewrites a streamstats window function expression so that {@link #aggVisitor} can resolve it |
| 2422 | + * against the joined row type, where right-side fields carry the {@code __r_<name>__} prefix: |
| 2423 | + * |
| 2424 | + * <ul> |
| 2425 | + * <li>Unwraps {@link WindowFunction} to expose its inner {@link Function} (the aggregate). |
| 2426 | + * <li>Preserves the outer {@link Alias} so the aggregate output keeps its user-visible name. |
| 2427 | + * <li>Renames every {@link QualifiedName} / {@link Field} reference inside the function body to |
| 2428 | + * the prefixed right-side column name. |
| 2429 | + * </ul> |
| 2430 | + */ |
| 2431 | + private UnresolvedExpression rewriteWindowFunctionForSelfJoin(UnresolvedExpression expr) { |
| 2432 | + if (expr instanceof Alias a) { |
| 2433 | + return new Alias(a.getName(), rewriteWindowFunctionForSelfJoin(a.getDelegated())); |
| 2434 | + } |
| 2435 | + if (expr instanceof WindowFunction wf) { |
| 2436 | + return rewriteWindowFunctionForSelfJoin(wf.getFunction()); |
| 2437 | + } |
| 2438 | + if (expr instanceof Function func) { |
| 2439 | + List<UnresolvedExpression> rewrittenArgs = |
| 2440 | + func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList(); |
| 2441 | + return new Function(func.getFuncName(), rewrittenArgs); |
| 2442 | + } |
| 2443 | + return expr; |
| 2444 | + } |
| 2445 | + |
| 2446 | + /** |
| 2447 | + * Recursively renames field references within an aggregate argument to their right-side alias. |
| 2448 | + */ |
| 2449 | + private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) { |
| 2450 | + if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) { |
| 2451 | + return new Field(toRightSideQualifiedName(qn), f.getFieldArgs()); |
| 2452 | + } |
| 2453 | + if (expr instanceof QualifiedName qn) { |
| 2454 | + return toRightSideQualifiedName(qn); |
| 2455 | + } |
| 2456 | + if (expr instanceof Alias a) { |
| 2457 | + return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated())); |
| 2458 | + } |
| 2459 | + if (expr instanceof Function func) { |
| 2460 | + List<UnresolvedExpression> rewrittenArgs = |
| 2461 | + func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList(); |
| 2462 | + return new Function(func.getFuncName(), rewrittenArgs); |
| 2463 | + } |
| 2464 | + return expr; |
| 2465 | + } |
| 2466 | + |
| 2467 | + private static QualifiedName toRightSideQualifiedName(QualifiedName original) { |
| 2468 | + return new QualifiedName(toRightSideFieldName(original.toString())); |
| 2469 | + } |
| 2470 | + |
| 2471 | + private static String toRightSideFieldName(String originalName) { |
| 2472 | + return RIGHT_SIDE_FIELD_PREFIX + originalName + RIGHT_SIDE_FIELD_SUFFIX; |
| 2473 | + } |
| 2474 | + |
2238 | 2475 | private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow node) { |
2239 | 2476 | // 1. global sequence to define order |
2240 | 2477 | RexNode rowNum = |
|
0 commit comments