diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index f18ac0b0479..539487a6f18 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -81,6 +81,22 @@ public class CalcitePlanContext { /** Whether we're currently inside a lambda context. */ @Getter @Setter private boolean inLambdaContext = false; + /** + * When enabled, tracks which RelNode ids were produced by each AST command. Each entry maps an + * AST node class name to the list of RelNode ids it produced (excluding children). + */ + @Getter @Setter private boolean trackingEnabled = false; + + @Getter private final List nodeIdMappings = new ArrayList<>(); + + /** Records a mapping from an AST command to the RelNode ids it produced. */ + public void recordMapping(String astNodeType, List relNodeIds) { + nodeIdMappings.add(new NodeIdMapping(astNodeType, relNodeIds)); + } + + /** A mapping from one AST command to the RelNode ids it produced. */ + public record NodeIdMapping(String astNodeType, List relNodeIds) {} + private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) { this.config = config; this.sysLimit = sysLimit; diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 91a30361a20..8cfffc35276 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -219,15 +219,47 @@ public CalciteRelNodeVisitor(DataSourceService dataSourceService) { } public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) { + if (context.isTrackingEnabled()) { + int idBefore = context.relBuilder.size() > 0 ? context.relBuilder.peek().getId() : -1; + RelNode result = unresolved.accept(this, context); + int idAfter = context.relBuilder.peek().getId(); + java.util.List producedIds = new java.util.ArrayList<>(); + for (int id = idBefore + 1; id <= idAfter; id++) { + producedIds.add(id); + } + context.recordMapping(unresolved.getClass().getSimpleName(), producedIds); + return result; + } return unresolved.accept(this, context); } @Override public RelNode visitChildren(Node node, CalcitePlanContext context) { + if (context.isTrackingEnabled() && node instanceof UnresolvedPlan) { + // Track each child's total contribution (the subtree it produces) + RelNode result = null; + for (org.opensearch.sql.ast.Node child : node.getChild()) { + int idBefore = context.relBuilder.size() > 0 ? context.relBuilder.peek().getId() : -1; + RelNode childResult = child.accept(this, context); + result = childResult; + // After child.accept returns, the child's visit* method has fully completed, + // so all RelNodes produced by that child (including ITS children) are on the stack. + int idAfter = context.relBuilder.peek().getId(); + if (child instanceof UnresolvedPlan) { + java.util.List producedIds = new java.util.ArrayList<>(); + for (int id = idBefore + 1; id <= idAfter; id++) { + producedIds.add(id); + } + context.recordMapping(child.getClass().getSimpleName(), producedIds); + } + } + if (node instanceof UnresolvedPlan plan) { + mapPathMaterializer.materializePaths(plan, context); + } + return result; + } RelNode result = super.visitChildren(node, context); if (node instanceof UnresolvedPlan plan) { - // Materialize MAP dotted paths as flat columns after children are analyzed - // (so MAP/struct types are known) but before the command's own visit logic runs. mapPathMaterializer.materializePaths(plan, context); } return result; diff --git a/core/src/main/java/org/opensearch/sql/executor/AnalyzeResponse.java b/core/src/main/java/org/opensearch/sql/executor/AnalyzeResponse.java new file mode 100644 index 00000000000..649cd3617a5 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/AnalyzeResponse.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import java.util.List; +import lombok.Builder; +import lombok.Data; +import org.opensearch.sql.monitor.profile.QueryProfile; + +@Data +@Builder +public class AnalyzeResponse { + + private final String query; + // private final List querySegments; + // private final String ast; + private final List logicalPlan; + private final List physicalPlan; + private final QueryProfile profile; + private final List operator_tree; + private final List recommendations; + private final List schema; + private final Object[][] datarows; + private final long total; + private final long size; + + @Data + @Builder + public static class SchemaColumn { + private final String name; + private final String type; + } + + @Data + @Builder + public static class QuerySegment { + private final String nodeType; + private final String source; + } + + @Data + @Builder + public static class OperatorNode { + private final String source; + private final List node_type; + private final List description; + private final String estimated_cost; + private final Long estimated_rows; + private final String actual_time_ms; + private final Long actual_rows; + private final Boolean is_pushed_down; + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index fe9d3e55dc1..2257c3c5111 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -5,21 +5,33 @@ package org.opensearch.sql.executor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.runtime.Hook; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; @@ -36,6 +48,7 @@ import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper; +import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners; import org.opensearch.sql.common.error.ErrorReport; import org.opensearch.sql.common.error.QueryProcessingStage; import org.opensearch.sql.common.error.StageErrorHandler; @@ -48,6 +61,7 @@ import org.opensearch.sql.monitor.profile.MetricName; import org.opensearch.sql.monitor.profile.ProfileContext; import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfile; import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; @@ -223,6 +237,415 @@ public void explainWithCalcite( settings); } + public void analyzeWithCalcite( + String query, + List querySegments, + UnresolvedPlan plan, + QueryType queryType, + ResponseListener listener) { + // Phase 1: Execute via the exact same path as executeWithCalcite + executionEngine.execute + // to get identical profile timings. Use a latch to synchronize the async callback. + // Force profiling on so executeWithCalcite activates QueryProfiling. + QueryContext.setProfile(true); + AtomicReference queryResponseRef = new AtomicReference<>(); + AtomicReference profileRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + executeWithCalcite( + plan, + queryType, + null, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + ProfileMetric formatMetric = + QueryProfiling.current().getOrCreateMetric(MetricName.FORMAT); + long formatStart = System.nanoTime(); + int resultSize = response.getResults().size(); + for (var exprValue : response.getResults()) { + exprValue.tupleValue().entrySet().stream() + .map(e -> e.getValue().value()) + .toArray(Object[]::new); + } + formatMetric.set(System.nanoTime() - formatStart); + profileRef.set(QueryProfiling.current().finish()); + queryResponseRef.set(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + errorRef.set(e); + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.onFailure(new RuntimeException("Interrupted while waiting for query execution", e)); + return; + } + + if (errorRef.get() != null) { + listener.onFailure(errorRef.get()); + return; + } + + ExecutionEngine.QueryResponse queryResponse = queryResponseRef.get(); + QueryProfile profile = profileRef.get(); + + // Phase 2: Re-run with tracking to capture logical/physical plans and node mappings. + // This run benefits from warm caches but we don't report its timings. + CalcitePlanContext.run( + () -> { + try { + QueryProfiling.noop(); + CalciteClassLoaderHelper.withCalciteClassLoader( + () -> { + CalcitePlanContext context = + CalcitePlanContext.create( + buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); + context.setTrackingEnabled(true); + RelNode relNode = analyze(plan, context); + RelNode calcitePlan = convertToCalcitePlan(relNode, context); + + AtomicReference physicalPlanRef = new AtomicReference<>(); + AtomicReference physicalRelRef = new AtomicReference<>(); + try (Hook.Closeable closeable = + Hook.PLAN_BEFORE_IMPLEMENTATION.addThread( + obj -> { + RelRoot relRoot = (RelRoot) obj; + physicalRelRef.set(relRoot.rel); + physicalPlanRef.set( + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES)); + })) { + try (java.sql.PreparedStatement ignored = + OpenSearchRelRunners.run(context, calcitePlan)) { + } catch (java.sql.SQLException e) { + throw new RuntimeException(e); + } + } + + String logicalPlanStr = + RelOptUtil.toString(calcitePlan, SqlExplainLevel.ALL_ATTRIBUTES); + List logicalPlanNodes = + java.util.Arrays.stream(logicalPlanStr.split("\n")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .toList(); + List physicalPlanNodes = + java.util.Arrays.stream(physicalPlanRef.get().split("\n")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .toList(); + + // Build operator tree using phase 2's tracking data + phase 1's profile. + List operatorTree = + buildOperatorTree( + querySegments, + logicalPlanNodes, + context.getNodeIdMappings(), + calcitePlan, + physicalRelRef.get(), + profile); + + // Convert QueryResponse results to analyze format. + List schema = new ArrayList<>(); + if (queryResponse.getSchema() != null) { + for (ExecutionEngine.Schema.Column col : + queryResponse.getSchema().getColumns()) { + schema.add( + AnalyzeResponse.SchemaColumn.builder() + .name(col.getName()) + .type(col.getExprType().typeName()) + .build()); + } + } + + Object[][] datarows = new Object[queryResponse.getResults().size()][]; + int rowIdx = 0; + for (var exprValue : queryResponse.getResults()) { + datarows[rowIdx++] = + exprValue.tupleValue().entrySet().stream() + .map(e -> e.getValue().value()) + .toArray(Object[]::new); + } + + AnalyzeResponse response = + AnalyzeResponse.builder() + .query(query) + .logicalPlan(logicalPlanNodes) + .physicalPlan(physicalPlanNodes) + .operator_tree(operatorTree) + .recommendations(List.of()) + .profile(profile) + .schema(schema) + .datarows(datarows) + .total(datarows.length) + .size(datarows.length) + .build(); + listener.onResponse(response); + }, + QueryService.class); + } catch (Throwable t) { + if (t instanceof Exception) { + listener.onFailure((Exception) t); + } else { + listener.onFailure(new RuntimeException(t)); + } + } + }, + settings); + } + + private List buildOperatorTree( + List querySegments, + List logicalPlanNodes, + List nodeIdMappings, + RelNode logicalPlan, + RelNode physicalPlan, + QueryProfile profile) { + // Build a map from RelNode id to its logical plan description string. + Map idToDescription = new HashMap<>(); + for (String node : logicalPlanNodes) { + int idIdx = node.lastIndexOf("id = "); + if (idIdx >= 0) { + String idStr = node.substring(idIdx + 5).trim(); + try { + int id = Integer.parseInt(idStr); + idToDescription.put(id, node); + } catch (NumberFormatException ignored) { + } + } + } + + // Compute exclusive ids per mapping by subtracting the previous mapping's ids. + // Mappings are recorded bottom-up: [Relation:[0], Filter:[0,1], Project:[0,1,2]] + // Exclusive: Relation=[0], Filter=[1], Project=[2] + List> exclusiveIds = new ArrayList<>(); + Set previousIds = new HashSet<>(); + for (CalcitePlanContext.NodeIdMapping mapping : nodeIdMappings) { + Set current = new HashSet<>(mapping.relNodeIds()); + Set exclusive = new HashSet<>(current); + exclusive.removeAll(previousIds); + exclusiveIds.add(exclusive); + previousIds = current; + } + + // Determine how many segments from the bottom were pushed into the physical scan. + // The physical plan's leaf node (the scan) absorbs logical nodes from the bottom up. + // Physical depth tells us how many separate physical operators exist; everything else + // was pushed down. We count segments bottom-up until we've covered all pushed logical nodes. + int physicalDepth = getLinearDepth(physicalPlan); + int logicalDepth = getLinearDepth(logicalPlan); + int pushedNodeCount = logicalDepth - physicalDepth; + + // log.info( + // "buildOperatorTree: logicalDepth={}, physicalDepth={}, pushedNodeCount={}," + // + " segments={}, exclusiveIds={}", + // logicalDepth, + // physicalDepth, + // pushedNodeCount, + // querySegments.size(), + // exclusiveIds); + + // Walk segments bottom-up (they're already in bottom-up order) and greedily assign + // them to the pushed group until we've accounted for all pushed logical nodes. + // The LogicalSystemLimit added by convertToCalcitePlan counts toward the logical depth + // but has no segment, so we only count nodes that appear in exclusiveIds. + int pushedLogicalNodes = 0; + int pushedSegments = 0; + for (int idx = 0; idx < querySegments.size() && pushedLogicalNodes < pushedNodeCount; idx++) { + Set ids = idx < exclusiveIds.size() ? exclusiveIds.get(idx) : Set.of(); + long planNodeCount = ids.stream().filter(idToDescription::containsKey).count(); + pushedLogicalNodes += planNodeCount; + pushedSegments++; + } + + // log.info( + // "buildOperatorTree: pushedSegments={}, pushedLogicalNodes={}", + // pushedSegments, + // pushedLogicalNodes); + + // Compute estimated row counts from the logical plan using RelMetadataQuery. + // Walk the logical plan bottom-up to get rowcount per node by id. + org.apache.calcite.rel.metadata.RelMetadataQuery mq = + logicalPlan.getCluster().getMetadataQuery(); + Map idToRowCount = new HashMap<>(); + collectRowCounts(logicalPlan, mq, idToRowCount); + + // Compute exclusive time and rows per physical node from the profile plan tree. + // The plan tree is top-down; we flatten it bottom-up to match operator tree order. + List physicalTimings = new ArrayList<>(); + if (profile != null && profile.getPlan() != null) { + List planNodes = new ArrayList<>(); + QueryProfile.PlanNode current = profile.getPlan(); + while (current != null) { + planNodes.add(current); + current = + (current.getChildren() != null && !current.getChildren().isEmpty()) + ? current.getChildren().get(0) + : null; + } + // planNodes is top-down; reverse to bottom-up + java.util.Collections.reverse(planNodes); + for (int p = 0; p < planNodes.size(); p++) { + double inclusive = planNodes.get(p).getTimeMillis(); + double childInclusive = (p > 0) ? planNodes.get(p - 1).getTimeMillis() : 0; + double exclusive = Math.max(0, inclusive - childInclusive); + long rows = planNodes.get(p).getRows(); + physicalTimings.add(new double[] {exclusive, rows}); + } + } + + List operators = new ArrayList<>(); + int physicalIdx = 0; + + // Build the pushed-down merged entry (first pushedSegments segments) + if (pushedSegments > 1) { + List mergedSegments = querySegments.subList(0, pushedSegments); + List descriptions = new ArrayList<>(); + for (int idx = 0; idx < pushedSegments; idx++) { + Set ids = idx < exclusiveIds.size() ? exclusiveIds.get(idx) : Set.of(); + ids.stream() + .sorted() + .map(idToDescription::get) + .filter(Objects::nonNull) + .forEach(descriptions::add); + } + String combinedSource = + mergedSegments.stream() + .map(AnalyzeResponse.QuerySegment::getSource) + .reduce((a, b) -> a + " | " + b) + .orElse(""); + List nodeTypes = + mergedSegments.stream().map(AnalyzeResponse.QuerySegment::getNodeType).toList(); + // Collect all plan node ids in the pushed group for estimated_rows + Set allPushedPlanIds = new HashSet<>(); + for (int i = 0; i < pushedSegments; i++) { + Set ids = i < exclusiveIds.size() ? exclusiveIds.get(i) : Set.of(); + ids.stream().filter(idToDescription::containsKey).forEach(allPushedPlanIds::add); + } + double[] timing = + physicalIdx < physicalTimings.size() ? physicalTimings.get(physicalIdx) : null; + physicalIdx++; + operators.add( + AnalyzeResponse.OperatorNode.builder() + .source(combinedSource) + .node_type(nodeTypes) + .description(descriptions.isEmpty() ? null : descriptions) + .is_pushed_down(true) + .estimated_rows(getEstimatedRows(allPushedPlanIds, idToRowCount)) + .actual_time_ms(timing != null ? String.format("%.2f ms", timing[0]) : null) + .actual_rows(timing != null ? (long) timing[1] : null) + .build()); + } else if (pushedSegments == 1) { + AnalyzeResponse.QuerySegment seg = querySegments.get(0); + Set ids = !exclusiveIds.isEmpty() ? exclusiveIds.get(0) : Set.of(); + Set planIds = + ids.stream() + .filter(idToDescription::containsKey) + .collect(java.util.stream.Collectors.toSet()); + List descriptions = + ids.stream().sorted().map(idToDescription::get).filter(Objects::nonNull).toList(); + double[] timing = + physicalIdx < physicalTimings.size() ? physicalTimings.get(physicalIdx) : null; + physicalIdx++; + operators.add( + AnalyzeResponse.OperatorNode.builder() + .source(seg.getSource()) + .node_type(List.of(seg.getNodeType())) + .description(descriptions.isEmpty() ? null : descriptions) + .estimated_rows(getEstimatedRows(planIds, idToRowCount)) + .actual_time_ms(timing != null ? String.format("%.2f ms", timing[0]) : null) + .actual_rows(timing != null ? (long) timing[1] : null) + .build()); + } + + // Remaining segments map to non-scan physical nodes (physicalDepth - 1 of them). + // Each physical node corresponds to one logical plan node. Group segments so that each + // group covers exactly one logical plan node; segments with 0 plan nodes merge into the + // next group that has one. + int idx = pushedSegments; + while (idx < querySegments.size()) { + List group = new ArrayList<>(); + List descriptions = new ArrayList<>(); + Set groupPlanIds = new HashSet<>(); + long logicalNodesInGroup = 0; + while (idx < querySegments.size() && logicalNodesInGroup < 1) { + group.add(querySegments.get(idx)); + Set ids = idx < exclusiveIds.size() ? exclusiveIds.get(idx) : Set.of(); + ids.stream() + .sorted() + .map(idToDescription::get) + .filter(Objects::nonNull) + .forEach(descriptions::add); + ids.stream().filter(idToDescription::containsKey).forEach(groupPlanIds::add); + logicalNodesInGroup += ids.stream().filter(idToDescription::containsKey).count(); + idx++; + } + String combinedSource = + group.stream() + .map(AnalyzeResponse.QuerySegment::getSource) + .reduce((a, b) -> a + " | " + b) + .orElse(""); + List nodeTypes = + group.stream().map(AnalyzeResponse.QuerySegment::getNodeType).toList(); + double[] timing = + physicalIdx < physicalTimings.size() ? physicalTimings.get(physicalIdx) : null; + physicalIdx++; + operators.add( + AnalyzeResponse.OperatorNode.builder() + .source(combinedSource) + .node_type(nodeTypes) + .description(descriptions.isEmpty() ? null : descriptions) + .estimated_rows(getEstimatedRows(groupPlanIds, idToRowCount)) + .actual_time_ms(timing != null ? String.format("%.2f ms", timing[0]) : null) + .actual_rows(timing != null ? (long) timing[1] : null) + .build()); + } + + return operators; + } + + private static int getLinearDepth(RelNode node) { + int depth = 0; + RelNode current = node; + while (current != null) { + depth++; + List inputs = current.getInputs(); + current = inputs.isEmpty() ? null : inputs.get(0); + } + return depth; + } + + private void collectRowCounts( + RelNode node, + org.apache.calcite.rel.metadata.RelMetadataQuery mq, + Map idToRowCount) { + try { + Double rowCount = mq.getRowCount(node); + if (rowCount != null) { + idToRowCount.put(node.getId(), rowCount); + } + } catch (Exception ignored) { + } + for (RelNode input : node.getInputs()) { + collectRowCounts(input, mq, idToRowCount); + } + } + + private Long getEstimatedRows(Set ids, Map idToRowCount) { + return ids.stream() + .filter(idToRowCount::containsKey) + .max(Integer::compareTo) + .map(id -> Math.round(idToRowCount.get(id))) + .orElse(null); + } + public void executeWithLegacy( UnresolvedPlan plan, QueryType queryType, diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/AnalyzePlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/AnalyzePlan.java new file mode 100644 index 00000000000..a43bc32792e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/AnalyzePlan.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.execution; + +import java.util.List; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.AnalyzeResponse; +import org.opensearch.sql.executor.AnalyzeResponse.QuerySegment; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.executor.QueryType; + +/** Plan that produces an AnalyzeResponse (AST + logical plan). */ +public class AnalyzePlan extends AbstractPlan { + + private final String query; + private final List querySegments; + private final UnresolvedPlan plan; + private final QueryService queryService; + private final ResponseListener listener; + + public AnalyzePlan( + QueryId queryId, + QueryType queryType, + String query, + List querySegments, + UnresolvedPlan plan, + QueryService queryService, + ResponseListener listener) { + super(queryId, queryType); + this.query = query; + this.querySegments = querySegments; + this.plan = plan; + this.queryService = queryService; + this.listener = listener; + } + + @Override + public void execute() { + queryService.analyzeWithCalcite(query, querySegments, plan, getQueryType(), listener); + } + + @Override + public void explain( + ResponseListener listener, ExplainMode mode) { + throw new UnsupportedOperationException("Explain is not supported for analyze plan"); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java index 48e2b3ce5e0..60744bd6052 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import java.util.List; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -19,6 +20,7 @@ import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.exception.UnsupportedCursorRequestException; +import org.opensearch.sql.executor.AnalyzeResponse; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryId; import org.opensearch.sql.executor.QueryService; @@ -146,4 +148,15 @@ public AbstractPlan visitExplain( node.getMode(), context.getRight()); } + + /** Create an AnalyzePlan that produces AST node and logical plan RelNode. */ + public AbstractPlan createAnalyzePlan( + String query, + List querySegments, + UnresolvedPlan plan, + QueryType queryType, + ResponseListener listener) { + return new AnalyzePlan( + QueryId.queryId(), queryType, query, querySegments, plan, queryService, listener); + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java index bb87bf7fa91..3ea21c9e596 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java @@ -30,6 +30,7 @@ public class PPLQueryRequestFactory { private static final String DEFAULT_EXPLAIN_MODE = "standard"; private static final String QUERY_PARAMS_PRETTY = "pretty"; private static final String QUERY_PARAMS_PROFILE = "profile"; + private static final String QUERY_PARAMS_ANALYZE = "analyze"; private static final String QUERY_PARAMS_FETCH_SIZE = "fetch_size"; /** @@ -82,9 +83,14 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques try { jsonContent = new JSONObject(content); boolean profileRequested = jsonContent.optBoolean(QUERY_PARAMS_PROFILE, false); + boolean analyzeRequested = jsonContent.optBoolean(QUERY_PARAMS_ANALYZE, false); String queryString = jsonContent.optString(PPL_FIELD_NAME, ""); + // if both profile and analyze are requested, profile overrides analyze + boolean profileSupported = isProfileSupported(restRequest.path(), format, queryString); boolean enableProfile = - profileRequested && isProfileSupported(restRequest.path(), format, queryString); + profileRequested && profileSupported; + boolean enableAnalyze = + analyzeRequested && !profileRequested && profileSupported; // Support fetch_size as a URL parameter if not already in the JSON body if (!jsonContent.has(QUERY_PARAMS_FETCH_SIZE) && restRequest.params().containsKey(QUERY_PARAMS_FETCH_SIZE)) { @@ -104,7 +110,8 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques restRequest.path(), format.getFormatName(), explainMode, - enableProfile); + enableProfile, + enableAnalyze); // set sanitize option if csv format if (format.equals(Format.CSV)) { pplRequest.sanitize(getSanitizeOption(restRequest.params())); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index b7c2d2c9e11..2a6fe4cbbd9 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -29,6 +29,7 @@ import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.executor.AnalyzeResponse; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; @@ -193,13 +194,48 @@ protected void doExecute( pplService.explain( transformedRequest, createExplainResponseListener(transformedRequest, clearingListener)); } else { - pplService.execute( - transformedRequest, - createListener(transformedRequest, clearingListener), - createExplainResponseListener(transformedRequest, clearingListener)); + pplService.analyze( + transformedRequest, createAnalyzeResponseListener(transformedRequest, clearingListener)); } + /** + * Commenting out lines 196-199 and replacing them with lines 203-211 will + * separate the `profile` and `analyze` endpoints. See PR #5568. + */ + // } else if (transformedRequest.analyze()) { + // pplService.analyze( + // transformedRequest, createAnalyzeResponseListener(transformedRequest, clearingListener)); + // } else { + // pplService.execute( + // transformedRequest, + // createListener(transformedRequest, clearingListener), + // createExplainResponseListener(transformedRequest, clearingListener)); + // } } + + private ResponseListener createAnalyzeResponseListener( + PPLQueryRequest request, ActionListener listener) { + return new ResponseListener() { + @Override + public void onResponse(AnalyzeResponse response) { + JsonResponseFormatter formatter = + new JsonResponseFormatter<>(PRETTY) { + @Override + protected Object buildJsonObject(AnalyzeResponse response) { + return response; + } + }; + listener.onResponse( + new TransportPPLQueryResponse(formatter.format(response), formatter.contentType())); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + } + /** * TODO: need to extract an interface for both SQL and PPL action handler and move these common * methods to the interface. This is not easy to do now because SQL action handler is still in diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java index 4ba1a53d872..68a96a4f924 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java @@ -53,6 +53,11 @@ public class TransportPPLQueryRequest extends ActionRequest { @Accessors(fluent = true) private boolean profile = false; + @Setter + @Getter + @Accessors(fluent = true) + private boolean analyze = false; + @Setter @Getter @Accessors(fluent = true) @@ -67,6 +72,7 @@ public TransportPPLQueryRequest(PPLQueryRequest pplQueryRequest) { sanitize = pplQueryRequest.sanitize(); style = pplQueryRequest.style(); profile = pplQueryRequest.profile(); + analyze = pplQueryRequest.analyze(); explainMode = pplQueryRequest.mode().getModeName(); queryId = pplQueryRequest.queryId(); } @@ -83,6 +89,7 @@ public TransportPPLQueryRequest(StreamInput in) throws IOException { sanitize = in.readBoolean(); style = in.readEnum(JsonResponseFormatter.Style.class); profile = in.readBoolean(); + analyze = in.readBoolean(); queryId = in.readOptionalString(); } @@ -116,6 +123,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(sanitize); out.writeEnum(style); out.writeBoolean(profile); + out.writeBoolean(analyze); out.writeOptionalString(queryId); } @@ -172,7 +180,7 @@ public String getDescription() { /** Convert to PPLQueryRequest. */ public PPLQueryRequest toPPLQueryRequest() { PPLQueryRequest pplQueryRequest = - new PPLQueryRequest(pplQuery, jsonContent, path, format, explainMode, profile); + new PPLQueryRequest(pplQuery, jsonContent, path, format, explainMode, profile, analyze); pplQueryRequest.sanitize(sanitize); pplQueryRequest.style(style); pplQueryRequest.queryId(queryId); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index ea353066cb0..48a2dd8b686 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -8,18 +8,26 @@ import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import static org.opensearch.sql.executor.execution.QueryPlanFactory.NO_CONSUMER_RESPONSE_LISTENER; +import java.util.ArrayList; +import java.util.List; import lombok.extern.log4j.Log4j2; +import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.tree.ParseTree; +import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.utils.QueryContext; +import org.opensearch.sql.executor.AnalyzeResponse; +import org.opensearch.sql.executor.AnalyzeResponse.QuerySegment; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.executor.QueryManager; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.executor.execution.AbstractPlan; import org.opensearch.sql.executor.execution.QueryPlanFactory; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; +import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.ppl.parser.AstBuilder; import org.opensearch.sql.ppl.parser.AstStatementBuilder; @@ -85,6 +93,91 @@ public void explain(PPLQueryRequest request, ResponseListener l } } + /** + * Analyze the query: produces the AST node and logical plan RelNode. + * + * @param request {@link PPLQueryRequest} + * @param listener {@link ResponseListener} for analyze response + */ + public void analyze(PPLQueryRequest request, ResponseListener listener) { + try { + String queryText = request.getRequest(); + ParseTree cst = parser.parse(queryText); + Statement statement = + cst.accept( + new AstStatementBuilder( + new AstBuilder(queryText, settings), + AstStatementBuilder.StatementBuilderContext.builder() + .isExplain(false) + .fetchSize(request.getFetchSize()) + .highlightConfig(request.getHighlightConfig()) + .format(request.getFormat()) + .build())); + + log.info( + "[{}] Incoming request {}", + QueryContext.getRequestId(), + anonymizer.anonymizeStatement(statement)); + + List querySegments = extractQuerySegments(cst, queryText); + UnresolvedPlan unresolvedPlan = ((Query) statement).getPlan(); + queryManager.submit( + queryExecutionFactory.createAnalyzePlan( + queryText, querySegments, unresolvedPlan, PPL_QUERY, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private List extractQuerySegments(ParseTree cst, String queryText) { + List segments = new ArrayList<>(); + OpenSearchPPLParser.QueryStatementContext queryStmt = findQueryStatement(cst); + if (queryStmt == null) { + return segments; + } + + // First segment: the search/source command (pplCommands) + OpenSearchPPLParser.PplCommandsContext pplCommands = queryStmt.pplCommands(); + if (pplCommands != null) { + segments.add(buildSegment(pplCommands, queryText)); + } + + // Remaining segments: each piped command + for (OpenSearchPPLParser.CommandsContext cmd : queryStmt.commands()) { + segments.add(buildSegment(cmd, queryText)); + } + return segments; + } + + private OpenSearchPPLParser.QueryStatementContext findQueryStatement(ParseTree tree) { + if (tree instanceof OpenSearchPPLParser.QueryStatementContext ctx) { + return ctx; + } + for (int i = 0; i < tree.getChildCount(); i++) { + OpenSearchPPLParser.QueryStatementContext result = findQueryStatement(tree.getChild(i)); + if (result != null) { + return result; + } + } + return null; + } + + private QuerySegment buildSegment(ParserRuleContext ctx, String queryText) { + int start = ctx.getStart().getStartIndex(); + int stop = ctx.getStop().getStopIndex(); + String source = queryText.substring(start, stop + 1); + // For wrapper rules like CommandsContext, drill into the specific child command + ParserRuleContext target = ctx; + if (ctx.getChildCount() == 1 && ctx.getChild(0) instanceof ParserRuleContext child) { + target = child; + } + String nodeType = target.getClass().getSimpleName().replace("Context", ""); + return QuerySegment.builder() + .nodeType(nodeType) + .source(source) + .build(); + } + private AbstractPlan plan( PPLQueryRequest request, ResponseListener queryListener, diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index 06c7fe1c38e..8bb2531675c 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -52,6 +52,11 @@ public class PPLQueryRequest { @Accessors(fluent = true) private boolean profile = false; + @Setter + @Getter + @Accessors(fluent = true) + private boolean analyze = false; + @Setter @Getter @Accessors(fluent = true) @@ -62,7 +67,12 @@ public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path) { } public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path, String format) { - this(pplQuery, jsonContent, path, format, ExplainMode.STANDARD.getModeName(), false); + this(pplQuery, jsonContent, path, format, ExplainMode.STANDARD.getModeName(), false, false); + } + + public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path, + String format, String explainMode, boolean profile) { + this(pplQuery, jsonContent, path, format, explainMode, profile, false); } /** Constructor of PPLQueryRequest. */ @@ -72,13 +82,15 @@ public PPLQueryRequest( String path, String format, String explainMode, - boolean profile) { + boolean profile, + boolean analyze) { this.pplQuery = pplQuery; this.jsonContent = jsonContent; this.path = Optional.ofNullable(path).orElse(DEFAULT_PPL_PATH); this.format = format; this.explainMode = explainMode; this.profile = profile; + this.analyze = analyze; } public String getRequest() {