Skip to content

Commit 56e80e6

Browse files
authored
[Optimization-4053] Support flink table planner loader (#4277)
1 parent e4a6388 commit 56e80e6

30 files changed

Lines changed: 8818 additions & 108 deletions

File tree

dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/sql/SqlSelect.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ private SqlNode addPermission(SqlNode where, String tableName, String tableAlias
132132
if (permissionsMap != null) {
133133
String permissionsStatement = permissionsMap.get(tableName);
134134
if (permissionsStatement != null && !"".equals(permissionsStatement)) {
135-
permissions = (SqlBasicCall)
136-
CustomTableEnvironmentContext.get().getParser().parseExpression(permissionsStatement);
135+
permissions = (SqlBasicCall) CustomTableEnvironmentContext.get().parseSql(permissionsStatement);
137136
}
138137
}
139138

dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public SqlNode parseSql(String sql) {
204204
return getParser().parseSql(sql);
205205
}
206206

207-
private static Executor lookupExecutor(
207+
protected static Executor lookupExecutor(
208208
ClassLoader classLoader, String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {
209209
try {
210210
final ExecutorFactory executorFactory =
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.executor;
21+
22+
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.configuration.ExecutionOptions;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.table.api.EnvironmentSettings;
27+
import org.apache.flink.table.api.TableConfig;
28+
import org.apache.flink.table.catalog.CatalogManager;
29+
import org.apache.flink.table.catalog.FunctionCatalog;
30+
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
31+
import org.apache.flink.table.delegation.Executor;
32+
import org.apache.flink.table.delegation.Planner;
33+
import org.apache.flink.table.factories.PlannerFactoryUtil;
34+
import org.apache.flink.table.module.ModuleManager;
35+
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
/**
40+
* PlannerTableEnvironmentImpl is used to load flink-table-planner_2.12 independently without using flink-table-planner-loader.
41+
*
42+
*/
43+
public class PlannerTableEnvironmentImpl extends CustomTableEnvironmentImpl {
44+
45+
private static final Logger log = LoggerFactory.getLogger(PlannerTableEnvironmentImpl.class);
46+
47+
public PlannerTableEnvironmentImpl(
48+
CatalogManager catalogManager,
49+
ModuleManager moduleManager,
50+
FunctionCatalog functionCatalog,
51+
TableConfig tableConfig,
52+
StreamExecutionEnvironment executionEnvironment,
53+
Planner planner,
54+
Executor executor,
55+
boolean isStreamingMode,
56+
ClassLoader userClassLoader) {
57+
super(
58+
catalogManager,
59+
moduleManager,
60+
functionCatalog,
61+
tableConfig,
62+
executionEnvironment,
63+
planner,
64+
executor,
65+
isStreamingMode,
66+
userClassLoader);
67+
}
68+
69+
public static PlannerTableEnvironmentImpl create(
70+
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
71+
return create(
72+
executionEnvironment, EnvironmentSettings.newInstance().build(), TableConfig.getDefault(), classLoader);
73+
}
74+
75+
public static PlannerTableEnvironmentImpl createBatch(
76+
StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
77+
Configuration configuration = new Configuration();
78+
configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
79+
TableConfig tableConfig = new TableConfig();
80+
tableConfig.addConfiguration(configuration);
81+
return create(executionEnvironment, EnvironmentSettings.inBatchMode(), tableConfig, classLoader);
82+
}
83+
84+
public static PlannerTableEnvironmentImpl create(
85+
StreamExecutionEnvironment executionEnvironment,
86+
EnvironmentSettings settings,
87+
TableConfig tableConfig,
88+
ClassLoader classLoader) {
89+
90+
// temporary solution until FLINK-15635 is fixed
91+
// final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
92+
93+
final ModuleManager moduleManager = new ModuleManager();
94+
95+
final CatalogManager catalogManager = CatalogManager.newBuilder()
96+
.classLoader(classLoader)
97+
.config(tableConfig.getConfiguration())
98+
.defaultCatalog(
99+
settings.getBuiltInCatalogName(),
100+
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()))
101+
.executionConfig(executionEnvironment.getConfig())
102+
.build();
103+
104+
final FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
105+
106+
final Executor executor = lookupExecutor(classLoader, settings.getExecutor(), executionEnvironment);
107+
108+
final Planner planner = PlannerFactoryUtil.createPlanner(
109+
settings.getPlanner(), executor, tableConfig, catalogManager, functionCatalog);
110+
111+
return new PlannerTableEnvironmentImpl(
112+
catalogManager,
113+
moduleManager,
114+
functionCatalog,
115+
tableConfig,
116+
executionEnvironment,
117+
planner,
118+
executor,
119+
settings.isStreamingMode(),
120+
classLoader);
121+
}
122+
}

dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/sql/SqlSelect.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ private SqlNode addPermission(SqlNode where, String tableName, String tableAlias
132132
if (permissionsMap != null) {
133133
String permissionsStatement = permissionsMap.get(tableName);
134134
if (permissionsStatement != null && !"".equals(permissionsStatement)) {
135-
permissions = (SqlBasicCall)
136-
CustomTableEnvironmentContext.get().getParser().parseExpression(permissionsStatement);
135+
permissions = (SqlBasicCall) CustomTableEnvironmentContext.get().parseSql(permissionsStatement);
137136
}
138137
}
139138

0 commit comments

Comments
 (0)