Skip to content

[FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration#1000

Open
nishita-09 wants to merge 10 commits into
apache:mainfrom
nishita-09:improve-observer-config
Open

[FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration#1000
nishita-09 wants to merge 10 commits into
apache:mainfrom
nishita-09:improve-observer-config

Conversation

@nishita-09
Copy link
Copy Markdown
Contributor

@nishita-09 nishita-09 commented Jul 27, 2025

What is the purpose of the change

*This pull request adds a new capability to the operator that ensures the configuration used for observation and health checks is consistent with the actual resultant configuration set both programmatically (in the user's main() method) and via the deployment spec.

Today, the operator infers running application/cluster settings solely from the K8s deployment spec and operator defaults. While this is mostly fine, many configs can be programmatically changed by the user in their app's main method (e.g., env.enableCheckpointing(3000, AT_LEAST_ONCE)), which takes precedence at runtime but is invisible to the operator. This leads to incorrect health check windows, missed in-place scaling opportunities, and inconsistent behavior in checkpoint stall detection.

Additionally, JM-level configuration (like jobmanager.scheduler, state.savepoints.dir, high-availability.*) set via operator defaults can drift if defaults are changed while a job is still running — the operator's reconstructed config diverges from the actual JM runtime config.*

Brief change log

  • Add getJobManagerConfiguration, getJobConfiguration, and getJobCheckpointConfiguration to FlinkService interface, implemented in AbstractFlinkService using Flink REST API endpoints (/jobs/:jobid/jobmanager/config, /jobs/:jobid/config, /jobs/:jobid/checkpoints/config).
  • Add FlinkRuntimeConfigurationUtils utility class with compile-time safe CheckpointConfigMapping enum to map checkpoint REST API JSON fields to Flink configuration keys.
  • Add Guava-backed runtime configuration cache in FlinkConfigManager with putRuntimeConfig, getRuntimeConfig, invalidateRuntimeConfig, and applyCachedRuntimeConfig methods. Cache is keyed by {namespace, name, jobId} and expires after a configurable timeout.
  • Fetch runtime configuration in JobStatusObserver when a job reaches RUNNING and cache is empty. The fetch uses a 3-layer strategy: JM config (base layer) -> job execution config (override) -> checkpoint config (programmatic overrides win). REST calls are made once per job lifecycle and cached.
  • Apply cached runtime configuration overrides in FlinkResourceContext.getObserveConfig() so all downstream consumers (health evaluator, reconciler, scaler) see the actual runtime values.
  • Invalidate the runtime config cache in ApplicationReconciler and SessionJobReconciler on cancel/upgrade, ensuring fresh config is fetched when a new job starts.

Verifying this change

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).
[WIP]
(or)

This change added tests and can be verified as follows:

  • Unit tests (2126 pass, 0 failures):

    • AbstractFlinkServiceTest: testGetJobManagerConfiguration, testGetJobManagerConfigurationReturnsEmptyOnFailure : verifies JM config REST endpoint parsing and graceful degradation.
    • AbstractFlinkServiceTest: testAllCheckpointConfigInfoFieldNamesCoveredByMapping, testAllExecutionConfigInfoFieldNamesCoveredByMapping : reflection-based tests that fail at build time if Flink adds new FIELD_NAME_* constants not covered by our mapping.
    • AbstractFlinkServiceTest: testMapJobConfigurationMapsAllExpectedFields, testMapJobConfigurationHandlesNullGracefully : verifies job config mapping correctness and null safety.
    • FlinkConfigManagerTest: runtime config cache put/get/invalidate/apply tests.
    • JobStatusObserverTest: verifies fetchAndCacheRuntimeConfig populates cache on first observe, skips on cache hit, handles partial/full REST failures gracefully, and invalidates on job upgrade.
  • Integration validation

  • 1. Deployed the job with application mode with deployment (checkpoint disabled) but enabled via the user in job logic.

  • Here, are the logs from ClusterHealthEvaluator which confirm that the operator now recognised the user defined configs.

  • Before the change:

Screenshot 2025-07-24 at 5 09 16 PM
  • After the change:
Screenshot 2025-07-24 at 5 13 06 PM
  • 2. Initial cache population - 53 entries from 3 REST endpoints cached in ~80ms:
    15:09:55,712 JobStatusObserver    [DEBUG][flink/basic-example] Fetching runtime configuration
    15:09:55,712 AbstractFlinkService  [DEBUG][flink/basic-example] Fetching JobManager configuration
    15:09:55,764 AbstractFlinkService  [DEBUG][flink/basic-example] Fetched 40 JobManager configuration entries
    15:09:55,764 AbstractFlinkService  [DEBUG][flink/basic-example] Fetching job configuration
    15:09:55,776 AbstractFlinkService  [DEBUG][flink/basic-example] Fetched 2 job configuration entries
    15:09:55,776 AbstractFlinkService  [DEBUG][flink/basic-example] Fetching checkpoint configuration
    15:09:55,791 FlinkConfigManager   [DEBUG][flink/basic-example] Cached runtime configuration with 53 entries
  • 3. Cache hit on subsequent observe cycles - zero re-fetches
       15:10:10,946 JobStatusObserver  [DEBUG][flink/basic-example] Runtime configuration already cached
       15:10:26,125 JobStatusObserver  [DEBUG][flink/basic-example] Runtime configuration already cached
  • 4. Cache invalidation on job suspension:
    15:11:53,127 FlinkConfigManager [DEBUG][flink/basic-example] Invalidated runtime configuration cache
  • 5. Cache re-population after job resume (new job ID, fresh REST fetch):
    15:13:11,133 JobStatusObserver    [DEBUG][flink/basic-example] Fetching runtime configuration
    15:13:11,133 AbstractFlinkService  [DEBUG][flink/basic-example] Fetching JobManager configuration
    15:13:11,139 AbstractFlinkService  [DEBUG][flink/basic-example] Fetched 40 JobManager configuration entries
    15:13:11,156 FlinkConfigManager   [DEBUG][flink/basic-example] Cached runtime configuration with 53 entries
  • 6. Parallelism change triggers invalidation + re-population:
    15:14:53,150 FlinkConfigManager  [DEBUG][flink/basic-example] Invalidated runtime configuration cache
    15:15:32,508 JobStatusObserver   [DEBUG][flink/basic-example] Fetching runtime configuration
  • 7. Programmatic config override detection - deployed with interval=10000, EXACTLY_ONCE in spec, app code overrides to 3000ms, AT_LEAST_ONCE:
    15:29:00,908 FlinkRuntimeConfigurationUtils [DEBUG][flink/programmatic-ckpt-test] Mapped 15 checkpoint
      configuration entries: {execution.checkpointing.mode=AT_LEAST_ONCE,
      execution.checkpointing.interval=3000ms, execution.checkpointing.timeout=30000ms,
      execution.checkpointing.min-pause=1000ms, ...}
    15:29:00,908 FlinkConfigManager [DEBUG][flink/programmatic-ckpt-test] Cached runtime configuration with 55 entries

    | Config | Deployment Spec | App Code Override (detected) |
    |--------|----------------|---------------------------|
    | `execution.checkpointing.interval` | 10000ms | **3000ms** |
    | `execution.checkpointing.mode` | EXACTLY_ONCE | **AT_LEAST_ONCE** |
    | `execution.checkpointing.timeout` | 600000ms | **30000ms** |
    | `execution.checkpointing.min-pause` | 0ms (default) | **1000ms** |
  **A/B comparison:** `ClusterHealthEvaluator` computes `window = max(interval * 2, timeout * 2)`:
   - Before (main branch, spec values): `max(10000*2, 600000*2)` = **~20 min**
   - After (feature branch, runtime values): `max(3000*2, 30000*2)` = **~1 min** (20x faster detection)
  • 8. Platform config drift - jobmanager.scheduler: adaptive set in operator defaults (NOT in K8s spec), then removed from defaults while job keeps running:
    16:03:35,205 AbstractFlinkService [DEBUG][flink/scheduler-test] Fetching JobManager configuration
    16:03:35,230 AbstractFlinkService [DEBUG][flink/scheduler-test] Fetched 40 JobManager configuration entries
    16:03:35,262 FlinkConfigManager  [DEBUG][flink/scheduler-test] Cached runtime configuration with 54 entries
    16:03:35,263 FlinkConfigManager  [DEBUG][flink/scheduler-test] Applying 54 cached runtime configuration overrides
| Source | `jobmanager.scheduler` |
|--------|----------------------|
| JM REST endpoint (actual runtime) | `adaptive` |
| K8s spec `flinkConfiguration` | _(not set)_ |
| Operator defaults (after drift) | _(not set)_ |

| Branch | `observeConfig` scheduler value | `supportsInPlaceScaling` |
|--------|-------------------------------|--------------------------|
| **Main** (reconstructed) | `Default` (wrong) | `false` - unnecessary restart |
| **Feature** (JM cache) | `Adaptive` (correct) | `true` - in-place scaling works |

Test Setup:

  • FlinkDeployment with jobmanager.scheduler: adaptive, inplace-scaling.enabled: true
  • Autoscaler enabled with stabilization.interval: 30s, metrics.window: 60s
  • Job: StateMachineExample.jar at parallelism 2 (deliberately over-provisioned)
  • The autoscaler detects low utilization and writes pipeline.jobvertex-parallelism-overrides

Job ID (unchanged throughout): 8d7ea2427c1eb8d7f3c491e4691bfd53

Result: PASS

Phase 1 — Cache active before scaling (steady-state)

13:14:05,974 FlinkConfigManager [DEBUG][default/inplace-scale-test] Applying 54 cached runtime configuration overrides
13:14:05,984 JobStatusObserver  [DEBUG][default/inplace-scale-test] Runtime configuration already cached
13:14:21,069 FlinkConfigManager [DEBUG][default/inplace-scale-test] Applying 54 cached runtime configuration overrides
13:14:21,075 JobStatusObserver  [DEBUG][default/inplace-scale-test] Runtime configuration already cached

Cache with 54 entries is being applied every ~15s observe cycle. No REST calls.

Phase 2 — Autoscaler triggers in-place scaling

13:14:21,164 AuditUtils         [INFO ][default/inplace-scale-test] >>> Event[Job]       | Info    | SCALINGREPORT   | Scaling execution enabled, begin scaling vertices:{ Vertex ID 20ba6b65f97481d5570070de90e4e791 | Parallelism 2 -> 1 | Processing capacity 258514.72 -> 103405.89 | Target data rate 2003.49}{ Vertex ID bc764cd8ddf7a0cff126f51c16239658 | Parallelism 2 -> 1 | Processing capacity 44651.35 -> 17860.54 | Target data rate 2667.92}
13:14:21,177 AuditUtils         [INFO ][default/inplace-scale-test] >>> Event[Job]       | Info    | SPECCHANGED     | SCALE change(s) detected (Diff: FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides : null -> 20ba6b65f97481d5570070de90e4e791:1,bc764cd8ddf7a0cff126f51c16239658:1]), starting reconciliation.
13:14:21,276 AuditUtils         [INFO ][default/inplace-scale-test] >>> Event[Job]       | Info    | SCALING         | In-place scaling triggered

The autoscaler wrote pipeline.jobvertex-parallelism-overrides, the reconciler detected a
DiffType.SCALE change (not UPGRADE), and called the Rescale API.

Phase 3 — Cache invalidation (new code in AbstractFlinkResourceReconciler.scale())

13:14:21,276 FlinkConfigManager [DEBUG][default/inplace-scale-test] Invalidated runtime configuration cache

Same millisecond as the in-place scale event. The stale cache (with old parallelism) is removed.

Phase 4 — Cache re-population from the rescaled JM

13:14:36,306 JobStatusObserver  [DEBUG][default/inplace-scale-test] Fetching runtime configuration
13:14:36,306 AbstractFlinkService [DEBUG][default/inplace-scale-test] Fetching runtime configuration
13:14:36,317 AbstractFlinkService [DEBUG][default/inplace-scale-test] Fetched merged runtime configuration with 54 entries
13:14:36,317 FlinkConfigManager [DEBUG][default/inplace-scale-test] Cached runtime configuration with 54 entries

Next observe cycle (~15s later): observer sees empty cache, calls getRuntimeConfiguration() which
queries all 3 REST endpoints. Fresh 54 entries cached from the now-rescaled JM.

Phase 5 — Subsequent cycles use fresh cache

13:14:51,490 FlinkConfigManager [DEBUG][default/inplace-scale-test] Applying 54 cached runtime configuration overrides
13:14:51,506 JobStatusObserver  [DEBUG][default/inplace-scale-test] Runtime configuration already cached

Back to steady-state with the new cache. No more REST calls.

Timeline Summary

Time Log Line Phase
13:14:21,069 FlinkConfigManager: Applying 54 cached runtime configuration overrides Cache active (stale)
13:14:21,164 AuditUtils: SCALINGREPORT | Parallelism 2 -> 1 Autoscaler decision
13:14:21,177 AuditUtils: SPECCHANGED | SCALE change(s) detected DiffType.SCALE reconciliation
13:14:21,276 AuditUtils: SCALING | In-place scaling triggered Rescale API called
13:14:21,276 FlinkConfigManager: Invalidated runtime configuration cache Stale cache cleared
13:14:36,306 JobStatusObserver: Fetching runtime configuration Re-fetch triggered
13:14:36,306 AbstractFlinkService: Fetching runtime configuration getRuntimeConfiguration()
13:14:36,317 AbstractFlinkService: Fetched merged runtime configuration with 54 entries Fresh config from rescaled JM
13:14:36,317 FlinkConfigManager: Cached runtime configuration with 54 entries New cache stored
13:14:51,490 FlinkConfigManager: Applying 54 cached runtime configuration overrides Steady-state with fresh cache

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@gyfora
Copy link
Copy Markdown
Contributor

gyfora commented Apr 7, 2026

@nishita-09 since you haven't touched this for some time, @Dennis-Mircea will take over the work at this point so we can move this issue forward. Hope that is okay.

@nishita-09
Copy link
Copy Markdown
Contributor Author

nishita-09 commented Apr 7, 2026

@nishita-09 since you haven't touched this for some time, @Dennis-Mircea will take over the work at this point so we can move this issue forward. Hope that is okay.

I was actually waiting on a review from you on this. If its not a problem I can continue prioritising this. @gyfora
https://issues.apache.org/jira/browse/FLINK-35746?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=18004944

Copy link
Copy Markdown
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some notes, overall I think there is too much logic in the configmanager. We should move the getting/accessing of the configs to the Observers, then the FlinkConfigManager should simply always apply when available.

This is going to be very critical logic, so the exception handling is critical, it's better to fall back to the previous behaviour (simply use the previous logic if the runtime config cannot be accessed) so that part is good (has to go to the observer too).

Once we restructured this, we should add detailed test coverage for the core logic and error handling :)

Comment on lines +450 to +477
var jobStatus = resource.getStatus().getJobStatus();
if (jobStatus == null || jobStatus.getJobId() == null) {
LOG.debug("No job ID available for runtime configuration fetch");
return;
}

JobID jobId = JobID.fromHexString(jobStatus.getJobId());
JobStatus jobState = jobStatus.getState();

// Only fetch runtime config if job is in a running state
if (jobState == null || !jobState.equals(JobStatus.RUNNING)) {
LOG.debug(
"Job {} is not in RUNNING state (current: {}), skipping runtime config fetch",
jobId,
jobState);
return;
}

// Check if JobManager is ready (only for FlinkDeployment)
if (resource instanceof FlinkDeployment) {
FlinkDeployment deployment = (FlinkDeployment) resource;
if (deployment.getStatus().getJobManagerDeploymentStatus()
!= JobManagerDeploymentStatus.READY) {
LOG.debug(
"JobManager is not ready, skipping runtime config fetch for job {}",
jobId);
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think it's not the right approach to have this logic here. We should move this to the observers where job state is tracked already and we know exactly when the rest api can be accessed.

So instead of accessing the rest api here, expose methods for setting/invalidating/getting the runtime config in the ConfigManager. Then the observer could check if the configmanager has a cached config already for the current job id , and if not update it. Same way, the reconciler could invalidate runtime configs after an cancel/upgrade.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure @gyfora thanks for the review. Will update this logic to move it to observers.

Comment on lines +460 to +466
if (jobState == null || !jobState.equals(JobStatus.RUNNING)) {
LOG.debug(
"Job {} is not in RUNNING state (current: {}), skipping runtime config fetch",
jobId,
jobState);
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other comment relating the observer logic, but overall once the runtime config is cached for the jobid we should apply it, even if the job is failing/restarting/etc. The configmanager doesn't need this logic after that

@nishita-09
Copy link
Copy Markdown
Contributor Author

I have added some notes, overall I think there is too much logic in the configmanager. We should move the getting/accessing of the configs to the Observers, then the FlinkConfigManager should simply always apply when available.
Sure @gyfora , will make this change.
This is going to be very critical logic, so the exception handling is critical, it's better to fall back to the previous behaviour (simply use the previous logic if the runtime config cannot be accessed) so that part is good (has to go to the observer too).
Yes makes sense, even I was of the same opinion. Will take this up.

Once we restructured this, we should add detailed test coverage for the core logic and error handling :)

Thanks @gyfora for the review. Will take a look at this and complete this by EOW.

nishita.pattanayak added 2 commits April 8, 2026 22:24
Integrated upstream changes including Blue/Green deployment features,
Flink 2.2 support, and various improvements while preserving runtime
configuration override implementation.
@nishita-09
Copy link
Copy Markdown
Contributor Author

nishita-09 commented Apr 8, 2026

I have added some notes, overall I think there is too much logic in the configmanager. We should move the getting/accessing of the configs to the Observers, then the FlinkConfigManager should simply always apply when available.
Sure @gyfora , will make this change.
This is going to be very critical logic, so the exception handling is critical, it's better to fall back to the previous behaviour (simply use the previous logic if the runtime config cannot be accessed) so that part is good (has to go to the observer too).
Yes makes sense, even I was of the same opinion. Will take this up.
Once we restructured this, we should add detailed test coverage for the core logic and error handling :)

Thanks @gyfora for the review. Will take a look at this and complete this by EOW.

@gyfora Please do review the changes, I have moved most of the update runtime config logic to observer where the configurations would be available. Also avoided hardcodings and handled errors as well in case the runtime config is not available/ accessible fallback would be the earlier config.
One of my major concerns is that this is very restrictive as we are particularly adding the configs set programmatically to the runtime config. Incase in future there is another config that is added, then explicit effort is required to even update it here.
The fundamental constraint is that Flink's REST API exposes the per-job checkpoint config through CheckpointConfigInfo which uses custom JSON field names (mode, interval, timeout...) that don't match Flink config keys (execution.checkpointing.mode, execution.checkpointing.interval, execution.checkpointing.timeout...).

What I can do is at least add a warning or throw an error, when a new config is added and its not updated here. Do let me know if you find a better way to get this done. Thanks!! Appreciate your help.

@nishita-09 nishita-09 requested a review from gyfora April 8, 2026 18:58
@nishita-09 nishita-09 marked this pull request as ready for review April 8, 2026 20:06
Copy link
Copy Markdown
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this manual mapping of configs is very fragile , and it duplicates a lot of logic from core Flink. A different approach was taken in https://github.com/apache/flink-kubernetes-operator/pull/1080/changes it uses a different endpoint to get the JobManagerJobConfiguration.

Can we simply use this instead of all runtimeConfig/checkpointConfig ? I think when the user programmatically sets a config in the StreamExecEnv those actually end up in the JobManagerJobConfiguration. cc @Dennis-Mircea

Comment on lines +164 to +168
executorService.scheduleWithFixedDelay(
runtimeConfigCache::cleanUp,
cacheTimeout.toMillis(),
cacheTimeout.toMillis(),
TimeUnit.MILLISECONDS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? I think we probably don't need it for the other cache either as we are continuously using the cache itself

.build();
runtimeConfigCache.put(cacheKey, Collections.unmodifiableMap(config));
LOG.debug(
"Cached runtime configuration for job {} (resource {}/{})", jobId, namespace, name);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to put namespace/name in the log as it's already part of the MDC

.keySet()
.removeIf(
key -> namespace.equals(key.getNamespace()) && name.equals(key.getName()));
LOG.debug("Invalidated runtime configuration cache for resource {}/{}", namespace, name);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to put namespace/name in the log as it's already part of the MDC

@Override
protected Configuration createObserveConfig() {
return configManager.getObserveConfig(getResource());
Configuration conf = configManager.getObserveConfig(getResource());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should getObserveConfig automatically apply the overrides in the configmanager?

Copy link
Copy Markdown
Contributor Author

@nishita-09 nishita-09 Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I have added it as a subsequent step that it specifically applies overrides after getting default configs. We can have this incorporated in the getObserveConfig as well

Comment on lines +133 to +135
if (jobId == null) {
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would jobId be null?

LOG.warn(
"Failed to fetch checkpoint configuration for job {}: {}",
jobId,
e.getMessage());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass exception instead of message


if (runtimeConfig.isEmpty() && fetchFailed) {
LOG.warn(
"All runtime config REST fetches failed for job {}, will retry next cycle",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general we can remove all job ids also from the logs, I don't think that adds any info

* org.apache.flink.configuration.ConfigOption} keys. Both sides use Flink's own constants for
* compile-time safety.
*/
private enum CheckpointConfigMapping {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move all the mapping logic for the Execution + Checkpoint config to a new utility class.

if (configurationInfo != null) {
if (configurationInfo.getExecutionConfigInfo() != null) {
jobConfig.put(
"parallelism.default",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the config options

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @gyfora Was going to do it for this as well, like it was done for checkpoint configs. Thanks!!

.getExecutionConfigInfo()
.getParallelism()));
jobConfig.put(
"pipeline.object-reuse",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the config options

@nishita-09
Copy link
Copy Markdown
Contributor Author

https://github.com/apache/flink-kubernetes-operator/pull/1080/changes

I agree that this manual mapping of configs is very fragile , and it duplicates a lot of logic from core Flink. A different approach was taken in https://github.com/apache/flink-kubernetes-operator/pull/1080/changes it uses a different endpoint to get the JobManagerJobConfiguration.

Can we simply use this instead of all runtimeConfig/checkpointConfig ? I think when the user programmatically sets a config in the StreamExecEnv those actually end up in the JobManagerJobConfiguration. cc @Dennis-Mircea

@gyfora For checkpoints configurations , the programmatically set config does not end up in JobManagerJobConfiguration. This still shows the default configs set in the deployment, while the checkpoint/config endpoint is the place where the configurations correctly reflect.

When user code calls env.enableCheckpointing(30000), it writes CheckpointingOptions.CHECKPOINTING_INTERVAL = 30s into the Configuration object inside StreamExecutionEnvironment. This is the same Configuration object that gets serialized into the JobGraph and eventually becomes the CheckpointCoordinatorConfiguration used by the CheckpointCoordinator at runtime.

CheckpointConfigHandler - This handler serves /jobs/:jobid/checkpoints/config. It reads from the execution graph, which gets its values from the CheckpointCoordinatorConfiguration, which was built from the JobGraph (which includes the programmatic overrides)

JobManagerJobConfigurationHander - This handler serves /jobs/:jobid/jobmanager/config. It reads from a pre-baked Configuration object passed at construction time i.e the JM's initial configuration.
The Configuration object passed to this handler is the JM's original startup Configuration. The user's main() method hasn't run yet when this handler is constructed, and even after it runs, it only modifies the StreamExecutionEnvironment's internal Configuration and not the JM-level Configuration.

@gyfora
Copy link
Copy Markdown
Contributor

gyfora commented Apr 9, 2026

https://github.com/apache/flink-kubernetes-operator/pull/1080/changes

I agree that this manual mapping of configs is very fragile , and it duplicates a lot of logic from core Flink. A different approach was taken in https://github.com/apache/flink-kubernetes-operator/pull/1080/changes it uses a different endpoint to get the JobManagerJobConfiguration.
Can we simply use this instead of all runtimeConfig/checkpointConfig ? I think when the user programmatically sets a config in the StreamExecEnv those actually end up in the JobManagerJobConfiguration. cc @Dennis-Mircea

@gyfora For checkpoints configurations , the programmatically set config does not end up in JobManagerJobConfiguration. This still shows the default configs set in the deployment, while the checkpoint/config endpoint is the place where the configurations correctly reflect.

When user code calls env.enableCheckpointing(30000), it writes CheckpointingOptions.CHECKPOINTING_INTERVAL = 30s into the Configuration object inside StreamExecutionEnvironment. This is the same Configuration object that gets serialized into the JobGraph and eventually becomes the CheckpointCoordinatorConfiguration used by the CheckpointCoordinator at runtime.

CheckpointConfigHandler - This handler serves /jobs/:jobid/checkpoints/config. It reads from the execution graph, which gets its values from the CheckpointCoordinatorConfiguration, which was built from the JobGraph (which includes the programmatic overrides)

JobManagerJobConfigurationHander - This handler serves /jobs/:jobid/jobmanager/config. It reads from a pre-baked Configuration object passed at construction time i.e the JM's initial configuration. The Configuration object passed to this handler is the JM's original startup Configuration. The user's main() method hasn't run yet when this handler is constructed, and even after it runs, it only modifies the StreamExecutionEnvironment's internal Configuration and not the JM-level Configuration.

Okay, that sounds suboptimal for our use-case here. We can start with the manual config mapping for now but we should try to fix this in Flink and have a rest endpoint that simply returns the effective job config.

@nishita-09
Copy link
Copy Markdown
Contributor Author

https://github.com/apache/flink-kubernetes-operator/pull/1080/changes

I agree that this manual mapping of configs is very fragile , and it duplicates a lot of logic from core Flink. A different approach was taken in https://github.com/apache/flink-kubernetes-operator/pull/1080/changes it uses a different endpoint to get the JobManagerJobConfiguration.
Can we simply use this instead of all runtimeConfig/checkpointConfig ? I think when the user programmatically sets a config in the StreamExecEnv those actually end up in the JobManagerJobConfiguration. cc @Dennis-Mircea

@gyfora For checkpoints configurations , the programmatically set config does not end up in JobManagerJobConfiguration. This still shows the default configs set in the deployment, while the checkpoint/config endpoint is the place where the configurations correctly reflect.
When user code calls env.enableCheckpointing(30000), it writes CheckpointingOptions.CHECKPOINTING_INTERVAL = 30s into the Configuration object inside StreamExecutionEnvironment. This is the same Configuration object that gets serialized into the JobGraph and eventually becomes the CheckpointCoordinatorConfiguration used by the CheckpointCoordinator at runtime.
CheckpointConfigHandler - This handler serves /jobs/:jobid/checkpoints/config. It reads from the execution graph, which gets its values from the CheckpointCoordinatorConfiguration, which was built from the JobGraph (which includes the programmatic overrides)
JobManagerJobConfigurationHander - This handler serves /jobs/:jobid/jobmanager/config. It reads from a pre-baked Configuration object passed at construction time i.e the JM's initial configuration. The Configuration object passed to this handler is the JM's original startup Configuration. The user's main() method hasn't run yet when this handler is constructed, and even after it runs, it only modifies the StreamExecutionEnvironment's internal Configuration and not the JM-level Configuration.

Okay, that sounds suboptimal for our use-case here. We can start with the manual config mapping for now but we should try to fix this in Flink and have a rest endpoint that simply returns the effective job config.

Agreed @gyfora . Sure, I will clean this up a bit more and have it ready for review in some time. Thanks.

@nishita-09 nishita-09 requested a review from gyfora April 9, 2026 18:20
@nishita-09
Copy link
Copy Markdown
Contributor Author

https://github.com/apache/flink-kubernetes-operator/pull/1080/changes

I agree that this manual mapping of configs is very fragile , and it duplicates a lot of logic from core Flink. A different approach was taken in https://github.com/apache/flink-kubernetes-operator/pull/1080/changes it uses a different endpoint to get the JobManagerJobConfiguration.
Can we simply use this instead of all runtimeConfig/checkpointConfig ? I think when the user programmatically sets a config in the StreamExecEnv those actually end up in the JobManagerJobConfiguration. cc @Dennis-Mircea

@gyfora For checkpoints configurations , the programmatically set config does not end up in JobManagerJobConfiguration. This still shows the default configs set in the deployment, while the checkpoint/config endpoint is the place where the configurations correctly reflect.
When user code calls env.enableCheckpointing(30000), it writes CheckpointingOptions.CHECKPOINTING_INTERVAL = 30s into the Configuration object inside StreamExecutionEnvironment. This is the same Configuration object that gets serialized into the JobGraph and eventually becomes the CheckpointCoordinatorConfiguration used by the CheckpointCoordinator at runtime.
CheckpointConfigHandler - This handler serves /jobs/:jobid/checkpoints/config. It reads from the execution graph, which gets its values from the CheckpointCoordinatorConfiguration, which was built from the JobGraph (which includes the programmatic overrides)
JobManagerJobConfigurationHander - This handler serves /jobs/:jobid/jobmanager/config. It reads from a pre-baked Configuration object passed at construction time i.e the JM's initial configuration. The Configuration object passed to this handler is the JM's original startup Configuration. The user's main() method hasn't run yet when this handler is constructed, and even after it runs, it only modifies the StreamExecutionEnvironment's internal Configuration and not the JM-level Configuration.

Okay, that sounds suboptimal for our use-case here. We can start with the manual config mapping for now but we should try to fix this in Flink and have a rest endpoint that simply returns the effective job config.

@gyfora I have updated the logic, moved the core logic from config manager to jobstatusobserver with utility classes in a separate file. I have also added jobManagerConfig configurations to the runtime config. Removed the jobIds, namespaces from logs. Kindly review this and do let me know if you find any other concern. Thank you!

@nishita-09
Copy link
Copy Markdown
Contributor Author

@gyfora Please do review this- addressed all the comments and made the changes accordingly, thank you !!

Copy link
Copy Markdown
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder that instead of the chained logic in the observer of calling the individual getConfig methods, we should have a single method to getRuntimeConfiguration in the Flink service that will call the 3 rest endpoints and merge the results. This we can later replace with a better endpoint easily without touching the observer.

Comment on lines +906 to +909
} catch (Exception e) {
LOG.warn("Failed to fetch JobManager configuration via REST API", e);
return new HashMap<>();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch is probably not required here as we already catch outside of this method, this way we just hide errors unnecessarily

return jobConfig;

} catch (Exception e) {
LOG.warn("Failed to fetch job configuration via REST API", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch is probably not required here as we already catch outside of this method, this way we just hide errors unnecessarily

return FlinkRuntimeConfigurationUtils.mapCheckpointConfiguration(
checkpointConfigInfo);

} catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch is probably not required here as we already catch outside of this method, this way we just hide errors unnecessarily


if (e.getCause() instanceof RestClientException restException
&& restException.getHttpResponseStatus() == HttpResponseStatus.NOT_FOUND) {
throw new RuntimeException("Job not found: " + jobId, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the complex error handling here

}

configManager.putRuntimeConfig(namespace, name, jobId, runtimeConfig);
ctx.clearObserveConfig();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to clear the observeConfig here? By having this in place it re-runs the entire config generation pipeline (cache lookup, key building, ObjectMapper serialization, cloning, spec config application) just to get an identical base config cache when the getObserveConfig() will be called.

I'd say that it makes more sense to apply the running config support in here:

Suggested change
ctx.clearObserveConfig();
var currentObserveConfig = ctx.getObserveConfig();
if (currentObserveConfig != null) {
runtimeConfig.forEach(currentObserveConfig::setString);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the above suggestion, thanks @Dennis-Mircea

throws Exception {
var resource = ctx.getResource();
ctx.getConfigManager()
.invalidateRuntimeConfig(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no explicit reason to do this as the cache TTL will be hit anyway. The existing LoadingCache for observeConfig doesn't do explicit invalidation either (it relies on key-based natural misses + TTL), so the runtimeConfigCache doesn't need it for consistency.

In this way, if we want to call invalidateRuntimeConfig in all valid scenarios, we should also include reconcileOtherChanges as one of those cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dennis-Mircea I was thinking of some scenarios, while I do understand in the current set of things we would not need this explicitly since TTL would take care of it eventually. I feel this could be a real use case in scenarios where same jobId is used in recovery/cancellation to run the job with updated config -> ensure invalidation of the job whenever cancel/resubmit is called should be good guardrail.
I totally agree with the point that if we call invalidateRuntimeConfig in cancelJob, it needs to be included in other valid scenarios as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By calling invalidateRuntimeConfig for each concrete case, I’ll focus even more on adding tests to cover all of these scenarios, and by assuming that @gyfora agrees this is the most suitable approach.

…erface for all runtime configs, invalidateConfig in flinkResourceReconciler
*/
private void fetchAndCacheRuntimeConfig(
FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
if (clusterJobStatus.getJobState() != JobStatus.RUNNING) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition restricts too much. I'd rather go with something like this:

if (clusterJobStatus.getJobState().isGloballyTerminalState()) {
    return;
}

Or not even have at all this condition. @gyfora what do you say?

* @param jobId Job ID string
* @return Cached runtime config if present
*/
public Optional<Map<String, String>> getRuntimeConfig(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, within a single cycle, the cache is hit in:

  1. getObserveConfig() -> applyCachedRuntimeConfig() -> getRuntimeConfig(ns, name, jobId)
  2. fetchAndCacheRuntimeConfig() -> configManager.getRuntimeConfig(ns, name, jobId).isPresent()

So there are 2 hits per cycle at this point (and maybe in the future will be even more) for this runtimeConfig. For that, I'd suggest to introduce a runtimeConfig variable as part of FlinkResourceContext that will follow the same pattern as observeConfig. Having a runtimeConfig variable per context will reduce the cache hits to 1.

throws Exception {
var resource = ctx.getResource();
ctx.getConfigManager()
.invalidateRuntimeConfig(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By calling invalidateRuntimeConfig for each concrete case, I’ll focus even more on adding tests to cover all of these scenarios, and by assuming that @gyfora agrees this is the most suitable approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants