diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json index ed3d846bc7b0..939c43396fda 100644 --- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json @@ -2,4 +2,5 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 2, "https://github.com/apache/beam/pull/32440": "testing datastream optimizations", + "pr": "37640" } diff --git a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 0d48526e1d0a..4a1c1cb0c5e1 100644 --- a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -36,6 +36,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -270,6 +271,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval()); } configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + configureCustomKryoSerializers(flinkStreamEnv.getConfig()); return flinkStreamEnv; } @@ -294,6 +296,14 @@ private static void configureWebUIOptions( } } + private static void configureCustomKryoSerializers(ExecutionConfig config) { + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); + // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap + serializerConfig.registerTypeWithKryoSerializer( + org.apache.beam.sdk.schemas.Schema.class, + com.esotericsoftware.kryo.serializers.JavaSerializer.class); + } + private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters { private final Map jobOptions; diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 98e7d547b4d3..837561ec71b7 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -230,6 +230,13 @@ dependencies { // configuration (https://issues.apache.org/jira/browse/BEAM-11732). permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version" + // align with Flink's kryo version (groupId changed) + if (flink_major.startsWith('2')) { + implementation "com.esotericsoftware:kryo:5.6.2" + } else { + implementation "com.esotericsoftware.kryo:kryo:2.24.0" + } + implementation "org.apache.flink:flink-streaming-java:$flink_version" testImplementation "org.apache.flink:flink-statebackend-rocksdb:$flink_version" testImplementation "org.apache.flink:flink-streaming-java:$flink_version:tests" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index a0e5908cc99d..de92dd94605a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -298,6 +298,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( configureStateBackend(options, flinkStreamEnv); configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + configureCustomKryoSerializers(flinkStreamEnv.getConfig()); return flinkStreamEnv; } @@ -322,6 +323,13 @@ private static void configureWebUIOptions( } } + private static void configureCustomKryoSerializers(ExecutionConfig config) { + // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap + config.registerTypeWithKryoSerializer( + org.apache.beam.sdk.schemas.Schema.class, + com.esotericsoftware.kryo.serializers.JavaSerializer.class); + } + private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters { private final Map jobOptions; diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 9302f25f8fce..11cf8edf2af6 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() { task flinkValidatesRunner { group = "Verification" - // TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved - def flinkVersion = '1.20' + def flinkVersion = project.ext.latestFlinkVersion dependsOn ":sdks:go:test:goBuild" dependsOn ":sdks:go:container:docker"