Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This module provides components organized into two main areas aligned with the [

### Unified Language Specification

- **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation.
- **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language) or SQL queries and returns Calcite `RelNode` logical plans as intermediate representation.
- **`UnifiedQueryTranspiler`**: Converts Calcite logical plans (`RelNode`) into SQL strings for various target databases using different SQL dialects.

### Unified Execution Runtime
Expand All @@ -17,7 +17,7 @@ This module provides components organized into two main areas aligned with the [
- **`UnifiedFunction`**: Engine-agnostic function interface that enables functions to be evaluated across different execution engines without engine-specific code duplication.
- **`UnifiedFunctionRepository`**: Repository for discovering and loading functions as `UnifiedFunction` instances, providing a bridge between function definitions and external execution engines.

Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines.
Together, these components enable complete workflows: parse PPL or SQL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines.

### Experimental API Design

Expand All @@ -33,7 +33,7 @@ Create a context with catalog configuration, query type, and optional settings:

```java
UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.language(QueryType.PPL) // or QueryType.SQL for SQL
.catalog("opensearch", opensearchSchema)
.catalog("spark_catalog", sparkSchema)
.defaultNamespace("opensearch")
Expand All @@ -44,7 +44,7 @@ UnifiedQueryContext context = UnifiedQueryContext.builder()

### UnifiedQueryPlanner

Use `UnifiedQueryPlanner` to parse and analyze PPL queries into Calcite logical plans. The planner accepts a `UnifiedQueryContext` and can be reused for multiple queries.
Use `UnifiedQueryPlanner` to parse and analyze PPL or SQL queries into Calcite logical plans. The planner accepts a `UnifiedQueryContext` and can be reused for multiple queries.

```java
// Create planner with context
Expand All @@ -53,6 +53,9 @@ UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
// Plan multiple queries (context is reused)
RelNode plan1 = planner.plan("source = logs | where status = 200");
RelNode plan2 = planner.plan("source = metrics | stats avg(cpu)");

// SQL queries are also supported (with QueryType.SQL context)
RelNode plan3 = planner.plan("SELECT * FROM logs WHERE status = 200");
```

### UnifiedQueryTranspiler
Expand Down Expand Up @@ -226,5 +229,4 @@ public class MySchema extends AbstractSchema {

## Future Work

- Expand support to SQL language.
- Extend planner to generate optimized physical plans using Calcite's optimization frameworks.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Map;
import java.util.Objects;
import lombok.Value;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
Expand Down Expand Up @@ -176,13 +177,18 @@ private FrameworkConfig buildFrameworkConfig() {

SchemaPlus defaultSchema = findSchemaByPath(rootSchema, defaultNamespace);
return Frameworks.newConfigBuilder()
.parserConfig(SqlParser.Config.DEFAULT)
.parserConfig(buildParserConfig())
.defaultSchema(defaultSchema)
.traitDefs((List<RelTraitDef>) null)
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
.build();
}

private SqlParser.Config buildParserConfig() {
// Preserve identifier case for lowercase OpenSearch index names
return SqlParser.Config.DEFAULT.withUnquotedCasing(Casing.UNCHANGED);
}

private SchemaPlus findSchemaByPath(SchemaPlus rootSchema, String defaultPath) {
if (defaultPath == null) {
return rootSchema;
Expand Down
101 changes: 63 additions & 38 deletions api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@

package org.opensearch.sql.api;

import lombok.RequiredArgsConstructor;
import org.antlr.v4.runtime.tree.ParseTree;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.CalciteRelNodeVisitor;
import org.opensearch.sql.common.antlr.Parser;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.ppl.antlr.PPLSyntaxParser;
Expand All @@ -28,36 +32,32 @@
* such as Spark or command-line tools, abstracting away Calcite internals.
*/
public class UnifiedQueryPlanner {
/** The parser instance responsible for converting query text into a parse tree. */
private final Parser parser;

/** Unified query context containing CalcitePlanContext with all configuration. */
private final UnifiedQueryContext context;

/** AST-to-RelNode visitor that builds logical plans from the parsed AST. */
private final CalciteRelNodeVisitor relNodeVisitor =
new CalciteRelNodeVisitor(new EmptyDataSourceService());
/** Planning strategy selected at construction time based on query type. */
private final PlanningStrategy strategy;

/**
* Constructs a UnifiedQueryPlanner with a unified query context.
*
* @param context the unified query context containing CalcitePlanContext
*/
public UnifiedQueryPlanner(UnifiedQueryContext context) {
this.parser = buildQueryParser(context.getPlanContext().queryType);
this.context = context;
this.strategy =
context.getPlanContext().queryType == QueryType.SQL
? new CalciteNativeStrategy(context)
: new CustomVisitorStrategy(context);
}

/**
* Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate
* optimal physical plan to fully unify query execution and leverage Calcite's optimizer.
*
* @param query the raw query string in PPL or other supported syntax
* @param query the raw query string in PPL or SQL syntax
* @return a logical plan representing the query
*/
public RelNode plan(String query) {
try {
return preserveCollation(analyze(parse(query)));
return strategy.plan(query);
} catch (SyntaxCheckException e) {
// Re-throw syntax error without wrapping
throw e;
Expand All @@ -66,38 +66,63 @@ public RelNode plan(String query) {
}
}

private Parser buildQueryParser(QueryType queryType) {
if (queryType == QueryType.PPL) {
return new PPLSyntaxParser();
}
throw new IllegalArgumentException("Unsupported query type: " + queryType);
/** Strategy interface for language-specific planning logic. */
private interface PlanningStrategy {
RelNode plan(String query) throws Exception;
}

private UnresolvedPlan parse(String query) {
ParseTree cst = parser.parse(query);
AstStatementBuilder astStmtBuilder =
new AstStatementBuilder(
new AstBuilder(query, context.getSettings()),
AstStatementBuilder.StatementBuilderContext.builder().build());
Statement statement = cst.accept(astStmtBuilder);
/** ANSI SQL planning using Calcite's native SqlParser → SqlValidator → SqlToRelConverter. */
@RequiredArgsConstructor
private static class CalciteNativeStrategy implements PlanningStrategy {
private final UnifiedQueryContext context;

if (statement instanceof Query) {
return ((Query) statement).getPlan();
@Override
public RelNode plan(String query) throws Exception {
try (Planner planner = Frameworks.getPlanner(context.getPlanContext().config)) {
SqlNode parsed = planner.parse(query);
SqlNode validated = planner.validate(parsed);
RelRoot relRoot = planner.rel(validated);
return relRoot.project();
}
}
throw new UnsupportedOperationException(
"Only query statements are supported but got " + statement.getClass().getSimpleName());
}

private RelNode analyze(UnresolvedPlan ast) {
return relNodeVisitor.analyze(ast, context.getPlanContext());
}
/** AST-based planning via ANTLR parser → UnresolvedPlan → CalciteRelNodeVisitor. */
@RequiredArgsConstructor
private static class CustomVisitorStrategy implements PlanningStrategy {
private final UnifiedQueryContext context;
private final PPLSyntaxParser parser = new PPLSyntaxParser();
private final CalciteRelNodeVisitor relNodeVisitor =
new CalciteRelNodeVisitor(new EmptyDataSourceService());

@Override
public RelNode plan(String query) {
UnresolvedPlan ast = parse(query);
RelNode logical = relNodeVisitor.analyze(ast, context.getPlanContext());
return preserveCollation(logical);
}

private UnresolvedPlan parse(String query) {
ParseTree cst = parser.parse(query);
AstStatementBuilder astStmtBuilder =
new AstStatementBuilder(
new AstBuilder(query, context.getSettings()),
AstStatementBuilder.StatementBuilderContext.builder().build());
Statement statement = cst.accept(astStmtBuilder);

if (statement instanceof Query) {
return ((Query) statement).getPlan();
}
throw new UnsupportedOperationException(
"Only query statements are supported but got " + statement.getClass().getSimpleName());
}

private RelNode preserveCollation(RelNode logical) {
RelNode calcitePlan = logical;
RelCollation collation = logical.getTraitSet().getCollation();
if (!(logical instanceof Sort) && collation != RelCollations.EMPTY) {
calcitePlan = LogicalSort.create(logical, collation, null, null);
private RelNode preserveCollation(RelNode logical) {
RelCollation collation = logical.getTraitSet().getCollation();
if (!(logical instanceof Sort) && collation != RelCollations.EMPTY) {
return LogicalSort.create(logical, collation, null, null);
}
return logical;
}
return calcitePlan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public void testMissingQueryType() {
UnifiedQueryContext.builder().catalog("opensearch", testSchema).build();
}

@Test(expected = IllegalArgumentException.class)
public void testUnsupportedQueryType() {
@Test
public void testSqlQueryType() {
UnifiedQueryContext context =
UnifiedQueryContext.builder()
.language(QueryType.SQL) // only PPL is supported for now
.language(QueryType.SQL)
.catalog("opensearch", testSchema)
.build();
new UnifiedQueryPlanner(context);
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
assertNotNull("SQL planner should be created", planner);
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Loading
Loading