Skip to content

Commit 8483c33

Browse files
committed
Go VR Flink test on Flink 2.0
1 parent 9caf1a6 commit 8483c33

5 files changed

Lines changed: 27 additions & 2 deletions

File tree

.github/trigger_files/beam_PostCommit_Go_VR_Flink.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
33
"modification": 2,
44
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
5+
"pr": "37640"
56
}

runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
3737
import org.apache.flink.api.common.ExecutionConfig;
3838
import org.apache.flink.api.common.RuntimeExecutionMode;
39+
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
3940
import org.apache.flink.configuration.CheckpointingOptions;
4041
import org.apache.flink.configuration.Configuration;
4142
import org.apache.flink.configuration.CoreOptions;
@@ -270,6 +271,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
270271
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
271272
}
272273
configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class));
274+
configureCustomKryoSerializers(flinkStreamEnv.getConfig());
273275

274276
return flinkStreamEnv;
275277
}
@@ -294,6 +296,14 @@ private static void configureWebUIOptions(
294296
}
295297
}
296298

299+
private static void configureCustomKryoSerializers(ExecutionConfig config) {
300+
SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig();
301+
// Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap
302+
serializerConfig.registerTypeWithKryoSerializer(
303+
org.apache.beam.sdk.schemas.Schema.class,
304+
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
305+
}
306+
297307
private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters {
298308
private final Map<String, String> jobOptions;
299309

runners/flink/flink_runner.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,13 @@ dependencies {
230230
// configuration (https://issues.apache.org/jira/browse/BEAM-11732).
231231
permitUnusedDeclared "org.apache.flink:flink-clients:$flink_version"
232232

233+
// align with Flink's kryo version (groupId changed)
234+
if (flink_major.startsWith('2')) {
235+
implementation "com.esotericsoftware:kryo:5.6.2"
236+
} else {
237+
implementation "com.esotericsoftware.kryo:kryo:2.24.0"
238+
}
239+
233240
implementation "org.apache.flink:flink-streaming-java:$flink_version"
234241
testImplementation "org.apache.flink:flink-statebackend-rocksdb:$flink_version"
235242
testImplementation "org.apache.flink:flink-streaming-java:$flink_version:tests"

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
298298
configureStateBackend(options, flinkStreamEnv);
299299

300300
configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class));
301+
configureCustomKryoSerializers(flinkStreamEnv.getConfig());
301302

302303
return flinkStreamEnv;
303304
}
@@ -322,6 +323,13 @@ private static void configureWebUIOptions(
322323
}
323324
}
324325

326+
private static void configureCustomKryoSerializers(ExecutionConfig config) {
327+
// Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap
328+
config.registerTypeWithKryoSerializer(
329+
org.apache.beam.sdk.schemas.Schema.class,
330+
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
331+
}
332+
325333
private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters {
326334
private final Map<String, String> jobOptions;
327335

sdks/go/test/build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() {
7979
task flinkValidatesRunner {
8080
group = "Verification"
8181

82-
// TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved
83-
def flinkVersion = '1.20'
82+
def flinkVersion = project.ext.latestFlinkVersion
8483

8584
dependsOn ":sdks:go:test:goBuild"
8685
dependsOn ":sdks:go:container:docker"

0 commit comments

Comments
 (0)