Skip to content

Commit a96ec21

Browse files
author
Jiyong Wang
committed
[FLINK-40005][cdc-cli] CliExecutor application mode reads pipeline definition from local file path
In application mode, CliExecutor.main received args[0] as the pipeline definition file PATH but passed it to YamlPipelineDefinitionParser.parse(String, Configuration), which treats the String as YAML content. The path string itself was parsed as YAML and failed validation with: Missing required field "source" in top-level configuration. Read the file content with the local JVM file API (java.nio Files.readAllBytes) instead of Flink's FileSystem — the cluster default FileSystem is often S3 (checkpoints/HA on S3), which cannot resolve the local file shipped next to the JobManager — then parse the content. Regression introduced by #3643 (FLINK-35360, Support Yarn application mode for yaml job).
1 parent 504a4b4 commit a96ec21

2 files changed

Lines changed: 87 additions & 1 deletion

File tree

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535

3636
import org.apache.commons.cli.CommandLine;
3737

38+
import java.nio.charset.StandardCharsets;
39+
import java.nio.file.Files;
40+
import java.nio.file.Paths;
3841
import java.util.List;
3942

4043
/** Executor for doing the composing and submitting logic for {@link CliFrontend}. */
@@ -108,7 +111,18 @@ public PipelineExecution.ExecutionInfo deployWithNoOpComposer() throws Exception
108111
// The main class for running application mode
109112
public static void main(String[] args) throws Exception {
110113
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
111-
PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new Configuration());
114+
// args[0] is the pipeline definition FILE PATH. In application mode the deployment
115+
// executors set ApplicationConfiguration.APPLICATION_ARGS = commandLine.getArgList(), i.e.
116+
// the pipeline definition file path (shipped into the JobManager container). Read its
117+
// content with the local JVM file API instead of Flink's FileSystem: the cluster default
118+
// FileSystem may be S3, in which case FileSystem.get(localPath) would not find the local
119+
// file. Then parse the content via the String overload. Without this, the String overload
120+
// would treat the path itself as YAML content and fail with:
121+
// Missing required field "source" in top-level configuration.
122+
String pipelineDefContent =
123+
new String(Files.readAllBytes(Paths.get(args[0])), StandardCharsets.UTF_8);
124+
PipelineDef pipelineDef =
125+
pipelineDefinitionParser.parse(pipelineDefContent, new Configuration());
112126
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
113127
FlinkPipelineComposer flinkPipelineComposer =
114128
FlinkPipelineComposer.ofApplicationCluster(env);
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.cli;
19+
20+
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.composer.definition.PipelineDef;
23+
import org.apache.flink.core.fs.Path;
24+
25+
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.net.URL;
30+
31+
import static org.assertj.core.api.Assertions.assertThat;
32+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
33+
34+
/**
35+
* Tests covering how {@link CliExecutor#main(String[])} (the application-mode entrypoint) loads the
36+
* pipeline definition.
37+
*
38+
* <p>In application mode the deployment executors set {@code
39+
* ApplicationConfiguration.APPLICATION_ARGS = commandLine.getArgList()}, so {@code args[0]} is the
40+
* pipeline definition FILE PATH. It must be parsed via the {@link
41+
* YamlPipelineDefinitionParser#parse(Path, Configuration)} overload (read the file). Using the
42+
* {@link YamlPipelineDefinitionParser#parse(String, Configuration)} overload would treat the path
43+
* string itself as YAML content and fail with {@code Missing required field "source"}.
44+
*/
45+
class CliExecutorTest {
46+
47+
/** The fixed behavior: parse the pipeline definition from a file path (Path overload). */
48+
@Test
49+
void testParsePipelineDefinitionFromFilePath() throws Exception {
50+
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
51+
Path pipelineDefPath = new Path(resource.toURI());
52+
PipelineDef pipelineDef =
53+
new YamlPipelineDefinitionParser().parse(pipelineDefPath, new Configuration());
54+
assertThat(pipelineDef).isNotNull();
55+
assertThat(pipelineDef.getSource()).isNotNull();
56+
}
57+
58+
/**
59+
* Reproduces the application-mode bug: passing the path STRING to the content overload makes
60+
* the parser treat the path as YAML content, yielding a scalar node without a {@code source}.
61+
*/
62+
@Test
63+
void testParsingFilePathAsYamlContentFails() {
64+
String pipelineDefPath = "/opt/flink/config/pipeline.yaml";
65+
assertThatThrownBy(
66+
() ->
67+
new YamlPipelineDefinitionParser()
68+
.parse(pipelineDefPath, new Configuration()))
69+
.isInstanceOf(IllegalArgumentException.class)
70+
.hasMessageContaining("Missing required field \"source\"");
71+
}
72+
}

0 commit comments

Comments
 (0)