Skip to content

Commit df04506

Browse files
committed
Go VR Flink test on Flink 2.0
1 parent 357862a commit df04506

6 files changed

Lines changed: 49 additions & 15 deletions

File tree

.github/trigger_files/beam_PostCommit_Go_VR_Flink.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
33
"modification": 2,
44
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
5-
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
65
}

runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,13 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
271271
}
272272
configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class));
273273

274+
// Register Kryo serializer for ImmutableBiMap to fix serialization issues in Flink 2
275+
((org.apache.flink.api.common.serialization.SerializerConfigImpl)
276+
flinkStreamEnv.getConfig().getSerializerConfig())
277+
.registerTypeWithKryoSerializer(
278+
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.class,
279+
org.apache.beam.runners.flink.translation.types.ImmutableMapSerializer.class);
280+
274281
return flinkStreamEnv;
275282
}
276283

sdks/go/container/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ docker {
3333
name containerImageName(
3434
name: project.docker_image_default_repo_prefix + "go_sdk",
3535
root: project.rootProject.hasProperty(["docker-repository-root"]) ?
36-
project.rootProject["docker-repository-root"] :
37-
project.docker_image_default_repo_root)
36+
project.rootProject["docker-repository-root"] :
37+
project.docker_image_default_repo_root,
38+
tag: project.rootProject.hasProperty(["docker-tag"]) ?
39+
project.rootProject["docker-tag"] : project.sdk_version)
3840
// tags used by dockerTag task
3941
tags containerImageTags()
4042
files "./build/"

sdks/go/run_with_go_version.sh

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,19 @@ if [ -z "$GOCMD" ] ; then
8383
# The download command isn't concurrency safe so we get an exclusive lock, without wait.
8484
# If we're first, we ensure the command is downloaded, releasing the lock afterwards.
8585
# This operation is cached on system and won't be re-downloaded at least.
86-
flock --exclusive --nonblock --conflict-exit-code 0 $LOCKFILE $GOBIN/$GOVERS download
86+
if command -v flock &> /dev/null; then
87+
flock --exclusive --nonblock --conflict-exit-code 0 $LOCKFILE $GOBIN/$GOVERS download
8788

88-
# Execute the script with the remaining arguments.
89-
# We get a shared lock for the ordinary go command execution.
90-
echo $GOBIN/$GOVERS $@
91-
flock --shared --timeout=10 $LOCKFILE $GOBIN/$GOVERS $@
89+
# Execute the script with the remaining arguments.
90+
# We get a shared lock for the ordinary go command execution.
91+
echo $GOBIN/$GOVERS $@
92+
flock --shared --timeout=10 $LOCKFILE $GOBIN/$GOVERS $@
93+
else
94+
echo "WARNING: flock not found, running without lock."
95+
$GOBIN/$GOVERS download
96+
echo $GOBIN/$GOVERS $@
97+
$GOBIN/$GOVERS $@
98+
fi
9299
else
93100
# Minor TODO: Figure out if we can pull out the GOCMD env variable after goPrepare
94101
# completion, and avoid this brittle GOBIN substitution.

sdks/go/test/build.gradle

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ task dataflowValidatesRunnerARM64() {
7979
task flinkValidatesRunner {
8080
group = "Verification"
8181

82-
// TODO(https://github.com/apache/beam/issues/37600) use project.ext.latestFlinkVersion after resolved
83-
def flinkVersion = '1.20'
82+
def flinkVersion = project.ext.latestFlinkVersion
8483

8584
dependsOn ":sdks:go:test:goBuild"
8685
dependsOn ":sdks:go:container:docker"
@@ -105,6 +104,7 @@ task flinkValidatesRunner {
105104
"--runner flink",
106105
"--flink_job_server_jar ${project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath}",
107106
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
107+
"--prebuild_go_docker_tag ${project(":sdks:go:container").containerImageTags().first()}",
108108
]
109109
exec {
110110
if (fork_java_home) {
@@ -133,6 +133,7 @@ task samzaValidatesRunner {
133133
"--runner samza",
134134
"--samza_job_server_jar ${project(":runners:samza:job-server").shadowJar.archivePath}",
135135
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
136+
"--prebuild_go_docker_tag ${project(":sdks:go:container").containerImageTags().first()}",
136137
]
137138
exec {
138139
if (fork_java_home) {
@@ -150,6 +151,7 @@ task sparkValidatesRunner {
150151
group = "Verification"
151152

152153
dependsOn ":sdks:go:test:goBuild"
154+
dependsOn ":sdks:go:container:docker"
153155
dependsOn ":sdks:java:container:${project.ext.currentJavaVersion}:docker"
154156
dependsOn ":runners:spark:3:job-server:shadowJar"
155157
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
@@ -161,6 +163,7 @@ task sparkValidatesRunner {
161163
"--runner spark",
162164
"--spark_job_server_jar ${project(":runners:spark:3:job-server").shadowJar.archivePath}",
163165
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
166+
"--prebuild_go_docker_tag ${project(":sdks:go:container").containerImageTags().first()}",
164167
]
165168
exec {
166169
if (fork_java_home) {
@@ -195,6 +198,7 @@ tasks.register("ulrValidatesRunner") {
195198
def options = [
196199
"--runner portable",
197200
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
201+
"--prebuild_go_docker_tag ${project(":sdks:go:container").containerImageTags().first()}",
198202
]
199203
exec {
200204
executable "sh"
@@ -227,6 +231,7 @@ task prismValidatesRunner {
227231
def options = [
228232
"--runner prism",
229233
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
234+
"--prebuild_go_docker_tag ${project(":sdks:go:container").containerImageTags().first()}",
230235
]
231236
exec {
232237
if (fork_java_home) {
@@ -265,11 +270,12 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
265270
"--expansion_jar=debeziumio:${debeziumIoExpJar}",
266271
"--expansion_jar=gcpio:${gcpIoExpJar}",
267272
"--bq_dataset=apache-beam-testing.beam_bigquery_io_test_temp",
268-
"--bt_instance=projects/apache-beam-testing/instances/beam-test"
273+
"--bt_instance=projects/apache-beam-testing/instances/beam-test",
269274
]
270275
pipelineOptions.addAll(pipelineOpts)
271276
def options = [
272277
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
278+
"--prebuild_go_docker_tag ${project(":sdks:go:container").containerImageTags().first()}",
273279
]
274280
options.addAll(scriptOpts)
275281
logger.info("Running the command: sh -c ./run_validatesrunner_tests.sh ${options.join(' ')}")

sdks/go/test/run_validatesrunner_tests.sh

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
# --tests -> A space-seperated list of targets for "go test", written with
3333
# beam/sdks/go as the working directory. Defaults to all packages in the
3434
# integration and regression directories.
35+
# --run -> To select which tests to run
3536
# --timeout -> Timeout for the go test command, on a per-package level.
3637
# --simultaneous -> Number of simultaneous packages to test.
3738
# Controls the -p flag for the go test command.
@@ -129,6 +130,11 @@ case $key in
129130
shift # past argument
130131
shift # past value
131132
;;
133+
--run)
134+
TEST_RUN="$2"
135+
shift # past argument
136+
shift # past value
137+
;;
132138
--timeout)
133139
TIMEOUT="$2"
134140
shift # past argument
@@ -229,8 +235,8 @@ case $key in
229235
shift # past argument
230236
shift # past value
231237
;;
232-
--java11_home)
233-
JAVA11_HOME="$2"
238+
--prebuild_go_docker_tag)
239+
PREBUILD_GO_DOCKER_TAG="$2"
234240
shift # past argument
235241
shift # past value
236242
;;
@@ -427,8 +433,12 @@ if [[ "$RUNNER" == "dataflow" ]]; then
427433
fi
428434
fi
429435
else
430-
TAG=dev
431-
./gradlew :sdks:go:container:docker -Pdocker-tag=$TAG
436+
if [[ -n {PREBUILD_GO_DOCKER_TAG} ]]; then
437+
TAG=$PREBUILD_GO_DOCKER_TAG
438+
else
439+
TAG=dev
440+
./gradlew :sdks:go:container:docker -Pdocker-tag=$TAG
441+
fi
432442
CONTAINER=apache/beam_go_sdk
433443
fi
434444

@@ -447,6 +457,9 @@ ARGS="$ARGS --environment_config=$CONTAINER:$TAG"
447457
ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test/$GCS_SUBFOLDER"
448458
ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test/$GCS_SUBFOLDER"
449459
ARGS="$ARGS --endpoint=$ENDPOINT"
460+
if [[ -n "$TEST_RUN" ]]; then
461+
ARGS="$ARGS -run $TEST_RUN"
462+
fi
450463
if [[ -n "$TEST_EXPANSION_ADDR" ]]; then
451464
ARGS="$ARGS --test_expansion_addr=$TEST_EXPANSION_ADDR"
452465
fi

0 commit comments

Comments
 (0)