Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
Expand All @@ -49,6 +50,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -88,6 +90,17 @@ public class FlinkConfigManager {
private final Consumer<Set<String>> namespaceListener;
private volatile ConcurrentHashMap<FlinkVersion, List<String>> relevantFlinkVersionPrefixes;

private final Cache<RuntimeConfigCacheKey, Map<String, String>> runtimeConfigCache;

/** Cache key for runtime configuration overrides, scoped to a specific job instance. */
@Value
@Builder
private static class RuntimeConfigCacheKey {
String namespace;
String name;
String jobId;
Comment thread
nishita-09 marked this conversation as resolved.
}

protected static final Pattern FLINK_VERSION_PATTERN =
Pattern.compile(
VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.")
Expand Down Expand Up @@ -132,6 +145,14 @@ public Configuration load(Key k) {
}
});

this.runtimeConfigCache =
CacheBuilder.newBuilder()
.maximumSize(
defaultConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE))
.expireAfterAccess(cacheTimeout)
.build();

updateDefaultConfig(defaultConfig);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(
Expand Down Expand Up @@ -351,6 +372,81 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
return conf;
}

private RuntimeConfigCacheKey cacheKey(String namespace, String name, String jobId) {
return RuntimeConfigCacheKey.builder().namespace(namespace).name(name).jobId(jobId).build();
}

/**
* Store runtime configuration overrides in the cache. Called by observers after fetching
* configuration from the Flink REST API.
*
* @param namespace Resource namespace
* @param name Resource name
* @param jobId Job ID string
* @param config Runtime configuration key-value pairs
*/
public void putRuntimeConfig(
String namespace, String name, String jobId, Map<String, String> config) {
runtimeConfigCache.put(
cacheKey(namespace, name, jobId), Collections.unmodifiableMap(config));
LOG.debug("Cached runtime configuration with {} entries", config.size());
}

/**
* Get cached runtime configuration overrides for a specific job instance.
*
* @param namespace Resource namespace
* @param name Resource name
* @param jobId Job ID string
* @return Cached runtime config if present
*/
public Optional<Map<String, String>> getRuntimeConfig(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, within a single cycle, the cache is hit in:

  1. getObserveConfig() -> applyCachedRuntimeConfig() -> getRuntimeConfig(ns, name, jobId)
  2. fetchAndCacheRuntimeConfig() -> configManager.getRuntimeConfig(ns, name, jobId).isPresent()

So there are 2 hits per cycle at this point (and maybe in the future will be even more) for this runtimeConfig. For that, I'd suggest to introduce a runtimeConfig variable as part of FlinkResourceContext that will follow the same pattern as observeConfig. Having a runtimeConfig variable per context will reduce the cache hits to 1.

String namespace, String name, String jobId) {
return Optional.ofNullable(
runtimeConfigCache.getIfPresent(cacheKey(namespace, name, jobId)));
}

/**
* Invalidate all cached runtime configurations for a given resource. Should be called when a
* job is cancelled or an upgrade replaces the running job.
*
* @param namespace Resource namespace
* @param name Resource name
*/
public void invalidateRuntimeConfig(String namespace, String name) {
runtimeConfigCache
.asMap()
.keySet()
.removeIf(
key -> namespace.equals(key.getNamespace()) && name.equals(key.getName()));
LOG.debug("Invalidated runtime configuration cache");
}

/**
* Apply any cached runtime configuration overrides to the given configuration. Looks up the
* cache using the resource's current job ID from its status.
*
* @param resource Flink resource (FlinkDeployment or FlinkSessionJob)
* @param conf Configuration to apply overrides to
*/
public void applyCachedRuntimeConfig(AbstractFlinkResource<?, ?> resource, Configuration conf) {
var jobStatus = resource.getStatus().getJobStatus();
if (jobStatus == null || jobStatus.getJobId() == null) {
return;
}
getRuntimeConfig(
resource.getMetadata().getNamespace(),
resource.getMetadata().getName(),
jobStatus.getJobId())
.ifPresent(
config -> {
LOG.debug(
"Applying {} cached runtime configuration overrides",
config.size());
config.forEach(conf::setString);
});
}

private void addOperatorConfigsFromSpec(AbstractFlinkSpec spec, Configuration conf) {
// Observe config should include the latest operator related settings
if (spec.getFlinkConfiguration() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,18 @@ public Configuration getObserveConfig() {
if (observeConfig != null) {
return observeConfig;
}
return observeConfig = createObserveConfig();
observeConfig = createObserveConfig();
if (observeConfig != null) {
configManager.applyCachedRuntimeConfig(getResource(), observeConfig);
}
return observeConfig;
}

/**
* @return The config manager for this context.
*/
public FlinkConfigManager getConfigManager() {
return configManager;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public boolean observe(FlinkResourceContext<R> ctx) {
if (newJobStatusOpt.isPresent()) {
var newJobStatus = newJobStatusOpt.get();
updateJobStatus(ctx, newJobStatus);
fetchAndCacheRuntimeConfig(ctx, newJobStatus);
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
// see if the JM server is up, try to get the exceptions
if (!previousJobStatus.isGloballyTerminalState()) {
Expand All @@ -111,6 +112,46 @@ public boolean observe(FlinkResourceContext<R> ctx) {
return false;
}

/**
* Fetch runtime configuration from the Flink REST API and cache it in the config manager when
* the job is in RUNNING state and no cached config exists for the current job ID. After
* caching, the runtime values are applied directly to the already-memoized observe config so
* subsequent consumers see the merged values without a full config regeneration.
*/
private void fetchAndCacheRuntimeConfig(
FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
if (clusterJobStatus.getJobState() != JobStatus.RUNNING) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition restricts too much. I'd rather go with something like this:

if (clusterJobStatus.getJobState().isGloballyTerminalState()) {
    return;
}

Or not even have at all this condition. @gyfora what do you say?

return;
}

var resource = ctx.getResource();
var jobStatus = resource.getStatus().getJobStatus();
var namespace = resource.getMetadata().getNamespace();
var name = resource.getMetadata().getName();
var jobId = jobStatus.getJobId();
var configManager = ctx.getConfigManager();

if (configManager.getRuntimeConfig(namespace, name, jobId).isPresent()) {
LOG.debug("Runtime configuration already cached");
return;
}

LOG.debug("Fetching runtime configuration");
try {
var runtimeConfig =
ctx.getFlinkService()
.getRuntimeConfiguration(
ctx.getObserveConfig(), JobID.fromHexString(jobId));
configManager.putRuntimeConfig(namespace, name, jobId, runtimeConfig);
var currentObserveConfig = ctx.getObserveConfig();
if (currentObserveConfig != null) {
runtimeConfig.forEach(currentObserveConfig::setString);
}
} catch (Exception e) {
LOG.warn("Failed to fetch runtime configuration, will retry next cycle", e);
}
}

/**
* Observe the exceptions raised in the job manager and take appropriate action.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,12 @@ private boolean scale(FlinkResourceContext<CR> ctx, Configuration deployConfig)
var scaled = ctx.getFlinkService().scale(ctx, deployConfig);

if (scaled) {
ReconciliationUtils.updateStatusForDeployedSpec(ctx.getResource(), deployConfig, clock);
var resource = ctx.getResource();
ctx.getConfigManager()
.invalidateRuntimeConfig(
resource.getMetadata().getNamespace(),
resource.getMetadata().getName());
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
}

return scaled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,10 @@ private boolean triggerSnapshotIfNeeded(FlinkResourceContext<CR> ctx, SnapshotTy
protected void resubmitJob(FlinkResourceContext<CR> ctx, boolean requireHaMetadata)
throws Exception {
LOG.info("Resubmitting Flink job...");
ctx.getConfigManager()
.invalidateRuntimeConfig(
ctx.getResource().getMetadata().getNamespace(),
ctx.getResource().getMetadata().getName());
SPEC specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource());

var upgradeStatePath =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,12 @@ private void setJobIdIfNecessary(
@Override
protected boolean cancelJob(FlinkResourceContext<FlinkDeployment> ctx, SuspendMode suspendMode)
throws Exception {
var resource = ctx.getResource();
ctx.getConfigManager()
.invalidateRuntimeConfig(
resource.getMetadata().getNamespace(), resource.getMetadata().getName());
var cancelTs = Instant.now();
var result =
ctx.getFlinkService()
.cancelJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
var result = ctx.getFlinkService().cancelJob(resource, suspendMode, ctx.getObserveConfig());
result.getSavepointPath()
.ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs));
return result.isPending();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,14 @@ public void deploy(
@Override
protected boolean cancelJob(FlinkResourceContext<FlinkSessionJob> ctx, SuspendMode suspendMode)
throws Exception {
var resource = ctx.getResource();
ctx.getConfigManager()
.invalidateRuntimeConfig(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no explicit reason to do this as the cache TTL will be hit anyway. The existing LoadingCache for observeConfig doesn't do explicit invalidation either (it relies on key-based natural misses + TTL), so the runtimeConfigCache doesn't need it for consistency.

In this way, if we want to call invalidateRuntimeConfig in all valid scenarios, we should also include reconcileOtherChanges as one of those cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dennis-Mircea I was thinking of some scenarios, while I do understand in the current set of things we would not need this explicitly since TTL would take care of it eventually. I feel this could be a real use case in scenarios where same jobId is used in recovery/cancellation to run the job with updated config -> ensure invalidation of the job whenever cancel/resubmit is called should be good guardrail.
I totally agree with the point that if we call invalidateRuntimeConfig in cancelJob, it needs to be included in other valid scenarios as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By calling invalidateRuntimeConfig for each concrete case, I’ll focus even more on adding tests to cover all of these scenarios, and by assuming that @gyfora agrees this is the most suitable approach.

resource.getMetadata().getNamespace(), resource.getMetadata().getName());
var cancelTs = Instant.now();
var result =
ctx.getFlinkService()
.cancelSessionJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
.cancelSessionJob(resource, suspendMode, ctx.getObserveConfig());
result.getSavepointPath()
.ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs));
return result.isPending();
Expand Down
Loading