Skip to content

Commit b6477ed

Browse files
mxmstevenzwu
authored andcommitted
Flink: Add support for Flink 2.0
1 parent f5489ae commit b6477ed

72 files changed

Lines changed: 371 additions & 235 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/flink-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ jobs:
7373
strategy:
7474
matrix:
7575
jvm: [11, 17, 21]
76-
flink: ['1.18', '1.19', '1.20']
76+
flink: ['1.19', '1.20', '2.0']
7777
env:
7878
SPARK_LOCAL_IP: localhost
7979
steps:

dev/stage-binaries.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#
2020

2121
SCALA_VERSION=2.12
22-
FLINK_VERSIONS=1.18,1.19,1.20
22+
FLINK_VERSIONS=1.19,1.20,2.0
2323
SPARK_VERSIONS=3.4,3.5
2424
KAFKA_VERSIONS=3
2525

flink/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ if (flinkVersions.contains("1.19")) {
2929

3030
if (flinkVersions.contains("1.20")) {
3131
apply from: file("$projectDir/v1.20/build.gradle")
32+
}
33+
34+
if (flinkVersions.contains("2.0")) {
35+
apply from: file("$projectDir/v2.0/build.gradle")
3236
}

flink/v2.0/build.gradle

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
String flinkMajorVersion = '1.20'
20+
String flinkMajorVersion = '2.0'
2121
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
2222

2323
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
@@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
3232
implementation project(':iceberg-parquet')
3333
implementation project(':iceberg-hive-metastore')
3434

35-
compileOnly libs.flink120.avro
35+
compileOnly libs.flink20.avro
3636
// for dropwizard histogram metrics implementation
37-
compileOnly libs.flink120.metrics.dropwizard
38-
compileOnly libs.flink120.streaming.java
39-
compileOnly "${libs.flink120.streaming.java.get().module}:${libs.flink120.streaming.java.get().getVersion()}:tests"
40-
compileOnly libs.flink120.table.api.java.bridge
41-
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"
42-
compileOnly libs.flink120.connector.base
43-
compileOnly libs.flink120.connector.files
37+
compileOnly libs.flink20.metrics.dropwizard
38+
compileOnly libs.flink20.streaming.java
39+
compileOnly "${libs.flink20.streaming.java.get().module}:${libs.flink20.streaming.java.get().getVersion()}:tests"
40+
compileOnly libs.flink20.table.api.java.bridge
41+
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink20.get()}"
42+
compileOnly libs.flink20.connector.base
43+
compileOnly libs.flink20.connector.files
4444

4545
compileOnly libs.hadoop3.hdfs
4646
compileOnly libs.hadoop3.common
@@ -68,13 +68,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
6868

6969
implementation libs.datasketches
7070

71-
testImplementation libs.flink120.connector.test.utils
72-
testImplementation libs.flink120.core
73-
testImplementation libs.flink120.runtime
74-
testImplementation(libs.flink120.test.utilsjunit) {
71+
testImplementation libs.flink20.connector.test.utils
72+
testImplementation libs.flink20.core
73+
testImplementation libs.flink20.runtime
74+
testImplementation(libs.flink20.test.utilsjunit) {
7575
exclude group: 'junit'
7676
}
77-
testImplementation(libs.flink120.test.utils) {
77+
testImplementation(libs.flink20.test.utils) {
7878
exclude group: "org.apache.curator", module: 'curator-test'
7979
exclude group: 'junit'
8080
}
@@ -169,7 +169,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
169169
}
170170

171171
// for dropwizard histogram metrics implementation
172-
implementation libs.flink120.metrics.dropwizard
172+
implementation libs.flink20.metrics.dropwizard
173173

174174
// for integration testing with the flink-runtime-jar
175175
// all of those dependencies are required because the integration test extends FlinkTestBase
@@ -179,13 +179,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
179179
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
180180
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
181181
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
182-
integrationImplementation(libs.flink120.test.utils) {
182+
integrationImplementation(libs.flink20.test.utils) {
183183
exclude group: "org.apache.curator", module: 'curator-test'
184184
exclude group: 'junit'
185185
}
186186

187-
integrationImplementation libs.flink120.table.api.java.bridge
188-
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"
187+
integrationImplementation libs.flink20.table.api.java.bridge
188+
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink20.get()}"
189189

190190
integrationImplementation libs.hadoop3.common
191191
integrationImplementation libs.hadoop3.hdfs
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.flink.table.api.runtime.types;
20+
21+
/**
22+
* Override Flink's internal FlinkScalaKryoInstantiator to avoid loading the Scala extensions for
23+
* the KryoSerializer. This is a workaround until Kryo-related issues with the Scala extensions are
24+
* fixed. See: https://issues.apache.org/jira/browse/FLINK-37546
25+
*/
26+
public class FlinkScalaKryoInstantiator {}

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Set;
2929
import java.util.stream.Collectors;
3030
import org.apache.flink.annotation.Internal;
31-
import org.apache.flink.table.api.TableSchema;
3231
import org.apache.flink.table.catalog.AbstractCatalog;
3332
import org.apache.flink.table.catalog.CatalogBaseTable;
3433
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -37,7 +36,6 @@
3736
import org.apache.flink.table.catalog.CatalogPartition;
3837
import org.apache.flink.table.catalog.CatalogPartitionSpec;
3938
import org.apache.flink.table.catalog.CatalogTable;
40-
import org.apache.flink.table.catalog.CatalogTableImpl;
4139
import org.apache.flink.table.catalog.ObjectPath;
4240
import org.apache.flink.table.catalog.ResolvedCatalogTable;
4341
import org.apache.flink.table.catalog.TableChange;
@@ -53,6 +51,7 @@
5351
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
5452
import org.apache.flink.table.expressions.Expression;
5553
import org.apache.flink.table.factories.Factory;
54+
import org.apache.flink.table.legacy.api.TableSchema;
5655
import org.apache.flink.util.StringUtils;
5756
import org.apache.iceberg.CachingCatalog;
5857
import org.apache.iceberg.DataFile;
@@ -680,7 +679,11 @@ static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> pro
680679
// CatalogTableImpl to copy a new catalog table.
681680
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
682681
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
683-
return new CatalogTableImpl(schema, partitionKeys, props, null);
682+
return CatalogTable.newBuilder()
683+
.schema(schema.toSchema())
684+
.partitionKeys(partitionKeys)
685+
.options(props)
686+
.build();
684687
}
685688

686689
static CatalogTable toCatalogTable(Table table) {

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import java.net.URL;
2222
import java.nio.file.Files;
2323
import java.nio.file.Paths;
24-
import java.util.List;
2524
import java.util.Locale;
2625
import java.util.Map;
26+
import java.util.Set;
27+
import org.apache.flink.configuration.ConfigOption;
2728
import org.apache.flink.configuration.GlobalConfiguration;
2829
import org.apache.flink.runtime.util.HadoopUtils;
2930
import org.apache.flink.table.catalog.Catalog;
@@ -34,8 +35,7 @@
3435
import org.apache.iceberg.catalog.Namespace;
3536
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3637
import org.apache.iceberg.relocated.com.google.common.base.Strings;
37-
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
38-
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
38+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
3939
import org.apache.iceberg.util.PropertyUtil;
4040

4141
/**
@@ -60,6 +60,8 @@
6060
*/
6161
public class FlinkCatalogFactory implements CatalogFactory {
6262

63+
public static final String FACTORY_IDENTIFIER = "iceberg";
64+
6365
// Can not just use "type", it conflicts with CATALOG_TYPE.
6466
public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
6567
public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
@@ -72,8 +74,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
7274
public static final String DEFAULT_DATABASE_NAME = "default";
7375
public static final String DEFAULT_CATALOG_NAME = "default_catalog";
7476
public static final String BASE_NAMESPACE = "base-namespace";
75-
public static final String TYPE = "type";
76-
public static final String PROPERTY_VERSION = "property-version";
7777

7878
/**
7979
* Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink
@@ -122,21 +122,23 @@ static CatalogLoader createCatalogLoader(
122122
}
123123

124124
@Override
125-
public Map<String, String> requiredContext() {
126-
Map<String, String> context = Maps.newHashMap();
127-
context.put(TYPE, "iceberg");
128-
context.put(PROPERTY_VERSION, "1");
129-
return context;
125+
public String factoryIdentifier() {
126+
return FACTORY_IDENTIFIER;
127+
}
128+
129+
@Override
130+
public Set<ConfigOption<?>> requiredOptions() {
131+
return ImmutableSet.<ConfigOption<?>>builder().build();
130132
}
131133

132134
@Override
133-
public List<String> supportedProperties() {
134-
return ImmutableList.of("*");
135+
public Set<ConfigOption<?>> optionalOptions() {
136+
return ImmutableSet.<ConfigOption<?>>builder().build();
135137
}
136138

137139
@Override
138-
public Catalog createCatalog(String name, Map<String, String> properties) {
139-
return createCatalog(name, properties, clusterHadoopConf());
140+
public Catalog createCatalog(Context context) {
141+
return createCatalog(context.getName(), context.getOptions(), clusterHadoopConf());
140142
}
141143

142144
protected Catalog createCatalog(

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Set;
2424
import org.apache.flink.configuration.ConfigOption;
2525
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.table.api.TableSchema;
2726
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
2827
import org.apache.flink.table.catalog.ObjectIdentifier;
2928
import org.apache.flink.table.catalog.ObjectPath;
@@ -34,6 +33,7 @@
3433
import org.apache.flink.table.connector.source.DynamicTableSource;
3534
import org.apache.flink.table.factories.DynamicTableSinkFactory;
3635
import org.apache.flink.table.factories.DynamicTableSourceFactory;
36+
import org.apache.flink.table.legacy.api.TableSchema;
3737
import org.apache.flink.table.utils.TableSchemaUtils;
3838
import org.apache.flink.util.Preconditions;
3939
import org.apache.iceberg.catalog.TableIdentifier;
@@ -131,17 +131,16 @@ private static TableLoader createTableLoader(
131131

132132
mergedProps.forEach(flinkConf::setString);
133133

134-
String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME);
134+
String catalogName = flinkConf.get(FlinkCreateTableOptions.CATALOG_NAME);
135135
Preconditions.checkNotNull(
136136
catalogName,
137137
"Table property '%s' cannot be null",
138138
FlinkCreateTableOptions.CATALOG_NAME.key());
139139

140-
String catalogDatabase =
141-
flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
140+
String catalogDatabase = flinkConf.get(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
142141
Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");
143142

144-
String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName);
143+
String catalogTable = flinkConf.get(FlinkCreateTableOptions.CATALOG_TABLE, tableName);
145144
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");
146145

147146
org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import java.util.List;
2222
import java.util.Set;
2323
import org.apache.flink.table.api.DataTypes;
24-
import org.apache.flink.table.api.TableSchema;
2524
import org.apache.flink.table.catalog.Column;
2625
import org.apache.flink.table.catalog.ResolvedSchema;
26+
import org.apache.flink.table.legacy.api.TableSchema;
2727
import org.apache.flink.table.types.logical.LogicalType;
2828
import org.apache.flink.table.types.logical.RowType;
2929
import org.apache.flink.table.types.utils.TypeConversions;

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
import org.apache.flink.configuration.ReadableConfig;
2424
import org.apache.flink.streaming.api.datastream.DataStream;
2525
import org.apache.flink.streaming.api.datastream.DataStreamSink;
26-
import org.apache.flink.table.api.TableSchema;
27-
import org.apache.flink.table.api.constraints.UniqueConstraint;
2826
import org.apache.flink.table.connector.ChangelogMode;
2927
import org.apache.flink.table.connector.ProviderContext;
3028
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
3129
import org.apache.flink.table.connector.sink.DynamicTableSink;
3230
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
3331
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
3432
import org.apache.flink.table.data.RowData;
33+
import org.apache.flink.table.legacy.api.TableSchema;
34+
import org.apache.flink.table.legacy.api.constraints.UniqueConstraint;
3535
import org.apache.flink.types.RowKind;
3636
import org.apache.flink.util.Preconditions;
3737
import org.apache.iceberg.flink.sink.FlinkSink;

0 commit comments

Comments
 (0)