Skip to content

Commit 4b8ae10

Browse files
authored
[Beam SQL] Implement Catalog and CatalogManager (#35223)
* beam sql catalog * api adjustment * cover more naming syntax (quotes, backticks, none) * spotless * fix * add documentation and cleanup * rename to dropCatalog; mark BeamSqlCli @internal; rename to EmptyCatalogManager * use registrars instead; remove initialize() method from Catalog * cleanupo
1 parent c9958d2 commit 4b8ae10

26 files changed

Lines changed: 1307 additions & 44 deletions
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
1-
{}
1+
{
2+
"comment": "Modify this file in a trivial way to cause this test suite to run",
3+
"modification": 1
4+
}

sdks/java/extensions/sql/src/main/codegen/config.fmpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@ data: {
2525
"org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlCreate"
2626
"org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlDrop"
2727
"org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName"
28+
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateCatalog"
2829
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateExternalTable"
2930
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateFunction"
31+
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropCatalog"
3032
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes"
33+
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetCatalog"
3134
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam"
3235
"org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
3336
"org.apache.beam.sdk.schemas.Schema"
@@ -41,6 +44,7 @@ data: {
4144
"JAR"
4245
"LOCATION"
4346
"TBLPROPERTIES"
47+
"PROPERTIES"
4448
]
4549

4650
# List of keywords from "keywords" section that are not reserved.
@@ -364,6 +368,7 @@ data: {
364368
"JAR"
365369
"LOCATION"
366370
"TBLPROPERTIES"
371+
"PROPERTIES"
367372
]
368373

369374
# List of non-reserved keywords to add;
@@ -385,6 +390,7 @@ data: {
385390
# Return type of method implementation should be 'SqlNode'.
386391
# Example: SqlShowDatabases(), SqlShowTables().
387392
statementParserMethods: [
393+
"SqlSetCatalog(Span.of(), null)"
388394
"SqlSetOptionBeam(Span.of(), null)"
389395
]
390396

@@ -416,6 +422,7 @@ data: {
416422
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
417423
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
418424
createStatementParserMethods: [
425+
"SqlCreateCatalog"
419426
"SqlCreateExternalTable"
420427
"SqlCreateFunction"
421428
"SqlCreateTableNotSupportedMessage"
@@ -425,6 +432,7 @@ data: {
425432
# Each must accept arguments "(SqlParserPos pos)".
426433
dropStatementParserMethods: [
427434
"SqlDropTable"
435+
"SqlDropCatalog"
428436
]
429437

430438
# Binary operators tokens

sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,125 @@ Schema.Field Field() :
136136
}
137137
}
138138

139+
SqlNodeList PropertyList() :
140+
{
141+
SqlNodeList list = new SqlNodeList(getPos());
142+
SqlNode property;
143+
}
144+
{
145+
property = Property() { list.add(property); }
146+
(
147+
<COMMA> property = Property() { list.add(property); }
148+
)*
149+
{
150+
return list;
151+
}
152+
}
153+
154+
155+
SqlNode Property() :
156+
{
157+
SqlNode key;
158+
SqlNode value;
159+
}
160+
{
161+
key = StringLiteral()
162+
<EQ>
163+
value = StringLiteral()
164+
{
165+
SqlNodeList pair = new SqlNodeList(getPos());
166+
pair.add(key);
167+
pair.add(value);
168+
return pair;
169+
}
170+
}
171+
172+
/**
173+
* CREATE CATALOG ( IF NOT EXISTS )? catalog_name
174+
* TYPE type_name
175+
* ( PROPERTIES '(' key = value ( ',' key = value )* ')' )?
176+
*/
177+
SqlCreate SqlCreateCatalog(Span s, boolean replace) :
178+
{
179+
final boolean ifNotExists;
180+
final SqlNode catalogName;
181+
final SqlNode type;
182+
SqlNodeList properties = null;
183+
}
184+
{
185+
186+
<CATALOG> {
187+
s.add(this);
188+
}
189+
190+
ifNotExists = IfNotExistsOpt()
191+
(
192+
catalogName = StringLiteral()
193+
|
194+
catalogName = SimpleIdentifier()
195+
)
196+
<TYPE>
197+
(
198+
type = StringLiteral()
199+
|
200+
type = SimpleIdentifier()
201+
)
202+
[ <PROPERTIES> <LPAREN> properties = PropertyList() <RPAREN> ]
203+
204+
{
205+
return new SqlCreateCatalog(
206+
s.end(this),
207+
replace,
208+
ifNotExists,
209+
catalogName,
210+
type,
211+
properties);
212+
}
213+
}
214+
215+
/**
216+
* SET CATALOG catalog_name
217+
*/
218+
SqlCall SqlSetCatalog(Span s, String scope) :
219+
{
220+
final SqlNode catalogName;
221+
}
222+
{
223+
<SET> {
224+
s.add(this);
225+
}
226+
<CATALOG>
227+
(
228+
catalogName = StringLiteral()
229+
|
230+
catalogName = SimpleIdentifier()
231+
)
232+
{
233+
return new SqlSetCatalog(
234+
s.end(this),
235+
scope,
236+
catalogName);
237+
}
238+
}
239+
240+
241+
SqlDrop SqlDropCatalog(Span s, boolean replace) :
242+
{
243+
final boolean ifExists;
244+
final SqlNode catalogName;
245+
}
246+
{
247+
<CATALOG> ifExists = IfExistsOpt()
248+
(
249+
catalogName = StringLiteral()
250+
|
251+
catalogName = SimpleIdentifier()
252+
)
253+
{
254+
return new SqlDropCatalog(s.end(this), ifExists, catalogName);
255+
}
256+
}
257+
139258
/**
140259
* Note: This example is probably out of sync with the code.
141260
*

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.beam.sdk.extensions.sql;
1919

2020
import org.apache.beam.sdk.Pipeline;
21+
import org.apache.beam.sdk.annotations.Internal;
2122
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
2223
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
2324
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
2425
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
26+
import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
2527
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
2628
import org.apache.beam.sdk.options.PipelineOptions;
2729
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -30,19 +32,25 @@
3032
@SuppressWarnings({
3133
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
3234
})
35+
@Internal
3336
public class BeamSqlCli {
3437
private BeamSqlEnv env;
3538
/** The store which persists all the table meta data. */
3639
private MetaStore metaStore;
3740

38-
public BeamSqlCli metaStore(MetaStore metaStore) {
39-
return metaStore(metaStore, false, PipelineOptionsFactory.create());
41+
public BeamSqlCli catalogManager(CatalogManager catalogManager) {
42+
return build(BeamSqlEnv.builder(catalogManager), false, PipelineOptionsFactory.create());
4043
}
4144

42-
public BeamSqlCli metaStore(
43-
MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) {
45+
public BeamSqlCli metaStore(MetaStore metaStore) {
4446
this.metaStore = metaStore;
45-
BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
47+
return build(BeamSqlEnv.builder(metaStore), false, PipelineOptionsFactory.create());
48+
}
49+
50+
public BeamSqlCli build(
51+
BeamSqlEnv.BeamSqlEnvBuilder builder,
52+
boolean autoLoadUdfUdaf,
53+
PipelineOptions pipelineOptions) {
4654
if (autoLoadUdfUdaf) {
4755
builder.autoLoadUserDefinedFunctions();
4856
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
3232
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
3333
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
34+
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
3435
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
3536
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
36-
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
3737
import org.apache.beam.sdk.transforms.Combine;
3838
import org.apache.beam.sdk.transforms.PTransform;
3939
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -136,9 +136,9 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
136136
public PCollection<Row> expand(PInput input) {
137137
TableProvider inputTableProvider =
138138
new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input));
139-
InMemoryMetaStore metaTableProvider = new InMemoryMetaStore();
140-
metaTableProvider.registerProvider(inputTableProvider);
141-
BeamSqlEnvBuilder sqlEnvBuilder = BeamSqlEnv.builder(metaTableProvider);
139+
InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
140+
catalogManager.registerTableProvider(PCOLLECTION_NAME, inputTableProvider);
141+
BeamSqlEnvBuilder sqlEnvBuilder = BeamSqlEnv.builder(catalogManager);
142142

143143
// TODO: validate duplicate functions.
144144
registerFunctions(sqlEnvBuilder);
@@ -147,7 +147,7 @@ public PCollection<Row> expand(PInput input) {
147147
// the same names are reused.
148148
if (autoLoading()) {
149149
sqlEnvBuilder.autoLoadUserDefinedFunctions();
150-
ServiceLoader.load(TableProvider.class).forEach(metaTableProvider::registerProvider);
150+
ServiceLoader.load(TableProvider.class).forEach(catalogManager::registerTableProvider);
151151
}
152152

153153
tableProviderMap().forEach(sqlEnvBuilder::addSchema);

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.impl;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
2022
import java.util.Collection;
2123
import java.util.Collections;
2224
import java.util.HashMap;
2325
import java.util.Map;
2426
import java.util.Set;
2527
import org.apache.beam.sdk.extensions.sql.meta.Table;
28+
import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog;
29+
import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
2630
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
2731
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.linq4j.tree.Expression;
2832
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.type.RelProtoDataType;
@@ -37,7 +41,8 @@
3741
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
3842
public class BeamCalciteSchema implements Schema {
3943
private JdbcConnection connection;
40-
private TableProvider tableProvider;
44+
private @Nullable TableProvider tableProvider;
45+
private @Nullable CatalogManager catalogManager;
4146
private Map<String, BeamCalciteSchema> subSchemas;
4247

4348
BeamCalciteSchema(JdbcConnection jdbcConnection, TableProvider tableProvider) {
@@ -46,8 +51,22 @@ public class BeamCalciteSchema implements Schema {
4651
this.subSchemas = new HashMap<>();
4752
}
4853

54+
/**
55+
* Creates a {@link BeamCalciteSchema} representing a {@link CatalogManager}. This will typically
56+
* be the root node of a pipeline.
57+
*/
58+
BeamCalciteSchema(JdbcConnection jdbcConnection, CatalogManager catalogManager) {
59+
this.connection = jdbcConnection;
60+
this.catalogManager = catalogManager;
61+
this.subSchemas = new HashMap<>();
62+
}
63+
4964
public TableProvider getTableProvider() {
50-
return tableProvider;
65+
return resolveMetastore();
66+
}
67+
68+
public @Nullable CatalogManager getCatalogManager() {
69+
return catalogManager;
5170
}
5271

5372
public Map<String, String> getPipelineOptions() {
@@ -87,7 +106,7 @@ public Expression getExpression(SchemaPlus parentSchema, String name) {
87106

88107
@Override
89108
public Set<String> getTableNames() {
90-
return tableProvider.getTables().keySet();
109+
return resolveMetastore().getTables().keySet();
91110
}
92111

93112
@Override
@@ -103,12 +122,12 @@ public Set<String> getTypeNames() {
103122
@Override
104123
public org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.Table getTable(
105124
String name) {
106-
Table table = tableProvider.getTable(name);
125+
Table table = resolveMetastore().getTable(name);
107126
if (table == null) {
108127
return null;
109128
}
110129
return new BeamCalciteTable(
111-
tableProvider.buildBeamSqlTable(table),
130+
resolveMetastore().buildBeamSqlTable(table),
112131
getPipelineOptions(),
113132
connection.getPipelineOptions());
114133
}
@@ -125,17 +144,36 @@ public Collection<Function> getFunctions(String name) {
125144

126145
@Override
127146
public Set<String> getSubSchemaNames() {
128-
return tableProvider.getSubProviders();
147+
return resolveMetastore().getSubProviders();
129148
}
130149

150+
/**
151+
* If this is the root schema (in other words, a {@link CatalogManager}), the sub schema will be a
152+
* {@link Catalog}'s metastore.
153+
*
154+
* <p>Otherwise, the sub-schema is derived from the {@link TableProvider} implementation.
155+
*/
131156
@Override
132157
public Schema getSubSchema(String name) {
133158
if (!subSchemas.containsKey(name)) {
134-
TableProvider subProvider = tableProvider.getSubProvider(name);
135-
BeamCalciteSchema subSchema =
136-
subProvider == null ? null : new BeamCalciteSchema(connection, subProvider);
159+
BeamCalciteSchema subSchema;
160+
if (tableProvider != null) {
161+
@Nullable TableProvider subProvider = tableProvider.getSubProvider(name);
162+
subSchema = subProvider != null ? new BeamCalciteSchema(connection, subProvider) : null;
163+
} else {
164+
@Nullable Catalog catalog = checkStateNotNull(catalogManager).getCatalog(name);
165+
subSchema = catalog != null ? new BeamCalciteSchema(connection, catalog.metaStore()) : null;
166+
}
137167
subSchemas.put(name, subSchema);
138168
}
169+
139170
return subSchemas.get(name);
140171
}
172+
173+
public TableProvider resolveMetastore() {
174+
if (tableProvider != null) {
175+
return tableProvider;
176+
}
177+
return checkStateNotNull(catalogManager).currentCatalog().metaStore();
178+
}
141179
}

0 commit comments

Comments
 (0)