Skip to content

[FLINK-40055] Serialize managed cluster config in the target Flink version's YAML dialect#1152

Open
gerkElznik wants to merge 1 commit into
apache:mainfrom
gerkElznik:FLINK-40055
Open

[FLINK-40055] Serialize managed cluster config in the target Flink version's YAML dialect#1152
gerkElznik wants to merge 1 commit into
apache:mainfrom
gerkElznik:FLINK-40055

Conversation

@gerkElznik

Copy link
Copy Markdown

What is the purpose of the change

Fixes FLINK-40055: the operator serializes managed-deployment config values in its own YAML dialect rather than the target deployment's. When the operator itself runs on standard config.yaml, list-typed options (e.g. pipeline.jars) are written into Flink 1.x deployments as YAML flow lists (['local://…']), which the legacy parser cannot read — the JobManager crashloops with URISyntaxException. Discovered via the CI failures on #1126, which this bug blocks (that PR makes config.yaml the operator's default format; today the bug is reachable via a conf-override dir or custom mount).

Root cause (full analysis on the Jira): the YAML dialect is process-global in flink-core (Configuration()'s no-arg constructor and clone() take GlobalConfiguration.isStandardYaml()), and AbstractFlinkService#removeOperatorConfigs bakes that dialect into every value via a toMap()/setString round-trip before FlinkConfMountDecorator — which already correctly picks the file name and format per target Flink version — gets to write the file.

The fix acts only at the serialization boundaries and leaves in-process parse semantics untouched (stamping the target dialect onto the long-lived resource configs was prototyped and rejected: the same flag governs how raw string values are parsed, so it changes read behavior for every consumer, and any downstream clone() silently resets it).

Brief change log

  • AbstractFlinkService#removeOperatorConfigs — copy via the copy constructor and removeKey directly instead of the toMap()/setString round-trip, so typed values survive untouched to the write boundaries that already render per target version.
  • AbstractFlinkService#runJar — serialize the session-job REST config map in the receiving cluster's dialect (new configToMapWithVersionDialect, a throwaway dialect-stamped copy for the JarRunRequestBody only).
  • FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion) — the Flink-2.0 dialect threshold becomes one shared public static (the existing instance method delegates), so the rule lives in a single place.

Verifying this change

This change added tests and can be verified as follows:

  • Unit test (AbstractFlinkServiceTest#removeOperatorConfigsKeepsTypedValuesTest): pins the process-global standard-yaml flag (the condition CI never exercises otherwise), asserts operator keys are removed, typed pipeline.jars survives, and the same config serializes as the legacy scalar for v1_20 and the standard flow list for v2_0.
  • All four operator-format × deployment-version combinations in a kind cluster with a from-source operator image: the v1_20 application deployment that crashloops without the fix (operator on config.yaml) reaches RUNNING with it; v2_2 deployments work under both operator formats.
  • Config invariance: the generated flink-config-* ConfigMap content is identical regardless of the operator's own format (only per-run identity values differ) — the operator's config format is invisible to managed deployments.
  • In-place operator config-format migration (chaos run): helm-flipped the operator chart legacy↔standard under RUNNING v1 and v2 jobs, killing JobManager pods after each flip to force a config re-read from the mounted ConfigMap: no job disruption, no JobManager pod churn, byte-identical generated configs, and every freshly started JobManager parsed its config.
  • mvn test for the touched suites: AbstractFlinkServiceTest 40/40, FlinkConfMountDecoratorTest 5/5, FlinkConfigManagerTest 9/9.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency): no
  • The public API (CustomResourceDescriptors): no
  • Core observer or reconciler logic that is regularly executed: yesremoveOperatorConfigs runs on every cluster submission; behavior is unchanged for string values, and typed values now pass through unconverted (previously stringified in the operator's dialect).

Documentation

  • Does this introduce a new feature? no
  • If yes, how is the feature documented? not applicable (bug fix; no user-facing configuration change)

…rsion's YAML dialect

The operator serialized managed-deployment config values in its own YAML
dialect: removeOperatorConfigs round-tripped the config through toMap(),
baking the operator process' dialect (GlobalConfiguration.isStandardYaml())
into every value. With the operator itself running on standard config.yaml,
list-typed options such as pipeline.jars were written into Flink 1.x
deployments as YAML flow lists ("['local://…']"), which the legacy parser
cannot read -> the JobManager crashloops.

Fix at the serialization boundaries, leaving in-process parse semantics
untouched:
- removeOperatorConfigs copies the config and removes operator keys directly
  instead of a toMap()/setString round-trip, so typed values survive to the
  boundaries that already render per target version (FlinkConfMountDecorator).
- runJar serializes the session-job REST config map in the receiving
  cluster's dialect via configToMapWithVersionDialect.
- The Flink-2.0 dialect threshold moves to a shared static
  FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion).

Generated-by: Claude Code (Claude Opus 4.8)
Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant