Skip to content

Commit 5d6f021

Browse files
Extract unified query context for shared config management (#4933) (#4970)
* Extract UnifiedQueryPlanner.Builder to UnifiedQueryContext * Remove backward compatibility code * Refactor unified query context with setting setter * Initialize unified query context with default system limits * Refactor setting map read * Update javadoc and rename queryType to language * Address AI comments * Reuse context in test base class * Remove session in doc --------- (cherry picked from commit 297074c) Signed-off-by: Chen Dai <daichen@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 61d9c18 commit 5d6f021

7 files changed

Lines changed: 343 additions & 215 deletions

File tree

api/README.md

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,34 @@ Together, these components enable a complete workflow: parse PPL queries into lo
1717

1818
## Usage
1919

20-
### UnifiedQueryPlanner
20+
### UnifiedQueryContext
21+
22+
`UnifiedQueryContext` is a reusable abstraction shared across unified query components (planner, compiler, etc.). It bundles `CalcitePlanContext` and `Settings` into a single object, centralizing configuration for all unified query operations.
2123

22-
Use the declarative, fluent builder API to initialize the `UnifiedQueryPlanner`.
24+
Create a context with catalog configuration, query type, and optional settings:
2325

2426
```java
25-
UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder()
27+
UnifiedQueryContext context = UnifiedQueryContext.builder()
2628
.language(QueryType.PPL)
27-
.catalog("opensearch", schema)
29+
.catalog("opensearch", opensearchSchema)
30+
.catalog("spark_catalog", sparkSchema)
2831
.defaultNamespace("opensearch")
2932
.cacheMetadata(true)
33+
.setting("plugins.query.size_limit", 200)
3034
.build();
35+
```
36+
37+
### UnifiedQueryPlanner
3138

32-
RelNode plan = planner.plan("source = opensearch.test");
39+
Use `UnifiedQueryPlanner` to parse and analyze PPL queries into Calcite logical plans. The planner accepts a `UnifiedQueryContext` and can be reused for multiple queries.
40+
41+
```java
42+
// Create planner with context
43+
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
44+
45+
// Plan multiple queries (context is reused)
46+
RelNode plan1 = planner.plan("source = logs | where status = 200");
47+
RelNode plan2 = planner.plan("source = metrics | stats avg(cpu)");
3348
```
3449

3550
### UnifiedQueryTranspiler
@@ -46,25 +61,28 @@ String sql = transpiler.toSql(plan);
4661

4762
### Complete Workflow Example
4863

49-
Combining both components to transpile PPL queries into target database SQL:
64+
Combining all components to transpile PPL queries into target database SQL:
5065

5166
```java
52-
// Step 1: Initialize planner
53-
UnifiedQueryPlanner planner = UnifiedQueryPlanner.builder()
67+
// Step 1: Create reusable context (shared across components)
68+
UnifiedQueryContext context = UnifiedQueryContext.builder()
5469
.language(QueryType.PPL)
5570
.catalog("catalog", schema)
5671
.defaultNamespace("catalog")
5772
.build();
5873

59-
// Step 2: Parse PPL query into logical plan
74+
// Step 2: Create planner with context
75+
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
76+
77+
// Step 3: Plan PPL query into logical plan
6078
RelNode plan = planner.plan("source = employees | where age > 30");
6179

62-
// Step 3: Initialize transpiler with target dialect
80+
// Step 4: Create transpiler with target dialect
6381
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
6482
.dialect(SparkSqlDialect.DEFAULT)
6583
.build();
6684

67-
// Step 4: Transpile to target SQL
85+
// Step 5: Transpile to target SQL
6886
String sparkSql = transpiler.toSql(plan);
6987
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
7088
```
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api;
7+
8+
import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT;
9+
import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT;
10+
import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT;
11+
12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Objects;
16+
import lombok.Value;
17+
import org.apache.calcite.jdbc.CalciteSchema;
18+
import org.apache.calcite.plan.RelTraitDef;
19+
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
20+
import org.apache.calcite.schema.Schema;
21+
import org.apache.calcite.schema.SchemaPlus;
22+
import org.apache.calcite.sql.parser.SqlParser;
23+
import org.apache.calcite.tools.FrameworkConfig;
24+
import org.apache.calcite.tools.Frameworks;
25+
import org.apache.calcite.tools.Programs;
26+
import org.opensearch.sql.calcite.CalcitePlanContext;
27+
import org.opensearch.sql.calcite.SysLimit;
28+
import org.opensearch.sql.common.setting.Settings;
29+
import org.opensearch.sql.executor.QueryType;
30+
31+
/**
32+
* A reusable abstraction shared across unified query components (planner, compiler, etc.). This
33+
* centralizes configuration for catalog schemas, query type, execution limits, and other settings,
34+
* enabling consistent behavior across all unified query operations.
35+
*/
36+
@Value
37+
public class UnifiedQueryContext {
38+
39+
/** CalcitePlanContext containing Calcite framework configuration and query type. */
40+
CalcitePlanContext planContext;
41+
42+
/** Settings containing execution limits and feature flags used by parsers and planners. */
43+
Settings settings;
44+
45+
/** Creates a new builder for UnifiedQueryContext. */
46+
public static Builder builder() {
47+
return new Builder();
48+
}
49+
50+
/** Builder that constructs UnifiedQueryContext. */
51+
public static class Builder {
52+
private QueryType queryType;
53+
private final Map<String, Schema> catalogs = new HashMap<>();
54+
private String defaultNamespace;
55+
private boolean cacheMetadata = false;
56+
57+
/**
58+
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
59+
* to avoid coupling with OpenSearchSettings.
60+
*/
61+
private final Map<Settings.Key, Object> settings =
62+
new HashMap<Settings.Key, Object>(
63+
Map.of(
64+
QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(),
65+
PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(),
66+
PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit()));
67+
68+
/**
69+
* Sets the query language frontend to be used.
70+
*
71+
* @param queryType the {@link QueryType}, such as PPL
72+
* @return this builder instance
73+
*/
74+
public Builder language(QueryType queryType) {
75+
this.queryType = queryType;
76+
return this;
77+
}
78+
79+
/**
80+
* Registers a catalog with the specified name and its associated schema. The schema can be a
81+
* flat or nested structure (e.g., catalog → schema → table), depending on how data is
82+
* organized.
83+
*
84+
* @param name the name of the catalog to register
85+
* @param schema the schema representing the structure of the catalog
86+
* @return this builder instance
87+
*/
88+
public Builder catalog(String name, Schema schema) {
89+
catalogs.put(name, schema);
90+
return this;
91+
}
92+
93+
/**
94+
* Sets the default namespace path for resolving unqualified table names.
95+
*
96+
* @param namespace dot-separated path (e.g., "spark_catalog.default" or "opensearch")
97+
* @return this builder instance
98+
*/
99+
public Builder defaultNamespace(String namespace) {
100+
this.defaultNamespace = namespace;
101+
return this;
102+
}
103+
104+
/**
105+
* Enables or disables catalog metadata caching in the root schema.
106+
*
107+
* @param cache whether to enable metadata caching
108+
* @return this builder instance
109+
*/
110+
public Builder cacheMetadata(boolean cache) {
111+
this.cacheMetadata = cache;
112+
return this;
113+
}
114+
115+
/**
116+
* Sets a specific setting value by name.
117+
*
118+
* @param name the setting key name (e.g., "plugins.query.size_limit")
119+
* @param value the setting value
120+
* @throws IllegalArgumentException if the setting name is not recognized
121+
*/
122+
public Builder setting(String name, Object value) {
123+
Settings.Key key =
124+
Settings.Key.of(name)
125+
.orElseThrow(() -> new IllegalArgumentException("Unknown setting name: " + name));
126+
settings.put(key, value);
127+
return this;
128+
}
129+
130+
/**
131+
* Builds a {@link UnifiedQueryContext} with the configuration.
132+
*
133+
* @return a new instance of {@link UnifiedQueryContext}
134+
*/
135+
public UnifiedQueryContext build() {
136+
Objects.requireNonNull(queryType, "Must specify language before build");
137+
138+
Settings settings = buildSettings();
139+
CalcitePlanContext planContext =
140+
CalcitePlanContext.create(
141+
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
142+
return new UnifiedQueryContext(planContext, settings);
143+
}
144+
145+
private Settings buildSettings() {
146+
return new Settings() {
147+
@Override
148+
@SuppressWarnings("unchecked")
149+
public <T> T getSettingValue(Key key) {
150+
return (T) settings.get(key);
151+
}
152+
153+
@Override
154+
public List<?> getSettings() {
155+
return List.copyOf(settings.entrySet());
156+
}
157+
};
158+
}
159+
160+
@SuppressWarnings({"rawtypes"})
161+
private FrameworkConfig buildFrameworkConfig() {
162+
SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheMetadata).plus();
163+
catalogs.forEach(rootSchema::add);
164+
165+
SchemaPlus defaultSchema = findSchemaByPath(rootSchema, defaultNamespace);
166+
return Frameworks.newConfigBuilder()
167+
.parserConfig(SqlParser.Config.DEFAULT)
168+
.defaultSchema(defaultSchema)
169+
.traitDefs((List<RelTraitDef>) null)
170+
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
171+
.build();
172+
}
173+
174+
private SchemaPlus findSchemaByPath(SchemaPlus rootSchema, String defaultPath) {
175+
if (defaultPath == null) {
176+
return rootSchema;
177+
}
178+
179+
SchemaPlus current = rootSchema;
180+
for (String part : defaultPath.split("\\.")) {
181+
current = current.getSubSchema(part);
182+
if (current == null) {
183+
throw new IllegalArgumentException("Invalid default catalog path: " + defaultPath);
184+
}
185+
}
186+
return current;
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)