File tree Expand file tree Collapse file tree
2.0/src/main/java/org/apache/beam/runners/flink
src/main/java/org/apache/beam/runners/flink Expand file tree Collapse file tree Original file line number Diff line number Diff line change 3636import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .net .HostAndPort ;
3737import org .apache .flink .api .common .ExecutionConfig ;
3838import org .apache .flink .api .common .RuntimeExecutionMode ;
39+ import org .apache .flink .api .common .serialization .SerializerConfigImpl ;
3940import org .apache .flink .configuration .CheckpointingOptions ;
4041import org .apache .flink .configuration .Configuration ;
4142import org .apache .flink .configuration .CoreOptions ;
@@ -270,6 +271,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
270271 flinkStreamEnv .getConfig ().setAutoWatermarkInterval (options .getAutoWatermarkInterval ());
271272 }
272273 configureWebUIOptions (flinkStreamEnv .getConfig (), options .as (PipelineOptions .class ));
274+ configureCustomKryoSerializers (flinkStreamEnv .getConfig ());
273275
274276 return flinkStreamEnv ;
275277 }
@@ -294,6 +296,14 @@ private static void configureWebUIOptions(
294296 }
295297 }
296298
299+ private static void configureCustomKryoSerializers (ExecutionConfig config ) {
300+ SerializerConfigImpl serializerConfig = (SerializerConfigImpl ) config .getSerializerConfig ();
301+ // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap
302+ serializerConfig .registerTypeWithKryoSerializer (
303+ org .apache .beam .sdk .schemas .Schema .class ,
304+ com .esotericsoftware .kryo .serializers .JavaSerializer .class );
305+ }
306+
297307 private static class GlobalJobParametersImpl extends ExecutionConfig .GlobalJobParameters {
298308 private final Map <String , String > jobOptions ;
299309
Original file line number Diff line number Diff line change @@ -230,6 +230,13 @@ dependencies {
230230 // configuration (https://issues.apache.org/jira/browse/BEAM-11732).
231231 permitUnusedDeclared " org.apache.flink:flink-clients:$flink_version "
232232
233+ // align with Flink's kryo version
234+ def kryo_version = ' 2.24.0'
235+ if (flink_major. startsWith(' 2' )) {
236+ kryo_version = ' 5.6.2'
237+ }
238+ implementation " com.esotericsoftware.kryo:kryo:$kryo_version "
239+
233240 implementation " org.apache.flink:flink-streaming-java:$flink_version "
234241 testImplementation " org.apache.flink:flink-statebackend-rocksdb:$flink_version "
235242 testImplementation " org.apache.flink:flink-streaming-java:$flink_version :tests"
Original file line number Diff line number Diff line change @@ -298,6 +298,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
298298 configureStateBackend (options , flinkStreamEnv );
299299
300300 configureWebUIOptions (flinkStreamEnv .getConfig (), options .as (PipelineOptions .class ));
301+ configureCustomKryoSerializers (flinkStreamEnv .getConfig ());
301302
302303 return flinkStreamEnv ;
303304 }
@@ -322,6 +323,13 @@ private static void configureWebUIOptions(
322323 }
323324 }
324325
326+ private static void configureCustomKryoSerializers (ExecutionConfig config ) {
327+ // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap
328+ config .registerTypeWithKryoSerializer (
329+ org .apache .beam .sdk .schemas .Schema .class ,
330+ com .esotericsoftware .kryo .serializers .JavaSerializer .class );
331+ }
332+
325333 private static class GlobalJobParametersImpl extends ExecutionConfig .GlobalJobParameters {
326334 private final Map <String , String > jobOptions ;
327335
Original file line number Diff line number Diff line change @@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() {
7979task flinkValidatesRunner {
8080 group = " Verification"
8181
82- // TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved
83- def flinkVersion = ' 1.20'
82+ def flinkVersion = project. ext. latestFlinkVersion
8483
8584 dependsOn " :sdks:go:test:goBuild"
8685 dependsOn " :sdks:go:container:docker"
You can’t perform that action at this time.
0 commit comments