Skip to content

Commit 0e10c04

Browse files
author
Jiyong Wang
committed
[FLINK-40005][cdc-cli] CliExecutor application mode resolves pipeline definition from local path, remote path or inline content
In application mode, CliExecutor.main received args[0] and passed it to YamlPipelineDefinitionParser.parse(String, Configuration), which treats the String as YAML content. The path string itself was parsed as YAML and failed validation with: Missing required field "source" in top-level configuration. The two native application deployment executors share CliExecutor.main but pass args[0] with different semantics: - K8SApplicationDeploymentExecutor sets APPLICATION_ARGS = commandLine.getArgList(), so args[0] is the pipeline definition FILE PATH (shipped into the JobManager container, e.g. mounted via a ConfigMap by the Flink Kubernetes Operator). - YarnApplicationDeploymentExecutor reads the file on the client side and sets APPLICATION_ARGS to the pipeline definition CONTENT. Resolve args[0] in three cases, in order: 1. An explicit-scheme path (s3://, hdfs://, oss://, file://) is read through Flink's FileSystem so the matching plugin resolves it. The scheme is explicit, so it is not at risk of being hijacked by the cluster default FileSystem. 2. A bare local file path is read with the local JVM file API (java.nio Files.readAllBytes) instead of Flink's FileSystem, whose cluster default may be S3/HDFS and would not resolve a local path shipped next to the JobManager. 3. Otherwise the value is already the pipeline definition content (YARN) and is parsed verbatim. This fixes K8S application mode without regressing YARN application mode, and also supports placing the pipeline definition file on remote storage. Also update the Kubernetes deployment docs (EN + ZH): the FlinkDeployment example now uses entryClass org.apache.flink.cdc.cli.CliExecutor without --use-mini-cluster, and the "native application mode is not supported" note is removed. Regression introduced by #3643 (FLINK-35360, Support Yarn application mode for yaml job).
1 parent 504a4b4 commit 0e10c04

4 files changed

Lines changed: 213 additions & 19 deletions

File tree

docs/content.zh/docs/deployment/kubernetes.md

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,9 @@ spec:
241241
imagePullPolicy: Always
242242
job:
243243
args:
244-
- '--use-mini-cluster'
245244
- /opt/flink/flink-cdc-{{< param Version >}}/conf/mysql-to-doris.yaml
246-
entryClass: org.apache.flink.cdc.cli.CliFrontend
247-
jarURI: 'local:///opt/flink/flink-cdc-{{< param Version >}}/lib/flink-cdc-dist-{{< param Version >}}.jar'
245+
entryClass: org.apache.flink.cdc.cli.CliExecutor
246+
jarURI: 'local:///opt/flink/lib/flink-cdc-dist-{{< param Version >}}.jar'
248247
parallelism: 1
249248
state: running
250249
upgradeMode: savepoint
@@ -276,7 +275,7 @@ spec:
276275
```
277276
{{< hint info >}}
278277
1. 由于Flink的类加载机制,参数`classloader.resolve-order`必须设置为`parent-first`。
279-
2. Flink CDC默认提交作业到远程Flink集群,在Operator模式下,您需要通过指定`--use-mini-cluster`参数在pod内部启动一个Standalone Flink集群
278+
2. `entryClass`必须设置为`org.apache.flink.cdc.cli.CliExecutor`,它是Flink **native application mode** 的入口类。Pipeline 定义文件路径通过`args`传入,由 JobManager 读取并构建作业图,作业随后在独立的 TaskManager 上执行
280279
{{< /hint >}}
281280

282281
### 提交Flink CDC作业
@@ -289,9 +288,4 @@ kubectl apply -f flink-cdc-pipeline-job.yaml
289288
```shell
290289
flinkdeployment.flink.apache.org/flink-cdc-pipeline-job created
291290
```
292-
如您需要查看日志、暴露Flink Web UI等,请参考:[Flink Kubernetes Operator文档](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/)。
293-
294-
295-
{{< hint info >}}
296-
请注意,目前不支持使用**native application mode**提交作业。
297-
{{< /hint >}}
291+
如您需要查看日志、暴露Flink Web UI等,请参考:[Flink Kubernetes Operator文档](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/)。

docs/content/docs/deployment/kubernetes.md

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,8 @@ spec:
244244
imagePullPolicy: Always
245245
job:
246246
args:
247-
- '--use-mini-cluster'
248247
- /opt/flink/flink-cdc-{{< param Version >}}/conf/mysql-to-doris.yaml
249-
entryClass: org.apache.flink.cdc.cli.CliFrontend
248+
entryClass: org.apache.flink.cdc.cli.CliExecutor
250249
jarURI: 'local:///opt/flink/lib/flink-cdc-dist-{{< param Version >}}.jar'
251250
parallelism: 1
252251
state: running
@@ -279,7 +278,7 @@ spec:
279278
```
280279
{{< hint info >}}
281280
1. Due to Flink's class loader, the parameter of `classloader.resolve-order` must be `parent-first`.
282-
2. Flink CDC submits a job to a remote Flink cluster by default, you should start a Standalone Flink cluster in the pod by `--use-mini-cluster` in Operator mode.
281+
2. The `entryClass` must be `org.apache.flink.cdc.cli.CliExecutor`, which is the entrypoint of Flink **native application mode**. The pipeline definition file path is passed through `args`; the JobManager reads it, builds the job graph, and the job is executed on dedicated TaskManagers.
283282
{{< /hint >}}
284283

285284
### Submit a Flink CDC Job
@@ -292,8 +291,4 @@ After successful submission, the return information is as follows:
292291
```shell
293292
flinkdeployment.flink.apache.org/flink-cdc-pipeline-job created
294293
```
295-
If you want to trace the logs or expose the Flink Web UI, please refer to: [Flink Kubernetes Operator documentation](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/)。
296-
297-
{{< hint info >}}
298-
Please note that submitting with **native application mode** is not supported for now.
299-
{{< /hint >}}
294+
If you want to trace the logs or expose the Flink Web UI, please refer to: [Flink Kubernetes Operator documentation](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/)。

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,20 @@
3030
import org.apache.flink.cdc.composer.flink.deployment.K8SApplicationDeploymentExecutor;
3131
import org.apache.flink.cdc.composer.flink.deployment.YarnApplicationDeploymentExecutor;
3232
import org.apache.flink.configuration.DeploymentOptions;
33+
import org.apache.flink.core.fs.FSDataInputStream;
34+
import org.apache.flink.core.fs.FileSystem;
3335
import org.apache.flink.core.fs.Path;
3436
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3537

3638
import org.apache.commons.cli.CommandLine;
3739

40+
import java.io.ByteArrayOutputStream;
41+
import java.net.URI;
42+
import java.net.URISyntaxException;
43+
import java.nio.charset.StandardCharsets;
44+
import java.nio.file.Files;
45+
import java.nio.file.InvalidPathException;
46+
import java.nio.file.Paths;
3847
import java.util.List;
3948

4049
/** Executor for doing the composing and submitting logic for {@link CliFrontend}. */
@@ -108,14 +117,90 @@ public PipelineExecution.ExecutionInfo deployWithNoOpComposer() throws Exception
108117
// The main class for running application mode
109118
public static void main(String[] args) throws Exception {
110119
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
111-
PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new Configuration());
120+
PipelineDef pipelineDef =
121+
pipelineDefinitionParser.parse(resolvePipelineDef(args[0]), new Configuration());
112122
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
113123
FlinkPipelineComposer flinkPipelineComposer =
114124
FlinkPipelineComposer.ofApplicationCluster(env);
115125
PipelineExecution execution = flinkPipelineComposer.compose(pipelineDef);
116126
execution.execute();
117127
}
118128

129+
/**
130+
* Resolves the application-mode entrypoint argument into the pipeline definition YAML content.
131+
*
132+
* <p>The two native application deployment executors share this single entrypoint but pass
133+
* {@code args[0]} with different semantics:
134+
*
135+
* <ul>
136+
* <li>{@link org.apache.flink.cdc.composer.flink.deployment.K8SApplicationDeploymentExecutor}
137+
* sets {@code APPLICATION_ARGS = commandLine.getArgList()}, so {@code args[0]} is the
138+
* pipeline definition FILE PATH shipped into the JobManager container (e.g. mounted via a
139+
* ConfigMap by the Flink Kubernetes Operator).
140+
* <li>{@link
141+
* org.apache.flink.cdc.composer.flink.deployment.YarnApplicationDeploymentExecutor} reads
142+
* the file on the client side and sets {@code APPLICATION_ARGS} to the pipeline
143+
* definition CONTENT, because it does not ship the file into the YARN container.
144+
* </ul>
145+
*
146+
* <p>Three cases are handled, in order:
147+
*
148+
* <ol>
149+
* <li>An explicit-scheme path (e.g. {@code s3://}, {@code hdfs://}, {@code oss://}, {@code
150+
* file://}) is read through Flink's FileSystem so the matching plugin resolves it. The
151+
* scheme is explicit, so — unlike a bare local path — it is not at risk of being hijacked
152+
* by the cluster default FileSystem.
153+
* <li>A bare local file path (e.g. shipped into the JobManager container / mounted by a
154+
* ConfigMap by the Flink Kubernetes Operator) is read with the local JVM file API rather
155+
* than Flink's FileSystem, whose cluster default may be S3/HDFS and would not resolve a
156+
* local path.
157+
* <li>Otherwise the value is already the pipeline definition CONTENT (the YARN application
158+
* executor reads the file on the client side and passes the content) and is used
159+
* verbatim. Without distinguishing these, the parser's String overload would treat a file
160+
* path as YAML content and fail with: Missing required field "source".
161+
* </ol>
162+
*/
163+
@VisibleForTesting
164+
static String resolvePipelineDef(String pipelineDefPathOrContent) throws Exception {
165+
// Case 1: explicit-scheme path -> read through Flink's FileSystem (plugin-aware).
166+
URI uri = tryParseUri(pipelineDefPathOrContent);
167+
if (uri != null && uri.getScheme() != null) {
168+
Path remotePath = new Path(pipelineDefPathOrContent);
169+
FileSystem fileSystem = remotePath.getFileSystem();
170+
try (FSDataInputStream in = fileSystem.open(remotePath);
171+
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
172+
byte[] buffer = new byte[4096];
173+
int bytesRead;
174+
while ((bytesRead = in.read(buffer)) != -1) {
175+
out.write(buffer, 0, bytesRead);
176+
}
177+
return new String(out.toByteArray(), StandardCharsets.UTF_8);
178+
}
179+
}
180+
181+
// Case 2: bare local file path -> read with the local JVM file API to avoid the cluster
182+
// default FileSystem (which may be S3/HDFS) hijacking a local path.
183+
try {
184+
java.nio.file.Path localPath = Paths.get(pipelineDefPathOrContent);
185+
if (Files.isRegularFile(localPath)) {
186+
return new String(Files.readAllBytes(localPath), StandardCharsets.UTF_8);
187+
}
188+
} catch (InvalidPathException ignored) {
189+
// Not a valid local path; fall through.
190+
}
191+
192+
// Case 3: not a path -> the YARN application executor already passes the CONTENT.
193+
return pipelineDefPathOrContent;
194+
}
195+
196+
private static URI tryParseUri(String value) {
197+
try {
198+
return new URI(value);
199+
} catch (URISyntaxException e) {
200+
return null;
201+
}
202+
}
203+
119204
@VisibleForTesting
120205
void setComposer(PipelineComposer composer) {
121206
this.composer = composer;
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.cli;
19+
20+
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.composer.definition.PipelineDef;
23+
24+
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.net.URI;
29+
import java.net.URL;
30+
import java.nio.file.Paths;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
34+
35+
/**
36+
* Tests covering how {@link CliExecutor#main(String[])} (the application-mode entrypoint) loads the
37+
* pipeline definition through {@link CliExecutor#resolvePipelineDef(String)}.
38+
*
39+
* <p>The two native application deployment executors share this single entrypoint but pass {@code
40+
* args[0]} with different semantics, so {@code resolvePipelineDef} must handle both:
41+
*
42+
* <ul>
43+
* <li><b>Kubernetes</b> ({@code K8SApplicationDeploymentExecutor}) passes the pipeline definition
44+
* FILE PATH (shipped into the JobManager container, e.g. mounted by a ConfigMap), so it must
45+
* be read from the file system.
46+
* <li><b>YARN</b> ({@code YarnApplicationDeploymentExecutor}) reads the file on the client side
47+
* and passes the pipeline definition CONTENT, which must be used verbatim.
48+
* </ul>
49+
*/
50+
class CliExecutorTest {
51+
52+
/**
53+
* Kubernetes application mode: {@code args[0]} is a file path, so {@code resolvePipelineDef}
54+
* reads the file content, which then parses into a valid pipeline definition.
55+
*/
56+
@Test
57+
void testResolvePipelineDefFromFilePath() throws Exception {
58+
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
59+
String pipelineDefPath = Paths.get(resource.toURI()).toString();
60+
61+
String content = CliExecutor.resolvePipelineDef(pipelineDefPath);
62+
assertThat(content).contains("source:").contains("type: mysql");
63+
64+
PipelineDef pipelineDef =
65+
new YamlPipelineDefinitionParser().parse(content, new Configuration());
66+
assertThat(pipelineDef.getSource()).isNotNull();
67+
}
68+
69+
/**
70+
* Explicit-scheme path (here {@code file://}, the same code path as {@code s3://}, {@code
71+
* hdfs://}, {@code oss://}): read through Flink's FileSystem so the matching plugin resolves
72+
* it.
73+
*/
74+
@Test
75+
void testResolvePipelineDefFromSchemePath() throws Exception {
76+
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
77+
String schemePath = resource.toURI().toString();
78+
assertThat(new URI(schemePath).getScheme()).isNotNull();
79+
80+
String content = CliExecutor.resolvePipelineDef(schemePath);
81+
assertThat(content).contains("source:").contains("type: mysql");
82+
83+
PipelineDef pipelineDef =
84+
new YamlPipelineDefinitionParser().parse(content, new Configuration());
85+
assertThat(pipelineDef.getSource()).isNotNull();
86+
}
87+
88+
/**
89+
* YARN application mode: {@code args[0]} is already the pipeline definition content (read on
90+
* the client side), so {@code resolvePipelineDef} returns it verbatim and it parses correctly.
91+
*/
92+
@Test
93+
void testResolvePipelineDefFromInlineContent() throws Exception {
94+
String pipelineDefContent = "source:\n type: mysql\n\nsink:\n type: kafka\n";
95+
96+
String resolved = CliExecutor.resolvePipelineDef(pipelineDefContent);
97+
assertThat(resolved).isEqualTo(pipelineDefContent);
98+
99+
PipelineDef pipelineDef =
100+
new YamlPipelineDefinitionParser().parse(resolved, new Configuration());
101+
assertThat(pipelineDef.getSource()).isNotNull();
102+
}
103+
104+
/**
105+
* The FLINK-40005 root cause: passing a file PATH straight to the String (content) overload
106+
* makes the parser treat the path as YAML content, yielding a scalar node without a {@code
107+
* source}. {@link CliExecutor#resolvePipelineDef(String)} avoids this for the Kubernetes path
108+
* by reading the file first.
109+
*/
110+
@Test
111+
void testParsingFilePathAsYamlContentFails() {
112+
String pipelineDefPath = "/opt/flink/config/pipeline.yaml";
113+
assertThatThrownBy(
114+
() ->
115+
new YamlPipelineDefinitionParser()
116+
.parse(pipelineDefPath, new Configuration()))
117+
.isInstanceOf(IllegalArgumentException.class)
118+
.hasMessageContaining("Missing required field \"source\"");
119+
}
120+
}

0 commit comments

Comments
 (0)