From 42cf92d8dea4e1718d5a4f5f07e5a4e04a424cf5 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 24 Apr 2026 11:46:17 -0400 Subject: [PATCH] Go VR Flink test on Flink 2.0 --- .github/trigger_files/beam_PostCommit_Go_VR_Flink.json | 1 + .../beam/runners/flink/FlinkExecutionEnvironments.java | 10 ++++++++++ runners/flink/flink_runner.gradle | 7 +++++++ .../beam/runners/flink/FlinkExecutionEnvironments.java | 8 ++++++++ sdks/go/test/build.gradle | 3 +-- sdks/go/test/integration/integration.go | 2 +- 6 files changed, 28 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json index ed3d846bc7b0..939c43396fda 100644 --- a/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Go_VR_Flink.json @@ -2,4 +2,5 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "modification": 2, "https://github.com/apache/beam/pull/32440": "testing datastream optimizations", + "pr": "37640" } diff --git a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 0d48526e1d0a..4a1c1cb0c5e1 100644 --- a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -36,6 +36,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -270,6 +271,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval()); } configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + configureCustomKryoSerializers(flinkStreamEnv.getConfig()); return flinkStreamEnv; } @@ -294,6 +296,14 @@ private static void configureWebUIOptions( } } + private static void configureCustomKryoSerializers(ExecutionConfig config) { + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); + // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap + serializerConfig.registerTypeWithKryoSerializer( + org.apache.beam.sdk.schemas.Schema.class, + com.esotericsoftware.kryo.serializers.JavaSerializer.class); + } + private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters { private final Map jobOptions; diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 98e7d547b4d3..837561ec71b7 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -230,6 +230,13 @@ dependencies { // configuration (https://issues.apache.org/jira/browse/BEAM-11732). permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version" + // align with Flink's kryo version (groupId changed) + if (flink_major.startsWith('2')) { + implementation "com.esotericsoftware:kryo:5.6.2" + } else { + implementation "com.esotericsoftware.kryo:kryo:2.24.0" + } + implementation "org.apache.flink:flink-streaming-java:$flink_version" testImplementation "org.apache.flink:flink-statebackend-rocksdb:$flink_version" testImplementation "org.apache.flink:flink-streaming-java:$flink_version:tests" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index a0e5908cc99d..de92dd94605a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -298,6 +298,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( configureStateBackend(options, flinkStreamEnv); configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + configureCustomKryoSerializers(flinkStreamEnv.getConfig()); return flinkStreamEnv; } @@ -322,6 +323,13 @@ private static void configureWebUIOptions( } } + private static void configureCustomKryoSerializers(ExecutionConfig config) { + // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap + config.registerTypeWithKryoSerializer( + org.apache.beam.sdk.schemas.Schema.class, + com.esotericsoftware.kryo.serializers.JavaSerializer.class); + } + private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters { private final Map jobOptions; diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 9302f25f8fce..11cf8edf2af6 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() { task flinkValidatesRunner { group = "Verification" - // TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved - def flinkVersion = '1.20' + def flinkVersion = project.ext.latestFlinkVersion dependsOn ":sdks:go:test:goBuild" dependsOn ":sdks:go:container:docker" diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 9d04f08de4fb..0c73b0843df6 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -199,7 +199,7 @@ var flinkFilters = []string{ "TestTestStreamToGBK", "TestTestStreamTimersEventTime", - "TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup)) + "TestTimers_EventTime_WithNoOutputTimestamp", // Encounter error: TimestampCombiner moved element from TIMESTAMP_MAX_VALUE to earlier time (end of global window) for window GlobalWindow "TestTimers_ProcessingTime.*", // Flink doesn't support processing time timers. // no support for BundleFinalizer