Skip to content

Commit 7b9f657

Browse files
Add unified query compiler API (#4974) (#5024)
* Reuse context in test base class * Rename package from runtime to compiler * Refactor compiler to pass in context via constructor * Polish javadoc and test code * Add IT for OpenSearch integration with unified query API * Simplify IT and reuse test fixtures from api module * Polish doc * Fix connection close issue and close in context * Polish for PR review * Address AI comment regarding double closing context --------- (cherry picked from commit 46302b7) Signed-off-by: Chen Dai <daichen@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 48fa93f commit 7b9f657

10 files changed

Lines changed: 554 additions & 42 deletions

File tree

api/README.md

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,18 @@ This module provides a high-level integration layer for the Calcite-based query
44

55
## Overview
66

7-
This module provides two primary components:
7+
This module provides components organized into two main areas aligned with the [Unified Query API architecture](https://github.com/opensearch-project/sql/issues/4782):
8+
9+
### Unified Language Specification
810

911
- **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation.
1012
- **`UnifiedQueryTranspiler`**: Converts Calcite logical plans (`RelNode`) into SQL strings for various target databases using different SQL dialects.
1113

12-
Together, these components enable a complete workflow: parse PPL queries into logical plans, then transpile those plans into target database SQL.
14+
### Unified Execution Runtime
15+
16+
- **`UnifiedQueryCompiler`**: Compiles Calcite logical plans (`RelNode`) into executable JDBC `PreparedStatement` objects for separation of compilation and execution.
17+
18+
Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, or compile and execute queries directly for testing and conformance validation.
1319

1420
### Experimental API Design
1521

@@ -59,40 +65,63 @@ UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
5965
String sql = transpiler.toSql(plan);
6066
```
6167

62-
### Complete Workflow Example
68+
Supported SQL dialects include:
69+
- `SparkSqlDialect.DEFAULT` - Apache Spark SQL
70+
- `PostgresqlSqlDialect.DEFAULT` - PostgreSQL
71+
- `MysqlSqlDialect.DEFAULT` - MySQL
72+
- And other Calcite-supported dialects
73+
74+
### UnifiedQueryCompiler
6375

64-
Combining all components to transpile PPL queries into target database SQL:
76+
Use `UnifiedQueryCompiler` to compile Calcite logical plans into executable JDBC statements. This separates compilation from execution and returns standard JDBC types.
6577

6678
```java
67-
// Step 1: Create reusable context (shared across components)
68-
UnifiedQueryContext context = UnifiedQueryContext.builder()
69-
.language(QueryType.PPL)
70-
.catalog("catalog", schema)
71-
.defaultNamespace("catalog")
72-
.build();
79+
UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context);
7380

74-
// Step 2: Create planner with context
75-
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
81+
try (PreparedStatement statement = compiler.compile(plan)) {
82+
ResultSet rs = statement.executeQuery();
83+
while (rs.next()) {
84+
// Standard JDBC ResultSet access
85+
}
86+
}
87+
```
7688

77-
// Step 3: Plan PPL query into logical plan
78-
RelNode plan = planner.plan("source = employees | where age > 30");
89+
### Complete Workflow Examples
7990

80-
// Step 4: Create transpiler with target dialect
81-
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
82-
.dialect(SparkSqlDialect.DEFAULT)
83-
.build();
91+
Combining all components for a complete PPL query workflow:
8492

85-
// Step 5: Transpile to target SQL
86-
String sparkSql = transpiler.toSql(plan);
87-
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
93+
```java
94+
// Step 1: Create reusable context (shared across all components)
95+
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
96+
.language(QueryType.PPL)
97+
.catalog("catalog", schema)
98+
.defaultNamespace("catalog")
99+
.build()) {
100+
101+
// Step 2: Create planner with context
102+
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
103+
104+
// Step 3: Plan PPL query into logical plan
105+
RelNode plan = planner.plan("source = employees | where age > 30");
106+
107+
// Option A: Transpile to target SQL
108+
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
109+
.dialect(SparkSqlDialect.DEFAULT)
110+
.build();
111+
String sparkSql = transpiler.toSql(plan);
112+
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
113+
114+
// Option B: Compile and execute directly
115+
UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context);
116+
try (PreparedStatement statement = compiler.compile(plan)) {
117+
ResultSet rs = statement.executeQuery();
118+
while (rs.next()) {
119+
// Process results with standard JDBC
120+
}
121+
}
122+
}
88123
```
89124

90-
Supported SQL dialects include:
91-
- `SparkSqlDialect.DEFAULT` - Apache Spark SQL
92-
- `PostgresqlSqlDialect.DEFAULT` - PostgreSQL
93-
- `MysqlSqlDialect.DEFAULT` - MySQL
94-
- And other Calcite-supported dialects
95-
96125
## Development & Testing
97126

98127
A set of unit tests is provided to validate planner behavior.

api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
plugins {
77
id 'java-library'
8+
id 'java-test-fixtures'
89
id "io.freefair.lombok"
910
id 'jacoco'
1011
id 'com.diffplug.spotless'
@@ -13,10 +14,14 @@ plugins {
1314
dependencies {
1415
api project(':ppl')
1516

17+
testImplementation testFixtures(project(':api'))
1618
testImplementation group: 'junit', name: 'junit', version: '4.13.2'
1719
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}"
1820
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}"
1921
testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.41.0'
22+
23+
testFixturesApi group: 'junit', name: 'junit', version: '4.13.2'
24+
testFixturesApi group: 'org.hamcrest', name: 'hamcrest', version: "${hamcrest_version}"
2025
}
2126

2227
spotless {

api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,26 @@
3434
* enabling consistent behavior across all unified query operations.
3535
*/
3636
@Value
37-
public class UnifiedQueryContext {
37+
public class UnifiedQueryContext implements AutoCloseable {
3838

3939
/** CalcitePlanContext containing Calcite framework configuration and query type. */
4040
CalcitePlanContext planContext;
4141

4242
/** Settings containing execution limits and feature flags used by parsers and planners. */
4343
Settings settings;
4444

45+
/**
46+
* Closes the underlying resource managed by this context.
47+
*
48+
* @throws Exception if an error occurs while closing the connection
49+
*/
50+
@Override
51+
public void close() throws Exception {
52+
if (planContext != null && planContext.connection != null) {
53+
planContext.connection.close();
54+
}
55+
}
56+
4557
/** Creates a new builder for UnifiedQueryContext. */
4658
public static Builder builder() {
4759
return new Builder();
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api.compiler;
7+
8+
import java.sql.Connection;
9+
import java.sql.PreparedStatement;
10+
import lombok.NonNull;
11+
import org.apache.calcite.interpreter.Bindables;
12+
import org.apache.calcite.plan.RelOptTable;
13+
import org.apache.calcite.rel.RelHomogeneousShuttle;
14+
import org.apache.calcite.rel.RelNode;
15+
import org.apache.calcite.rel.core.TableScan;
16+
import org.apache.calcite.rel.logical.LogicalTableScan;
17+
import org.apache.calcite.tools.RelRunner;
18+
import org.opensearch.sql.api.UnifiedQueryContext;
19+
20+
/**
21+
* {@code UnifiedQueryCompiler} compiles Calcite logical plans ({@link RelNode}) into executable
22+
* JDBC statements, separating query compilation from execution.
23+
*/
24+
public class UnifiedQueryCompiler {
25+
26+
/** Unified query context containing CalcitePlanContext with all configuration. */
27+
private final UnifiedQueryContext context;
28+
29+
/**
30+
* Constructs a UnifiedQueryCompiler with a unified query context.
31+
*
32+
* @param context the unified query context containing CalcitePlanContext
33+
*/
34+
public UnifiedQueryCompiler(UnifiedQueryContext context) {
35+
this.context = context;
36+
}
37+
38+
/**
39+
* Compiles a Calcite logical plan into an executable {@link PreparedStatement}. Similar to {@code
40+
* CalciteToolsHelper.OpenSearchRelRunners.run()} but does not close the connection, leaving
41+
* resource management to {@link UnifiedQueryContext}.
42+
*
43+
* @param plan the logical plan to compile (must not be null)
44+
* @return a compiled PreparedStatement ready for execution
45+
* @throws IllegalStateException if compilation fails
46+
*/
47+
public PreparedStatement compile(@NonNull RelNode plan) {
48+
try {
49+
// Apply shuttle to convert LogicalTableScan to BindableTableScan
50+
final RelHomogeneousShuttle shuttle =
51+
new RelHomogeneousShuttle() {
52+
@Override
53+
public RelNode visit(TableScan scan) {
54+
final RelOptTable table = scan.getTable();
55+
if (scan instanceof LogicalTableScan
56+
&& Bindables.BindableTableScan.canHandle(table)) {
57+
return Bindables.BindableTableScan.create(scan.getCluster(), table);
58+
}
59+
return super.visit(scan);
60+
}
61+
};
62+
RelNode transformedPlan = plan.accept(shuttle);
63+
64+
Connection connection = context.getPlanContext().connection;
65+
final RelRunner runner = connection.unwrap(RelRunner.class);
66+
return runner.prepareStatement(transformedPlan);
67+
} catch (Exception e) {
68+
throw new IllegalStateException("Failed to compile logical plan", e);
69+
}
70+
}
71+
}

api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package org.opensearch.sql.api;
77

88
import static org.junit.Assert.assertEquals;
9+
import static org.junit.Assert.assertFalse;
910
import static org.junit.Assert.assertNotNull;
11+
import static org.junit.Assert.assertTrue;
1012
import static org.opensearch.sql.common.setting.Settings.Key.*;
1113

1214
import org.junit.Test;
@@ -79,4 +81,19 @@ public void testInvalidDefaultNamespace() {
7981
.defaultNamespace("nonexistent")
8082
.build();
8183
}
84+
85+
@Test
86+
public void testContextClose() throws Exception {
87+
// Create a separate context for this test to avoid affecting other tests
88+
UnifiedQueryContext testContext =
89+
UnifiedQueryContext.builder()
90+
.language(QueryType.PPL)
91+
.catalog("opensearch", testSchema)
92+
.defaultNamespace("opensearch")
93+
.build();
94+
95+
assertFalse(testContext.getPlanContext().connection.isClosed());
96+
testContext.close();
97+
assertTrue(testContext.getPlanContext().connection.isClosed());
98+
}
8299
}

api/src/test/java/org/opensearch/sql/api/UnifiedQueryTestBase.java

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,37 @@
55

66
package org.opensearch.sql.api;
77

8+
import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
9+
import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
10+
811
import java.util.List;
912
import java.util.Map;
13+
import lombok.Builder;
14+
import lombok.Singular;
15+
import org.apache.calcite.DataContext;
16+
import org.apache.calcite.linq4j.Enumerable;
17+
import org.apache.calcite.linq4j.Linq4j;
1018
import org.apache.calcite.rel.type.RelDataType;
1119
import org.apache.calcite.rel.type.RelDataTypeFactory;
20+
import org.apache.calcite.schema.ScannableTable;
21+
import org.apache.calcite.schema.Schema;
22+
import org.apache.calcite.schema.Statistic;
23+
import org.apache.calcite.schema.Statistics;
1224
import org.apache.calcite.schema.Table;
1325
import org.apache.calcite.schema.impl.AbstractSchema;
14-
import org.apache.calcite.schema.impl.AbstractTable;
26+
import org.apache.calcite.sql.SqlCall;
27+
import org.apache.calcite.sql.SqlNode;
1528
import org.apache.calcite.sql.type.SqlTypeName;
29+
import org.junit.After;
1630
import org.junit.Before;
1731
import org.opensearch.sql.executor.QueryType;
1832

1933
/** Base class for unified query tests providing common test schema and utilities. */
2034
public abstract class UnifiedQueryTestBase {
2135

36+
/** Default catalog name */
37+
protected static final String DEFAULT_CATALOG = "catalog";
38+
2239
/** Test schema containing sample tables for testing */
2340
protected AbstractSchema testSchema;
2441

@@ -41,23 +58,74 @@ protected Map<String, Table> getTableMap() {
4158
context =
4259
UnifiedQueryContext.builder()
4360
.language(QueryType.PPL)
44-
.catalog("catalog", testSchema)
61+
.catalog(DEFAULT_CATALOG, testSchema)
4562
.build();
4663
planner = new UnifiedQueryPlanner(context);
4764
}
4865

66+
@After
67+
public void tearDown() throws Exception {
68+
if (context != null) {
69+
context.close();
70+
}
71+
}
72+
73+
/** Creates employees table with sample data for testing */
4974
protected Table createEmployeesTable() {
50-
return new AbstractTable() {
51-
@Override
52-
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
53-
return typeFactory.createStructType(
54-
List.of(
55-
typeFactory.createSqlType(SqlTypeName.INTEGER),
56-
typeFactory.createSqlType(SqlTypeName.VARCHAR),
57-
typeFactory.createSqlType(SqlTypeName.INTEGER),
58-
typeFactory.createSqlType(SqlTypeName.VARCHAR)),
59-
List.of("id", "name", "age", "department"));
60-
}
61-
};
75+
return SimpleTable.builder()
76+
.col("id", INTEGER)
77+
.col("name", VARCHAR)
78+
.col("age", INTEGER)
79+
.col("department", VARCHAR)
80+
.row(new Object[] {1, "Alice", 25, "Engineering"})
81+
.row(new Object[] {2, "Bob", 35, "Sales"})
82+
.row(new Object[] {3, "Charlie", 45, "Engineering"})
83+
.row(new Object[] {4, "Diana", 28, "Marketing"})
84+
.build();
85+
}
86+
87+
/** Reusable scannable table with builder pattern for easy table creation */
88+
@Builder
89+
protected static class SimpleTable implements ScannableTable {
90+
@Singular("col")
91+
private final Map<String, SqlTypeName> schema;
92+
93+
@Singular private final List<Object[]> rows;
94+
95+
@Override
96+
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
97+
RelDataTypeFactory.Builder builder = typeFactory.builder();
98+
schema.forEach(builder::add);
99+
return builder.build();
100+
}
101+
102+
@Override
103+
public Enumerable<Object[]> scan(DataContext root) {
104+
return Linq4j.asEnumerable(rows);
105+
}
106+
107+
@Override
108+
public Statistic getStatistic() {
109+
return Statistics.UNKNOWN;
110+
}
111+
112+
@Override
113+
public Schema.TableType getJdbcTableType() {
114+
return Schema.TableType.TABLE;
115+
}
116+
117+
@Override
118+
public boolean isRolledUp(String column) {
119+
return false;
120+
}
121+
122+
@Override
123+
public boolean rolledUpColumnValidInsideAgg(
124+
String column,
125+
SqlCall call,
126+
SqlNode parent,
127+
org.apache.calcite.config.CalciteConnectionConfig config) {
128+
return false;
129+
}
62130
}
63131
}

0 commit comments

Comments
 (0)