[FLINK-40055] Serialize managed cluster config in the target Flink version's YAML dialect#1152
Open
gerkElznik wants to merge 1 commit into
Open
[FLINK-40055] Serialize managed cluster config in the target Flink version's YAML dialect#1152gerkElznik wants to merge 1 commit into
gerkElznik wants to merge 1 commit into
Conversation
…rsion's YAML dialect
The operator serialized managed-deployment config values in its own YAML
dialect: removeOperatorConfigs round-tripped the config through toMap(),
baking the operator process' dialect (GlobalConfiguration.isStandardYaml())
into every value. With the operator itself running on standard config.yaml,
list-typed options such as pipeline.jars were written into Flink 1.x
deployments as YAML flow lists ("['local://…']"), which the legacy parser
cannot read -> the JobManager crashloops.
Fix at the serialization boundaries, leaving in-process parse semantics
untouched:
- removeOperatorConfigs copies the config and removes operator keys directly
instead of a toMap()/setString round-trip, so typed values survive to the
boundaries that already render per target version (FlinkConfMountDecorator).
- runJar serializes the session-job REST config map in the receiving
cluster's dialect via configToMapWithVersionDialect.
- The Flink-2.0 dialect threshold moves to a shared static
FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion).
Generated-by: Claude Code (Claude Opus 4.8)
Co-Authored-By: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fixes FLINK-40055: the operator serializes managed-deployment config values in its own YAML dialect rather than the target deployment's. When the operator itself runs on standard
config.yaml, list-typed options (e.g.pipeline.jars) are written into Flink 1.x deployments as YAML flow lists (['local://…']), which the legacy parser cannot read — the JobManager crashloops withURISyntaxException. Discovered via the CI failures on #1126, which this bug blocks (that PR makesconfig.yamlthe operator's default format; today the bug is reachable via a conf-override dir or custom mount).Root cause (full analysis on the Jira): the YAML dialect is process-global in flink-core (
Configuration()'s no-arg constructor andclone()takeGlobalConfiguration.isStandardYaml()), andAbstractFlinkService#removeOperatorConfigsbakes that dialect into every value via atoMap()/setStringround-trip beforeFlinkConfMountDecorator— which already correctly picks the file name and format per target Flink version — gets to write the file.The fix acts only at the serialization boundaries and leaves in-process parse semantics untouched (stamping the target dialect onto the long-lived resource configs was prototyped and rejected: the same flag governs how raw string values are parsed, so it changes read behavior for every consumer, and any downstream
clone()silently resets it).Brief change log
AbstractFlinkService#removeOperatorConfigs— copy via the copy constructor andremoveKeydirectly instead of thetoMap()/setStringround-trip, so typed values survive untouched to the write boundaries that already render per target version.AbstractFlinkService#runJar— serialize the session-job REST config map in the receiving cluster's dialect (newconfigToMapWithVersionDialect, a throwaway dialect-stamped copy for theJarRunRequestBodyonly).FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)— the Flink-2.0 dialect threshold becomes one shared public static (the existing instance method delegates), so the rule lives in a single place.Verifying this change
This change added tests and can be verified as follows:
AbstractFlinkServiceTest#removeOperatorConfigsKeepsTypedValuesTest): pins the process-global standard-yaml flag (the condition CI never exercises otherwise), asserts operator keys are removed, typedpipeline.jarssurvives, and the same config serializes as the legacy scalar forv1_20and the standard flow list forv2_0.v1_20application deployment that crashloops without the fix (operator onconfig.yaml) reaches RUNNING with it;v2_2deployments work under both operator formats.flink-config-*ConfigMap content is identical regardless of the operator's own format (only per-run identity values differ) — the operator's config format is invisible to managed deployments.mvn testfor the touched suites:AbstractFlinkServiceTest40/40,FlinkConfMountDecoratorTest5/5,FlinkConfigManagerTest9/9.Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors): noremoveOperatorConfigsruns on every cluster submission; behavior is unchanged for string values, and typed values now pass through unconverted (previously stringified in the operator's dialect).Documentation