[FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration#1000
[FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration#1000nishita-09 wants to merge 10 commits into
Conversation
…CheckpointConfiguration
…CheckpointConfiguration
|
@nishita-09 since you haven't touched this for some time, @Dennis-Mircea will take over the work at this point so we can move this issue forward. Hope that is okay. |
I was actually waiting on a review from you on this. If its not a problem I can continue prioritising this. @gyfora |
gyfora
left a comment
There was a problem hiding this comment.
I have added some notes, overall I think there is too much logic in the configmanager. We should move the getting/accessing of the configs to the Observers, then the FlinkConfigManager should simply always apply when available.
This is going to be very critical logic, so the exception handling is critical, it's better to fall back to the previous behaviour (simply use the previous logic if the runtime config cannot be accessed) so that part is good (has to go to the observer too).
Once we restructured this, we should add detailed test coverage for the core logic and error handling :)
| var jobStatus = resource.getStatus().getJobStatus(); | ||
| if (jobStatus == null || jobStatus.getJobId() == null) { | ||
| LOG.debug("No job ID available for runtime configuration fetch"); | ||
| return; | ||
| } | ||
|
|
||
| JobID jobId = JobID.fromHexString(jobStatus.getJobId()); | ||
| JobStatus jobState = jobStatus.getState(); | ||
|
|
||
| // Only fetch runtime config if job is in a running state | ||
| if (jobState == null || !jobState.equals(JobStatus.RUNNING)) { | ||
| LOG.debug( | ||
| "Job {} is not in RUNNING state (current: {}), skipping runtime config fetch", | ||
| jobId, | ||
| jobState); | ||
| return; | ||
| } | ||
|
|
||
| // Check if JobManager is ready (only for FlinkDeployment) | ||
| if (resource instanceof FlinkDeployment) { | ||
| FlinkDeployment deployment = (FlinkDeployment) resource; | ||
| if (deployment.getStatus().getJobManagerDeploymentStatus() | ||
| != JobManagerDeploymentStatus.READY) { | ||
| LOG.debug( | ||
| "JobManager is not ready, skipping runtime config fetch for job {}", | ||
| jobId); | ||
| return; | ||
| } |
There was a problem hiding this comment.
In general I think it's not the right approach to have this logic here. We should move this to the observers where job state is tracked already and we know exactly when the rest api can be accessed.
So instead of accessing the rest api here, expose methods for setting/invalidating/getting the runtime config in the ConfigManager. Then the observer could check if the configmanager has a cached config already for the current job id , and if not update it. Same way, the reconciler could invalidate runtime configs after an cancel/upgrade.
There was a problem hiding this comment.
Sure @gyfora thanks for the review. Will update this logic to move it to observers.
| if (jobState == null || !jobState.equals(JobStatus.RUNNING)) { | ||
| LOG.debug( | ||
| "Job {} is not in RUNNING state (current: {}), skipping runtime config fetch", | ||
| jobId, | ||
| jobState); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Please see my other comment relating the observer logic, but overall once the runtime config is cached for the jobid we should apply it, even if the job is failing/restarting/etc. The configmanager doesn't need this logic after that
Thanks @gyfora for the review. Will take a look at this and complete this by EOW. |
…rom FlinkConfigManager to observer
Integrated upstream changes including Blue/Green deployment features, Flink 2.2 support, and various improvements while preserving runtime configuration override implementation.
@gyfora Please do review the changes, I have moved most of the update runtime config logic to observer where the configurations would be available. Also avoided hardcodings and handled errors as well in case the runtime config is not available/ accessible fallback would be the earlier config. What I can do is at least add a warning or throw an error, when a new config is added and its not updated here. Do let me know if you find a better way to get this done. Thanks!! Appreciate your help. |
There was a problem hiding this comment.
I agree that this manual mapping of configs is very fragile , and it duplicates a lot of logic from core Flink. A different approach was taken in https://github.com/apache/flink-kubernetes-operator/pull/1080/changes it uses a different endpoint to get the JobManagerJobConfiguration.
Can we simply use this instead of all runtimeConfig/checkpointConfig ? I think when the user programmatically sets a config in the StreamExecEnv those actually end up in the JobManagerJobConfiguration. cc @Dennis-Mircea
| executorService.scheduleWithFixedDelay( | ||
| runtimeConfigCache::cleanUp, | ||
| cacheTimeout.toMillis(), | ||
| cacheTimeout.toMillis(), | ||
| TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Is this necessary? I think we probably don't need it for the other cache either as we are continuously using the cache itself
| .build(); | ||
| runtimeConfigCache.put(cacheKey, Collections.unmodifiableMap(config)); | ||
| LOG.debug( | ||
| "Cached runtime configuration for job {} (resource {}/{})", jobId, namespace, name); |
There was a problem hiding this comment.
no need to put namespace/name in the log as it's already part of the MDC
| .keySet() | ||
| .removeIf( | ||
| key -> namespace.equals(key.getNamespace()) && name.equals(key.getName())); | ||
| LOG.debug("Invalidated runtime configuration cache for resource {}/{}", namespace, name); |
There was a problem hiding this comment.
no need to put namespace/name in the log as it's already part of the MDC
| @Override | ||
| protected Configuration createObserveConfig() { | ||
| return configManager.getObserveConfig(getResource()); | ||
| Configuration conf = configManager.getObserveConfig(getResource()); |
There was a problem hiding this comment.
should getObserveConfig automatically apply the overrides in the configmanager?
There was a problem hiding this comment.
Here, I have added it as a subsequent step that it specifically applies overrides after getting default configs. We can have this incorporated in the getObserveConfig as well
| if (jobId == null) { | ||
| return; | ||
| } |
| LOG.warn( | ||
| "Failed to fetch checkpoint configuration for job {}: {}", | ||
| jobId, | ||
| e.getMessage()); |
There was a problem hiding this comment.
pass exception instead of message
|
|
||
| if (runtimeConfig.isEmpty() && fetchFailed) { | ||
| LOG.warn( | ||
| "All runtime config REST fetches failed for job {}, will retry next cycle", |
There was a problem hiding this comment.
I think in general we can remove all job ids also from the logs, I don't think that adds any info
| * org.apache.flink.configuration.ConfigOption} keys. Both sides use Flink's own constants for | ||
| * compile-time safety. | ||
| */ | ||
| private enum CheckpointConfigMapping { |
There was a problem hiding this comment.
We should move all the mapping logic for the Execution + Checkpoint config to a new utility class.
| if (configurationInfo != null) { | ||
| if (configurationInfo.getExecutionConfigInfo() != null) { | ||
| jobConfig.put( | ||
| "parallelism.default", |
There was a problem hiding this comment.
We should use the config options
There was a problem hiding this comment.
Yes @gyfora Was going to do it for this as well, like it was done for checkpoint configs. Thanks!!
| .getExecutionConfigInfo() | ||
| .getParallelism())); | ||
| jobConfig.put( | ||
| "pipeline.object-reuse", |
There was a problem hiding this comment.
We should use the config options
@gyfora For checkpoints configurations , the programmatically set config does not end up in JobManagerJobConfiguration. This still shows the default configs set in the deployment, while the checkpoint/config endpoint is the place where the configurations correctly reflect. When user code calls CheckpointConfigHandler - This handler serves JobManagerJobConfigurationHander - This handler serves |
…rom FlinkConfigManager to observer
Okay, that sounds suboptimal for our use-case here. We can start with the manual config mapping for now but we should try to fix this in Flink and have a rest endpoint that simply returns the effective job config. |
Agreed @gyfora . Sure, I will clean this up a bit more and have it ready for review in some time. Thanks. |
@gyfora I have updated the logic, moved the core logic from config manager to jobstatusobserver with utility classes in a separate file. I have also added jobManagerConfig configurations to the runtime config. Removed the jobIds, namespaces from logs. Kindly review this and do let me know if you find any other concern. Thank you! |
|
@gyfora Please do review this- addressed all the comments and made the changes accordingly, thank you !! |
gyfora
left a comment
There was a problem hiding this comment.
I wonder that instead of the chained logic in the observer of calling the individual getConfig methods, we should have a single method to getRuntimeConfiguration in the Flink service that will call the 3 rest endpoints and merge the results. This we can later replace with a better endpoint easily without touching the observer.
| } catch (Exception e) { | ||
| LOG.warn("Failed to fetch JobManager configuration via REST API", e); | ||
| return new HashMap<>(); | ||
| } |
There was a problem hiding this comment.
This catch is probably not required here as we already catch outside of this method, this way we just hide errors unnecessarily
| return jobConfig; | ||
|
|
||
| } catch (Exception e) { | ||
| LOG.warn("Failed to fetch job configuration via REST API", e); |
There was a problem hiding this comment.
This catch is probably not required here as we already catch outside of this method, this way we just hide errors unnecessarily
| return FlinkRuntimeConfigurationUtils.mapCheckpointConfiguration( | ||
| checkpointConfigInfo); | ||
|
|
||
| } catch (Exception e) { |
There was a problem hiding this comment.
This catch is probably not required here as we already catch outside of this method, this way we just hide errors unnecessarily
|
|
||
| if (e.getCause() instanceof RestClientException restException | ||
| && restException.getHttpResponseStatus() == HttpResponseStatus.NOT_FOUND) { | ||
| throw new RuntimeException("Job not found: " + jobId, e); |
There was a problem hiding this comment.
I don't think we need the complex error handling here
| } | ||
|
|
||
| configManager.putRuntimeConfig(namespace, name, jobId, runtimeConfig); | ||
| ctx.clearObserveConfig(); |
There was a problem hiding this comment.
Why to clear the observeConfig here? By having this in place it re-runs the entire config generation pipeline (cache lookup, key building, ObjectMapper serialization, cloning, spec config application) just to get an identical base config cache when the getObserveConfig() will be called.
I'd say that it makes more sense to apply the running config support in here:
| ctx.clearObserveConfig(); | |
| var currentObserveConfig = ctx.getObserveConfig(); | |
| if (currentObserveConfig != null) { | |
| runtimeConfig.forEach(currentObserveConfig::setString); | |
| } |
There was a problem hiding this comment.
Added the above suggestion, thanks @Dennis-Mircea
| throws Exception { | ||
| var resource = ctx.getResource(); | ||
| ctx.getConfigManager() | ||
| .invalidateRuntimeConfig( |
There was a problem hiding this comment.
I see no explicit reason to do this as the cache TTL will be hit anyway. The existing LoadingCache for observeConfig doesn't do explicit invalidation either (it relies on key-based natural misses + TTL), so the runtimeConfigCache doesn't need it for consistency.
In this way, if we want to call invalidateRuntimeConfig in all valid scenarios, we should also include reconcileOtherChanges as one of those cases.
There was a problem hiding this comment.
@Dennis-Mircea I was thinking of some scenarios, while I do understand in the current set of things we would not need this explicitly since TTL would take care of it eventually. I feel this could be a real use case in scenarios where same jobId is used in recovery/cancellation to run the job with updated config -> ensure invalidation of the job whenever cancel/resubmit is called should be good guardrail.
I totally agree with the point that if we call invalidateRuntimeConfig in cancelJob, it needs to be included in other valid scenarios as well.
There was a problem hiding this comment.
By calling invalidateRuntimeConfig for each concrete case, I’ll focus even more on adding tests to cover all of these scenarios, and by assuming that @gyfora agrees this is the most suitable approach.
…erface for all runtime configs, invalidateConfig in flinkResourceReconciler
| */ | ||
| private void fetchAndCacheRuntimeConfig( | ||
| FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) { | ||
| if (clusterJobStatus.getJobState() != JobStatus.RUNNING) { |
There was a problem hiding this comment.
I think this condition restricts too much. I'd rather go with something like this:
if (clusterJobStatus.getJobState().isGloballyTerminalState()) {
return;
}Or not even have at all this condition. @gyfora what do you say?
| * @param jobId Job ID string | ||
| * @return Cached runtime config if present | ||
| */ | ||
| public Optional<Map<String, String>> getRuntimeConfig( |
There was a problem hiding this comment.
Currently, within a single cycle, the cache is hit in:
getObserveConfig()->applyCachedRuntimeConfig()->getRuntimeConfig(ns, name, jobId)fetchAndCacheRuntimeConfig()->configManager.getRuntimeConfig(ns, name, jobId).isPresent()
So there are 2 hits per cycle at this point (and maybe in the future will be even more) for this runtimeConfig. For that, I'd suggest to introduce a runtimeConfig variable as part of FlinkResourceContext that will follow the same pattern as observeConfig. Having a runtimeConfig variable per context will reduce the cache hits to 1.
| throws Exception { | ||
| var resource = ctx.getResource(); | ||
| ctx.getConfigManager() | ||
| .invalidateRuntimeConfig( |
There was a problem hiding this comment.
By calling invalidateRuntimeConfig for each concrete case, I’ll focus even more on adding tests to cover all of these scenarios, and by assuming that @gyfora agrees this is the most suitable approach.
What is the purpose of the change
*This pull request adds a new capability to the operator that ensures the configuration used for observation and health checks is consistent with the actual resultant configuration set both programmatically (in the user's
main()method) and via the deployment spec.Today, the operator infers running application/cluster settings solely from the K8s deployment spec and operator defaults. While this is mostly fine, many configs can be programmatically changed by the user in their app's main method (e.g.,
env.enableCheckpointing(3000, AT_LEAST_ONCE)), which takes precedence at runtime but is invisible to the operator. This leads to incorrect health check windows, missed in-place scaling opportunities, and inconsistent behavior in checkpoint stall detection.Additionally, JM-level configuration (like
jobmanager.scheduler,state.savepoints.dir,high-availability.*) set via operator defaults can drift if defaults are changed while a job is still running — the operator's reconstructed config diverges from the actual JM runtime config.*Brief change log
getJobManagerConfiguration,getJobConfiguration, andgetJobCheckpointConfigurationtoFlinkServiceinterface, implemented inAbstractFlinkServiceusing Flink REST API endpoints (/jobs/:jobid/jobmanager/config,/jobs/:jobid/config,/jobs/:jobid/checkpoints/config).FlinkRuntimeConfigurationUtilsutility class with compile-time safeCheckpointConfigMappingenum to map checkpoint REST API JSON fields to Flink configuration keys.FlinkConfigManagerwithputRuntimeConfig,getRuntimeConfig,invalidateRuntimeConfig, andapplyCachedRuntimeConfigmethods. Cache is keyed by{namespace, name, jobId}and expires after a configurable timeout.JobStatusObserverwhen a job reaches RUNNING and cache is empty. The fetch uses a 3-layer strategy: JM config (base layer) -> job execution config (override) -> checkpoint config (programmatic overrides win). REST calls are made once per job lifecycle and cached.FlinkResourceContext.getObserveConfig()so all downstream consumers (health evaluator, reconciler, scaler) see the actual runtime values.ApplicationReconcilerandSessionJobReconcileron cancel/upgrade, ensuring fresh config is fetched when a new job starts.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
[WIP]
(or)
This change added tests and can be verified as follows:
Unit tests (2126 pass, 0 failures):
AbstractFlinkServiceTest:testGetJobManagerConfiguration,testGetJobManagerConfigurationReturnsEmptyOnFailure: verifies JM config REST endpoint parsing and graceful degradation.AbstractFlinkServiceTest:testAllCheckpointConfigInfoFieldNamesCoveredByMapping,testAllExecutionConfigInfoFieldNamesCoveredByMapping: reflection-based tests that fail at build time if Flink adds newFIELD_NAME_*constants not covered by our mapping.AbstractFlinkServiceTest:testMapJobConfigurationMapsAllExpectedFields,testMapJobConfigurationHandlesNullGracefully: verifies job config mapping correctness and null safety.FlinkConfigManagerTest: runtime config cache put/get/invalidate/apply tests.JobStatusObserverTest: verifiesfetchAndCacheRuntimeConfigpopulates cache on first observe, skips on cache hit, handles partial/full REST failures gracefully, and invalidates on job upgrade.Integration validation
1. Deployed the job with application mode with deployment (checkpoint disabled) but enabled via the user in job logic.
Here, are the logs from
ClusterHealthEvaluatorwhich confirm that the operator now recognised the user defined configs.Before the change:
15:11:53,127 FlinkConfigManager [DEBUG][flink/basic-example] Invalidated runtime configuration cacheinterval=10000, EXACTLY_ONCEin spec, app code overrides to3000ms, AT_LEAST_ONCE:jobmanager.scheduler: adaptiveset in operator defaults (NOT in K8s spec), then removed from defaults while job keeps running:Test Setup:
jobmanager.scheduler: adaptive,inplace-scaling.enabled: truestabilization.interval: 30s,metrics.window: 60sStateMachineExample.jarat parallelism 2 (deliberately over-provisioned)pipeline.jobvertex-parallelism-overridesJob ID (unchanged throughout):
8d7ea2427c1eb8d7f3c491e4691bfd53Result: PASS
Phase 1 — Cache active before scaling (steady-state)
Cache with 54 entries is being applied every ~15s observe cycle. No REST calls.
Phase 2 — Autoscaler triggers in-place scaling
The autoscaler wrote
pipeline.jobvertex-parallelism-overrides, the reconciler detected aDiffType.SCALEchange (not UPGRADE), and called the Rescale API.Phase 3 — Cache invalidation (new code in
AbstractFlinkResourceReconciler.scale())Same millisecond as the in-place scale event. The stale cache (with old parallelism) is removed.
Phase 4 — Cache re-population from the rescaled JM
Next observe cycle (~15s later): observer sees empty cache, calls
getRuntimeConfiguration()whichqueries all 3 REST endpoints. Fresh 54 entries cached from the now-rescaled JM.
Phase 5 — Subsequent cycles use fresh cache
Back to steady-state with the new cache. No more REST calls.
Timeline Summary
FlinkConfigManager: Applying 54 cached runtime configuration overridesAuditUtils: SCALINGREPORT | Parallelism 2 -> 1AuditUtils: SPECCHANGED | SCALE change(s) detectedAuditUtils: SCALING | In-place scaling triggeredFlinkConfigManager: Invalidated runtime configuration cacheJobStatusObserver: Fetching runtime configurationAbstractFlinkService: Fetching runtime configurationgetRuntimeConfiguration()AbstractFlinkService: Fetched merged runtime configuration with 54 entriesFlinkConfigManager: Cached runtime configuration with 54 entriesFlinkConfigManager: Applying 54 cached runtime configuration overridesDoes this pull request potentially affect one of the following parts:
CustomResourceDescriptors: noDocumentation