Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@
<td>Integer</td>
<td>The port the health probe will use to expose the status.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.ingress.manage</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.jm-deployment-recovery.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<td>Integer</td>
<td>The port the health probe will use to expose the status.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.ingress.manage</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.label.selector</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ void registerDeploymentController() {
observerFactory,
statusRecorder,
eventRecorder,
canaryResourceManager);
canaryResourceManager,
configManager);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class FlinkOperatorConfiguration {
Duration slowRequestThreshold;
int reportedExceptionEventsMaxCount;
int reportedExceptionEventsMaxStackTraceLength;
boolean manageIngress;

public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
Expand Down Expand Up @@ -203,6 +204,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);

boolean manageIngress =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);

return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
Expand Down Expand Up @@ -234,7 +238,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
snapshotResourcesEnabled,
slowRequestThreshold,
reportedExceptionEventsMaxCount,
reportedExceptionEventsMaxStackTraceLength);
reportedExceptionEventsMaxStackTraceLength,
manageIngress);
}

private static GenericRetry getRetryConfig(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,4 +679,12 @@ public static String operatorConfigKey(String key) {
.defaultValue(10)
.withDescription(
"Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Boolean> OPERATOR_MANAGE_INGRESS =
operatorConfig("ingress.manage")
.booleanType()
.defaultValue(true)
.withDescription(
"Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class FlinkDeploymentController
private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
private final EventRecorder eventRecorder;
private final CanaryResourceManager<FlinkDeployment> canaryResourceManager;
private final FlinkConfigManager flinkConfigManager;

public FlinkDeploymentController(
Set<FlinkResourceValidator> validators,
Expand All @@ -77,14 +79,16 @@ public FlinkDeploymentController(
FlinkDeploymentObserverFactory observerFactory,
StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
EventRecorder eventRecorder,
CanaryResourceManager<FlinkDeployment> canaryResourceManager) {
CanaryResourceManager<FlinkDeployment> canaryResourceManager,
FlinkConfigManager flinkConfigManager) {
this.validators = validators;
this.ctxFactory = ctxFactory;
this.reconcilerFactory = reconcilerFactory;
this.observerFactory = observerFactory;
this.statusRecorder = statusRecorder;
this.eventRecorder = eventRecorder;
this.canaryResourceManager = canaryResourceManager;
this.flinkConfigManager = flinkConfigManager;
}

@Override
Expand Down Expand Up @@ -184,7 +188,9 @@ public List<EventSource<?, FlinkDeployment>> prepareEventSources(
List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
}
if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
eventSources.add(
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void deploy(
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);

IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient());
IngressUtils.reconcileIngress(
ctx, spec, deployConfig, ctx.getKubernetesClient(), eventRecorder);
}

private void setJobIdIfNecessary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public void deploy(
setOwnerReference(cr, deployConfig);
ctx.getFlinkService().submitSessionCluster(deployConfig);
cr.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient());
IngressUtils.reconcileIngress(
ctx, spec, deployConfig, ctx.getKubernetesClient(), eventRecorder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@
import static org.apache.flink.kubernetes.operator.utils.EventSourceUtils.LABEL_COMPONENT_INGRESS;

/** Ingress utilities. */
@Slf4j
public class IngressUtils {

private static final Pattern NAME_PTN =
Expand All @@ -68,13 +70,29 @@ public class IngressUtils {
private static final String REST_SVC_NAME_SUFFIX = "-rest";

private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class);
public static final String INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET =
"Ingress management is turned off but ingress set in spec";
public static final String INGRESS_MANAGEMENT = "IngressManagement";

public static void reconcileIngress(
FlinkResourceContext<?> ctx,
FlinkDeploymentSpec spec,
Configuration effectiveConfig,
KubernetesClient client) {
KubernetesClient client,
EventRecorder eventRecorder) {
if (!ctx.getOperatorConfig().isManageIngress()) {
if (spec.getIngress() != null) {
Comment thread
gyfora marked this conversation as resolved.
eventRecorder.triggerEvent(
ctx.getResource(),
EventRecorder.Type.Warning,
INGRESS_MANAGEMENT,
INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET,
EventRecorder.Component.Operator,
client);
}

return;
}
var objectMeta = ctx.getResource().getMetadata();
if (spec.getIngress() != null) {
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public TestingFlinkDeploymentController(
new FlinkDeploymentObserverFactory(eventRecorder),
statusRecorder,
eventRecorder,
canaryResourceManager);
canaryResourceManager,
new FlinkConfigManager(Configuration.fromMap(Map.of())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS;
Expand All @@ -38,9 +39,11 @@

import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand All @@ -57,7 +60,20 @@ class IngressUtilsTest {

private FlinkResourceContext<?> createResourceContext(FlinkDeployment appCluster) {
testingJosdkContext = new TestingJosdkContext<>(client);
return new FlinkDeploymentContext(appCluster, testingJosdkContext, null, null, null, null);
return new FlinkDeploymentContext(
appCluster,
testingJosdkContext,
null,
new FlinkConfigManager(Configuration.fromMap(new HashMap<>())),
null,
null);
}

private FlinkResourceContext<?> createResourceContext(
FlinkDeployment appCluster, FlinkConfigManager configManager) {
testingJosdkContext = new TestingJosdkContext<>(client);
return new FlinkDeploymentContext(
appCluster, testingJosdkContext, null, configManager, null, null);
}

@Test
Expand All @@ -69,7 +85,7 @@ void testIngress() {

// no ingress when ingressDomain is empty
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
assertNull(
client.network()
Expand All @@ -93,7 +109,7 @@ void testIngress() {
builder.template("{{name}}.{{namespace}}.example.com");
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
Ingress ingress = null;
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress ingressV1beta1 = null;
if (IngressUtils.ingressInNetworkingV1(client)) {
Expand Down Expand Up @@ -142,7 +158,7 @@ void testIngress() {
builder.annotations(Map.of("nginx.ingress.kubernetes.io/rewrite-target", "/$2"));
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -199,7 +215,7 @@ void testIngress() {
builder.className("nginx");
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -283,7 +299,7 @@ public void testIngressTls() {
builder.tls(new ArrayList<>());
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
Ingress ingress = null;
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress ingressV1beta1 = null;
if (IngressUtils.ingressInNetworkingV1(client)) {
Expand Down Expand Up @@ -320,7 +336,7 @@ public void testIngressTls() {
builder.tls(List.of(ingressTlsSpecSecretOnly));
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -358,7 +374,7 @@ public void testIngressTls() {
builder.tls(List.of(ingressTlsSpecHostsOnly));
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -396,7 +412,7 @@ public void testIngressTls() {
builder.tls(List.of(ingressTlsSpecSingleTLSWithHost));
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -438,7 +454,7 @@ public void testIngressTls() {
builder.tls(List.of(ingressTlsSpecSingleTLSWithHosts));
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -485,7 +501,7 @@ public void testIngressTls() {
List.of(ingressTlsSpecMultipleTLSWithHosts1, ingressTlsSpecMultipleTLSWithHosts2));
appCluster.getSpec().setIngress(builder.build());
IngressUtils.reconcileIngress(
createResourceContext(appCluster), appCluster.getSpec(), config, client);
createResourceContext(appCluster), appCluster.getSpec(), config, client, null);
if (IngressUtils.ingressInNetworkingV1(client)) {
ingress =
client.network()
Expand Down Expand Up @@ -549,7 +565,39 @@ void testDeletesIngress() {
io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class,
List.of(ingress)));

IngressUtils.reconcileIngress(context, appCluster.getSpec(), null, client);
IngressUtils.reconcileIngress(context, appCluster.getSpec(), null, client, null);

var ingressV1beta1 =
client.network()
.v1beta1()
.ingresses()
.inNamespace(appCluster.getMetadata().getNamespace())
.withName(appCluster.getMetadata().getName())
.get();
assertThat(ingressV1beta1).isNull();
}

@Test
void skipIngressReconciliationIfFeatureFlagOff() {
List<Event> events = new ArrayList<>();
EventRecorder eventRecorder =
new EventRecorder((a, event) -> events.add(event), (a, b) -> {});
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
FlinkConfigManager manager =
new FlinkConfigManager(
Configuration.fromMap(Map.of(OPERATOR_MANAGE_INGRESS.key(), "false")));
var context = createResourceContext(appCluster, manager);
context.getOperatorConfig();
Configuration config =
new FlinkConfigManager(new Configuration())
.getDeployConfig(appCluster.getMetadata(), appCluster.getSpec());

IngressSpec.IngressSpecBuilder builder = IngressSpec.builder();
builder.template("{{name}}.{{namespace}}.example.com");
builder.tls(new ArrayList<>());
appCluster.getSpec().setIngress(builder.build());

IngressUtils.reconcileIngress(context, appCluster.getSpec(), config, client, eventRecorder);

var ingressV1beta1 =
client.network()
Expand All @@ -559,5 +607,6 @@ void testDeletesIngress() {
.withName(appCluster.getMetadata().getName())
.get();
assertThat(ingressV1beta1).isNull();
assertThat(events).hasSize(1);
}
}