diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 5667d88852..db57a55382 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -170,6 +170,12 @@ Integer The port the health probe will use to expose the status. + +
kubernetes.operator.ingress.manage
+ true + Boolean + Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created. +
kubernetes.operator.jm-deployment-recovery.enabled
true diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html b/docs/layouts/shortcodes/generated/system_advanced_section.html index 7752c15d5f..1c9bc3bd72 100644 --- a/docs/layouts/shortcodes/generated/system_advanced_section.html +++ b/docs/layouts/shortcodes/generated/system_advanced_section.html @@ -68,6 +68,12 @@ Integer The port the health probe will use to expose the status. + +
kubernetes.operator.ingress.manage
+ true + Boolean + Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created. +
kubernetes.operator.label.selector
(none) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index e2cca03988..3e21ab456d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -199,7 +199,8 @@ void registerDeploymentController() { observerFactory, statusRecorder, eventRecorder, - canaryResourceManager); + canaryResourceManager, + configManager); registeredControllers.add(operator.register(controller, this::overrideControllerConfigs)); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index f65385ecc4..97068722aa 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -79,6 +79,7 @@ public class FlinkOperatorConfiguration { Duration slowRequestThreshold; int reportedExceptionEventsMaxCount; int reportedExceptionEventsMaxStackTraceLength; + boolean manageIngress; public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { Duration reconcileInterval = @@ -203,6 +204,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato operatorConfig.get( KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES); + boolean manageIngress = + operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS); + return new FlinkOperatorConfiguration( reconcileInterval, reconcilerMaxParallelism, @@ -234,7 +238,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato snapshotResourcesEnabled, slowRequestThreshold, reportedExceptionEventsMaxCount, - reportedExceptionEventsMaxStackTraceLength); + reportedExceptionEventsMaxStackTraceLength, + manageIngress); } private static GenericRetry getRetryConfig(Configuration conf) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 38108bd168..4f755372a1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -679,4 +679,12 @@ public static String operatorConfigKey(String key) { .defaultValue(10) .withDescription( "Maximum number of exception-related Kubernetes events emitted per reconciliation cycle."); + + @Documentation.Section(SECTION_ADVANCED) + public static final ConfigOption OPERATOR_MANAGE_INGRESS = + operatorConfig("ingress.manage") + .booleanType() + .defaultValue(true) + .withDescription( + "Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created."); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 165c313dc1..32a3109d12 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException; @@ -69,6 +70,7 @@ public class FlinkDeploymentController private final StatusRecorder statusRecorder; private final EventRecorder eventRecorder; private final CanaryResourceManager canaryResourceManager; + private final FlinkConfigManager flinkConfigManager; public FlinkDeploymentController( Set validators, @@ -77,7 +79,8 @@ public FlinkDeploymentController( FlinkDeploymentObserverFactory observerFactory, StatusRecorder statusRecorder, EventRecorder eventRecorder, - CanaryResourceManager canaryResourceManager) { + CanaryResourceManager canaryResourceManager, + FlinkConfigManager flinkConfigManager) { this.validators = validators; this.ctxFactory = ctxFactory; this.reconcilerFactory = reconcilerFactory; @@ -85,6 +88,7 @@ public FlinkDeploymentController( this.statusRecorder = statusRecorder; this.eventRecorder = eventRecorder; this.canaryResourceManager = canaryResourceManager; + this.flinkConfigManager = flinkConfigManager; } @Override @@ -184,7 +188,9 @@ public List> prepareEventSources( List> eventSources = new ArrayList<>(); eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context)); eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context)); - eventSources.add(EventSourceUtils.getIngressInformerEventSource(context)); + if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) { + eventSources.add(EventSourceUtils.getIngressInformerEventSource(context)); + } if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) { eventSources.add( EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context)); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index f2a1b71e60..180fb128c9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -190,7 +190,8 @@ public void deploy( status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING); status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); - IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient()); + IngressUtils.reconcileIngress( + ctx, spec, deployConfig, ctx.getKubernetesClient(), eventRecorder); } private void setJobIdIfNecessary( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 139a99935c..4c998fec16 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -106,7 +106,8 @@ public void deploy( setOwnerReference(cr, deployConfig); ctx.getFlinkService().submitSessionCluster(deployConfig); cr.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); - IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient()); + IngressUtils.reconcileIngress( + ctx, spec, deployConfig, ctx.getKubernetesClient(), eventRecorder); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java index 89036d0539..722270a1f6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java @@ -38,6 +38,7 @@ import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.NonDeletingOperation; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ import static org.apache.flink.kubernetes.operator.utils.EventSourceUtils.LABEL_COMPONENT_INGRESS; /** Ingress utilities. */ +@Slf4j public class IngressUtils { private static final Pattern NAME_PTN = @@ -68,13 +70,29 @@ public class IngressUtils { private static final String REST_SVC_NAME_SUFFIX = "-rest"; private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class); + public static final String INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET = + "Ingress management is turned off but ingress set in spec"; + public static final String INGRESS_MANAGEMENT = "IngressManagement"; public static void reconcileIngress( FlinkResourceContext ctx, FlinkDeploymentSpec spec, Configuration effectiveConfig, - KubernetesClient client) { + KubernetesClient client, + EventRecorder eventRecorder) { + if (!ctx.getOperatorConfig().isManageIngress()) { + if (spec.getIngress() != null) { + eventRecorder.triggerEvent( + ctx.getResource(), + EventRecorder.Type.Warning, + INGRESS_MANAGEMENT, + INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET, + EventRecorder.Component.Operator, + client); + } + return; + } var objectMeta = ctx.getResource().getMetadata(); if (spec.getIngress() != null) { HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 5eca939457..af04e0697b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -110,7 +110,8 @@ public TestingFlinkDeploymentController( new FlinkDeploymentObserverFactory(eventRecorder), statusRecorder, eventRecorder, - canaryResourceManager); + canaryResourceManager, + new FlinkConfigManager(Configuration.fromMap(Map.of()))); } @Override diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java index 78b6c3a13f..08124ced39 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.networking.v1.Ingress; import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS; @@ -38,9 +39,11 @@ import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -57,7 +60,20 @@ class IngressUtilsTest { private FlinkResourceContext createResourceContext(FlinkDeployment appCluster) { testingJosdkContext = new TestingJosdkContext<>(client); - return new FlinkDeploymentContext(appCluster, testingJosdkContext, null, null, null, null); + return new FlinkDeploymentContext( + appCluster, + testingJosdkContext, + null, + new FlinkConfigManager(Configuration.fromMap(new HashMap<>())), + null, + null); + } + + private FlinkResourceContext createResourceContext( + FlinkDeployment appCluster, FlinkConfigManager configManager) { + testingJosdkContext = new TestingJosdkContext<>(client); + return new FlinkDeploymentContext( + appCluster, testingJosdkContext, null, configManager, null, null); } @Test @@ -69,7 +85,7 @@ void testIngress() { // no ingress when ingressDomain is empty IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { assertNull( client.network() @@ -93,7 +109,7 @@ void testIngress() { builder.template("{{name}}.{{namespace}}.example.com"); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); Ingress ingress = null; io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress ingressV1beta1 = null; if (IngressUtils.ingressInNetworkingV1(client)) { @@ -142,7 +158,7 @@ void testIngress() { builder.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/$2")); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -199,7 +215,7 @@ void testIngress() { builder.className("nginx"); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -283,7 +299,7 @@ public void testIngressTls() { builder.tls(new ArrayList<>()); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); Ingress ingress = null; io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress ingressV1beta1 = null; if (IngressUtils.ingressInNetworkingV1(client)) { @@ -320,7 +336,7 @@ public void testIngressTls() { builder.tls(List.of(ingressTlsSpecSecretOnly)); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -358,7 +374,7 @@ public void testIngressTls() { builder.tls(List.of(ingressTlsSpecHostsOnly)); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -396,7 +412,7 @@ public void testIngressTls() { builder.tls(List.of(ingressTlsSpecSingleTLSWithHost)); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -438,7 +454,7 @@ public void testIngressTls() { builder.tls(List.of(ingressTlsSpecSingleTLSWithHosts)); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -485,7 +501,7 @@ public void testIngressTls() { List.of(ingressTlsSpecMultipleTLSWithHosts1, ingressTlsSpecMultipleTLSWithHosts2)); appCluster.getSpec().setIngress(builder.build()); IngressUtils.reconcileIngress( - createResourceContext(appCluster), appCluster.getSpec(), config, client); + createResourceContext(appCluster), appCluster.getSpec(), config, client, null); if (IngressUtils.ingressInNetworkingV1(client)) { ingress = client.network() @@ -549,7 +565,39 @@ void testDeletesIngress() { io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class, List.of(ingress))); - IngressUtils.reconcileIngress(context, appCluster.getSpec(), null, client); + IngressUtils.reconcileIngress(context, appCluster.getSpec(), null, client, null); + + var ingressV1beta1 = + client.network() + .v1beta1() + .ingresses() + .inNamespace(appCluster.getMetadata().getNamespace()) + .withName(appCluster.getMetadata().getName()) + .get(); + assertThat(ingressV1beta1).isNull(); + } + + @Test + void skipIngressReconciliationIfFeatureFlagOff() { + List events = new ArrayList<>(); + EventRecorder eventRecorder = + new EventRecorder((a, event) -> events.add(event), (a, b) -> {}); + FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); + FlinkConfigManager manager = + new FlinkConfigManager( + Configuration.fromMap(Map.of(OPERATOR_MANAGE_INGRESS.key(), "false"))); + var context = createResourceContext(appCluster, manager); + context.getOperatorConfig(); + Configuration config = + new FlinkConfigManager(new Configuration()) + .getDeployConfig(appCluster.getMetadata(), appCluster.getSpec()); + + IngressSpec.IngressSpecBuilder builder = IngressSpec.builder(); + builder.template("{{name}}.{{namespace}}.example.com"); + builder.tls(new ArrayList<>()); + appCluster.getSpec().setIngress(builder.build()); + + IngressUtils.reconcileIngress(context, appCluster.getSpec(), config, client, eventRecorder); var ingressV1beta1 = client.network() @@ -559,5 +607,6 @@ void testDeletesIngress() { .withName(appCluster.getMetadata().getName()) .get(); assertThat(ingressV1beta1).isNull(); + assertThat(events).hasSize(1); } }