-
Notifications
You must be signed in to change notification settings - Fork 520
[FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration #1000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d7cd880
e87421c
b962c9e
40e1cd7
cd28e19
dfcfe3d
a54a5eb
2279a2c
cc8457b
812a3f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import org.apache.flink.configuration.ConfigOption; | ||
| import org.apache.flink.configuration.Configuration; | ||
| import org.apache.flink.configuration.GlobalConfiguration; | ||
| import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; | ||
| import org.apache.flink.kubernetes.operator.api.FlinkDeployment; | ||
| import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; | ||
| import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; | ||
|
|
@@ -49,6 +50,7 @@ | |
|
|
||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
@@ -88,6 +90,17 @@ public class FlinkConfigManager { | |
| private final Consumer<Set<String>> namespaceListener; | ||
| private volatile ConcurrentHashMap<FlinkVersion, List<String>> relevantFlinkVersionPrefixes; | ||
|
|
||
| private final Cache<RuntimeConfigCacheKey, Map<String, String>> runtimeConfigCache; | ||
|
|
||
| /** Cache key for runtime configuration overrides, scoped to a specific job instance. */ | ||
| @Value | ||
| @Builder | ||
| private static class RuntimeConfigCacheKey { | ||
| String namespace; | ||
| String name; | ||
| String jobId; | ||
| } | ||
|
|
||
| protected static final Pattern FLINK_VERSION_PATTERN = | ||
| Pattern.compile( | ||
| VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.") | ||
|
|
@@ -132,6 +145,14 @@ public Configuration load(Key k) { | |
| } | ||
| }); | ||
|
|
||
| this.runtimeConfigCache = | ||
| CacheBuilder.newBuilder() | ||
| .maximumSize( | ||
| defaultConfig.get( | ||
| KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE)) | ||
| .expireAfterAccess(cacheTimeout) | ||
| .build(); | ||
|
|
||
| updateDefaultConfig(defaultConfig); | ||
| ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); | ||
| executorService.scheduleWithFixedDelay( | ||
|
|
@@ -351,6 +372,81 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { | |
| return conf; | ||
| } | ||
|
|
||
| private RuntimeConfigCacheKey cacheKey(String namespace, String name, String jobId) { | ||
| return RuntimeConfigCacheKey.builder().namespace(namespace).name(name).jobId(jobId).build(); | ||
| } | ||
|
|
||
| /** | ||
| * Store runtime configuration overrides in the cache. Called by observers after fetching | ||
| * configuration from the Flink REST API. | ||
| * | ||
| * @param namespace Resource namespace | ||
| * @param name Resource name | ||
| * @param jobId Job ID string | ||
| * @param config Runtime configuration key-value pairs | ||
| */ | ||
| public void putRuntimeConfig( | ||
| String namespace, String name, String jobId, Map<String, String> config) { | ||
| runtimeConfigCache.put( | ||
| cacheKey(namespace, name, jobId), Collections.unmodifiableMap(config)); | ||
| LOG.debug("Cached runtime configuration with {} entries", config.size()); | ||
| } | ||
|
|
||
| /** | ||
| * Get cached runtime configuration overrides for a specific job instance. | ||
| * | ||
| * @param namespace Resource namespace | ||
| * @param name Resource name | ||
| * @param jobId Job ID string | ||
| * @return Cached runtime config if present | ||
| */ | ||
| public Optional<Map<String, String>> getRuntimeConfig( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, within a single cycle, the cache is hit in:
So there are 2 hits per cycle at this point (and maybe in the future will be even more) for this |
||
| String namespace, String name, String jobId) { | ||
| return Optional.ofNullable( | ||
| runtimeConfigCache.getIfPresent(cacheKey(namespace, name, jobId))); | ||
| } | ||
|
|
||
| /** | ||
| * Invalidate all cached runtime configurations for a given resource. Should be called when a | ||
| * job is cancelled or an upgrade replaces the running job. | ||
| * | ||
| * @param namespace Resource namespace | ||
| * @param name Resource name | ||
| */ | ||
| public void invalidateRuntimeConfig(String namespace, String name) { | ||
| runtimeConfigCache | ||
| .asMap() | ||
| .keySet() | ||
| .removeIf( | ||
| key -> namespace.equals(key.getNamespace()) && name.equals(key.getName())); | ||
| LOG.debug("Invalidated runtime configuration cache"); | ||
| } | ||
|
|
||
| /** | ||
| * Apply any cached runtime configuration overrides to the given configuration. Looks up the | ||
| * cache using the resource's current job ID from its status. | ||
| * | ||
| * @param resource Flink resource (FlinkDeployment or FlinkSessionJob) | ||
| * @param conf Configuration to apply overrides to | ||
| */ | ||
| public void applyCachedRuntimeConfig(AbstractFlinkResource<?, ?> resource, Configuration conf) { | ||
| var jobStatus = resource.getStatus().getJobStatus(); | ||
| if (jobStatus == null || jobStatus.getJobId() == null) { | ||
| return; | ||
| } | ||
| getRuntimeConfig( | ||
| resource.getMetadata().getNamespace(), | ||
| resource.getMetadata().getName(), | ||
| jobStatus.getJobId()) | ||
| .ifPresent( | ||
| config -> { | ||
| LOG.debug( | ||
| "Applying {} cached runtime configuration overrides", | ||
| config.size()); | ||
| config.forEach(conf::setString); | ||
| }); | ||
| } | ||
|
|
||
| private void addOperatorConfigsFromSpec(AbstractFlinkSpec spec, Configuration conf) { | ||
| // Observe config should include the latest operator related settings | ||
| if (spec.getFlinkConfiguration() != null) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,7 @@ public boolean observe(FlinkResourceContext<R> ctx) { | |
| if (newJobStatusOpt.isPresent()) { | ||
| var newJobStatus = newJobStatusOpt.get(); | ||
| updateJobStatus(ctx, newJobStatus); | ||
| fetchAndCacheRuntimeConfig(ctx, newJobStatus); | ||
| ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus()); | ||
| // see if the JM server is up, try to get the exceptions | ||
| if (!previousJobStatus.isGloballyTerminalState()) { | ||
|
|
@@ -111,6 +112,46 @@ public boolean observe(FlinkResourceContext<R> ctx) { | |
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Fetch runtime configuration from the Flink REST API and cache it in the config manager when | ||
| * the job is in RUNNING state and no cached config exists for the current job ID. After | ||
| * caching, the runtime values are applied directly to the already-memoized observe config so | ||
| * subsequent consumers see the merged values without a full config regeneration. | ||
| */ | ||
| private void fetchAndCacheRuntimeConfig( | ||
| FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) { | ||
| if (clusterJobStatus.getJobState() != JobStatus.RUNNING) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this condition restricts too much. I'd rather go with something like this: if (clusterJobStatus.getJobState().isGloballyTerminalState()) {
return;
}Or not even have at all this condition. @gyfora what do you say? |
||
| return; | ||
| } | ||
|
|
||
| var resource = ctx.getResource(); | ||
| var jobStatus = resource.getStatus().getJobStatus(); | ||
| var namespace = resource.getMetadata().getNamespace(); | ||
| var name = resource.getMetadata().getName(); | ||
| var jobId = jobStatus.getJobId(); | ||
| var configManager = ctx.getConfigManager(); | ||
|
|
||
| if (configManager.getRuntimeConfig(namespace, name, jobId).isPresent()) { | ||
| LOG.debug("Runtime configuration already cached"); | ||
| return; | ||
| } | ||
|
|
||
| LOG.debug("Fetching runtime configuration"); | ||
| try { | ||
| var runtimeConfig = | ||
| ctx.getFlinkService() | ||
| .getRuntimeConfiguration( | ||
| ctx.getObserveConfig(), JobID.fromHexString(jobId)); | ||
| configManager.putRuntimeConfig(namespace, name, jobId, runtimeConfig); | ||
| var currentObserveConfig = ctx.getObserveConfig(); | ||
| if (currentObserveConfig != null) { | ||
| runtimeConfig.forEach(currentObserveConfig::setString); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to fetch runtime configuration, will retry next cycle", e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Observe the exceptions raised in the job manager and take appropriate action. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -111,10 +111,14 @@ public void deploy( | |
| @Override | ||
| protected boolean cancelJob(FlinkResourceContext<FlinkSessionJob> ctx, SuspendMode suspendMode) | ||
| throws Exception { | ||
| var resource = ctx.getResource(); | ||
| ctx.getConfigManager() | ||
| .invalidateRuntimeConfig( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see no explicit reason to do this as the cache TTL will be hit anyway. The existing In this way, if we want to call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Dennis-Mircea I was thinking of some scenarios, while I do understand in the current set of things we would not need this explicitly since TTL would take care of it eventually. I feel this could be a real use case in scenarios where same jobId is used in recovery/cancellation to run the job with updated config -> ensure invalidation of the job whenever cancel/resubmit is called should be good guardrail.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By calling |
||
| resource.getMetadata().getNamespace(), resource.getMetadata().getName()); | ||
| var cancelTs = Instant.now(); | ||
| var result = | ||
| ctx.getFlinkService() | ||
| .cancelSessionJob(ctx.getResource(), suspendMode, ctx.getObserveConfig()); | ||
| .cancelSessionJob(resource, suspendMode, ctx.getObserveConfig()); | ||
| result.getSavepointPath() | ||
| .ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs)); | ||
| return result.isPending(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.