()
.eq(FlinkApplication::getTracking, 1)
- .notIn(FlinkApplication::getDeployMode, FlinkDeployMode.getKubernetesMode()))
- .stream()
- .filter(application -> distributedTaskService.isLocalProcessing(application.getId()))
- .collect(Collectors.toList());
-
+ .notIn(FlinkApplication::getDeployMode, FlinkDeployMode.getKubernetesMode()));
applications.forEach(app -> {
Long appId = app.getId();
WATCHING_APPS.put(appId, app);
@@ -210,7 +202,7 @@ public void doStop() {
*
* 2) Normal information obtain, once every 5 seconds
*/
- @Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}")
+ @Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
index dadea616a3..8abab9c0f3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java
@@ -103,7 +103,7 @@ private void init() {
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
}
- @Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}")
+ @Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
private void start() {
Long timeMillis = System.currentTimeMillis();
if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
index 14c896fcb4..5383629d7e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java
@@ -28,7 +28,6 @@
import org.apache.streampark.console.core.metrics.spark.Job;
import org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
-import org.apache.streampark.console.core.service.DistributedTaskService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
@@ -63,7 +62,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
@Slf4j
@Component
@@ -78,9 +76,6 @@ public class SparkAppHttpWatcher {
@Autowired
private SparkApplicationInfoService applicationInfoService;
- @Autowired
- private DistributedTaskService distributedTaskService;
-
@Autowired
private AlertService alertService;
@@ -134,10 +129,7 @@ public void init() {
List applications = applicationManageService.list(
new LambdaQueryWrapper()
.eq(SparkApplication::getTracking, 1)
- .ne(SparkApplication::getState, SparkAppStateEnum.LOST.getValue()))
- .stream()
- .filter(application -> distributedTaskService.isLocalProcessing(application.getId()))
- .collect(Collectors.toList());
+ .ne(SparkApplication::getState, SparkAppStateEnum.LOST.getValue()));
applications.forEach(app -> {
Long appId = app.getId();
@@ -161,7 +153,7 @@ public void doStop() {
*
* 2) Normal information obtain, once every 5 seconds
*/
- @Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}")
+ @Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
diff --git a/streampark-console/streampark-console-service/src/main/resources/config.yaml b/streampark-console/streampark-console-service/src/main/resources/config.yaml
index 7acf0e77aa..96f8426af4 100644
--- a/streampark-console/streampark-console-service/src/main/resources/config.yaml
+++ b/streampark-console/streampark-console-service/src/main/resources/config.yaml
@@ -64,8 +64,6 @@ streampark:
project:
# Number of projects allowed to be running at the same time , If there is no limit, -1 can be configured
max-build: 16
- # console heartbeat interval
- max-heartbeat-interval: 10s
# flink on yarn or spark on yarn, when the hadoop cluster enable kerberos authentication, it is necessary to set Kerberos authentication parameters.
security:
@@ -100,39 +98,3 @@ sso:
# Optional, change by authentication client
# Please replace and fill in your client config below when enabled SSO
-## flink catalog store config
-table:
- catalog-store:
- kind: jdbc
- jdbc:
- url: jdbc://mysql:127.0.0.1:3306/flink-test
- # The JDBC database url.
- table-name: t_flink_catalog
- ## catalog store table
- driver: com.mysql.cj.jdbc.Driver
- # The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.
- username: flinkuser
- # The JDBC user name. 'username' and 'password' must both be specified if any of them is specified.
- password: flinkpw
- # The JDBC password.
- max-retry-timeout: 600
-
-
-
-registry:
- type: jdbc
- heartbeat-refresh-interval: 1s
- session-timeout: 3s
-
-network:
- # network interface preferred like eth0, default: empty
- preferred-interface: ""
- # network interface restricted like docker0, default: empty
- restrict-interface: docker0
- # network IP gets priority, default inner outer
- priority-strategy: default
-
- # Add configurable job.state-watcher.fixed-delayed
-job:
- state-watcher:
- fixed-delayed: 1000
diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index b415785b20..3197caf56c 100644
--- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -487,7 +487,6 @@ create table if not exists `t_access_token` (
primary key(`id`)
);
-
-- ----------------------------
-- Table of t_external_link definition
-- ----------------------------
@@ -502,7 +501,6 @@ create table if not exists `t_external_link` (
primary key(`id`)
);
-
-- ----------------------------
-- table structure for t_yarn_queue
-- ----------------------------
@@ -518,21 +516,6 @@ create table if not exists `t_yarn_queue` (
primary key (`id`)
);
--- ----------------------------
--- Table structure for t_flink_catalog
--- ----------------------------
-create table if not exists t_flink_catalog (
- `id` BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
- `team_id` bigint not null,
- `user_id` bigint default null,
- `catalog_type` varchar(255) not NULL,
- `catalog_name` VARCHAR(255) NOT NULL,
- `configuration` text,
- `create_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
- `update_time` TIMESTAMP WITHOUT TIME ZONE DEFAULT NULL,
- CONSTRAINT uniq_catalog_name UNIQUE (`catalog_name`)
-);
-
-- ----------------------------
-- Table structure for t_spark_env
-- ----------------------------
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DatabaseServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DatabaseServiceTest.java
deleted file mode 100644
index 99bf17ba4e..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DatabaseServiceTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.common.enums.CatalogType;
-import org.apache.streampark.console.base.exception.ApiAlertException;
-import org.apache.streampark.console.core.bean.DatabaseParam;
-import org.apache.streampark.console.core.bean.FlinkCatalogParams;
-import org.apache.streampark.console.core.entity.FlinkCatalog;
-import org.apache.streampark.console.core.service.container.MysqlBaseITCASE;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class DatabaseServiceTest extends MysqlBaseITCASE {
-
- @Autowired
- private FlinkCatalogService catalogService;
-
- @Autowired
- private DatabaseService databaseService;
-
- private FlinkCatalogParams flinkCatalog = new FlinkCatalogParams();
-
- @BeforeEach
- public void setup() {
- flinkCatalog.setCatalogName("flink-test");
- flinkCatalog.setTeamId(1L);
- flinkCatalog.setUserId(1L);
- flinkCatalog.setCatalogType(CatalogType.JDBC);
- FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
- new FlinkCatalogParams.FlinkJDBCCatalog();
- flinkJDBCCatalog.setBaseUrl(MYSQL_CONTAINER.getJdbcUrl());
- flinkJDBCCatalog.setType(CatalogType.JDBC.name().toLowerCase());
- flinkJDBCCatalog.setDefaultDatabase(MYSQL_CONTAINER.getDatabaseName());
- flinkJDBCCatalog.setUsername(MYSQL_CONTAINER.getUsername());
- flinkJDBCCatalog.setPassword(MYSQL_CONTAINER.getPassword());
- flinkCatalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
- catalogService.create(flinkCatalog, 1L);
- FlinkCatalog fc = catalogService.getCatalog("flink-test");
- flinkCatalog.setId(fc.getId());
- }
-
- @AfterEach
- void destroy() {
- FlinkCatalog fc = catalogService.getCatalog("flink-test");
- catalogService.remove(flinkCatalog.getId());
- }
-
- // @Test
- public void testDatabaseExists_Positive() {
- DatabaseParam dbParam = new DatabaseParam();
- dbParam.setCatalogId(flinkCatalog.getId());
- dbParam.setCatalogName(flinkCatalog.getCatalogName());
- dbParam.setName(MYSQL_CONTAINER.getDatabaseName());
-
- assertTrue(databaseService.databaseExists(dbParam));
- }
-
- // @Test
- public void testCreateDatabase_Negative() {
- DatabaseParam dbParam = new DatabaseParam();
- dbParam.setCatalogId(flinkCatalog.getId());
- dbParam.setName("new_db");
- dbParam.setIgnoreIfExits(false);
-
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> {
- databaseService.createDatabase(dbParam);
- });
- assertNull(exception.getMessage());
- }
-
- // @Test
- public void testListDatabases_Positive() {
- List databaseParamList = databaseService.listDatabases(flinkCatalog.getId());
- assertNotNull(databaseParamList);
- }
-
- // @Test
- public void testListDatabases_Negative_NoDatabases() {
-
- Exception exception =
- assertThrows(
- ApiAlertException.class,
- () -> {
- databaseService.listDatabases(null);
- });
-
- assertEquals(
- "The catalog can't be null. get catalog from database failed.", exception.getMessage());
- }
-
- // @Test
- public void testDropDatabase_Positive() {
- DatabaseParam dbParam = new DatabaseParam();
- dbParam.setCatalogId(flinkCatalog.getId());
- dbParam.setCatalogName(flinkCatalog.getCatalogName());
- dbParam.setName(MYSQL_CONTAINER.getDatabaseName());
- dbParam.setCascade(true);
- dbParam.setIgnoreIfExits(true);
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> {
- databaseService.dropDatabase(dbParam);
- });
- assertEquals(null, exception.getMessage());
- }
-
- // @Test
- public void testDropDatabase_Negative_NullDatabaseName() {
- DatabaseParam dbParam = new DatabaseParam();
- dbParam.setCatalogId(1L);
- dbParam.setName(null);
-
- Exception exception =
- assertThrows(
- ApiAlertException.class,
- () -> {
- databaseService.dropDatabase(dbParam);
- });
-
- assertEquals("Database name can not be null.", exception.getMessage());
- }
-}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
deleted file mode 100644
index 9149101b0e..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.console.core.bean.FlinkTaskItem;
-import org.apache.streampark.console.core.bean.SparkTaskItem;
-import org.apache.streampark.console.core.entity.DistributedTask;
-import org.apache.streampark.console.core.entity.FlinkApplication;
-import org.apache.streampark.console.core.entity.SparkApplication;
-import org.apache.streampark.console.core.enums.DistributedTaskEnum;
-import org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl;
-
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-@Slf4j
-class DistributedTaskServiceTest {
-
- private final DistributedTaskServiceImpl distributionTaskService = new DistributedTaskServiceImpl();
-
- private final String serverId = "testServer";
- private final Set allServers = new HashSet<>(Collections.singleton(serverId));
-
- // the number of virtual nodes for each server
- private final int numberOfReplicas = 2 << 16;
-
- @Test
- void testInit() {
- distributionTaskService.init(allServers, serverId);
- assert (distributionTaskService.getConsistentHashSize() == numberOfReplicas);
- }
-
- @Test
- void testIsLocalProcessing() {
- distributionTaskService.init(allServers, serverId);
- for (long i = 0; i < numberOfReplicas; i++) {
- assert (distributionTaskService.isLocalProcessing(i));
- }
- }
-
- @Test
- void testFlinkTaskAndApp() {
- FlinkApplication application = new FlinkApplication();
- application.setId(0L);
- try {
- DistributedTask distributedTask =
- distributionTaskService.getDistributedTaskByFlinkApp(application, false, DistributedTaskEnum.START);
- FlinkTaskItem flinkTaskItem = distributionTaskService.getFlinkTaskItem(distributedTask);
- FlinkApplication newApplication = distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem);
- assert (application.equals(newApplication));
- } catch (Exception e) {
- log.error("testFlinkTaskAndApp failed:", e);
- }
- }
-
- @Test
- void testSparkTaskAndApp() {
- SparkApplication application = new SparkApplication();
- application.setId(0L);
- try {
- DistributedTask distributedTask =
- distributionTaskService.getDistributedTaskBySparkApp(application, false, DistributedTaskEnum.START);
- SparkTaskItem sparkTaskItem = distributionTaskService.getSparkTaskItem(distributedTask);
- SparkApplication newApplication = distributionTaskService.getAppBySparkTaskItem(sparkTaskItem);
- assert (application.equals(newApplication));
- } catch (Exception e) {
- log.error("testSparkTaskAndApp failed:", e);
- }
- }
-
-}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
deleted file mode 100644
index b6089adf93..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkCatalogServiceTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.common.enums.CatalogType;
-import org.apache.streampark.console.SpringUnitTestBase;
-import org.apache.streampark.console.base.domain.RestRequest;
-import org.apache.streampark.console.core.bean.FlinkCatalogParams;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** CatalogService Tests */
-public class FlinkCatalogServiceTest extends SpringUnitTestBase {
-
- @Autowired
- private FlinkCatalogService catalogService;
-
- @AfterEach
- void cleanTestRecordsInDatabase() {
- catalogService.remove(new QueryWrapper<>());
- }
-
- @Test
- @Order(1)
- public void create() {
- FlinkCatalogParams catalog = new FlinkCatalogParams();
- catalog.setTeamId(1L);
- catalog.setCatalogType(CatalogType.JDBC);
- catalog.setCatalogName("catalog-name");
- FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
- new FlinkCatalogParams.FlinkJDBCCatalog();
- flinkJDBCCatalog.setType("jdbc");
- flinkJDBCCatalog.setDefaultDatabase("aa");
- flinkJDBCCatalog.setPassword("11");
- flinkJDBCCatalog.setUsername("user");
- flinkJDBCCatalog.setBaseUrl("url");
- catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
-
- boolean create = catalogService.create(catalog, 1L);
- assertThat(create).isTrue();
- }
-
- @Test
- @Order(2)
- public void update() {
- FlinkCatalogParams catalog = new FlinkCatalogParams();
- catalog.setTeamId(1L);
- catalog.setCatalogType(CatalogType.JDBC);
- catalog.setCatalogName("catalog-name");
- FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
- new FlinkCatalogParams.FlinkJDBCCatalog();
- flinkJDBCCatalog.setType("jdbc");
- flinkJDBCCatalog.setDefaultDatabase("aa");
- flinkJDBCCatalog.setPassword("11");
- flinkJDBCCatalog.setUsername("user");
- flinkJDBCCatalog.setBaseUrl("url1");
- catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
- RestRequest request = new RestRequest();
- catalogService.create(catalog, 1L);
-
- IPage catalogIPage = catalogService.page(catalog, request);
- FlinkCatalogParams catalogs = catalogIPage.getRecords().get(0);
- catalogs.getFlinkJDBCCatalog().setBaseUrl("url2");
- catalogService.update(catalogs, 2L);
-
- IPage catalogResult = catalogService.page(catalog, request);
-
- assertThat(
- catalogResult.getRecords().get(0).getFlinkJDBCCatalog().getBaseUrl().contains("url2"))
- .isTrue();
- assertThat(catalogResult.getRecords().get(0).getUserId().equals(2L)).isTrue();
- assertThat(catalogResult.getRecords().get(0).getCatalogType().equals(CatalogType.JDBC))
- .isTrue();
- }
-
- @Test
- @Order(3)
- public void remove() {
- FlinkCatalogParams catalog = new FlinkCatalogParams();
- catalog.setTeamId(1L);
- catalog.setCatalogType(CatalogType.JDBC);
- catalog.setCatalogName("catalog-name");
- FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
- new FlinkCatalogParams.FlinkJDBCCatalog();
- flinkJDBCCatalog.setType("jdbc");
- flinkJDBCCatalog.setDefaultDatabase("aa");
- flinkJDBCCatalog.setPassword("11");
- flinkJDBCCatalog.setUsername("user");
- flinkJDBCCatalog.setBaseUrl("url");
- catalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
- catalogService.create(catalog, 1L);
- RestRequest request = new RestRequest();
- IPage catalogIPage = catalogService.page(catalog, request);
- boolean deleted = catalogService.remove(catalogIPage.getRecords().get(0).getId());
- assertThat(deleted).isTrue();
- }
-}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
deleted file mode 100644
index 66e3c73a7c..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.common.util.SystemPropertyUtils;
-import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class RegistryServiceTest {
-
- private final RegistryServiceImpl registryService = new RegistryServiceImpl();
-
- @Test
- public void testRegister() {
- if (enableHA()) {
- try {
- registryService.registry();
- } catch (Exception e) {
- Assertions.assertEquals(1, registryService.getCurrentNodes().size());
- registryService.unRegister();
- }
- }
- }
-
- public boolean enableHA() {
- return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
- }
-
-}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/TableServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/TableServiceTest.java
deleted file mode 100644
index 38bfeb3d48..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/TableServiceTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.common.enums.CatalogType;
-import org.apache.streampark.console.base.exception.ApiAlertException;
-import org.apache.streampark.console.core.bean.FlinkCatalogParams;
-import org.apache.streampark.console.core.bean.FlinkDataType;
-import org.apache.streampark.console.core.bean.TableColumn;
-import org.apache.streampark.console.core.bean.TableParams;
-import org.apache.streampark.console.core.entity.FlinkCatalog;
-import org.apache.streampark.console.core.service.container.MysqlBaseITCASE;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Order;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TableServiceTest extends MysqlBaseITCASE {
-
- @Autowired
- private FlinkCatalogService catalogService;
- @Autowired
- private TableService tableService;
- private FlinkCatalogParams flinkCatalog = new FlinkCatalogParams();
- private final String FLINK_TABLE = "flink_test";
- private final String FLINK_NEW_TABLE = "flink_test_new";
-
- @BeforeEach
- public void setUp() {
- flinkCatalog.setCatalogName("flink-test");
- flinkCatalog.setTeamId(1L);
- flinkCatalog.setUserId(1L);
- flinkCatalog.setCatalogType(CatalogType.JDBC);
- FlinkCatalogParams.FlinkJDBCCatalog flinkJDBCCatalog =
- new FlinkCatalogParams.FlinkJDBCCatalog();
- flinkJDBCCatalog.setBaseUrl(MYSQL_CONTAINER.getJdbcUrl());
- flinkJDBCCatalog.setType(CatalogType.JDBC.name().toLowerCase());
- flinkJDBCCatalog.setDefaultDatabase(MYSQL_CONTAINER.getDatabaseName());
- flinkJDBCCatalog.setUsername(MYSQL_CONTAINER.getUsername());
- flinkJDBCCatalog.setPassword(MYSQL_CONTAINER.getPassword());
- flinkCatalog.setFlinkJDBCCatalog(flinkJDBCCatalog);
- catalogService.create(flinkCatalog, 1L);
- FlinkCatalog fc = catalogService.getCatalog("flink-test");
- flinkCatalog.setId(fc.getId());
- }
-
- @AfterEach
- void destroy() {
- FlinkCatalog fc = catalogService.getCatalog("flink-test");
- catalogService.remove(flinkCatalog.getId());
- }
-
- // @Test
- @Order(1)
- public void testTableExists_Positive() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
-
- assertTrue(tableService.tableExists(tableParams));
- }
-
- // @Test
- @Order(2)
- public void testTableExists_Negative_CatalogNotFound() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(-100L);
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
-
- Exception exception =
- assertThrows(ApiAlertException.class, () -> tableService.tableExists(tableParams));
- assertEquals("Catalog is not exit.", exception.getMessage());
- }
-
- // @Test
- @Order(3)
- public void testCreateTable_Positive() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
- tableParams.setDescription("Test table");
- List partitionKeyList = new ArrayList();
- partitionKeyList.add("A");
- tableParams.setPartitionKey(partitionKeyList);
- Map optionMap = new HashMap<>();
- optionMap.put("A", "A");
- tableParams.setTableOptions(optionMap);
- FlinkDataType flinkDataType = new FlinkDataType("INT", true, 8, 16);
- tableParams.setTableColumns(
- Collections.singletonList(
- new TableColumn(1, "A", flinkDataType, "table test", true, "1", 1)));
-
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class, () -> tableService.createTable(tableParams));
- assertEquals(null, exception.getMessage());
- }
-
- // @Test
- @Order(4)
- public void testCreateTable_Negative_TableNameNull() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(null);
- tableParams.setDescription("Test table");
- FlinkDataType flinkDataType = new FlinkDataType("INT", true, 8, 16);
- tableParams.setTableColumns(
- Collections.singletonList(
- new TableColumn(1, "A", flinkDataType, "table test", false, "1", 1)));
-
- Exception exception =
- assertThrows(ApiAlertException.class, () -> tableService.createTable(tableParams));
- assertEquals("Table name can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(5)
- public void testAddColumn_Positive() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
- tableParams.setDescription("Test table");
- FlinkDataType flinkDataType = new FlinkDataType("INT", true, 8, 16);
- tableParams.setTableColumns(
- Collections.singletonList(
- new TableColumn(1, "B", flinkDataType, "table test", false, "1", 1)));
-
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class, () -> tableService.addColumn(tableParams));
- assertEquals(null, exception.getMessage());
- }
-
- // @Test
- @Order(6)
- public void testAddColumn_Negative_TableNameNull() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(null);
- tableParams.setDescription("Test table");
- FlinkDataType flinkDataType = new FlinkDataType("INT", true, 8, 16);
- tableParams.setTableColumns(
- Collections.singletonList(
- new TableColumn(1, "A", flinkDataType, "table test", false, "1", 1)));
-
- Exception exception =
- assertThrows(ApiAlertException.class, () -> tableService.addColumn(tableParams));
- assertEquals("Table name can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(7)
- public void testDropColumn() {
- String catalogName = flinkCatalog.getCatalogName();
- String databaseName = MYSQL_CONTAINER.getDatabaseName();
- String tableName = FLINK_TABLE;
- String columnName = "A";
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> tableService.dropColumn(catalogName, databaseName, tableName, columnName));
- assertEquals(null, exception.getMessage());
- }
-
- // @Test
- @Order(8)
- public void testDropColumn_Negative_TableNameNull() {
- String catalogName = "flink-test";
- String databaseName = "test_db";
- String columnName = "old_column";
-
- Exception exception =
- assertThrows(
- ApiAlertException.class,
- () -> tableService.dropColumn(catalogName, databaseName, null, columnName));
- assertEquals("Table name can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(9)
- public void testAddOption() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
- tableParams.setTableOptions(Collections.singletonMap("key", "value"));
-
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class, () -> tableService.addOption(tableParams));
- assertEquals(
- "Unsupported table change type: org.apache.flink.table.catalog.TableChange$SetOption",
- exception.getMessage());
- }
-
- // @Test
- @Order(10)
- public void testAddOption_Negative_OptionsNull() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
-
- Exception exception =
- assertThrows(ApiAlertException.class, () -> tableService.addOption(tableParams));
- assertEquals("Table options can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(12)
- public void testDropTable_Negative_TableNameNull() {
- String catalogName = "test_catalog";
- String databaseName = "test_db";
-
- Exception exception =
- assertThrows(
- ApiAlertException.class, () -> tableService.dropTable(catalogName, databaseName, null));
- assertEquals("Table name can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(13)
- public void testRenameTable() {
- String catalogName = flinkCatalog.getCatalogName();
- String databaseName = MYSQL_CONTAINER.getDatabaseName();
- Exception exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> tableService.renameTable(catalogName, databaseName, FLINK_TABLE, FLINK_NEW_TABLE));
- assertEquals(null, exception.getMessage());
- }
-
- // @Test
- @Order(14)
- public void testRenameTable_Negative_FromTableNameNull() {
- String catalogName = "test_catalog";
- String databaseName = "test_db";
- String toTableName = "new_table";
-
- Exception exception =
- assertThrows(
- ApiAlertException.class,
- () -> tableService.renameTable(catalogName, databaseName, null, toTableName));
- assertEquals("From table name can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(15)
- public void testListTables_Positive() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setCatalogName(flinkCatalog.getCatalogName());
- tableParams.setDatabaseName(MYSQL_CONTAINER.getDatabaseName());
- tableParams.setName(FLINK_TABLE);
-
- List result = tableService.listTables(tableParams);
- assertNotNull(result);
- assertEquals(1, result.size());
- }
-
- // @Test
- @Order(16)
- public void testListTables_Negative_DatabaseNameNull() {
- TableParams tableParams = new TableParams();
- tableParams.setCatalogId(flinkCatalog.getId());
- tableParams.setDatabaseName(null);
- tableParams.setName("test_table");
-
- Exception exception =
- assertThrows(ApiAlertException.class, () -> tableService.listTables(tableParams));
- assertEquals("Database name can not be null.", exception.getMessage());
- }
-
- // @Test
- @Order(17)
- public void testListColumns_Positive() {
- String catalogName = flinkCatalog.getCatalogName();
- String databaseName = MYSQL_CONTAINER.getDatabaseName();
- TableParams result = tableService.listColumns(catalogName, databaseName, FLINK_TABLE);
- assertNotNull(result);
- }
-
- // @Test
- @Order(17)
- public void testListColumns_Negative_DatabaseNameNull() {
- String catalogName = "test_catalog";
- String tableName = "test_table";
-
- Exception exception =
- assertThrows(
- ApiAlertException.class, () -> tableService.listColumns(catalogName, null, tableName));
- assertEquals("Database name can not be null.", exception.getMessage());
- }
-}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MySqlContainer.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MySqlContainer.java
deleted file mode 100644
index 0a29307c74..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MySqlContainer.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service.container;
-
-import org.testcontainers.containers.ContainerLaunchException;
-import org.testcontainers.containers.JdbcDatabaseContainer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class MySqlContainer extends JdbcDatabaseContainer {
-
- public static final String IMAGE = "mysql";
-
- public static final String MYSQL_VERSION = "8.0";
- public static final Integer MYSQL_PORT = 3306;
-
- private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
- private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
- private static final String MYSQL_ROOT_USER = "root";
-
- private String databaseName = "test";
- private String username = "test";
- private String password = "test";
-
- public MySqlContainer() {
- this(MYSQL_VERSION);
- }
-
- public MySqlContainer(String version) {
- super(DockerImageName.parse(IMAGE + ":" + version));
- addExposedPort(MYSQL_PORT);
- }
-
- @Override
- protected Set getLivenessCheckPorts() {
- return new HashSet<>(getMappedPort(MYSQL_PORT));
- }
-
- @Override
- protected void configure() {
- optionallyMapResourceParameterAsVolume(
- MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");
-
- if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
- optionallyMapResourceParameterAsVolume(
- SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
- }
-
- addEnv("MYSQL_DATABASE", databaseName);
- addEnv("MYSQL_USER", username);
- if (password != null && !password.isEmpty()) {
- addEnv("MYSQL_PASSWORD", password);
- addEnv("MYSQL_ROOT_PASSWORD", password);
- } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
- addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
- } else {
- throw new ContainerLaunchException("Empty password can be used only with the root user");
- }
- setStartupAttempts(3);
- }
-
- @Override
- public String getDriverClassName() {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- return "com.mysql.cj.jdbc.Driver";
- } catch (ClassNotFoundException e) {
- return "com.mysql.jdbc.Driver";
- }
- }
-
- public String getJdbcUrl(String databaseName) {
- return "jdbc:mysql://" + getHost() + ":" + getDatabasePort();
- }
-
- @Override
- public String getJdbcUrl() {
- return getJdbcUrl(databaseName);
- }
-
- public int getDatabasePort() {
- return getMappedPort(MYSQL_PORT);
- }
-
- @Override
- protected String constructUrlForConnection(String queryString) {
- String url = super.constructUrlForConnection(queryString);
-
- if (!url.contains("useSSL=")) {
- String separator = url.contains("?") ? "&" : "?";
- url = url + separator + "useSSL=false";
- }
-
- if (!url.contains("allowPublicKeyRetrieval=")) {
- url = url + "&allowPublicKeyRetrieval=true";
- }
-
- return url;
- }
-
- @Override
- public String getDatabaseName() {
- return databaseName;
- }
-
- @Override
- public String getUsername() {
- return username;
- }
-
- @Override
- public String getPassword() {
- return password;
- }
-
- @Override
- protected String getTestQueryString() {
- return "SELECT 1";
- }
-
- @SuppressWarnings("unchecked")
- public MySqlContainer withConfigurationOverride(String s) {
- parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
- return this;
- }
-
- @SuppressWarnings("unchecked")
- public MySqlContainer withSetupSQL(String sqlPath) {
- parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
- return this;
- }
-
- @Override
- public MySqlContainer withDatabaseName(final String databaseName) {
- this.databaseName = databaseName;
- return this;
- }
-
- @Override
- public MySqlContainer withUsername(final String username) {
- this.username = username;
- return this;
- }
-
- @Override
- public MySqlContainer withPassword(final String password) {
- this.password = password;
- return this;
- }
-}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MysqlBaseITCASE.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MysqlBaseITCASE.java
deleted file mode 100644
index f00bfa1f87..0000000000
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MysqlBaseITCASE.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service.container;
-
-import org.apache.streampark.console.SpringUnitTestBase;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.util.stream.Stream;
-
-public class MysqlBaseITCASE extends SpringUnitTestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(MysqlBaseITCASE.class);
- protected static final MySqlContainer MYSQL_CONTAINER =
- createMySqlContainer("docker/server/my.cnf");
-
- @BeforeAll
- public static void startContainers() {
- LOG.info("Starting containers...");
- Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
- LOG.info("Containers are started.");
- }
-
- @AfterAll
- public static void stopContainers() {
- LOG.info("Stopping containers...");
- if (MYSQL_CONTAINER != null) {
- MYSQL_CONTAINER.stop();
- }
- LOG.info("Containers are stopped.");
- }
-
- protected static MySqlContainer createMySqlContainer(String configPath) {
- return (MySqlContainer) new MySqlContainer(MySqlContainer.MYSQL_VERSION)
- .withConfigurationOverride(configPath)
- .withSetupSQL("docker/setup.sql")
- .withDatabaseName("flink-test")
- .withUsername("flinkuser")
- .withPassword("flinkpw")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
- }
-}
diff --git a/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml b/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
deleted file mode 100644
index e66514e536..0000000000
--- a/streampark-console/streampark-console-service/src/test/resources/application-integration-test.yml
+++ /dev/null
@@ -1,61 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-logging:
- level:
- root: info
-
-spring:
- datasource:
- driver-class-name: org.h2.Driver
- url: jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript from 'classpath:db/schema-h2.sql'
- username: sa
- password: sa
- sql:
- init:
- data-locations: classpath:db/data-h2.sql
- continue-on-error: true
- username: sa
- password: sa
- mode: always
-server:
- port: 6666
-
-streampark:
- workspace:
- local: /tmp
- # remote: hdfs://hdfscluster/streampark
- # hadoop-user-name: root
-
-job:
- state-watcher:
- fixed-delayed: 1000
-
-table:
- catalog-store:
- kind: jdbc
- jdbc:
- url: jdbc://mysql:127.0.0.1:3306/flink-test
- # The JDBC database url.
- table-name: t_flink_catalog
- ## catalog store table
- driver: com.mysql.cj.jdbc.Driver
- # The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.
- username: flinkuser
- # The JDBC user name. 'username' and 'password' must both be specified if any of them is specified.
- password: flinkpw
- # The JDBC password.
- max-retry-timeout: 600
diff --git a/streampark-console/streampark-console-service/src/test/resources/application-test.yml b/streampark-console/streampark-console-service/src/test/resources/application-test.yml
index 9b3d1e1ce3..d702d2adb3 100644
--- a/streampark-console/streampark-console-service/src/test/resources/application-test.yml
+++ b/streampark-console/streampark-console-service/src/test/resources/application-test.yml
@@ -35,41 +35,3 @@ spring:
username: sa
password: sa
mode: always
-## flink catalog store config
-table:
- catalog-store:
- kind: jdbc
- jdbc:
- url: jdbc://mysql:127.0.0.1:3306/flink-test
- # The JDBC database url.
- table-name: t_flink_catalog
- ## catalog store table
- driver: com.mysql.cj.jdbc.Driver
- # The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.
- username: flinkuser
- # The JDBC user name. 'username' and 'password' must both be specified if any of them is specified.
- password: flinkpw
- # The JDBC password.
- max-retry-timeout: 600
-
-streampark:
- # console heartbeat interval
- max-heartbeat-interval: 10s
-
-registry:
- # default using jdbc as registry
- type: jdbc
- heartbeat-refresh-interval: 1s
- session-timeout: 3s
-
-network:
- # network interface preferred like eth0, default: empty
- preferred-interface: ""
- # network interface restricted like docker0, default: empty
- restrict-interface: docker0
- # network IP gets priority, default inner outer
- priority-strategy: default
-
-job:
- state-watcher:
- fixed-delayed: 1000
diff --git a/streampark-console/streampark-console-service/src/test/resources/docker/server/my.cnf b/streampark-console/streampark-console-service/src/test/resources/docker/server/my.cnf
deleted file mode 100644
index 953ffe8f91..0000000000
--- a/streampark-console/streampark-console-service/src/test/resources/docker/server/my.cnf
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# For advice on how to change settings please see
-# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
-
-[mysqld]
-#
-# Remove leading # and set to the amount of RAM for the most important data
-# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
-# innodb_buffer_pool_size = 128M
-#
-# Remove leading # to turn on a very important data integrity option: logging
-# changes to the binary log between backups.
-# log_bin
-#
-# Remove leading # to set options mainly useful for reporting servers.
-# The server defaults are faster for transactions and fast SELECTs.
-# Adjust sizes as needed, experiment to find the optimal values.
-# join_buffer_size = 128M
-# sort_buffer_size = 2M
-# read_rnd_buffer_size = 2M
-skip-host-cache
-skip-name-resolve
-#datadir=/var/lib/mysql
-#socket=/var/lib/mysql/mysql.sock
-secure-file-priv=/var/lib/mysql
-user=mysql
-
-# Disabling symbolic-links is recommended to prevent assorted security risks
-symbolic-links=0
-
-#log-error=/var/log/mysqld.log
-#pid-file=/var/run/mysqld/mysqld.pid
-
-# ----------------------------------------------
-# Enable the binlog for replication & CDC
-# ----------------------------------------------
-
-# Enable binary replication log and set the prefix, expiration, and log format.
-# The prefix is arbitrary, expiration can be short for integration tests but would
-# be longer on a production system. Row-level info is required for ingest to work.
-# Server ID is required, but this will vary on production systems.
-server-id = 223344
-log_bin = mysql-bin
-binlog_format = row
-# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception
-# message when the binlog expires in the server.
-binlog_expire_logs_seconds = 1
-max_binlog_size = 4096
-
-# enable gtid mode
-gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
diff --git a/streampark-console/streampark-console-service/src/test/resources/docker/setup.sql b/streampark-console/streampark-console-service/src/test/resources/docker/setup.sql
deleted file mode 100644
index ee1bf53f50..0000000000
--- a/streampark-console/streampark-console-service/src/test/resources/docker/setup.sql
+++ /dev/null
@@ -1,41 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
--- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
--- However, in this database we'll grant 2 users different privileges:
---
--- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
--- 2) 'mysqluser' - all privileges
---
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
-GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-
--- ----------------------------------------------------------------------------------------------------------------
--- DATABASE: emptydb
--- ----------------------------------------------------------------------------------------------------------------
-CREATE DATABASE emptydb;
-USE flink-test;
-create table flink_test (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `catalog_name` varchar(255) NOT NULL,
- `configuration` text,
- `create_time` datetime DEFAULT NULL,
- `update_time` datetime DEFAULT NULL,
- PRIMARY KEY (`id`) USING BTREE,
- UNIQUE INDEX `uniq_catalog_name` (`catalog_name`) USING BTREE
-)
-
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 1543627153..44f95abaa9 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -37,8 +37,6 @@
streampark-flink-proxy
streampark-flink-packer
streampark-flink-kubernetes
- streampark-flink-catalog-store
- streampark-flink-connector-plugin
streampark-flink-cdcclient
diff --git a/streampark-flink/streampark-flink-catalog-store/pom.xml b/streampark-flink/streampark-flink-catalog-store/pom.xml
deleted file mode 100644
index 8105e29d08..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/pom.xml
+++ /dev/null
@@ -1,169 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache.streampark
- streampark-flink
- 2.2.0-SNAPSHOT
-
-
- streampark-flink-catalog-store
- StreamPark : Flink Catalog Store
-
-
- 19.0
- 2.15.3
-
-
-
-
- org.apache.flink
- flink-annotations
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-table-common
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-shaded-jackson
-
-
- org.apache.commons
- commons-compress
-
-
-
-
- org.apache.flink
- flink-table-api-java
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-shaded-jackson
-
-
- org.apache.commons
- commons-compress
-
-
-
-
- org.apache.flink
- flink-sql-gateway-api
- ${flink.version}
-
-
- org.apache.flink
- flink-shaded-jackson
-
-
-
-
- org.apache.flink
- flink-table-api-java
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-shaded-jackson
-
-
-
-
- org.apache.flink
- flink-table-api-java-uber
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-shaded-jackson
-
-
-
-
- org.apache.flink
- flink-shaded-jackson
- ${flink.shaded.jackson.version}-${flink.shaded.version19}
- provided
-
-
- mysql
- mysql-connector-java
- test
-
-
- org.assertj
- assertj-core
- test
-
-
- org.testcontainers
- mysql
- ${testcontainer.version}
- test
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
-
- shade
-
- package
-
- true
- ${project.basedir}/target/dependency-reduced-pom.xml
-
-
- org.apache.streampark:*
- org.apache.flink:flink-sql-gateway-api
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
-
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
deleted file mode 100644
index 11cb6906ef..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
-
-/** Serialization utils */
-public final class JacksonUtils {
-
- private JacksonUtils() {
- }
-
- private static final ObjectMapper MAPPER;
-
- static {
- MAPPER = new ObjectMapper();
- MAPPER.registerModule(new SimpleModule());
- MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
- MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
- }
-
- public static T read(String json, Class clazz) throws JsonProcessingException {
- return MAPPER.readValue(json, clazz);
- }
-
- public static T read(String json, TypeReference typeReference) throws JsonProcessingException {
- return MAPPER.readValue(json, typeReference);
- }
-
- public static String write(Object object) throws JsonProcessingException {
- return MAPPER.writeValueAsString(object);
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
deleted file mode 100644
index feeb76841a..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog;
-
-import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.table.catalog.AbstractCatalogStore;
-import org.apache.flink.table.catalog.CatalogDescriptor;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * Catalog Store for Jdbc.
- */
-public class JdbcCatalogStore extends AbstractCatalogStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogStore.class);
- private final JdbcConnectionProvider jdbcConnectionProvider;
- private transient Connection connection;
- private transient Statement statement;
- private transient ResultSet resultSet;
-
- private final String catalogTableName;
-
- public JdbcCatalogStore(JdbcConnectionProvider jdbcConnectionProvider, String catalogTableName) {
- this.jdbcConnectionProvider = jdbcConnectionProvider;
- this.catalogTableName = catalogTableName;
- }
-
- @Override
- public void open() {
- try {
- this.connection = jdbcConnectionProvider.getOrEstablishConnection();
- this.statement = this.connection.createStatement();
- super.open();
- } catch (SQLException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- closeResultSetAndStatement();
- try {
- jdbcConnectionProvider.close();
- } catch (Exception e) {
- throw new CatalogException(e);
- }
- super.close();
- }
-
- @Override
- public void storeCatalog(String catalogName, CatalogDescriptor catalogDescriptor) throws CatalogException {
- checkOpenState();
- try {
- if (!contains(catalogName)) {
- statement.executeUpdate(String.format(
- "insert into %s (catalog_name,configuration,create_time,update_time) values ('%s','%s',now(),now())",
- this.catalogTableName, catalogName,
- JacksonUtils.write(catalogDescriptor.getConfiguration().toMap())));
- } else {
- LOG.error("catalog {} is exist.", catalogName);
- }
- } catch (SQLException | JsonProcessingException e) {
- throw new CatalogException(String.format("Store catalog %s failed!", catalogName), e);
- }
- }
-
- @Override
- public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException {
- checkOpenState();
- try {
- int effectRow = statement.executeUpdate(
- String.format("delete from %s where catalog_name='%s'", this.catalogTableName, catalogName));
-
- if (effectRow == 0 && !ignoreIfNotExists) {
- throw new CatalogException(String.format("Remove catalog %s failed!", catalogName));
- }
- } catch (SQLException e) {
- LOG.error("Remove catalog {} failed!", catalogName, e);
- throw new CatalogException(String.format("Remove catalog %s failed!", catalogName));
- }
- }
-
- @Override
- public Optional getCatalog(String catalogName) throws CatalogException {
- checkOpenState();
- try {
- resultSet = statement
- .executeQuery(
- String.format("select * from %s where catalog_name='%s'", this.catalogTableName, catalogName));
- while (resultSet.next()) {
- return Optional.of(CatalogDescriptor.of(catalogName,
- Configuration.fromMap(JacksonUtils.read(resultSet.getString("configuration"), Map.class))));
- }
- } catch (SQLException | JsonProcessingException e) {
- throw new CatalogException(String.format("Get catalog %s failed!", catalogName), e);
- }
- return Optional.empty();
- }
-
- @Override
- public Set listCatalogs() throws CatalogException {
- checkOpenState();
- Set catalogs = new HashSet<>();
- try {
- resultSet = statement.executeQuery(String.format("select * from %s;", this.catalogTableName));
- while (resultSet.next()) {
- catalogs.add(resultSet.getString("catalog_name"));
- }
- return catalogs;
- } catch (SQLException e) {
- throw new CatalogException("List catalogs failed!", e);
- }
- }
-
- @Override
- public boolean contains(String catalogName) throws CatalogException {
- checkOpenState();
- try {
- resultSet = statement.executeQuery(
- String.format("select * from %s where catalog_name='%s';", this.catalogTableName, catalogName));
- while (resultSet.next()) {
- resultSet.getString("catalog_name");
- return true;
- }
- } catch (SQLException e) {
- throw new CatalogException(String.format("Catalog %s is contains failed!", catalogName), e);
- }
- return false;
- }
-
- private void closeResultSetAndStatement() {
- try {
- if (resultSet != null && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (statement != null && !statement.isClosed()) {
- statement.close();
- }
- resultSet = null;
- statement = null;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
deleted file mode 100644
index 38aa6e5c72..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog;
-
-import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
-import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
-import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.catalog.CatalogStore;
-import org.apache.flink.table.factories.CatalogStoreFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.DRIVER;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.IDENTIFIER;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.MAX_RETRY_TIMEOUT;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.PASSWORD;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.TABLE_NAME;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.URL;
-import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.USERNAME;
-
-/** Catalog Store Factory for Jdbc. */
-public class JdbcCatalogStoreFactory implements CatalogStoreFactory {
-
- private JdbcConnectionProvider jdbcConnectionProvider;
- private transient String catalogTableName;
-
- @Override
- public CatalogStore createCatalogStore() {
- return new JdbcCatalogStore(jdbcConnectionProvider, this.catalogTableName);
- }
-
- @Override
- public void open(Context context) {
- FactoryUtil.FactoryHelper factoryHelper =
- createCatalogStoreFactoryHelper(this, context);
- factoryHelper.validate();
-
- ReadableConfig options = factoryHelper.getOptions();
- JdbcConnectionOptions jdbcConnectionOptions =
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(options.get(URL))
- .withDriverName(options.get(DRIVER))
- .withUsername(options.get(USERNAME))
- .withPassword(options.get(PASSWORD))
- .withConnectionCheckTimeoutSeconds(options.get(MAX_RETRY_TIMEOUT))
- .build();
-
- this.catalogTableName = options.get(TABLE_NAME);
- this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectionOptions);
- }
-
- @Override
- public void close() {
- this.jdbcConnectionProvider.closeConnection();
- }
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public Set> requiredOptions() {
- Set> options = new HashSet<>();
- options.add(URL);
- options.add(DRIVER);
- options.add(USERNAME);
- options.add(PASSWORD);
- options.add(TABLE_NAME);
- return Collections.unmodifiableSet(options);
- }
-
- @Override
- public Set> optionalOptions() {
- Set> options = new HashSet<>();
- options.add(MAX_RETRY_TIMEOUT);
- return Collections.unmodifiableSet(options);
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
deleted file mode 100644
index 85a324bb51..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Catalog Store Options for Jdbc.
- */
-public class JdbcCatalogStoreFactoryOptions {
-
- public static final String IDENTIFIER = "jdbc";
-
- public static final ConfigOption URL =
- ConfigOptions.key("url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The JDBC database url.");
- public static final ConfigOption TABLE_NAME =
- ConfigOptions.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The name of JDBC table to connect.");
- public static final ConfigOption DRIVER =
- ConfigOptions.key("driver")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.");
- public static final ConfigOption USERNAME =
- ConfigOptions.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The JDBC user name. 'username' and 'password' must both be specified if any of them is specified.");
-
- public static final ConfigOption PASSWORD =
- ConfigOptions.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The JDBC password.");
-
- public static final ConfigOption MAX_RETRY_TIMEOUT =
- ConfigOptions.key("max-retry-timeout")
- .intType()
- .defaultValue(60)
- .withDescription(
- "Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second.");
-
- private JdbcCatalogStoreFactoryOptions() {
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
deleted file mode 100644
index 62f460d4e1..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog.connections;
-
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-
-public class JdbcConnectionOptions implements Serializable {
-
- public static final String USER_KEY = "user";
- public static final String PASSWORD_KEY = "password";
-
- private static final long serialVersionUID = 1L;
-
- protected final String url;
- @Nullable
- protected final String driverName;
- protected final int connectionCheckTimeoutSeconds;
- @Nonnull
- protected final Properties properties;
-
- protected JdbcConnectionOptions(
- String url,
- @Nullable String driverName,
- int connectionCheckTimeoutSeconds,
- @Nonnull Properties properties) {
- Preconditions.checkArgument(
- connectionCheckTimeoutSeconds > 0,
- "Connection check timeout seconds shouldn't be smaller than 1");
- this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
- this.driverName = driverName;
- this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
- this.properties =
- Preconditions.checkNotNull(properties, "Connection properties must be non-null");
- }
-
- public String getDbURL() {
- return url;
- }
-
- @Nullable
- public String getDriverName() {
- return driverName;
- }
-
- public Optional getUsername() {
- return Optional.ofNullable(properties.getProperty(USER_KEY));
- }
-
- public Optional getPassword() {
- return Optional.ofNullable(properties.getProperty(PASSWORD_KEY));
- }
-
- public int getConnectionCheckTimeoutSeconds() {
- return connectionCheckTimeoutSeconds;
- }
-
- @Nonnull
- public Properties getProperties() {
- return properties;
- }
-
- @Nonnull
- public static Properties getBriefAuthProperties(String user, String password) {
- final Properties result = new Properties();
- if (Objects.nonNull(user)) {
- result.put(USER_KEY, user);
- }
- if (Objects.nonNull(password)) {
- result.put(PASSWORD_KEY, password);
- }
- return result;
- }
-
- /** Builder for {@link JdbcConnectionOptions}. */
- public static class JdbcConnectionOptionsBuilder {
-
- private String url;
- private String driverName;
- private int connectionCheckTimeoutSeconds = 60;
- private final Properties properties = new Properties();
-
- public JdbcConnectionOptionsBuilder withUrl(String url) {
- this.url = url;
- return this;
- }
-
- public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
- this.driverName = driverName;
- return this;
- }
-
- public JdbcConnectionOptionsBuilder withProperty(String propKey, String propVal) {
- Preconditions.checkNotNull(propKey, "Connection property key mustn't be null");
- Preconditions.checkNotNull(propVal, "Connection property value mustn't be null");
- this.properties.put(propKey, propVal);
- return this;
- }
-
- public JdbcConnectionOptionsBuilder withUsername(String username) {
- if (Objects.nonNull(username)) {
- this.properties.put(USER_KEY, username);
- }
- return this;
- }
-
- public JdbcConnectionOptionsBuilder withPassword(String password) {
- if (Objects.nonNull(password)) {
- this.properties.put(PASSWORD_KEY, password);
- }
- return this;
- }
-
- /**
- * Set the maximum timeout between retries, default is 60 seconds.
- *
- * @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't smaller than 1
- * second.
- */
- public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(
- int connectionCheckTimeoutSeconds) {
- this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
- return this;
- }
-
- public JdbcConnectionOptions build() {
- return new JdbcConnectionOptions(
- url, driverName, connectionCheckTimeoutSeconds, properties);
- }
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
deleted file mode 100644
index c49cb1fc7d..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog.connections;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.Properties;
-
-/** JDBC connection provider. */
-@PublicEvolving
-public interface JdbcConnectionProvider extends Serializable, AutoCloseable {
-
- /**
- * Get existing connection.
- *
- * @return existing connection
- */
- @Nullable
- Connection getConnection();
-
- /**
- * Get existing connection properties.
- *
- * @return existing connection properties
- */
- @Nonnull
- default Properties getProperties() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Check whether possible existing connection is valid or not through {@link
- * Connection#isValid(int)}.
- *
- * @return true if existing connection is valid
- * @throws SQLException sql exception throw from {@link Connection#isValid(int)}
- */
- boolean isConnectionValid() throws SQLException;
-
- /**
- * Get existing connection or establish an new one if there is none.
- *
- * @return existing connection or newly established connection
- * @throws SQLException sql exception
- * @throws ClassNotFoundException driver class not found
- */
- Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
-
- /** Close possible existing connection. */
- void closeConnection();
-
- /**
- * Close possible existing connection and establish an new one.
- *
- * @return newly established connection
- * @throws SQLException sql exception
- * @throws ClassNotFoundException driver class not found
- */
- Connection reestablishConnection() throws SQLException, ClassNotFoundException;
-
- default void close() throws Exception {
- closeConnection();
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
deleted file mode 100644
index e78c0bfbe8..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog.connections;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Enumeration;
-import java.util.Properties;
-
-/** Simple JDBC connection provider. */
-@NotThreadSafe
-@PublicEvolving
-public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
-
- private static final long serialVersionUID = 1L;
-
- private final JdbcConnectionOptions jdbcOptions;
-
- private transient Driver loadedDriver;
- private transient Connection connection;
-
- static {
- // Load DriverManager first to avoid deadlock between DriverManager's
- // static initialization block and specific driver class's static
- // initialization block when two different driver classes are loading
- // concurrently using Class.forName while DriverManager is uninitialized
- // before.
- //
- // This could happen in JDK 8 but not above as driver loading has been
- // moved out of DriverManager's static initialization block since JDK 9.
- DriverManager.getDrivers();
- }
-
- public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
- this.jdbcOptions = jdbcOptions;
- }
-
- @Override
- public Connection getConnection() {
- return connection;
- }
-
- @Nonnull
- @Override
- public Properties getProperties() {
- return jdbcOptions.getProperties();
- }
-
- @Override
- public boolean isConnectionValid() throws SQLException {
- return connection != null
- && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
- }
-
- private Driver loadDriver(String driverName) throws SQLException, ClassNotFoundException {
- Preconditions.checkNotNull(driverName);
- Enumeration drivers = DriverManager.getDrivers();
- while (drivers.hasMoreElements()) {
- Driver driver = drivers.nextElement();
- if (driver.getClass().getName().equals(driverName)) {
- return driver;
- }
- }
- // We could reach here for reasons:
- // * Class loader hell of DriverManager(see JDK-8146872).
- // * driver is not installed as a service provider.
- Class> clazz =
- Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
- try {
- return (Driver) clazz.newInstance();
- } catch (Exception ex) {
- throw new SQLException("Fail to create driver of class " + driverName, ex);
- }
- }
-
- private Driver getLoadedDriver() throws SQLException, ClassNotFoundException {
- if (loadedDriver == null) {
- loadedDriver = loadDriver(jdbcOptions.getDriverName());
- }
- return loadedDriver;
- }
-
- @Override
- public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
- if (isConnectionValid()) {
- return connection;
- }
- if (jdbcOptions.getDriverName() == null) {
- connection = DriverManager.getConnection(jdbcOptions.getDbURL(), getProperties());
- } else {
- Driver driver = getLoadedDriver();
- connection = driver.connect(jdbcOptions.getDbURL(), getProperties());
- if (connection == null) {
- // Throw same exception as DriverManager.getConnection when no driver found to match
- // caller expectation.
- throw new SQLException(
- "No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
- }
- }
- return connection;
- }
-
- @Override
- public void closeConnection() {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.warn("JDBC connection close failed.", e);
- } finally {
- connection = null;
- }
- }
- }
-
- @Override
- public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
- closeConnection();
- return getOrEstablishConnection();
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
deleted file mode 100644
index 9bcdf4448d..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.streampark.catalog.JdbcCatalogStoreFactory
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
deleted file mode 100644
index 59842e1b4e..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog;
-
-import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
-
-import org.apache.flink.table.catalog.CatalogStore;
-import org.apache.flink.table.factories.CatalogStoreFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-
-public class JdbcCatalogStoreFactoryTest extends MysqlBaseITCASE {
-
- @org.junit.Test
- public void testCatalogStoreFactoryDiscovery() {
-
- String factoryIdentifier = JdbcCatalogStoreFactoryOptions.IDENTIFIER;
- Map options = new HashMap<>();
- options.put("url", MYSQL_CONTAINER.getJdbcUrl());
- options.put("table-name", "t_mysql_catalog");
- options.put("driver", MYSQL_CONTAINER.getDriverClassName());
- options.put("username", MYSQL_CONTAINER.getUsername());
- options.put("password", MYSQL_CONTAINER.getPassword());
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- final FactoryUtil.DefaultCatalogStoreContext discoveryContext =
- new FactoryUtil.DefaultCatalogStoreContext(options, null, classLoader);
- final CatalogStoreFactory factory =
- FactoryUtil.discoverFactory(
- classLoader, CatalogStoreFactory.class, factoryIdentifier);
- factory.open(discoveryContext);
-
- CatalogStore catalogStore = factory.createCatalogStore();
- assertThat(catalogStore instanceof JdbcCatalogStore).isTrue();
-
- factory.close();
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
deleted file mode 100644
index 6267688632..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog;
-
-import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
-import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
-import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
-import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.catalog.CatalogDescriptor;
-import org.apache.flink.table.catalog.CatalogStore;
-import org.apache.flink.table.catalog.CommonCatalogOptions;
-import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-
-import org.assertj.core.api.ThrowableAssert;
-
-import java.util.Set;
-
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
-
-public class JdbcCatalogStoreTest extends MysqlBaseITCASE {
-
- private static final String DUMMY = "dummy";
- private static final CatalogDescriptor DUMMY_CATALOG;
-
- static {
- Configuration conf = new Configuration();
- conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY);
- conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE, "dummy_db");
-
- DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf);
- }
- @org.junit.Test
- public void testNotOpened() {
- CatalogStore catalogStore = initCatalogStore();
-
- assertCatalogStoreNotOpened(catalogStore::listCatalogs);
- assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
- assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
- assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG));
- assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true));
- }
-
- @org.junit.Test
- public void testStore() {
- CatalogStore catalogStore = initCatalogStore();
- catalogStore.open();
-
- catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
-
- Set storedCatalogs = catalogStore.listCatalogs();
- assertThat(storedCatalogs.size()).isEqualTo(1);
- assertThat(storedCatalogs.contains(DUMMY)).isTrue();
- }
-
- @org.junit.Test
- public void testRemoveExisting() {
- CatalogStore catalogStore = initCatalogStore();
- catalogStore.open();
-
- catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
- assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
-
- catalogStore.removeCatalog(DUMMY, false);
- assertThat(catalogStore.listCatalogs().size()).isEqualTo(0);
- assertThat(catalogStore.contains(DUMMY)).isFalse();
- }
-
- @org.junit.Test
- public void testRemoveNonExisting() {
- CatalogStore catalogStore = initCatalogStore();
- catalogStore.open();
-
- catalogStore.removeCatalog(DUMMY, true);
-
- assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false))
- .isInstanceOf(CatalogException.class)
- .hasMessageContaining(
- "Remove catalog " + DUMMY + " failed!");
- }
-
- @org.junit.Test
- public void testClosed() {
- CatalogStore catalogStore = initCatalogStore();
-
- catalogStore.open();
-
- catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
- assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
-
- catalogStore.close();
-
- assertCatalogStoreNotOpened(catalogStore::listCatalogs);
- assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
- assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
- assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG));
- assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true));
- }
-
- private void assertCatalogStoreNotOpened(
- ThrowableAssert.ThrowingCallable shouldRaiseThrowable) {
- assertThatThrownBy(shouldRaiseThrowable)
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("CatalogStore is not opened yet.");
- }
-
- private JdbcCatalogStore initCatalogStore() {
- JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(MYSQL_CONTAINER.getJdbcUrl())
- .withDriverName(MYSQL_CONTAINER.getDriverClassName())
- .withUsername(MYSQL_CONTAINER.getUsername())
- .withPassword(MYSQL_CONTAINER.getPassword())
- .build();
-
- JdbcConnectionProvider jdbcConnectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectionOptions);
- return new JdbcCatalogStore(jdbcConnectionProvider, "t_mysql_catalog");
-
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
deleted file mode 100644
index 94a5c2f7e7..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog.mysql;
-
-import org.testcontainers.containers.ContainerLaunchException;
-import org.testcontainers.containers.JdbcDatabaseContainer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class MySqlContainer extends JdbcDatabaseContainer {
-
- public static final String IMAGE = "mysql";
-
- public static final String MYSQL_VERSION = "8.0";
- public static final Integer MYSQL_PORT = 3306;
-
- private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
- private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
- private static final String MYSQL_ROOT_USER = "root";
-
- private String databaseName = "test";
- private String username = "test";
- private String password = "test";
-
- public MySqlContainer() {
- this(MYSQL_VERSION);
- }
-
- public MySqlContainer(String version) {
- super(DockerImageName.parse(IMAGE + ":" + version));
- addExposedPort(MYSQL_PORT);
- }
-
- @Override
- protected Set getLivenessCheckPorts() {
- return new HashSet<>(getMappedPort(MYSQL_PORT));
- }
-
- @Override
- protected void configure() {
- optionallyMapResourceParameterAsVolume(
- MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", "mysql-default-conf");
-
- if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
- optionallyMapResourceParameterAsVolume(
- SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
- }
-
- addEnv("MYSQL_DATABASE", databaseName);
- addEnv("MYSQL_USER", username);
- if (password != null && !password.isEmpty()) {
- addEnv("MYSQL_PASSWORD", password);
- addEnv("MYSQL_ROOT_PASSWORD", password);
- } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
- addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
- } else {
- throw new ContainerLaunchException(
- "Empty password can be used only with the root user");
- }
- setStartupAttempts(3);
- }
-
- @Override
- public String getDriverClassName() {
- try {
- Class.forName("com.mysql.cj.jdbc.Driver");
- return "com.mysql.cj.jdbc.Driver";
- } catch (ClassNotFoundException e) {
- return "com.mysql.jdbc.Driver";
- }
- }
-
- public String getJdbcUrl(String databaseName) {
- String additionalUrlParams = constructUrlParameters("?", "&");
- return "jdbc:mysql://"
- + getHost()
- + ":"
- + getDatabasePort()
- + "/"
- + databaseName
- + additionalUrlParams;
- }
-
- @Override
- public String getJdbcUrl() {
- return getJdbcUrl(databaseName);
- }
-
- public int getDatabasePort() {
- return getMappedPort(MYSQL_PORT);
- }
-
- @Override
- protected String constructUrlForConnection(String queryString) {
- String url = super.constructUrlForConnection(queryString);
-
- if (!url.contains("useSSL=")) {
- String separator = url.contains("?") ? "&" : "?";
- url = url + separator + "useSSL=false";
- }
-
- if (!url.contains("allowPublicKeyRetrieval=")) {
- url = url + "&allowPublicKeyRetrieval=true";
- }
-
- return url;
- }
-
- @Override
- public String getDatabaseName() {
- return databaseName;
- }
-
- @Override
- public String getUsername() {
- return username;
- }
-
- @Override
- public String getPassword() {
- return password;
- }
-
- @Override
- protected String getTestQueryString() {
- return "SELECT 1";
- }
-
- @SuppressWarnings("unchecked")
- public MySqlContainer withConfigurationOverride(String s) {
- parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
- return this;
- }
-
- @SuppressWarnings("unchecked")
- public MySqlContainer withSetupSQL(String sqlPath) {
- parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
- return this;
- }
-
- @Override
- public MySqlContainer withDatabaseName(final String databaseName) {
- this.databaseName = databaseName;
- return this;
- }
-
- @Override
- public MySqlContainer withUsername(final String username) {
- this.username = username;
- return this;
- }
-
- @Override
- public MySqlContainer withPassword(final String password) {
- this.password = password;
- return this;
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
deleted file mode 100644
index 799335b96d..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.catalog.mysql;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.util.stream.Stream;
-
-public class MysqlBaseITCASE {
-
- private static final Logger LOG = LoggerFactory.getLogger(MysqlBaseITCASE.class);
- protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer("docker/server/my.cnf");
-
- @org.junit.BeforeClass
- public static void startContainers() {
- LOG.info("Starting containers...");
- Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
- LOG.info("Containers are started.");
- }
-
- @org.junit.AfterClass
- public static void stopContainers() {
- LOG.info("Stopping containers...");
- if (MYSQL_CONTAINER != null) {
- MYSQL_CONTAINER.stop();
- }
- LOG.info("Containers are stopped.");
- }
-
- protected static MySqlContainer createMySqlContainer(String configPath) {
- return (MySqlContainer) new MySqlContainer(MySqlContainer.MYSQL_VERSION)
- .withConfigurationOverride(configPath)
- .withSetupSQL("docker/setup.sql")
- .withDatabaseName("flink-test")
- .withUsername("flinkuser")
- .withPassword("flinkpw")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
- }
-}
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
deleted file mode 100644
index 953ffe8f91..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# For advice on how to change settings please see
-# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
-
-[mysqld]
-#
-# Remove leading # and set to the amount of RAM for the most important data
-# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
-# innodb_buffer_pool_size = 128M
-#
-# Remove leading # to turn on a very important data integrity option: logging
-# changes to the binary log between backups.
-# log_bin
-#
-# Remove leading # to set options mainly useful for reporting servers.
-# The server defaults are faster for transactions and fast SELECTs.
-# Adjust sizes as needed, experiment to find the optimal values.
-# join_buffer_size = 128M
-# sort_buffer_size = 2M
-# read_rnd_buffer_size = 2M
-skip-host-cache
-skip-name-resolve
-#datadir=/var/lib/mysql
-#socket=/var/lib/mysql/mysql.sock
-secure-file-priv=/var/lib/mysql
-user=mysql
-
-# Disabling symbolic-links is recommended to prevent assorted security risks
-symbolic-links=0
-
-#log-error=/var/log/mysqld.log
-#pid-file=/var/run/mysqld/mysqld.pid
-
-# ----------------------------------------------
-# Enable the binlog for replication & CDC
-# ----------------------------------------------
-
-# Enable binary replication log and set the prefix, expiration, and log format.
-# The prefix is arbitrary, expiration can be short for integration tests but would
-# be longer on a production system. Row-level info is required for ingest to work.
-# Server ID is required, but this will vary on production systems.
-server-id = 223344
-log_bin = mysql-bin
-binlog_format = row
-# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception
-# message when the binlog expires in the server.
-binlog_expire_logs_seconds = 1
-max_binlog_size = 4096
-
-# enable gtid mode
-gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
diff --git a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
deleted file mode 100644
index 1b22107deb..0000000000
--- a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
+++ /dev/null
@@ -1,41 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
--- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
--- However, in this database we'll grant 2 users different privileges:
---
--- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
--- 2) 'mysqluser' - all privileges
---
-GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
-CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
-GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-
--- ----------------------------------------------------------------------------------------------------------------
--- DATABASE: emptydb
--- ----------------------------------------------------------------------------------------------------------------
-CREATE DATABASE emptydb;
-USE flink-test;
-create table t_mysql_catalog (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `catalog_name` varchar(255) NOT NULL,
- `configuration` text,
- `create_time` datetime DEFAULT NULL,
- `update_time` datetime DEFAULT NULL,
- PRIMARY KEY (`id`) USING BTREE,
- UNIQUE INDEX `uniq_catalog_name` (`catalog_name`) USING BTREE
-)
-
diff --git a/streampark-flink/streampark-flink-connector-plugin/pom.xml b/streampark-flink/streampark-flink-connector-plugin/pom.xml
deleted file mode 100644
index 6a8a2702de..0000000000
--- a/streampark-flink/streampark-flink-connector-plugin/pom.xml
+++ /dev/null
@@ -1,96 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache.streampark
- streampark-flink
- 2.2.0-SNAPSHOT
-
-
- streampark-flink-connector-plugin
- StreamPark : Flink Connector Plugin
-
- 1.18.1
- 0.8.2
-
-
-
- org.apache.flink
- flink-connector-jdbc
- ${flink.connector.version}
-
-
- org.apache.flink
- flink-sql-connector-hive-2.3.9_${scala.binary.version}
- ${flink.version}
-
-
- org.apache.paimon
- paimon-flink-1.18
- ${paimon.version}
-
-
-
- org.apache.streampark
- streampark-flink-catalog-store
- ${project.version}
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
-
- shade
-
- package
-
- true
- ${project.basedir}/target/dependency-reduced-pom.xml
-
-
- org.apache.streampark:*
- org.apache.flink:*
- org.apache.paimon:*
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/streampark-flink/streampark-flink-connector-plugin/src/main/java/org/apache/streampark/package-info.java b/streampark-flink/streampark-flink-connector-plugin/src/main/java/org/apache/streampark/package-info.java
deleted file mode 100644
index 96017d83a3..0000000000
--- a/streampark-flink/streampark-flink-connector-plugin/src/main/java/org/apache/streampark/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides compilers for Java code.
- */
-package org.apache.streampark;
diff --git a/streampark-flink/streampark-flink-connector-plugin/src/test/java/org/apache/streampark/package-info.java b/streampark-flink/streampark-flink-connector-plugin/src/test/java/org/apache/streampark/package-info.java
deleted file mode 100644
index 96017d83a3..0000000000
--- a/streampark-flink/streampark-flink-connector-plugin/src/test/java/org/apache/streampark/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides compilers for Java code.
- */
-package org.apache.streampark;
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 093a11b2e0..21b96b41a0 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -88,7 +88,7 @@ object KubernetesRetriever extends Logger {
.getClusterClientFactory(flinkConfig)
.createClusterDescriptor(flinkConfig)
.asInstanceOf[KubernetesClusterDescriptor]
- .autoClose(clusterProvider =>
+ .using(clusterProvider =>
Try {
clusterProvider
.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
@@ -113,7 +113,7 @@ object KubernetesRetriever extends Logger {
KubernetesRetriever
.newK8sClient()
- .autoClose(client => {
+ .using(client => {
client
.apps()
.deployments()
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 2df17d6c06..a0421fc0da 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -35,7 +35,7 @@ object KubernetesDeploymentHelper extends Logger {
private[this] def getPods(nameSpace: String, deploymentName: String): List[Pod] = {
KubernetesRetriever
.newK8sClient()
- .autoClose(client => {
+ .using(client => {
Try {
client.pods
.inNamespace(nameSpace)
@@ -72,7 +72,7 @@ object KubernetesDeploymentHelper extends Logger {
private[this] def deleteDeployment(nameSpace: String, deploymentName: String): Unit = {
KubernetesRetriever
.newK8sClient()
- .autoClose(client => {
+ .using(client => {
val map = client.apps.deployments.inNamespace(nameSpace)
map.withLabel("app", deploymentName).delete
map.withName(deploymentName).delete()
@@ -82,7 +82,7 @@ object KubernetesDeploymentHelper extends Logger {
private[this] def deleteConfigMap(nameSpace: String, deploymentName: String): Unit = {
KubernetesRetriever
.newK8sClient()
- .autoClose(client => {
+ .using(client => {
val map = client.configMaps().inNamespace(nameSpace)
map.withLabel("app", deploymentName).delete
map.withName(deploymentName).delete()
@@ -106,7 +106,7 @@ object KubernetesDeploymentHelper extends Logger {
def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
KubernetesRetriever
.newK8sClient()
- .autoClose(client => {
+ .using(client => {
val path = KubernetesDeploymentHelper.getJobLog(jobId)
val file = new File(path)
val log = client.apps.deployments
@@ -121,7 +121,7 @@ object KubernetesDeploymentHelper extends Logger {
def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: String): String = {
KubernetesRetriever
.newK8sClient()
- .autoClose(client =>
+ .using(client =>
Try {
val podName = getPods(nameSpace, jobName).head.getMetadata.getName
val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index 25b11a6433..a39880ff41 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
-import org.apache.streampark.common.util.Implicits.AutoCloseImplicits
+import org.apache.streampark.common.util.Implicits._
import org.apache.streampark.common.util.Logger
import org.apache.flink.client.program.ClusterClient
@@ -27,7 +27,7 @@ object IngressController extends Logger {
private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
- private lazy val clusterVersion = new DefaultKubernetesClient().autoClose(client => {
+ private lazy val clusterVersion = new DefaultKubernetesClient().using(client => {
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
})
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 48f9d647b0..d4984949d7 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -31,7 +31,7 @@ class IngressStrategyV1 extends IngressStrategy {
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- new DefaultKubernetesClient().autoClose(client =>
+ new DefaultKubernetesClient().using(client =>
Try {
Option(
Try(
@@ -49,8 +49,8 @@ class IngressStrategyV1 extends IngressStrategy {
val newPath = Option(path).filter(_.nonEmpty).map(_.replaceAll("\\/+$", "")).getOrElse("")
s"http://$host$newPath"
}
- .getOrElse(clusterClient.autoClose(_.getWebInterfaceURL))
- case None => clusterClient.autoClose(_.getWebInterfaceURL)
+ .getOrElse(clusterClient.using(_.getWebInterfaceURL))
+ case None => clusterClient.using(_.getWebInterfaceURL)
}
} match {
case Success(value) => value
@@ -75,7 +75,7 @@ class IngressStrategyV1 extends IngressStrategy {
}
override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
- new DefaultKubernetesClient().autoClose(client => {
+ new DefaultKubernetesClient().using(client => {
val ownerReference = getOwnerReference(nameSpace, clusterId, client)
val ingressBackendRestServicePort =
touchIngressBackendRestPort(client, clusterId, nameSpace)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index f8d33d8a28..28bd547558 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -33,7 +33,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- new DefaultKubernetesClient().autoClose(client => {
+ new DefaultKubernetesClient().using(client => {
Try {
Option(
Try(
@@ -47,8 +47,8 @@ class IngressStrategyV1beta1 extends IngressStrategy {
.map(ingress => ingress.getSpec.getRules.head)
.map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath)
.map { case (host, path) => s"http://$host$path" }
- .getOrElse(clusterClient.autoClose(_.getWebInterfaceURL))
- case None => clusterClient.autoClose(_.getWebInterfaceURL)
+ .getOrElse(clusterClient.using(_.getWebInterfaceURL))
+ case None => clusterClient.using(_.getWebInterfaceURL)
}
} match {
case Success(value) => value
@@ -70,7 +70,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
}
override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = {
- new DefaultKubernetesClient().autoClose(client => {
+ new DefaultKubernetesClient().using(client => {
val ownerReference = getOwnerReference(nameSpace, clusterId, client)
val ingress = new IngressBuilder()
.withNewMetadata()
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 402ab1e964..3f5fc9f7d3 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -93,7 +93,7 @@ class FlinkYarnApplicationBuildPipeline(request: FlinkYarnApplicationBuildReques
val uploadFile =
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
if (fsOperator.exists(uploadFile)) {
- new FileInputStream(originFile).autoClose(inputStream => {
+ new FileInputStream(originFile).using(inputStream => {
if (DigestUtils.md5Hex(inputStream) != fsOperator.fileMd5(uploadFile)) {
fsOperator.upload(originFile.getAbsolutePath, uploadFile)
}
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkYarnBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkYarnBuildPipeline.scala
index 9daeab677e..2c92067b6f 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkYarnBuildPipeline.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkYarnBuildPipeline.scala
@@ -96,7 +96,7 @@ class SparkYarnBuildPipeline(request: SparkYarnBuildRequest)
val uploadFile =
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
if (fsOperator.exists(uploadFile)) {
- new FileInputStream(originFile).autoClose(inputStream => {
+ new FileInputStream(originFile).using(inputStream => {
if (DigestUtils.md5Hex(inputStream) != fsOperator.fileMd5(uploadFile)) {
fsOperator.upload(originFile.getAbsolutePath, uploadFile)
}
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index bbd96110b5..8d6c8cd12e 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -216,12 +216,12 @@ object FlinkShimsProxy extends Logger {
def getObject[T](loader: ClassLoader, obj: Object): T = {
val arrayOutputStream = new ByteArrayOutputStream
new ObjectOutputStream(arrayOutputStream)
- .autoClose(out => {
+ .using(out => {
out.writeObject(obj)
val byteArrayInputStream =
new ByteArrayInputStream(arrayOutputStream.toByteArray)
new ClassLoaderObjectInputStream(loader, byteArrayInputStream)
- .autoClose(_.readObject())
+ .using(_.readObject())
})
.asInstanceOf[T]
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
index 47a8451649..1ef0bf33d2 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
@@ -115,11 +115,6 @@
provided
-
- org.apache.streampark
- streampark-flink-catalog-store
- ${project.version}
-
@@ -139,7 +134,6 @@
org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}
- org.apache.streampark:streampark-flink-catalog-store
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
index 9fb04838c8..e0cceaf1e3 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml
@@ -114,12 +114,6 @@
${flink.version}
provided
-
-
- org.apache.streampark
- streampark-flink-catalog-store
- ${project.version}
-
@@ -139,7 +133,6 @@
org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}
- org.apache.streampark:streampark-flink-catalog-store
diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index 86268f8c7c..f0a7933f04 100644
--- a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++ b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -190,12 +190,12 @@ object SparkShimsProxy extends Logger {
def getObject[T](loader: ClassLoader, obj: Object): T = {
val arrayOutputStream = new ByteArrayOutputStream
new ObjectOutputStream(arrayOutputStream)
- .autoClose(objectOutputStream => {
+ .using(objectOutputStream => {
objectOutputStream.writeObject(obj)
val byteArrayInputStream =
new ByteArrayInputStream(arrayOutputStream.toByteArray)
new ClassLoaderObjectInputStream(loader, byteArrayInputStream)
- .autoClose(_.readObject())
+ .using(_.readObject())
})
.asInstanceOf[T]
}