Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.CustomResource;
Expand Down Expand Up @@ -447,64 +445,6 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
return defaultNonSSAResources();
}

/**
* If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect
* events from its own updates of dependent resources and then filter them.
*
* <p>Disable this if you want to react to your own dependent resource updates
*
* @return if special annotation should be used for dependent resource to filter events
* @since 4.5.0
*/
default boolean previousAnnotationForDependentResourcesEventFiltering() {
return true;
}

/**
* For dependent resources, the framework can add an annotation to filter out events resulting
* directly from the framework's operation. There are, however, some resources that do not follow
* the Kubernetes API conventions that changes in metadata should not increase the generation of
* the resource (as recorded in the {@code generation} field of the resource's {@code metadata}).
* For these resources, this convention is not respected and results in a new event for the
* framework to process. If that particular case is not handled correctly in the resource matcher,
* the framework will consider that the resource doesn't match the desired state and therefore
* triggers an update, which in turn, will re-add the annotation, thus starting the loop again,
* infinitely.
*
* <p>As a workaround, we automatically skip adding previous annotation for those well-known
* resources. Note that if you are sure that the matcher works for your use case, and it should in
* most instances, you can remove the resource type from the blocklist.
*
* <p>The consequence of adding a resource type to the set is that the framework will not use
* event filtering to prevent events, initiated by changes made by the framework itself as a
* result of its processing of dependent resources, to trigger the associated reconciler again.
*
* <p>Note that this method only takes effect if annotating dependent resources to prevent
* dependent resources events from triggering the associated reconciler again is activated as
* controlled by {@link #previousAnnotationForDependentResourcesEventFiltering()}
*
* @return a Set of resource classes where the previous version annotation won't be used.
*/
default Set<Class<? extends HasMetadata>> withPreviousAnnotationForDependentResourcesBlocklist() {
return Set.of(Deployment.class, StatefulSet.class);
}

/**
* If the event logic should parse the resourceVersion to determine the ordering of dependent
* resource events. This is typically not needed.
*
* <p>Disabled by default as Kubernetes does not support, and discourages, this interpretation of
* resourceVersions. Enable only if your api server event processing seems to lag the operator
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
}

/**
* {@link io.javaoperatorsdk.operator.api.reconciler.UpdateControl} patch resource or status can
* either use simple patches or SSA. Setting this to {@code true}, controllers will use SSA for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,8 @@ public class ConfigurationServiceOverrider {
private Duration reconciliationTerminationTimeout;
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
private Boolean previousAnnotationForDependentResources;
private Boolean parseResourceVersions;
private Boolean useSSAToPatchPrimaryResource;
private Boolean cloneSecondaryResourcesWhenGettingFromCache;
private Set<Class<? extends HasMetadata>> previousAnnotationUsageBlocklist;

@SuppressWarnings("rawtypes")
private DependentResourceFactory dependentResourceFactory;
Expand Down Expand Up @@ -168,31 +165,6 @@ public ConfigurationServiceOverrider withDefaultNonSSAResource(
return this;
}

public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(boolean value) {
this.previousAnnotationForDependentResources = value;
return this;
}

/**
* @param value true if internal algorithms can use metadata.resourceVersion as a numeric value.
* @return this
*/
public ConfigurationServiceOverrider withParseResourceVersions(boolean value) {
this.parseResourceVersions = value;
return this;
}

/**
* @deprecated use withParseResourceVersions
* @param value true if internal algorithms can use metadata.resourceVersion as a numeric value.
* @return this
*/
@Deprecated(forRemoval = true)
public ConfigurationServiceOverrider wihtParseResourceVersions(boolean value) {
this.parseResourceVersions = value;
return this;
}

public ConfigurationServiceOverrider withUseSSAToPatchPrimaryResource(boolean value) {
this.useSSAToPatchPrimaryResource = value;
return this;
Expand All @@ -204,12 +176,6 @@ public ConfigurationServiceOverrider withCloneSecondaryResourcesWhenGettingFromC
return this;
}

public ConfigurationServiceOverrider withPreviousAnnotationForDependentResourcesBlocklist(
Set<Class<? extends HasMetadata>> blocklist) {
this.previousAnnotationUsageBlocklist = blocklist;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, client) {
@Override
Expand Down Expand Up @@ -331,20 +297,6 @@ public Set<Class<? extends HasMetadata>> defaultNonSSAResources() {
defaultNonSSAResource, ConfigurationService::defaultNonSSAResources);
}

@Override
public boolean previousAnnotationForDependentResourcesEventFiltering() {
return overriddenValueOrDefault(
previousAnnotationForDependentResources,
ConfigurationService::previousAnnotationForDependentResourcesEventFiltering);
}

@Override
public boolean parseResourceVersionsForEventFilteringAndCaching() {
return overriddenValueOrDefault(
parseResourceVersions,
ConfigurationService::parseResourceVersionsForEventFilteringAndCaching);
}

@Override
public boolean useSSAToPatchPrimaryResource() {
return overriddenValueOrDefault(
Expand All @@ -357,14 +309,6 @@ public boolean cloneSecondaryResourcesWhenGettingFromCache() {
cloneSecondaryResourcesWhenGettingFromCache,
ConfigurationService::cloneSecondaryResourcesWhenGettingFromCache);
}

@Override
public Set<Class<? extends HasMetadata>>
withPreviousAnnotationForDependentResourcesBlocklist() {
return overriddenValueOrDefault(
previousAnnotationUsageBlocklist,
ConfigurationService::withPreviousAnnotationForDependentResourcesBlocklist);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public abstract class KubernetesDependentResource<R extends HasMetadata, P exten
private final boolean garbageCollected = this instanceof GarbageCollected;
private KubernetesDependentResourceConfig<R> kubernetesDependentResourceConfig;
private volatile Boolean useSSA;
private volatile Boolean usePreviousAnnotationForEventFiltering;

public KubernetesDependentResource() {}

Expand Down Expand Up @@ -158,14 +157,6 @@ protected void addMetadata(
} else {
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
}
} else if (usePreviousAnnotation(context)) { // set a new one
eventSource()
.orElseThrow()
.addPreviousAnnotation(
Optional.ofNullable(actualResource)
.map(r -> r.getMetadata().getResourceVersion())
.orElse(null),
target);
}
addReferenceHandlingMetadata(target, primary);
}
Expand All @@ -181,22 +172,6 @@ protected boolean useSSA(Context<P> context) {
return useSSA;
}

private boolean usePreviousAnnotation(Context<P> context) {
if (usePreviousAnnotationForEventFiltering == null) {
usePreviousAnnotationForEventFiltering =
context
.getControllerConfiguration()
.getConfigurationService()
.previousAnnotationForDependentResourcesEventFiltering()
&& !context
.getControllerConfiguration()
.getConfigurationService()
.withPreviousAnnotationForDependentResourcesBlocklist()
.contains(this.resourceType());
}
return usePreviousAnnotationForEventFiltering;
}

@Override
protected void handleDelete(P primary, R secondary, Context<P> context) {
if (secondary != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ControllerEventSource<T extends HasMetadata>

@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerEventSource(Controller<T> controller) {
super(NAME, controller.getCRClient(), controller.getConfiguration(), false);
super(NAME, controller.getCRClient(), controller.getConfiguration());
this.controller = controller;

final var config = controller.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,32 +82,19 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>

public InformerEventSource(
InformerEventSourceConfiguration<R> configuration, EventSourceContext<P> context) {
this(
configuration,
configuration.getKubernetesClient().orElse(context.getClient()),
context
.getControllerConfiguration()
.getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching());
}

InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
this(configuration, client, false);
this(configuration, configuration.getKubernetesClient().orElse(context.getClient()));
}

// visible for testing
@SuppressWarnings({"unchecked", "rawtypes"})
private InformerEventSource(
InformerEventSourceConfiguration<R> configuration,
KubernetesClient client,
boolean parseResourceVersions) {
InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
super(
configuration.name(),
configuration
.getGroupVersionKind()
.map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind()))
.orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())),
configuration,
parseResourceVersions);
configuration);
// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (useSecondaryToPrimaryIndex()) {
Expand Down Expand Up @@ -209,7 +196,7 @@ private synchronized void onAddOrUpdate(
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
var res = temporaryResourceCache.getResourceFromCache(resourceID);
if (res.isEmpty()) {
return isEventKnownFromAnnotation(newObject, oldObject);
return false;
}
boolean resVersionsEqual =
newObject
Expand All @@ -224,24 +211,6 @@ private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
|| temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
}

private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
String previous = newObject.getMetadata().getAnnotations().get(PREVIOUS_ANNOTATION_KEY);
boolean known = false;
if (previous != null) {
String[] parts = previous.split(",");
if (id.equals(parts[0])) {
if (oldObject == null && parts.length == 1) {
known = true;
} else if (oldObject != null
&& parts.length == 2
&& oldObject.getMetadata().getResourceVersion().equals(parts[1])) {
known = true;
}
}
}
return known;
}

private void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
Expand Down Expand Up @@ -333,22 +302,6 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
&& (genericFilter == null || genericFilter.accept(resource));
}

/**
* Add an annotation to the resource so that the subsequent will be omitted
*
* @param resourceVersion null if there is no prior version
* @param target mutable resource that will be returned
*/
public R addPreviousAnnotation(String resourceVersion, R target) {
target
.getMetadata()
.getAnnotations()
.put(
PREVIOUS_ANNOTATION_KEY,
id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse(""));
return target;
}

private enum Operation {
ADD,
UPDATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,14 @@ public abstract class ManagedInformerEventSource<

private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
private InformerManager<R, C> cache;
private final boolean parseResourceVersions;
private ControllerConfiguration<R> controllerConfiguration;
private final C configuration;
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
protected TemporaryResourceCache<R> temporaryResourceCache;
protected MixedOperation client;

protected ManagedInformerEventSource(
String name, MixedOperation client, C configuration, boolean parseResourceVersions) {
protected ManagedInformerEventSource(String name, MixedOperation client, C configuration) {
super(configuration.getResourceClass(), name);
this.parseResourceVersions = parseResourceVersions;
this.client = client;
this.configuration = configuration;
}
Expand Down Expand Up @@ -102,7 +99,7 @@ public synchronized void start() {
if (isRunning()) {
return;
}
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
temporaryResourceCache = new TemporaryResourceCache<>(this);
this.cache = new InformerManager<>(client, configuration, this);
cache.setControllerConfiguration(controllerConfiguration);
cache.addIndexers(indexers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand Down Expand Up @@ -94,13 +93,9 @@ void clean() {
// keep up to the last million deletions for up to 10 minutes
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;

public TemporaryResourceCache(
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
boolean parseResourceVersions) {
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
this.managedInformerEventSource = managedInformerEventSource;
this.parseResourceVersions = parseResourceVersions;
}

public synchronized void onDeleteEvent(T resource, boolean unknownState) {
Expand Down Expand Up @@ -164,20 +159,12 @@ public synchronized void putResource(T newResource, String previousResourceVersi
}
}

/**
* @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()}
* is enabled and the resourceVersion of newResource is numerically greater than
* cachedResource, otherwise false
*/
public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
try {
if (parseResourceVersions
&& Long.parseLong(newResource.getMetadata().getResourceVersion())
> Long.parseLong(cachedResource.getMetadata().getResourceVersion())) {
return true;
}
return Long.parseLong(newResource.getMetadata().getResourceVersion())
> Long.parseLong(cachedResource.getMetadata().getResourceVersion());
} catch (NumberFormatException e) {
log.debug(
log.warn(
"Could not compare resourceVersions {} and {} for {}",
newResource.getMetadata().getResourceVersion(),
cachedResource.getMetadata().getResourceVersion(),
Expand Down
Loading
Loading