Skip to content

Commit 52a596f

Browse files
committed
Register Beam Schema with JavaSerializer
1 parent df04506 commit 52a596f

1 file changed

Lines changed: 11 additions & 7 deletions

File tree

runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
3737
import org.apache.flink.api.common.ExecutionConfig;
3838
import org.apache.flink.api.common.RuntimeExecutionMode;
39+
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
3940
import org.apache.flink.configuration.CheckpointingOptions;
4041
import org.apache.flink.configuration.Configuration;
4142
import org.apache.flink.configuration.CoreOptions;
@@ -270,13 +271,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
270271
flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
271272
}
272273
configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class));
273-
274-
// Register Kryo serializer for ImmutableBiMap to fix serialization issues in Flink 2
275-
((org.apache.flink.api.common.serialization.SerializerConfigImpl)
276-
flinkStreamEnv.getConfig().getSerializerConfig())
277-
.registerTypeWithKryoSerializer(
278-
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.class,
279-
org.apache.beam.runners.flink.translation.types.ImmutableMapSerializer.class);
274+
configureCustomKryoSerializers(flinkStreamEnv.getConfig());
280275

281276
return flinkStreamEnv;
282277
}
@@ -301,6 +296,15 @@ private static void configureWebUIOptions(
301296
}
302297
}
303298

299+
// Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap in Flink 2
300+
// TODO:
301+
private static void configureCustomKryoSerializers(ExecutionConfig config) {
302+
SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig();
303+
serializerConfig.registerTypeWithKryoSerializer(
304+
org.apache.beam.sdk.schemas.Schema.class,
305+
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
306+
}
307+
304308
private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters {
305309
private final Map<String, String> jobOptions;
306310

0 commit comments

Comments
 (0)