[FLINK-40005][cdc-cli] CliExecutor application mode resolves pipeline definition from local path, remote path or inline content#4451
Open
Zhile wants to merge 1 commit into
Conversation
Author
|
@MOBIN-F can you help to review? I'm not sure if this applies the original design. |
… definition from local path, remote path or inline content In application mode, CliExecutor.main received args[0] and passed it to YamlPipelineDefinitionParser.parse(String, Configuration), which treats the String as YAML content. The path string itself was parsed as YAML and failed validation with: Missing required field "source" in top-level configuration. The two native application deployment executors share CliExecutor.main but pass args[0] with different semantics: - K8SApplicationDeploymentExecutor sets APPLICATION_ARGS = commandLine.getArgList(), so args[0] is the pipeline definition FILE PATH (shipped into the JobManager container, e.g. mounted via a ConfigMap by the Flink Kubernetes Operator). - YarnApplicationDeploymentExecutor reads the file on the client side and sets APPLICATION_ARGS to the pipeline definition CONTENT. Resolve args[0] in three cases, in order: 1. An explicit-scheme path (s3://, hdfs://, oss://, file://) is read through Flink's FileSystem so the matching plugin resolves it. The scheme is explicit, so it is not at risk of being hijacked by the cluster default FileSystem. 2. A bare local file path is read with the local JVM file API (java.nio Files.readAllBytes) instead of Flink's FileSystem, whose cluster default may be S3/HDFS and would not resolve a local path shipped next to the JobManager. 3. Otherwise the value is already the pipeline definition content (YARN) and is parsed verbatim. This fixes K8S application mode without regressing YARN application mode, and also supports placing the pipeline definition file on remote storage. Also update the Kubernetes deployment docs (EN + ZH): the FlinkDeployment example now uses entryClass org.apache.flink.cdc.cli.CliExecutor without --use-mini-cluster, and the "native application mode is not supported" note is removed. Regression introduced by apache#3643 (FLINK-35360, Support Yarn application mode for yaml job).
a96ec21 to
0e10c04
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fix [FLINK-40005]: in application mode,
CliExecutor.mainfailed to start a YAML pipeline withMissing required field "source" in top-level configuration.args[0]was passed straight toYamlPipelineDefinitionParser.parse(String, ...), which treats itsStringargument as YAML content. Whenargs[0]is a file path, the path itself is parsed as YAML, producing a document withoutsource/sinkand failing validation.The two native application deployment executors share
CliExecutor.mainbut passargs[0]with different semantics — the root of the problem:K8SApplicationDeploymentExecutorsetsAPPLICATION_ARGS = commandLine.getArgList(), soargs[0]is the pipeline definition file path (shipped into the JobManager container, e.g. mounted via a ConfigMap by the Flink Kubernetes Operator).YarnApplicationDeploymentExecutorreads the file on the client side and setsAPPLICATION_ARGSto the pipeline definition content.So a fix that only reads
args[0]as a path would fix K8S but regress YARN (whoseargs[0]is already content, not a path).Regression introduced by #3643 (FLINK-35360, Support Yarn application mode for yaml job).
Brief change log
CliExecutor.mainnow resolvesargs[0]viaresolvePipelineDef(...), in three cases:s3:///hdfs:///oss:///file://) — read through Flink'sFileSystemso the matching plugin resolves it.Files.readAllBytes) rather than Flink'sFileSystem, whose cluster default may be S3/HDFS and would not resolve a local path shipped next to the JobManager.FlinkDeploymentexample now usesentryClass: org.apache.flink.cdc.cli.CliExecutorwithout--use-mini-cluster, and the "native application mode is not supported" note is removed.Verifying this change
CliExecutorTestcovers all three resolution paths (local path / scheme path viafile://, same code path ass3:/// inline content) plus the original regression.Missing required field "source".Does this pull request potentially affect one of the following parts