diff --git a/Dockerfile b/Dockerfile index c80d0686b5..f7cfe972d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ WORKDIR /app COPY . . -RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS -Dfabric8.httpclient.impl="$HTTP_CLIENT" +RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-autoscaler,flink-kubernetes-webhook,flink-kubernetes-operator-bluegreen-agent -DskipTests=$SKIP_TESTS -Dfabric8.httpclient.impl="$HTTP_CLIENT" RUN cd /app/tools/license; mkdir jars; cd jars; \ cp /app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . && \ @@ -42,9 +42,12 @@ ENV OPERATOR_VERSION=1.15-SNAPSHOT ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar ENV KUBERNETES_STANDALONE_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION.jar +ENV AGENT_JAR=flink-kubernetes-operator-bluegreen-agent-$OPERATOR_VERSION.jar +ENV ARTIFACTS_DIR=$FLINK_HOME/artifacts ENV OPERATOR_LIB=$FLINK_HOME/operator-lib RUN mkdir -p $OPERATOR_LIB +RUN mkdir -p $ARTIFACTS_DIR WORKDIR /flink-kubernetes-operator RUN groupadd --system --gid=9999 flink && \ @@ -60,6 +63,7 @@ COPY --chown=flink:flink --from=build /app/tools/license/licenses-output/NOTICE COPY --chown=flink:flink --from=build /app/tools/license/licenses-output/licenses ./licenses COPY --chown=flink:flink --from=build /app/LICENSE ./LICENSE COPY --chown=flink:flink docker-entrypoint.sh / +COPY --chown=flink:flink --from=build /app/flink-kubernetes-operator-bluegreen-agent/target/$AGENT_JAR $ARTIFACTS_DIR/bluegreen-agent.jar ARG SKIP_OS_UPDATE=true diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateContext.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateContext.java new file mode 100644 index 0000000000..d67ac63c1b --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateContext.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.bluegreen; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.ACTIVE_DEPLOYMENT_TYPE; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.DEPLOYMENT_DELETION_DELAY; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.IS_FIRST_DEPLOYMENT; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.TRANSITION_STAGE; + +/** Base functionality of the Context used for Gate implementations. */ +@Data +@AllArgsConstructor +public class GateContext implements Serializable { + + /** GateContext enum. */ + private final BlueGreenDeploymentType activeBlueGreenDeploymentType; + + private final GateOutputMode outputMode; + private final int deploymentTeardownDelayMs; + private final TransitionStage gateStage; + private final boolean isFirstDeployment; + + public static GateContext create( + Map data, BlueGreenDeploymentType currentBlueGreenDeploymentType) { + var nextActiveDeploymentType = + BlueGreenDeploymentType.valueOf(data.get(ACTIVE_DEPLOYMENT_TYPE.getLabel())); + + var deploymentDeletionDelaySec = + Integer.parseInt(data.get(DEPLOYMENT_DELETION_DELAY.getLabel())); + + var outputMode = + currentBlueGreenDeploymentType == nextActiveDeploymentType + ? GateOutputMode.ACTIVE + : GateOutputMode.STANDBY; + + var isFirstDeployment = Boolean.parseBoolean(data.get(IS_FIRST_DEPLOYMENT.getLabel())); + + return new GateContext( + nextActiveDeploymentType, + outputMode, + deploymentDeletionDelaySec, + TransitionStage.valueOf(data.get(TRANSITION_STAGE.getLabel())), + isFirstDeployment); + } +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateContextOptions.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateContextOptions.java new file mode 100644 index 0000000000..07112012df --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateContextOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.bluegreen; + +import lombok.Getter; + +/** Options values for the GateContext. */ +public enum GateContextOptions { + IS_FIRST_DEPLOYMENT("is-first-deployment"), + ACTIVE_DEPLOYMENT_TYPE("active-deployment-type"), + DEPLOYMENT_DELETION_DELAY("deployment-deletion-delay-ms"), + TRANSITION_STAGE("stage"); + + @Getter private final String label; + + private GateContextOptions(String label) { + this.label = label; + } +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateKubernetesService.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateKubernetesService.java new file mode 100644 index 0000000000..f60cd2d33c --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateKubernetesService.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.bluegreen; + +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; + +/** Simple Kubernetes service proxy for Gate operations. */ +public class GateKubernetesService implements Serializable { + + private static final Logger logger = LoggerFactory.getLogger(GateKubernetesService.class); + + @Getter private final KubernetesClient kubernetesClient; + + private final String namespace; + private final String configMapName; + + public GateKubernetesService(String namespace, String configMapName) { + Preconditions.checkNotNull(namespace); + Preconditions.checkNotNull(configMapName); + + try { + kubernetesClient = new KubernetesClientBuilder().build(); + } catch (Exception e) { + logger.error("Error instantiating Kubernetes Client", e); + throw e; + } + + this.namespace = namespace; + this.configMapName = configMapName; + } + + public void setInformers(ResourceEventHandler resourceEventHandler) { + kubernetesClient + .configMaps() + .inNamespace(namespace) + .withName(configMapName) + .inform(resourceEventHandler, 0); + } + + public void updateConfigMapEntries(Map kvps) { + var configMap = parseConfigMap(); + + kvps.forEach((key, value) -> configMap.getData().put(key, value)); + + try { + kubernetesClient.configMaps().inNamespace(namespace).resource(configMap).update(); + } catch (Exception e) { + logger.error("Failed to UPDATE the ConfigMap", e); + throw e; + } + } + + public ConfigMap parseConfigMap() { + try { + return kubernetesClient + .configMaps() + .inNamespace(namespace) + .withName(configMapName) + .get(); + } catch (Exception e) { + logger.error("Failed to GET the ConfigMap", e); + throw e; + } + } +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateOutputMode.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateOutputMode.java new file mode 100644 index 0000000000..6efab11a2b --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/GateOutputMode.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.bluegreen; + +/** Gate output enum values. */ +public enum GateOutputMode { + ACTIVE, + STANDBY +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionMode.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionMode.java new file mode 100644 index 0000000000..15f9285dc0 --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionMode.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.bluegreen; + +/** Possible transition modes supported by the `FlinkBlueGreenDeploymentController`. */ +public enum TransitionMode { + /** + * FLIP-503: simple transition that deletes the previous deployment as soon as the new one is + * RUNNING/STABLE. + */ + BASIC, + + /** + * FLIP-504: advanced coordination between deployment stages during transition. Not supported + * until FLIP-504 is implemented. + */ + ADVANCED; +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionStage.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionStage.java new file mode 100644 index 0000000000..acd95ce84a --- /dev/null +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/TransitionStage.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.bluegreen; + +/** Enumeration of the various stages for _ALL_ Blue/Green deployments. */ +public enum TransitionStage { + CLEAR_TO_TEARDOWN, + FAILING, + INITIALIZING, + RUNNING, + TRANSITIONING; +} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java index a515b0a91b..870014541a 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.api.spec; import org.apache.flink.annotation.Experimental; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @@ -42,5 +43,7 @@ public class FlinkBlueGreenDeploymentSpec { @Nullable private IngressSpec ingress; + private TransitionMode transitionMode; + private FlinkDeploymentTemplateSpec template; } diff --git a/flink-kubernetes-operator-bluegreen-agent/pom.xml b/flink-kubernetes-operator-bluegreen-agent/pom.xml new file mode 100644 index 0000000000..6724bab96b --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-agent/pom.xml @@ -0,0 +1,92 @@ + + + 4.0.0 + + + org.apache.flink + flink-kubernetes-operator-parent + 1.15-SNAPSHOT + .. + + + flink-kubernetes-operator-bluegreen-agent + Flink Kubernetes BlueGreen Deployment Agent + + + 1.14.17 + + + + + + net.bytebuddy + byte-buddy + ${bytebuddy.version} + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-core + ${flink.version} + provided + + + org.junit.jupiter + junit-jupiter + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + shade + + + + + org.apache.flink.kubernetes.operator.bluegreen.agent.GateInjectionAgent + true + true + + + + + + + + + + + diff --git a/flink-kubernetes-operator-bluegreen-agent/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionAgent.java b/flink-kubernetes-operator-bluegreen-agent/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionAgent.java new file mode 100644 index 0000000000..3c2eeaf0d2 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-agent/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionAgent.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.agent; + +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.implementation.MethodDelegation; + +import java.io.File; +import java.lang.instrument.Instrumentation; +import java.util.jar.JarFile; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +/** + * Java agent that transparently injects BlueGreen gate operators into every Flink Application-mode + * job, without requiring any user code changes. + * + *

Intercept point: {@code StreamExecutionEnvironment.execute(StreamGraph)}. This is a stable + * public API method present from Flink 1.14 through 2.x. It is called regardless of whether the + * user invokes {@code env.execute()}, {@code env.execute(String)}, or {@code + * env.execute(StreamGraph)} directly. + * + *

In production the agent JAR is injected into the JobManager pod by the BlueGreen controller + * via an init container — users do not need to modify their Dockerfile or set {@code + * env.java.opts.jobmanager} manually. + */ +public class GateInjectionAgent { + + public static void premain(String agentArgs, Instrumentation inst) throws Exception { + // The instrumented StreamExecutionEnvironment bytecode will reference + // GateInjectionInterceptor by name. Make the agent JAR visible to the + // system classloader so that reference resolves at runtime. + String jarPath = + GateInjectionAgent.class + .getProtectionDomain() + .getCodeSource() + .getLocation() + .getPath(); + inst.appendToSystemClassLoaderSearch(new JarFile(new File(jarPath))); + + System.err.println("[BlueGreen Agent] premain started, registering transformer"); + + new AgentBuilder.Default() + // Instrument both the base class and its Application-mode subclass. + // + // StreamContextEnvironment (flink-clients) overrides execute(StreamGraph) + // and is the concrete type used in Application mode and client-side execution. + // StreamExecutionEnvironment.execute(StreamGraph) is never reached via virtual + // dispatch when the runtime type is StreamContextEnvironment, so we must + // intercept the override directly. + // + // StreamExecutionEnvironment is kept as a fallback for local/test execution + // where LocalStreamEnvironment does NOT override execute(StreamGraph). + .type( + named( + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment") + .or( + named( + "org.apache.flink.client.program.StreamContextEnvironment"))) + .transform( + (builder, typeDescription, classLoader, module, protectionDomain) -> + builder.method( + named("execute") + .and(takesArguments(1)) + .and( + takesArgument( + 0, + named( + "org.apache.flink.streaming.api.graph.StreamGraph")))) + .intercept( + MethodDelegation.to( + GateInjectionInterceptor.class))) + .installOn(inst); + + System.err.println( + "[BlueGreen Agent] transformer registered for StreamExecutionEnvironment + StreamContextEnvironment"); + } +} diff --git a/flink-kubernetes-operator-bluegreen-agent/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionInterceptor.java b/flink-kubernetes-operator-bluegreen-agent/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionInterceptor.java new file mode 100644 index 0000000000..1bc222ca70 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-agent/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionInterceptor.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.agent; + +import org.apache.flink.configuration.Configuration; + +import net.bytebuddy.implementation.bind.annotation.Argument; +import net.bytebuddy.implementation.bind.annotation.RuntimeType; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.implementation.bind.annotation.This; + +import java.lang.reflect.Method; +import java.util.concurrent.Callable; + +/** + * Byte Buddy interceptor invoked in place of {@code + * StreamExecutionEnvironment.execute(StreamGraph)}. + * + *

When {@code bluegreen.gate.injection.enabled=true}: + * + *

    + *
  1. Loads {@code GateInjectorExecutor} from the user code classloader (thread context + * CL at the time execute() is called). This ensures that the injected {@code + * WatermarkGateProcessFunction} instances are created by the user classloader and can be + * deserialized on TaskManagers from the user JAR. + *
  2. Calls {@code GateInjectorExecutor.injectGates(StreamGraph, Configuration)} reflectively, + * mutating the graph in-place before the original execute() runs. + *
+ * + *

Failure contract: + * + *

    + *
  • If the agent cannot read the Flink configuration (unexpected agent bug), injection is + * skipped and the job continues — agent errors must never affect non-BlueGreen jobs. + *
  • If {@code bluegreen.gate.injection.enabled=true} and {@code GateInjectorExecutor} is not + * found, or its {@code injectGates} method is missing, the job fails with a clear + * message. This flag is only set for BlueGreen deployments, so a missing class means the + * bluegreen-client library is absent from the job JAR — a real misconfiguration. + *
+ * + *

Classloader safety: + * + *

    + *
  • {@code StreamGraph} and {@code Configuration} are in {@code flink-core} / {@code + * flink-streaming-java} — loaded by the system classloader. The user classloader delegates to + * the parent for those packages (no bundling), so both sides of the reflective call see the + * exact same {@code Class} objects. ✓ + *
+ */ +public class GateInjectionInterceptor { + + private static final String GATE_INJECTION_ENABLED = "bluegreen.gate.injection.enabled"; + private static final String INJECTOR_CLASS = + "org.apache.flink.kubernetes.operator.bluegreen.client.GateInjectorExecutor"; + + @RuntimeType + public static Object intercept( + @This Object envInstance, + @Argument(0) Object streamGraph, + @SuperCall Callable superCall) + throws Exception { + + // Read config from the live environment instance rather than re-loading from disk. + // GlobalConfiguration.loadConfiguration() reads flink-conf.yaml but misses keys + // injected at container start via FLINK_PROPERTIES (e.g. kubernetes.namespace) when + // the config volume is read-only and the entrypoint script cannot write them. + // Guard this read: if it fails for an unexpected reason (agent bug), skip injection + // rather than affecting the job. + Configuration config = null; + try { + config = + (Configuration) + envInstance + .getClass() + .getMethod("getConfiguration") + .invoke(envInstance); + } catch (Exception e) { + Throwable cause = (e.getCause() != null) ? e.getCause() : e; + System.err.println( + "[BlueGreen Agent] Could not read Flink configuration, skipping gate injection: " + + cause); + } + + if (config != null && config.getBoolean(GATE_INJECTION_ENABLED, false)) { + // bluegreen.gate.injection.enabled is only set by the operator for BlueGreen + // deployments. Any failure beyond this point is a real misconfiguration — fail the + // job with a clear message so the operator can reflect the error in the deployment + // status. + ClassLoader userCL = Thread.currentThread().getContextClassLoader(); + + Class injectorClass; + try { + injectorClass = userCL.loadClass(INJECTOR_CLASS); + } catch (ClassNotFoundException e) { + String msg = + "[BlueGreen Agent] Gate injection is enabled but '" + + INJECTOR_CLASS + + "' was not found on the job classpath. " + + "Ensure the bluegreen-client library is included in your job JAR."; + System.err.println(msg); + throw new RuntimeException(msg, e); + } + + // Locate injectGates by name to avoid any cross-CL type token issues. + Method injectGates = null; + for (Method m : injectorClass.getMethods()) { + if ("injectGates".equals(m.getName()) && m.getParameterCount() == 2) { + injectGates = m; + break; + } + } + + if (injectGates == null) { + String msg = + "[BlueGreen Agent] Gate injection is enabled but 'injectGates(StreamGraph, Configuration)' " + + "was not found on '" + + INJECTOR_CLASS + + "'. Verify the bluegreen-client version is compatible."; + System.err.println(msg); + throw new RuntimeException(msg); + } + + injectGates.invoke(null, streamGraph, config); + System.err.println("[BlueGreen Agent] Gate injection applied to StreamGraph"); + } + + return superCall.call(); + } +} diff --git a/flink-kubernetes-operator-bluegreen-agent/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionInterceptorTest.java b/flink-kubernetes-operator-bluegreen-agent/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionInterceptorTest.java new file mode 100644 index 0000000000..84dc3da1b3 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-agent/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/agent/GateInjectionInterceptorTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.agent; + +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link GateInjectionInterceptor}. */ +public class GateInjectionInterceptorTest { + + @BeforeEach + public void resetStub() { + org.apache.flink.kubernetes.operator.bluegreen.client.GateInjectorExecutor.injected = false; + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + } + + @Test + public void testIntercept_injectionDisabled_superCallInvoked() throws Exception { + Configuration config = new Configuration(); + // bluegreen.gate.injection.enabled not set — defaults to false + AtomicBoolean superCallInvoked = new AtomicBoolean(false); + + GateInjectionInterceptor.intercept( + new MockEnv(config), new Object(), superCallFlagWith(superCallInvoked)); + + assertTrue(superCallInvoked.get()); + assertFalse( + org.apache.flink.kubernetes.operator.bluegreen.client.GateInjectorExecutor + .injected); + } + + @Test + public void testIntercept_injectionEnabled_injectorFound_injectGatesCalled() throws Exception { + Configuration config = new Configuration(); + config.setString("bluegreen.gate.injection.enabled", "true"); + AtomicBoolean superCallInvoked = new AtomicBoolean(false); + + // Test classloader has GateInjectorExecutor on the path — set in @BeforeEach + GateInjectionInterceptor.intercept( + new MockEnv(config), new Object(), superCallFlagWith(superCallInvoked)); + + assertTrue(superCallInvoked.get(), "superCall must always be invoked"); + assertTrue( + org.apache.flink.kubernetes.operator.bluegreen.client.GateInjectorExecutor.injected, + "injectGates should have been called"); + } + + @Test + public void testIntercept_injectionEnabled_classNotFound_jobFails() { + Configuration config = new Configuration(); + config.setString("bluegreen.gate.injection.enabled", "true"); + + // Swap to a classloader that does not have GateInjectorExecutor + Thread.currentThread() + .setContextClassLoader( + new ClassLoader(null) { + @Override + public Class loadClass(String name) throws ClassNotFoundException { + throw new ClassNotFoundException(name); + } + }); + + assertThrows( + RuntimeException.class, + () -> + GateInjectionInterceptor.intercept( + new MockEnv(config), new Object(), () -> null), + "Missing bluegreen-client library must fail the job"); + } + + @Test + public void testIntercept_exceptionDuringConfigRead_superCallStillInvoked() throws Exception { + // An envInstance whose getConfiguration() throws — agent must never affect the job + Object badEnv = + new Object() { + @SuppressWarnings("unused") + public Configuration getConfiguration() { + throw new RuntimeException("simulated config failure"); + } + }; + AtomicBoolean superCallInvoked = new AtomicBoolean(false); + + GateInjectionInterceptor.intercept( + badEnv, new Object(), superCallFlagWith(superCallInvoked)); + + assertTrue( + superCallInvoked.get(), "superCall must still be invoked when config read fails"); + } + + // ==================== Helpers ==================== + + private static Callable superCallFlagWith(AtomicBoolean flag) { + return () -> { + flag.set(true); + return null; + }; + } + + /** + * Minimal environment stub with a {@code getConfiguration()} method the agent can reflect on. + */ + private static class MockEnv { + private final Configuration config; + + MockEnv(Configuration config) { + this.config = config; + } + + @SuppressWarnings("unused") + public Configuration getConfiguration() { + return config; + } + } +} diff --git a/flink-kubernetes-operator-bluegreen-agent/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectorExecutor.java b/flink-kubernetes-operator-bluegreen-agent/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectorExecutor.java new file mode 100644 index 0000000000..0ea892adf6 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-agent/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectorExecutor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import org.apache.flink.configuration.Configuration; + +/** + * Test stub that matches the class name and method signature the agent looks for. Tracks whether + * injectGates was called so tests can assert on it. + */ +public class GateInjectorExecutor { + + public static boolean injected = false; + + public static void injectGates(Object streamGraph, Configuration config) { + injected = true; + } +} diff --git a/flink-kubernetes-operator-bluegreen-client/README.MD b/flink-kubernetes-operator-bluegreen-client/README.MD new file mode 100644 index 0000000000..81334de989 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/README.MD @@ -0,0 +1,22 @@ + + +# Flink Kubernetes Client Code Client + +TBD \ No newline at end of file diff --git a/flink-kubernetes-operator-bluegreen-client/pom.xml b/flink-kubernetes-operator-bluegreen-client/pom.xml new file mode 100644 index 0000000000..23acd377b5 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/pom.xml @@ -0,0 +1,133 @@ + + + 4.0.0 + + + org.apache.flink + flink-kubernetes-operator-parent + 1.15-SNAPSHOT + .. + + + flink-kubernetes-operator-bluegreen-client + Flink Kubernetes BlueGreen Deployment Client + + + + + --add-opens=java.base/java.util=ALL-UNNAMED + + + + + + org.apache.flink + flink-kubernetes-operator-api + ${project.version} + compile + + + + org.apache.flink + flink-streaming-java + ${flink.version} + compile + + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + + org.apache.flink + flink-table-runtime + ${flink.version} + true + + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.version} + test + + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + + + + org.apache.flink + flink-runtime + ${flink.version} + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-streaming-java + ${flink.version} + test-jar + test + + + org.apache.flink + flink-metrics-core + ${flink.version} + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.projectlombok + lombok + ${lombok.version} + + + + + + + + diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectionPosition.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectionPosition.java new file mode 100644 index 0000000000..1fdbcd7e64 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectionPosition.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +/** Controls where the BlueGreen gate operator is injected into the StreamGraph. */ +public enum GateInjectionPosition { + /** Inject immediately after the source node. Preferred for fan-out DAGs. */ + AFTER_SOURCE, + /** Inject immediately before the sink node. Default. */ + BEFORE_SINK +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectorExecutor.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectorExecutor.java new file mode 100644 index 0000000000..661ed0ccbf --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateInjectorExecutor.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.PipelineExecutor; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.util.InstantiationUtil; + +import lombok.AllArgsConstructor; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * {@link PipelineExecutor} decorator that transparently injects a BlueGreen gate operator into the + * {@link StreamGraph} before delegating to the wrapped executor. + * + *

Can be used programmatically (wrap your executor) or automatically via the {@code + * flink-kubernetes-operator-bluegreen-agent}, which intercepts {@code + * StreamExecutionEnvironment.execute(StreamGraph)} and calls {@link #injectGates} without any user + * code changes. + */ +@AllArgsConstructor +public class GateInjectorExecutor implements PipelineExecutor { + + private final PipelineExecutor delegate; + private final Configuration config; + private final Logger logger; + + @Override + public CompletableFuture execute( + Pipeline pipeline, Configuration config, ClassLoader classLoader) throws Exception { + + if (pipeline instanceof StreamGraph) { + injectGateOperators((StreamGraph) pipeline, config, classLoader); + } + return delegate.execute(pipeline, config, classLoader); + } + + /** + * Entry point for Option A (Application mode): call this from the user entry-point after + * building the StreamGraph and before env.execute(graph). + * + *

{@code
+     * StreamGraph graph = env.getStreamGraph("My Job");
+     * GateInjectorExecutor.injectGates(graph, flinkConfig);
+     * env.execute(graph);
+     * }
+ */ + public static void injectGates(StreamGraph graph, Configuration config) { + injectGateOperators(graph, config, Thread.currentThread().getContextClassLoader()); + } + + private static void injectGateOperators( + StreamGraph graph, Configuration config, ClassLoader cl) { + // These keys are injected by the BlueGreen controller when it creates the deployment. + // On the initial deployment (no transition in progress) or in non-BlueGreen clusters + // they will be absent — skip injection rather than crash. + String activeDeploymentType = config.getString("bluegreen.active-deployment-type", null); + String configMapName = config.getString("bluegreen.configmap.name", null); + if (activeDeploymentType == null || configMapName == null) { + System.err.println( + "[BlueGreen] bluegreen.active-deployment-type or bluegreen.configmap.name" + + " not found in config — skipping gate injection"); + return; + } + + GateInjectionPosition position = + GateInjectionPosition.valueOf( + config.getString( + "bluegreen.gate.injection.position", + GateInjectionPosition.BEFORE_SINK.name())); + + switch (position) { + case AFTER_SOURCE: + { + List sources = + graph.getStreamNodes().stream() + .filter(n -> n.getInEdges().isEmpty()) + .collect(Collectors.toList()); + + // Only single-source DAGs are supported. Multiple sources imply independent + // event-time domains; a single ConfigMap watermark W derived from one source + // is meaningless for another — no safe multi-source topology exists. + // TODO: consider supporting fan-in (N sources, 1 sink) with per-source gate + // coordination once the watermark aggregation design is finalised. + if (sources.size() != 1) { + throw new IllegalStateException( + "bluegreen.gate.injection.position=AFTER_SOURCE requires exactly 1 source, " + + "found " + + sources.size() + + ": " + + sources.stream() + .map(StreamNode::getOperatorName) + .collect(Collectors.joining(", "))); + } + + StreamNode source = sources.get(0); + List.copyOf(source.getOutEdges()) + .forEach( + edge -> { + StreamNode downstream = + graph.getStreamNode(edge.getTargetId()); + injectGate( + graph, + config, + cl, + edge, + source, + downstream, + "BlueGreen-Gate[" + source.getOperatorName() + "]"); + }); + break; + } + case BEFORE_SINK: + { + List sinks = + graph.getStreamNodes().stream() + .filter(n -> n.getOutEdges().isEmpty()) + .collect(Collectors.toList()); + + // Only single-sink DAGs are supported. Fan-out (1 source → N sinks) is safe in + // principle — all branches share the same event-time domain and W is consistent + // — but distinguishing it from independent source-per-sink chains requires a + // full reachability traversal. For now we keep the invariant simple: exactly 1 + // sink. For fan-out DAGs, prefer AFTER_SOURCE instead, which naturally places a + // single gate before all branches. + // TODO: consider adding reachability-based fan-out detection to lift this + // restriction. + if (sinks.size() != 1) { + throw new IllegalStateException( + "bluegreen.gate.injection.position=BEFORE_SINK requires exactly 1 sink, " + + "found " + + sinks.size() + + ": " + + sinks.stream() + .map(StreamNode::getOperatorName) + .collect(Collectors.joining(", ")) + + ". For fan-out DAGs use bluegreen.gate.injection.position=AFTER_SOURCE instead."); + } + + StreamNode sink = sinks.get(0); + List.copyOf(sink.getInEdges()) + .forEach( + edge -> { + StreamNode upstream = + graph.getStreamNode(edge.getSourceId()); + injectGate( + graph, + config, + cl, + edge, + upstream, + sink, + "BlueGreen-Gate[" + sink.getOperatorName() + "]"); + }); + break; + } + } + } + + private static void injectGate( + StreamGraph graph, + Configuration config, + ClassLoader cl, + StreamEdge edge, + StreamNode upstream, + StreamNode downstream, + String gateName) { + + int gateId = + graph.getStreamNodes().stream().mapToInt(StreamNode::getId).max().getAsInt() + 1; + + // TypeInformation recovery is the known friction point (see design notes). + // Gate is a passthrough: we use GenericTypeInfo as a placeholder for addOperator(), + // then immediately override with the upstream serializer directly. + TypeInformation typeInfo = new GenericTypeInfo<>(Object.class); + + WatermarkGateProcessFunction gateFunction = buildGateFunction(config, upstream, cl); + ProcessOperator gateOperator = new ProcessOperator<>(gateFunction); + + graph.addOperator( + gateId, + downstream.getSlotSharingGroup(), + null, + SimpleOperatorFactory.of(gateOperator), + typeInfo, + typeInfo, + gateName); + + // Override with the correct serializer from upstream + graph.getStreamNode(gateId).setSerializersIn(upstream.getTypeSerializerOut()); + graph.getStreamNode(gateId).setSerializerOut(upstream.getTypeSerializerOut()); + + // Gate always matches the parallelism of the operator immediately downstream of it. + graph.setParallelism(gateId, downstream.getParallelism()); + // StreamGraph.setMaxParallelism(int,int) only applies values > 0. + // ExecutionConfig.getMaxParallelism() returns the job-level setting + // (set via env.setMaxParallelism()), or -1 when unset. Flink's + // scheduler normalises -1 → 128 internally; we do the same here + // so the gate node always gets a valid positive value. + int configuredMaxPar = graph.getExecutionConfig().getMaxParallelism(); + graph.setMaxParallelism(gateId, configuredMaxPar > 0 ? configuredMaxPar : 128); + + // Rewire: upstream ──✕──> downstream → upstream ──> gate ──> downstream + downstream.getInEdges().remove(edge); + upstream.getOutEdges().remove(edge); + graph.addEdge(upstream.getId(), gateId, 0); + graph.addEdge(gateId, downstream.getId(), 0); + } + + private static WatermarkGateProcessFunction buildGateFunction( + Configuration config, StreamNode upstream, ClassLoader cl) { + + GateStrategy strategy = + GateStrategy.valueOf( + config.getString("bluegreen.gate.strategy", GateStrategy.WATERMARK.name())); + + switch (strategy) { + case WATERMARK: + { + Map flinkConfigMap = new HashMap<>(); + flinkConfigMap.put( + "bluegreen.active-deployment-type", + config.getString("bluegreen.active-deployment-type", null)); + flinkConfigMap.put( + "kubernetes.namespace", config.getString("kubernetes.namespace", null)); + flinkConfigMap.put( + "bluegreen.configmap.name", + config.getString("bluegreen.configmap.name", null)); + WatermarkExtractor extractor = + buildWatermarkExtractor(config, upstream, cl); + return WatermarkGateProcessFunction.create(flinkConfigMap, extractor); + } + default: + throw new IllegalStateException("Unsupported gate strategy: " + strategy); + } + } + + private static WatermarkExtractor buildWatermarkExtractor( + Configuration config, StreamNode upstream, ClassLoader cl) { + + boolean isSqlJob = upstream.getTypeSerializerOut() instanceof RowDataSerializer; + + if (isSqlJob) { + int fieldIdx = config.getInteger("bluegreen.gate.watermark.field-index", -1); + // fieldIdx is a captured primitive — lambda is serializable via WatermarkExtractor + return (WatermarkExtractor) + record -> + (fieldIdx < 0 || ((RowData) record).isNullAt(fieldIdx)) + ? Long.MIN_VALUE + : ((RowData) record).getLong(fieldIdx); + } + + String extractorClass = config.getString("bluegreen.gate.watermark.extractor-class", null); + if (extractorClass != null) { + try { + Object instance = + Class.forName(extractorClass, true, cl) + .getDeclaredConstructor() + .newInstance(); + // Full serialization dry-run — surfaces non-serializable fields anywhere + // in the object graph before JobGraph submission, not at TaskManager distribution + InstantiationUtil.serializeObject(instance); + return (WatermarkExtractor) instance; + } catch (IOException e) { + throw new IllegalArgumentException( + extractorClass + " is not fully serializable: " + e.getMessage(), e); + } catch (ReflectiveOperationException e) { + throw new IllegalArgumentException( + "Could not instantiate extractor class: " + extractorClass, e); + } + } + + // No extractor configured — fall back to processing-time gating + return (WatermarkExtractor) record -> Long.MIN_VALUE; + } +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateProcessFunction.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateProcessFunction.java new file mode 100644 index 0000000000..09935a8634 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateProcessFunction.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateKubernetesService; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.TRANSITION_STAGE; +import static org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage.CLEAR_TO_TEARDOWN; + +/** Base class for ProcessFunction (streaming) based Gate implementations. */ +abstract class GateProcessFunction extends ProcessFunction implements Serializable { + private static final Logger logger = LoggerFactory.getLogger(GateProcessFunction.class); + + protected final BlueGreenDeploymentType blueGreenDeploymentType; + + // How long (ms) to wait after a write condition is first detected before writing to the + // ConfigMap. This window gives other subtasks a chance to observe the write via the informer, + // reducing duplicate K8s API calls. + private static final long WRITE_DEDUP_DELAY_MS = 500L; + + // Processing time (ms) when a pending write was first requested; -1 means no write pending. + private long pendingWriteSince = -1L; + // Set by notifyClearToTeardown() so that maybePerformPendingWrite() performs the teardown + // write even if the pending write was originally scheduled for a different operation. + private boolean pendingClearToTeardown = false; + + private GateKubernetesService gateKubernetesService; + protected GateContext baseContext; + private String namespace; + private String configMapName; + + protected abstract void onContextUpdate(GateContext baseContext, Map data); + + public GateProcessFunction( + BlueGreenDeploymentType blueGreenDeploymentType, + String namespace, + String configMapName) { + Preconditions.checkArgument( + blueGreenDeploymentType == BlueGreenDeploymentType.BLUE + || blueGreenDeploymentType == BlueGreenDeploymentType.GREEN, + "Invalid deployment type: " + blueGreenDeploymentType); + + this.blueGreenDeploymentType = blueGreenDeploymentType; + this.namespace = namespace; + this.configMapName = configMapName; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + setKubernetesEnvironment(); + processConfigMap(gateKubernetesService.parseConfigMap()); + } + + /** + * Records the current processing time as the start of a pending write window. No-op if a write + * is already pending. The actual write is performed by {@link #maybePerformPendingWrite} on the + * next element processed after the dedup delay has elapsed. + */ + protected final void scheduleWriteTimer(Context ctx) { + if (pendingWriteSince < 0) { + pendingWriteSince = ctx.timerService().currentProcessingTime(); + } + } + + protected abstract void processElementActive( + I value, ProcessFunction.Context ctx, Collector out) + throws IllegalAccessException; + + protected abstract void processElementStandby( + I value, ProcessFunction.Context ctx, Collector out) + throws IllegalAccessException; + + @Override + public void processElement(I value, ProcessFunction.Context ctx, Collector out) + throws Exception { + maybePerformPendingWrite(ctx); + switch (baseContext.getOutputMode()) { + case ACTIVE: + processElementActive(value, ctx, out); + break; + case STANDBY: + processElementStandby(value, ctx, out); + break; + default: + String error = "Invalid OutputMode caught"; + logger.error(error); + throw new IllegalStateException(error); + } + } + + private void maybePerformPendingWrite(Context ctx) throws Exception { + if (pendingWriteSince < 0) { + return; + } + long now = ctx.timerService().currentProcessingTime(); + if (now - pendingWriteSince < WRITE_DEDUP_DELAY_MS) { + return; + } + pendingWriteSince = -1L; + boolean handled = handleScheduledWrite(ctx); + if (!handled && pendingClearToTeardown) { + pendingClearToTeardown = false; + if (baseContext.getGateStage() != CLEAR_TO_TEARDOWN) { + logInfo("Writing " + CLEAR_TO_TEARDOWN + " to ConfigMap"); + gateKubernetesService.updateConfigMapEntries( + Map.of(TRANSITION_STAGE.getLabel(), CLEAR_TO_TEARDOWN.toString())); + logInfo(CLEAR_TO_TEARDOWN + " set!"); + } else { + logInfo(CLEAR_TO_TEARDOWN + " already set, skipping"); + } + } + } + + private void setKubernetesEnvironment() { + this.gateKubernetesService = new GateKubernetesService(namespace, configMapName); + + logInfo("Preparing Informers..."); + var resourceEventHandler = + new ResourceEventHandler() { + @Override + public void onAdd(ConfigMap obj) { + logger.warn("Unexpected ConfigMap added: " + obj); + } + + @Override + public void onUpdate(ConfigMap oldObj, ConfigMap newObj) { + if (!oldObj.equals(newObj)) { + var oldState = oldObj.getData().get(TRANSITION_STAGE.getLabel()); + var newState = newObj.getData().get(TRANSITION_STAGE.getLabel()); + + logInfo("Update notification 1: " + oldState + " to " + newState); + + processConfigMap(newObj); + } + } + + @Override + public void onDelete(ConfigMap obj, boolean deletedFinalStateUnknown) { + logger.error( + "ConfigMap deleted: " + + obj + + ", final state unknown: " + + deletedFinalStateUnknown); + } + }; + + gateKubernetesService.setInformers(resourceEventHandler); + logInfo("Informers set!"); + } + + private void processConfigMap(ConfigMap configMap) { + this.baseContext = GateContext.create(configMap.getData(), blueGreenDeploymentType); + + // Filtering the "custom" entries only + var baseKeys = + Arrays.stream(GateContextOptions.values()) + .map(GateContextOptions::getLabel) + .collect(Collectors.toSet()); + var allConfigMapKeys = new HashSet<>(configMap.getData().keySet()); + // Set difference: + allConfigMapKeys.removeAll(baseKeys); + + var filteredData = + configMap.getData().entrySet().stream() + .filter(kvp -> allConfigMapKeys.contains(kvp.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + onContextUpdate(baseContext, filteredData); + } + + protected final void notifyClearToTeardown(Context ctx) { + pendingClearToTeardown = true; + scheduleWriteTimer(ctx); + } + + /** + * Hook for subclasses to handle watermark-specific ConfigMap writes when the dedup window + * elapses. Returns {@code true} if the write was handled (base-class CLEAR_TO_TEARDOWN logic is + * skipped), {@code false} to fall through. + */ + protected boolean handleScheduledWrite(Context ctx) throws Exception { + return false; + } + + protected final void updateConfigMapCustomEntries(Map customEntries) + throws Exception { + // Validating only "custom" entries/keys can be updated + var keysToUpdate = customEntries.keySet(); + var baseContextKeys = + Arrays.stream(GateContextOptions.values()) + .map(GateContextOptions::getLabel) + .collect(Collectors.toCollection(HashSet::new)); + // Set intersection: + baseContextKeys.retainAll(keysToUpdate); + + if (!baseContextKeys.isEmpty()) { + var error = "Attempted to update read-only base keys" + baseContextKeys; + logger.error(error); + throw new IllegalAccessException(error); + } + logInfo("Updating custom entries: " + customEntries); + int maxRetries = 3; + for (int attempt = 0; attempt <= maxRetries; attempt++) { + try { + gateKubernetesService.updateConfigMapEntries(customEntries); + return; + } catch (KubernetesClientException e) { + if (e.getCode() == 409 && attempt < maxRetries) { + logInfo( + "ConfigMap update conflict on attempt " + + (attempt + 1) + + ", retrying..."); + } else { + throw e; + } + } + } + } + + // Temporary "utility" function for development + protected void logInfo(String message) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + logger.error("[BlueGreen Gate-" + subtaskIdx + "]:" + message); + } +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateStrategy.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateStrategy.java new file mode 100644 index 0000000000..b63f4e092d --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateStrategy.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +/** + * Selects which {@link GateProcessFunction} implementation is injected into the StreamGraph. + * Configured via {@code bluegreen.gate.strategy}. + * + *

Currently only {@link #WATERMARK} is supported. Additional strategies (e.g. processing-time + * only, count-based) can be added here as the gate architecture evolves. + */ +public enum GateStrategy { + /** Watermark-based cutoff using {@link WatermarkGateProcessFunction}. */ + WATERMARK +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkExtractor.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkExtractor.java new file mode 100644 index 0000000000..35eb90d1f5 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkExtractor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import java.io.Serializable; +import java.util.function.Function; + +/** + * Extracts a watermark (epoch-ms) from a record for use by {@link WatermarkGateProcessFunction}. + * + *

Implementations must be serializable so they can be shipped to TaskManagers as part of the + * operator state. Use a static lambda or a concrete class with only serializable fields. + */ +@FunctionalInterface +public interface WatermarkExtractor extends Function, Serializable {} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateContext.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateContext.java new file mode 100644 index 0000000000..2b85acfdea --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateContext.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +/** Watermark based functionality of the GateContext. */ +@Getter +public class WatermarkGateContext implements Serializable { + + static final String WATERMARK_TOGGLE_VALUE = "watermark-toggle-value"; + static final String WATERMARK_STAGE = "watermark-stage"; + + private final GateContext baseContext; + private final WatermarkGateStage watermarkGateStage; + + @Setter private Long watermarkToggleValue; + + public WatermarkGateContext( + GateContext baseContext, + WatermarkGateStage watermarkGateStage, + Long watermarkToggleValue) { + this.baseContext = baseContext; + this.watermarkGateStage = watermarkGateStage; + this.watermarkToggleValue = watermarkToggleValue; + } + + @Override + public boolean equals(Object o) { + if (o instanceof WatermarkGateContext) { + WatermarkGateContext wm = (WatermarkGateContext) o; + return this.baseContext.equals(wm.getBaseContext()) + && this.watermarkGateStage.equals(wm.getWatermarkGateStage()) + && Objects.equals(this.watermarkToggleValue, wm.getWatermarkToggleValue()); + } + return false; + } + + public static WatermarkGateContext create(GateContext baseContext, Map data) { + // Possible values: + // null -> indetermined/to be calculated by the active job + // 0 -> For first deployments (undetermined) + // Positive -> For valid transitions + Long watermarkToggleValue = null; + + if (baseContext.isFirstDeployment()) { + watermarkToggleValue = 0L; + } else if (data.containsKey(WATERMARK_TOGGLE_VALUE)) { + watermarkToggleValue = Long.parseLong(data.get(WATERMARK_TOGGLE_VALUE)); + } + + var watermarkState = + data.getOrDefault(WATERMARK_STAGE, WatermarkGateStage.WATERMARK_NOT_SET.toString()); + + return new WatermarkGateContext( + baseContext, WatermarkGateStage.valueOf(watermarkState), watermarkToggleValue); + } + + @Override + public String toString() { + return "WatermarkGateContext: {" + + ", watermarkToggleValue: " + + watermarkToggleValue + + ", watermarkGateStage: " + + watermarkGateStage + + ", gateStage:" + + baseContext.getGateStage() + + ", deploymentTeardownDelaySec: " + + baseContext.getDeploymentTeardownDelayMs() + + ", outputMode: " + + baseContext.getOutputMode() + + ", activeDeploymentType: " + + baseContext.getActiveBlueGreenDeploymentType() + + "}"; + } +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunction.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunction.java new file mode 100644 index 0000000000..f41c446ca1 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunction.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Map; +import java.util.function.Function; + +/** Watermark based GateProcessFunction (streaming). */ +public class WatermarkGateProcessFunction extends GateProcessFunction + implements Serializable { + + private final Function watermarkExtractor; + + private WatermarkGateContext currentWatermarkGateContext; + + WatermarkGateProcessFunction( + BlueGreenDeploymentType blueGreenDeploymentType, + String namespace, + String configMapName, + Function watermarkExtractor) { + super(blueGreenDeploymentType, namespace, configMapName); + + Preconditions.checkNotNull(watermarkExtractor); + + this.watermarkExtractor = watermarkExtractor; + } + + public static WatermarkGateProcessFunction create( + Map flinkConfig, Function watermarkExtractor) { + requireConfigKey(flinkConfig, "bluegreen.active-deployment-type"); + requireConfigKey(flinkConfig, "kubernetes.namespace"); + requireConfigKey(flinkConfig, "bluegreen.configmap.name"); + return new WatermarkGateProcessFunction( + BlueGreenDeploymentType.valueOf( + flinkConfig.get("bluegreen.active-deployment-type")), + flinkConfig.get("kubernetes.namespace"), + flinkConfig.get("bluegreen.configmap.name"), + watermarkExtractor); + } + + private static void requireConfigKey(Map config, String key) { + if (config.get(key) == null) { + throw new IllegalArgumentException( + "WatermarkGateProcessFunction requires config key '" + + key + + "' to be set. If using automatic injection this is set by the" + + " operator; if instantiating directly, provide it in the" + + " flinkConfig map."); + } + } + + @Override + protected void onContextUpdate(GateContext baseContext, Map data) { + var fetchedWatermarkContext = WatermarkGateContext.create(baseContext, data); + logInfo("Refreshing WatermarkGateContext with data: " + data); + + if (currentWatermarkGateContext == null) { + logInfo("currentWatermarkGateContext INITIALIZED: " + fetchedWatermarkContext); + currentWatermarkGateContext = fetchedWatermarkContext; + } else if (!currentWatermarkGateContext.equals(fetchedWatermarkContext)) { + logInfo("currentWatermarkGateContext UPDATED: " + fetchedWatermarkContext); + currentWatermarkGateContext = fetchedWatermarkContext; + } + } + + @Override + protected void processElementActive( + I value, ProcessFunction.Context ctx, Collector out) + throws IllegalAccessException { + Long wmToggleValue = currentWatermarkGateContext.getWatermarkToggleValue(); + if (wmToggleValue != null) { + Long extractedWatermark = watermarkExtractor.apply(value); + if (wmToggleValue <= extractedWatermark) { + // Normal + out.collect(value); + } else { + // Waiting for WM + logInfo( + " -- Waiting to Reach WM: " + + (wmToggleValue - extractedWatermark) + + " ms - "); + } + } else { + // Transitioning to Active + var currentGateStage = currentWatermarkGateContext.getBaseContext().getGateStage(); + if (currentGateStage == TransitionStage.TRANSITIONING) { + logInfo(" -- Waiting for WM to be set - "); + notifyWaitingForWatermark(ctx); + } else { + logInfo("Waiting for the TRANSITIONING state, current: " + currentGateStage); + } + } + } + + @Override + protected void processElementStandby( + I value, ProcessFunction.Context ctx, Collector out) + throws IllegalAccessException { + if (currentWatermarkGateContext.getWatermarkToggleValue() != null) { + var watermarkToggleValue = currentWatermarkGateContext.getWatermarkToggleValue(); + + if (getWatermarkBoundary(ctx.timerService()) <= watermarkToggleValue) { + if (watermarkToggleValue > watermarkExtractor.apply(value)) { + // Should still output the element + out.collect(value); + } else { + // Went past the Watermark toggle value: BLOCK ELEMENT + logInfo(" -- Past WM -- "); + } + } else { + // Went past the Watermark Boundary: BLOCK ELEMENT + logInfo(" -- Past WM Boundary -- "); + notifyClearToTeardown(ctx); + } + } else { + // This ACTIVE job is transitioning to STANDBY, output elements + out.collect(value); + // Set the watermark when the other new job is ready + updateWatermarkInConfigMap(ctx); + } + } + + private long getWatermarkBoundary(TimerService timerService) { + return timerService.currentWatermark() > 0 + ? timerService.currentWatermark() + : timerService.currentProcessingTime(); + } + + protected void updateWatermarkInConfigMap(Context ctx) { + scheduleWriteTimer(ctx); + } + + protected void notifyWaitingForWatermark(Context ctx) { + scheduleWriteTimer(ctx); + } + + @Override + protected boolean handleScheduledWrite(Context ctx) throws Exception { + var wmCtx = currentWatermarkGateContext; + + // Standby job: Active job has signalled it's waiting — compute and write the WM toggle. + if (wmCtx.getWatermarkGateStage() == WatermarkGateStage.WAITING_FOR_WATERMARK + && wmCtx.getWatermarkToggleValue() == null) { + var nextWatermarkToggleValue = + getWatermarkBoundary(ctx.timerService()) + + wmCtx.getBaseContext().getDeploymentTeardownDelayMs(); + // Set optimistically so subsequent elements on this subtask don't reschedule + // before the ConfigMap informer propagates. + currentWatermarkGateContext.setWatermarkToggleValue(nextWatermarkToggleValue); + logInfo("Updating the ConfigMap Watermark value to: " + nextWatermarkToggleValue); + updateConfigMapCustomEntries( + Map.of( + WatermarkGateContext.WATERMARK_TOGGLE_VALUE, + Long.toString(nextWatermarkToggleValue), + WatermarkGateContext.WATERMARK_STAGE, + WatermarkGateStage.WATERMARK_SET.toString())); + logInfo("Watermark updated!"); + return true; + } + + // Active job: signal that it is waiting for the Standby job to provide the WM toggle. + if (wmCtx.getBaseContext().getGateStage() == TransitionStage.TRANSITIONING + && wmCtx.getWatermarkToggleValue() == null + && wmCtx.getWatermarkGateStage() != WatermarkGateStage.WAITING_FOR_WATERMARK) { + logInfo("Setting " + WatermarkGateStage.WAITING_FOR_WATERMARK); + updateConfigMapCustomEntries( + Map.of( + WatermarkGateContext.WATERMARK_STAGE, + WatermarkGateStage.WAITING_FOR_WATERMARK.toString())); + logInfo(WatermarkGateStage.WAITING_FOR_WATERMARK + " set!"); + return true; + } + + return false; // fall through to base-class CLEAR_TO_TEARDOWN handling + } +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateStage.java b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateStage.java new file mode 100644 index 0000000000..56e610d600 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateStage.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +/** Watermark Gate specific stage values. */ +public enum WatermarkGateStage { + WATERMARK_NOT_SET, + WAITING_FOR_WATERMARK, + WATERMARK_SET +} diff --git a/flink-kubernetes-operator-bluegreen-client/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunctionTest.java b/flink-kubernetes-operator-bluegreen-client/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunctionTest.java new file mode 100644 index 0000000000..6ed39ab3b9 --- /dev/null +++ b/flink-kubernetes-operator-bluegreen-client/src/test/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunctionTest.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.bluegreen.client; + +import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Unit tests for {@link WatermarkGateProcessFunction}. */ +public class WatermarkGateProcessFunctionTest { + + private static final String TEST_NAMESPACE = "test-namespace"; + private static final String TEST_CONFIGMAP_NAME = "test-configmap"; + private static final long TEST_WATERMARK_VALUE = 1000L; + private static final long TEST_TEARDOWN_DELAY = 5000L; + + private TestWatermarkGateProcessFunction watermarkGateFunction; + private Function watermarkExtractor; + private OneInputStreamOperatorTestHarness testHarness; + + @BeforeEach + void setUp() throws Exception { + watermarkExtractor = TestMessage::getTimestamp; + watermarkGateFunction = + new TestWatermarkGateProcessFunction( + BlueGreenDeploymentType.BLUE, + TEST_NAMESPACE, + TEST_CONFIGMAP_NAME, + watermarkExtractor); + + testHarness = ProcessFunctionTestHarnesses.forProcessFunction(watermarkGateFunction); + testHarness.open(); + } + + // ==================== Context Update Tests ==================== + + @Test + void testContextInitialization() throws Exception { + // isFirstDeployment=false so the watermark value is read from data, not forced to 0 + GateContext baseContext = createBaseContext(TransitionStage.RUNNING, false); + Map data = + createWatermarkData(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + + watermarkGateFunction.onContextUpdate(baseContext, data); + + assertNotNull(getWatermarkContext()); + assertEquals(TEST_WATERMARK_VALUE, getWatermarkContext().getWatermarkToggleValue()); + assertEquals( + WatermarkGateStage.WATERMARK_SET, getWatermarkContext().getWatermarkGateStage()); + } + + @Test + void testContextUpdate() throws Exception { + // isFirstDeployment=false so the watermark value is read from data, not forced to 0 + GateContext baseContext = createBaseContext(TransitionStage.RUNNING, false); + Map initialData = + createWatermarkData(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + watermarkGateFunction.onContextUpdate(baseContext, initialData); + + Map updatedData = + createWatermarkData(2000L, WatermarkGateStage.WAITING_FOR_WATERMARK); + watermarkGateFunction.onContextUpdate(baseContext, updatedData); + + assertEquals(2000L, getWatermarkContext().getWatermarkToggleValue()); + assertEquals( + WatermarkGateStage.WAITING_FOR_WATERMARK, + getWatermarkContext().getWatermarkGateStage()); + } + + @Test + void testConfigMapInformerCallback() throws Exception { + // Simulates a ConfigMap update arriving via the Kubernetes informer + Map configMapUpdate = + createWatermarkData(500L, WatermarkGateStage.WATERMARK_SET); + + watermarkGateFunction.simulateConfigMapUpdate(configMapUpdate); + + assertNotNull(getWatermarkContext()); + assertEquals(500L, getWatermarkContext().getWatermarkToggleValue()); + assertEquals( + WatermarkGateStage.WATERMARK_SET, getWatermarkContext().getWatermarkGateStage()); + + // Processing time=0, watermark boundary=0 <= toggle=500; message ts=600 > 500 + // → message passes the gate + TestMessage message = new TestMessage("test", 600L); + testHarness.processElement(message, 550L); + + assertEquals(1, testHarness.extractOutputValues().size()); + assertEquals(message, testHarness.extractOutputValues().get(0)); + } + + // ==================== Active Processing Tests ==================== + + @Test + void testProcessElementActiveWithWatermarkToggleValueNormal() throws Exception { + setupActiveContext(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + // message ts == toggle value → passes (wmToggle <= extractedWatermark) + TestMessage message = new TestMessage("test", TEST_WATERMARK_VALUE); + + testHarness.processElement(message, TEST_WATERMARK_VALUE - 100); + + assertEquals(1, testHarness.extractOutputValues().size()); + assertEquals(message, testHarness.extractOutputValues().get(0)); + } + + @Test + void testProcessElementActiveWaitingForWatermark() throws Exception { + setupActiveContext(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + // message ts < toggle value → gated, waits for watermark to advance + TestMessage message = new TestMessage("test", TEST_WATERMARK_VALUE - 100); + + testHarness.processElement(message, TEST_WATERMARK_VALUE - 200); + + assertEquals(0, testHarness.extractOutputValues().size()); + assertTrue( + watermarkGateFunction.getLogMessages().stream() + .anyMatch(msg -> msg.contains("Waiting to Reach WM"))); + } + + @Test + void testProcessElementActiveWithNullWatermarkToggleTransitioning() throws Exception { + // During TRANSITIONING the watermark toggle is not yet set (null); the active deployment + // should signal that it is waiting for the toggle value to be written to the ConfigMap. + setupActiveContext( + null, WatermarkGateStage.WATERMARK_NOT_SET, TransitionStage.TRANSITIONING); + TestMessage message = new TestMessage("test", TEST_WATERMARK_VALUE); + + testHarness.processElement(message, TEST_WATERMARK_VALUE - 100); + + assertEquals(0, testHarness.extractOutputValues().size()); + assertTrue(watermarkGateFunction.notifyWaitingForWatermarkCalled); + } + + // ==================== Standby Processing Tests ==================== + + @Test + void testProcessElementStandbyWithinWatermarkBoundary() throws Exception { + // Standby deployment: message ts < toggle value and processing time <= toggle value + // → still within valid range, element passes through + setupStandbyContext(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + TestMessage message = new TestMessage("test", TEST_WATERMARK_VALUE - 100); + + testHarness.processElement(message, TEST_WATERMARK_VALUE - 200); + + assertEquals(1, testHarness.extractOutputValues().size()); + assertEquals(message, testHarness.extractOutputValues().get(0)); + } + + @Test + void testProcessElementStandbyPastWatermarkToggle() throws Exception { + // Standby deployment: message ts > toggle value → past the cutoff, element is blocked + setupStandbyContext(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + TestMessage message = new TestMessage("test", TEST_WATERMARK_VALUE + 100); + + testHarness.processElement(message, TEST_WATERMARK_VALUE + 50); + + assertEquals(0, testHarness.extractOutputValues().size()); + assertTrue( + watermarkGateFunction.getLogMessages().stream() + .anyMatch(msg -> msg.contains("Past WM"))); + } + + @Test + void testProcessElementStandbyNullWatermarkToggle() throws Exception { + // Standby deployment: no toggle value yet → old active job is transitioning to standby, + // elements pass through and the watermark value is written to the ConfigMap. + setupStandbyContext(null, WatermarkGateStage.WATERMARK_NOT_SET); + TestMessage message = new TestMessage("test", TEST_WATERMARK_VALUE); + + testHarness.processElement(message, TEST_WATERMARK_VALUE - 100); + + assertEquals(1, testHarness.extractOutputValues().size()); + assertEquals(message, testHarness.extractOutputValues().get(0)); + assertTrue(watermarkGateFunction.updateWatermarkInConfigMapCalled); + } + + // ==================== Watermark Control Tests ==================== + + @Test + void testWatermarkProgression() throws Exception { + setupActiveContext(TEST_WATERMARK_VALUE, WatermarkGateStage.WATERMARK_SET); + TestMessage earlyMessage = new TestMessage("early", TEST_WATERMARK_VALUE - 200); + TestMessage lateMessage = new TestMessage("late", TEST_WATERMARK_VALUE + 100); + + // Early message ts < toggle value → gated + testHarness.processElement(earlyMessage, TEST_WATERMARK_VALUE - 300); + assertEquals(0, testHarness.extractOutputValues().size()); + + // Advance watermark; does not affect the toggle-value gate directly, but verifies + // the harness progresses without errors + testHarness.processWatermark(TEST_WATERMARK_VALUE - 50); + + // Late message ts > toggle value → passes + testHarness.processElement(lateMessage, TEST_WATERMARK_VALUE + 50); + assertEquals(1, testHarness.extractOutputValues().size()); + assertEquals(lateMessage, testHarness.extractOutputValues().get(0)); + } + + // ==================== Factory Method Tests ==================== + + @Test + void testCreateFromFlinkConfig() { + Map flinkConfig = new HashMap<>(); + flinkConfig.put("bluegreen.active-deployment-type", "BLUE"); + flinkConfig.put("kubernetes.namespace", "test-ns"); + flinkConfig.put("bluegreen.configmap.name", "test-cm"); + + WatermarkGateProcessFunction function = + WatermarkGateProcessFunction.create(flinkConfig, s -> (long) s.length()); + + assertNotNull(function); + } + + // ==================== Helper Methods ==================== + + private void setupActiveContext(Long watermarkToggleValue, WatermarkGateStage stage) + throws Exception { + setupActiveContext(watermarkToggleValue, stage, TransitionStage.RUNNING); + } + + private void setupActiveContext( + Long watermarkToggleValue, WatermarkGateStage stage, TransitionStage gateStage) + throws Exception { + GateContext baseContext = createBaseContext(gateStage, false); + Map data = createWatermarkData(watermarkToggleValue, stage); + watermarkGateFunction.onContextUpdate(baseContext, data); + } + + private void setupStandbyContext(Long watermarkToggleValue, WatermarkGateStage stage) + throws Exception { + // Recreate function as GREEN (standby) deployment + watermarkGateFunction = + new TestWatermarkGateProcessFunction( + BlueGreenDeploymentType.GREEN, + TEST_NAMESPACE, + TEST_CONFIGMAP_NAME, + watermarkExtractor); + + testHarness.close(); + testHarness = ProcessFunctionTestHarnesses.forProcessFunction(watermarkGateFunction); + testHarness.open(); + + GateContext baseContext = createBaseContext(TransitionStage.RUNNING, false); + Map data = createWatermarkData(watermarkToggleValue, stage); + watermarkGateFunction.onContextUpdate(baseContext, data); + } + + private GateContext createBaseContext(TransitionStage gateStage, boolean isFirstDeployment) { + Map data = new HashMap<>(); + data.put( + GateContextOptions.ACTIVE_DEPLOYMENT_TYPE.getLabel(), + BlueGreenDeploymentType.BLUE.toString()); + data.put(GateContextOptions.TRANSITION_STAGE.getLabel(), gateStage.toString()); + data.put( + GateContextOptions.DEPLOYMENT_DELETION_DELAY.getLabel(), + String.valueOf(TEST_TEARDOWN_DELAY)); + data.put( + GateContextOptions.IS_FIRST_DEPLOYMENT.getLabel(), + String.valueOf(isFirstDeployment)); + + return GateContext.create(data, BlueGreenDeploymentType.BLUE); + } + + private Map createWatermarkData( + Long watermarkToggleValue, WatermarkGateStage stage) { + Map data = new HashMap<>(); + if (watermarkToggleValue != null) { + data.put(WatermarkGateContext.WATERMARK_TOGGLE_VALUE, watermarkToggleValue.toString()); + } + data.put(WatermarkGateContext.WATERMARK_STAGE, stage.toString()); + return data; + } + + private WatermarkGateContext getWatermarkContext() { + return watermarkGateFunction.getCurrentWatermarkGateContext(); + } + + // ==================== Test Helper Classes ==================== + + private static class TestMessage { + private final String content; + private final long timestamp; + + public TestMessage(String content, long timestamp) { + this.content = content; + this.timestamp = timestamp; + } + + public String getContent() { + return content; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestMessage that = (TestMessage) o; + return timestamp == that.timestamp && content.equals(that.content); + } + + @Override + public int hashCode() { + return content.hashCode() + (int) timestamp; + } + + @Override + public String toString() { + return "TestMessage{content='" + content + "', timestamp=" + timestamp + '}'; + } + } + + /** Test implementation of WatermarkGateProcessFunction that captures method calls and state. */ + private static class TestWatermarkGateProcessFunction + extends WatermarkGateProcessFunction { + private final List logMessages = new ArrayList<>(); + private GateContext mockBaseContext; + + public boolean notifyWaitingForWatermarkCalled = false; + public boolean updateWatermarkInConfigMapCalled = false; + + TestWatermarkGateProcessFunction( + BlueGreenDeploymentType blueGreenDeploymentType, + String namespace, + String configMapName, + Function watermarkExtractor) { + super(blueGreenDeploymentType, namespace, configMapName, watermarkExtractor); + } + + @Override + public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { + // Build a mock ConfigMap context representing a non-first deployment in RUNNING state. + // isFirstDeployment=false ensures watermark values are read from data rather than + // defaulted to 0. + Map mockConfigMapData = new HashMap<>(); + mockConfigMapData.put( + GateContextOptions.ACTIVE_DEPLOYMENT_TYPE.getLabel(), + BlueGreenDeploymentType.BLUE.toString()); + mockConfigMapData.put( + GateContextOptions.TRANSITION_STAGE.getLabel(), + TransitionStage.RUNNING.toString()); + mockConfigMapData.put(GateContextOptions.DEPLOYMENT_DELETION_DELAY.getLabel(), "5000"); + mockConfigMapData.put(GateContextOptions.IS_FIRST_DEPLOYMENT.getLabel(), "false"); + + // Use the actual deployment type so GREEN functions get STANDBY output mode + mockBaseContext = GateContext.create(mockConfigMapData, blueGreenDeploymentType); + + // Set baseContext so processElement() can dispatch to active/standby + baseContext = mockBaseContext; + + // Simulate the parent's initialization by calling onContextUpdate + onContextUpdate(mockBaseContext, new HashMap<>()); + + // Skip the actual Kubernetes service initialization but keep the important logic + logInfo( + "Mock initialization completed - skipping Kubernetes service setup for testing"); + } + + @Override + protected void logInfo(String message) { + logMessages.add(message); + } + + /** Override to capture the call without invoking Kubernetes. */ + @Override + protected void notifyWaitingForWatermark(Context ctx) { + notifyWaitingForWatermarkCalled = true; + } + + /** Override to capture the call without invoking Kubernetes. */ + @Override + protected void updateWatermarkInConfigMap(Context ctx) { + updateWatermarkInConfigMapCalled = true; + } + + // Helper to access the private currentWatermarkGateContext field for assertions + public WatermarkGateContext getCurrentWatermarkGateContext() { + try { + java.lang.reflect.Field field = + WatermarkGateProcessFunction.class.getDeclaredField( + "currentWatermarkGateContext"); + field.setAccessible(true); + return (WatermarkGateContext) field.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // Method to simulate ConfigMap updates from tests (mirrors the informer callback path) + public void simulateConfigMapUpdate(Map newData) { + if (mockBaseContext != null) { + onContextUpdate(mockBaseContext, newData); + } + } + + public List getLogMessages() { + return logMessages; + } + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java index a22574470d..cde4015038 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java @@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; @@ -93,7 +94,7 @@ public List> prepareEventSources( EventSourceContext context) { List> eventSources = new ArrayList<>(); - InformerEventSourceConfiguration config = + InformerEventSourceConfiguration flinkDeploymentConfig = InformerEventSourceConfiguration.from( FlinkDeployment.class, FlinkBlueGreenDeployment.class) .withSecondaryToPrimaryMapper( @@ -102,7 +103,17 @@ public List> prepareEventSources( .withFollowControllerNamespacesChanges(true) .build(); - eventSources.add(new InformerEventSource<>(config, context)); + InformerEventSourceConfiguration configMapConfig = + InformerEventSourceConfiguration.from( + ConfigMap.class, FlinkBlueGreenDeployment.class) + .withSecondaryToPrimaryMapper( + Mappers.fromOwnerReferences(context.getPrimaryResourceClass())) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) + .build(); + + eventSources.add(new InformerEventSource<>(flinkDeploymentConfig, context)); + eventSources.add(new InformerEventSource<>(configMapConfig, context)); if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) { eventSources.add(EventSourceUtils.getBlueGreenIngressInformerEventSource(context)); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java index 054b121f61..512db7975c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments; import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import io.fabric8.kubernetes.api.model.ObjectMeta; import io.javaoperatorsdk.operator.api.reconciler.Context; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -62,4 +63,9 @@ public BlueGreenDeploymentType getOppositeDeploymentType(BlueGreenDeploymentType ? BlueGreenDeploymentType.GREEN : BlueGreenDeploymentType.BLUE; } + + public String getConfigMapName() { + ObjectMeta bgMeta = bgDeployment.getMetadata(); + return bgMeta.getName() + "-configmap"; + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java index 9543fecabd..9929a4c1a9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java @@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.IngressUtils; +import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenTransitionUtils; import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils; import org.apache.flink.util.Preconditions; @@ -45,12 +46,16 @@ import java.time.Instant; import java.util.Objects; +import java.util.Optional; import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deleteFlinkDeployment; import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deployCluster; import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.isFlinkDeploymentReady; import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.suspendFlinkDeployment; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenTransitionUtils.updateTransitionStageFromJobStatus; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenTransitionUtils.validateAdvancedModeConfig; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.fetchSavepointInfo; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getDeploymentDeletionDelay; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getSpecDiff; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.hasSpecChanged; @@ -95,6 +100,13 @@ public UpdateControl initiateDeployment( isFirstDeployment, bgMeta); + Optional configError = validateAdvancedModeConfig(context); + if (configError.isPresent()) { + return markDeploymentFailing(context, configError.get()); + } + BlueGreenTransitionUtils.prepareTransitionMetadata( + context, nextBlueGreenDeploymentType, flinkDeployment, isFirstDeployment); + deployCluster(context, flinkDeployment); BlueGreenUtils.setAbortTimestamp(context); @@ -145,6 +157,11 @@ public UpdateControl checkAndInitiateDeployment( if (currentFlinkDeployment != null && isFlinkDeploymentReady(currentFlinkDeployment)) { if (specDiff == BlueGreenDiffType.TRANSITION) { + Optional configError = validateAdvancedModeConfig(context); + if (configError.isPresent()) { + return markDeploymentFailing(context, configError.get()); + } + boolean savepointTriggered = false; try { savepointTriggered = handleSavepoint(context, currentFlinkDeployment); @@ -487,6 +504,7 @@ public UpdateControl monitorTransition( } if (isFlinkDeploymentReady(transitionState.nextDeployment)) { + BlueGreenTransitionUtils.moveToFirstTransitionStage(context); return shouldWeDelete( context, transitionState.currentDeployment, @@ -583,6 +601,12 @@ private UpdateControl shouldWeDelete( FlinkDeployment nextDeployment, FlinkBlueGreenDeploymentState nextState) { + if (!BlueGreenTransitionUtils.isClearToTeardown(context)) { + // Wait until CLEAR_TO_TEARDOWN is set by the client + return UpdateControl.noUpdate() + .rescheduleAfter(getReconciliationReschedInterval(context)); + } + var deploymentStatus = context.getDeploymentStatus(); if (currentDeployment == null) { @@ -590,7 +614,7 @@ private UpdateControl shouldWeDelete( return finalizeBlueGreenDeployment(context, nextState); } - long deploymentDeletionDelayMs = BlueGreenUtils.getDeploymentDeletionDelay(context); + long deploymentDeletionDelayMs = getDeploymentDeletionDelay(context); long deploymentReadyTimestamp = instantStrToMillis(deploymentStatus.getDeploymentReadyTimestamp()); @@ -684,6 +708,7 @@ private UpdateControl abortDeployment( FlinkBlueGreenDeploymentState previousState = getPreviousState(nextState, context.getDeployments()); + BlueGreenTransitionUtils.rollbackActiveDeploymentType(context, previousState); context.getDeploymentStatus().setBlueGreenState(previousState); var error = @@ -828,6 +853,7 @@ public static UpdateControl patchStatusUpdateControl( } if (jobState != null) { + updateTransitionStageFromJobStatus(context, jobState); deploymentStatus.getJobStatus().setState(jobState); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java index ae7492d312..24716d401f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java @@ -23,15 +23,23 @@ import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.StatusDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; +import java.util.Optional; /** Utility methods for handling Kubernetes operations in Blue/Green deployments. */ public class BlueGreenKubernetesService { + private static final Logger LOG = LoggerFactory.getLogger(BlueGreenKubernetesService.class); + /** * Creates ObjectMeta for a dependent Kubernetes resource with proper owner references. * @@ -116,4 +124,59 @@ public static boolean deleteFlinkDeployment( return deletedStatus.size() == 1 && deletedStatus.get(0).getKind().equals("FlinkDeployment"); } + + // ==================== ConfigMap related methods ==================== + + public static void updateConfigMapEntry(BlueGreenContext context, String key, String value) { + FlinkBlueGreenDeployment bgDeployment = context.getBgDeployment(); + var josdkContext = context.getJosdkContext(); + ConfigMap configMap = getConfigMap(context); + String namespace = bgDeployment.getMetadata().getNamespace(); + configMap.getData().put(key, value); + josdkContext.getClient().configMaps().inNamespace(namespace).resource(configMap).update(); + } + + public static ConfigMap getConfigMap(BlueGreenContext context) { + Optional configMapOpt = + context.getJosdkContext().getSecondaryResources(ConfigMap.class).stream() + .filter(cm -> cm.getMetadata().getName().equals(context.getConfigMapName())) + .findFirst(); + + if (configMapOpt.isEmpty()) { + throw new RuntimeException( + "Expected Blue/Green ConfigMap " + context.getConfigMapName() + " not found"); + } + + return configMapOpt.get(); + } + + public static void upsertConfigMap(BlueGreenContext context, Map data) { + var bgDeployment = context.getBgDeployment(); + var bgMeta = bgDeployment.getMetadata(); + var configMap = new ConfigMapBuilder().addToData(data).build(); + var configMapMeta = getDependentObjectMeta(bgDeployment); + configMapMeta.setName(context.getConfigMapName()); + + // Set metadata BEFORE creating the resource reference + configMap.setMetadata(configMapMeta); + + var configMapResource = + context.getJosdkContext() + .getClient() + .configMaps() + .inNamespace(bgMeta.getNamespace()) + .resource(configMap); + + if (configMapResource.get() == null) { + LOG.info("Creating new Blue/Green ConfigMap for deploymentType: {}", bgMeta.getName()); + configMapResource.create(); + } else { + Map existingData = configMapResource.get().getData(); + LOG.warn( + "Found Blue/Green ConfigMap, existing data: {}, replaced by: {}", + existingData, + data); + configMapResource.update(); + } + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenTransitionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenTransitionUtils.java new file mode 100644 index 0000000000..638737a9a8 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenTransitionUtils.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils.bluegreen; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage; +import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; +import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec; +import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState; +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.PodTemplateSpec; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import lombok.SneakyThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.naming.OperationNotSupportedException; + +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.ACTIVE_DEPLOYMENT_TYPE; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.IS_FIRST_DEPLOYMENT; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.TRANSITION_STAGE; +import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.getConfigMap; +import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.updateConfigMapEntry; +import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.upsertConfigMap; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getDeploymentDeletionDelay; + +/** Utility class for Blue/Green transition stage operations. */ +public class BlueGreenTransitionUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BlueGreenTransitionUtils.class); + + @SneakyThrows + public static TransitionMode getTransitionMode(BlueGreenContext context) { + TransitionMode transitionMode = context.getBgDeployment().getSpec().getTransitionMode(); + + if (transitionMode == null) { + throw new OperationNotSupportedException("Please specify the TransitionMode"); + } + + return transitionMode; + } + + public static void prepareTransitionMetadata( + BlueGreenContext context, + BlueGreenDeploymentType blueGreenDeploymentType, + FlinkDeployment flinkDeployment, + boolean isFirstDeployment) { + if (TransitionMode.ADVANCED != getTransitionMode(context)) { + return; + } + + var transitionDefaultMetadata = + Map.of( + IS_FIRST_DEPLOYMENT.getLabel(), + isFirstDeployment ? "true" : "false", + GateContextOptions.DEPLOYMENT_DELETION_DELAY.getLabel(), + String.valueOf(getDeploymentDeletionDelay(context)), + ACTIVE_DEPLOYMENT_TYPE.getLabel(), + blueGreenDeploymentType.toString(), + TRANSITION_STAGE.getLabel(), + TransitionStage.INITIALIZING.toString()); + + upsertConfigMap(context, transitionDefaultMetadata); + + // Preparing the FlinkConfiguration for the OutputDecider + flinkDeployment + .getSpec() + .getFlinkConfiguration() + .put( + "bluegreen." + ACTIVE_DEPLOYMENT_TYPE.getLabel(), + blueGreenDeploymentType.toString()); + flinkDeployment + .getSpec() + .getFlinkConfiguration() + .put("bluegreen.configmap.name", context.getConfigMapName()); + + // Auto-inject agent config and init container when a gate strategy is declared + ConfigObjectNode flinkConfig = flinkDeployment.getSpec().getFlinkConfiguration(); + if (flinkConfig.has("bluegreen.gate.strategy")) { + if (!flinkConfig.has("bluegreen.gate.injection.enabled")) { + flinkConfig.put("bluegreen.gate.injection.enabled", "true"); + } + + final String agentFlag = "-javaagent:/opt/flink/lib/bluegreen-agent.jar"; + String existingOpts = + flinkConfig.has("env.java.opts.jobmanager") + ? flinkConfig.get("env.java.opts.jobmanager").asText() + : ""; + if (!existingOpts.contains(agentFlag)) { + flinkConfig.put( + "env.java.opts.jobmanager", + existingOpts.isEmpty() ? agentFlag : existingOpts + " " + agentFlag); + } + + String operatorImage = System.getenv("OPERATOR_IMAGE"); + if (operatorImage != null && !operatorImage.isEmpty()) { + injectAgentInitContainer(flinkDeployment, operatorImage); + } else { + LOG.warn( + "[BlueGreen] OPERATOR_IMAGE env var not set — " + + "agent JAR init-container will not be injected"); + } + } + } + + public static void moveToFirstTransitionStage(BlueGreenContext context) { + if (TransitionMode.ADVANCED != getTransitionMode(context)) { + return; + } + + ConfigMap configMap = getConfigMap(context); + String stage = configMap.getData().get(TRANSITION_STAGE.getLabel()); + + if (stage.equals(TransitionStage.INITIALIZING.toString())) { + TransitionStage nextStage; + + if (context.getDeployments().getNumberOfDeployments() == 2) { + LOG.info("Stage INITIALIZING to TRANSITIONING"); + nextStage = TransitionStage.TRANSITIONING; + } else { + LOG.info( + "No transition between deployments detected, Stage INITIALIZING -> CLEAR_TO_TEARDOWN"); + nextStage = TransitionStage.CLEAR_TO_TEARDOWN; + } + + updateTransitionStage(context, nextStage); + } + } + + public static boolean isClearToTeardown(BlueGreenContext context) { + + if (TransitionMode.ADVANCED == getTransitionMode(context)) { + ConfigMap configMap = getConfigMap(context); + String stage = configMap.getData().get(TRANSITION_STAGE.getLabel()); + + if (!stage.equals(TransitionStage.CLEAR_TO_TEARDOWN.toString())) { + LOG.info("Waiting for CLEAR_TO_TEARDOWN, current stage: " + stage); + return false; + } + } + + return true; + } + + public static void rollbackActiveDeploymentType( + BlueGreenContext context, FlinkBlueGreenDeploymentState previousState) { + + if (TransitionMode.ADVANCED != getTransitionMode(context)) { + return; + } + + var previousDeploymentType = + previousState.name().contains("BLUE") + ? BlueGreenDeploymentType.BLUE + : BlueGreenDeploymentType.GREEN; + + updateConfigMapEntry( + context, ACTIVE_DEPLOYMENT_TYPE.getLabel(), previousDeploymentType.toString()); + } + + public static void updateTransitionStageFromJobStatus( + BlueGreenContext context, JobStatus jobStatus) { + + if (TransitionMode.ADVANCED != getTransitionMode(context)) { + return; + } + + // ConfigMap may not exist yet if validation failed before prepareTransitionMetadata ran + boolean configMapPresent = + context.getJosdkContext().getSecondaryResources(ConfigMap.class).stream() + .anyMatch( + cm -> + cm.getMetadata() + .getName() + .equals(context.getConfigMapName())); + if (!configMapPresent) { + return; + } + + TransitionStage transitionStage; + switch (jobStatus) { + case RUNNING: + transitionStage = TransitionStage.RUNNING; + break; + case FAILING: + transitionStage = TransitionStage.FAILING; + break; + case RECONCILING: + transitionStage = TransitionStage.INITIALIZING; + break; + default: + throw new RuntimeException("Unsupported JobStatus: " + jobStatus); + } + updateTransitionStage(context, transitionStage); + } + + public static void updateTransitionStage( + BlueGreenContext context, TransitionStage transitionStage) { + updateConfigMapEntry(context, TRANSITION_STAGE.getLabel(), transitionStage.toString()); + } + + /** + * Validates that all required gate configuration properties are present when using ADVANCED + * mode. Returns an error message if any required property is missing, or {@link + * Optional#empty()} if the configuration is valid. + * + *

Required properties for ADVANCED mode: + * + *

    + *
  • {@code bluegreen.gate.strategy} — must be set to a supported value (e.g. WATERMARK) + *
  • {@code bluegreen.gate.watermark.extractor-class} — required when strategy is WATERMARK + *
+ */ + public static Optional validateAdvancedModeConfig(BlueGreenContext context) { + if (TransitionMode.ADVANCED != getTransitionMode(context)) { + return Optional.empty(); + } + + ConfigObjectNode flinkConfig = + context.getBgDeployment().getSpec().getTemplate().getSpec().getFlinkConfiguration(); + + if (!flinkConfig.has("bluegreen.gate.strategy")) { + return Optional.of( + "[BlueGreen] ADVANCED mode requires 'bluegreen.gate.strategy' to be set " + + "in flinkConfiguration. Supported values: WATERMARK"); + } + + String strategy = flinkConfig.get("bluegreen.gate.strategy").asText(); + if ("WATERMARK".equals(strategy)) { + if (!flinkConfig.has("bluegreen.gate.watermark.extractor-class")) { + return Optional.of( + "[BlueGreen] Gate strategy WATERMARK requires " + + "'bluegreen.gate.watermark.extractor-class' to be set " + + "in flinkConfiguration."); + } + } else { + return Optional.of( + "[BlueGreen] Unknown gate strategy: '" + + strategy + + "'. Supported values: WATERMARK"); + } + + return Optional.empty(); + } + + private static void injectAgentInitContainer( + FlinkDeployment flinkDeployment, String operatorImage) { + var spec = flinkDeployment.getSpec(); + + if (spec.getJobManager() == null) { + spec.setJobManager(new JobManagerSpec()); + } + JobManagerSpec jmSpec = spec.getJobManager(); + + if (jmSpec.getPodTemplate() == null) { + jmSpec.setPodTemplate(new PodTemplateSpec()); + } + PodTemplateSpec podTemplate = jmSpec.getPodTemplate(); + if (podTemplate.getSpec() == null) { + podTemplate.setSpec(new PodSpec()); + } + PodSpec podSpec = podTemplate.getSpec(); + + podSpec.getVolumes() + .add( + new VolumeBuilder() + .withName("bluegreen-agent") + .withNewEmptyDir() + .endEmptyDir() + .build()); + + podSpec.getInitContainers() + .add( + new ContainerBuilder() + .withName("bluegreen-agent-init") + .withImage(operatorImage) + .withCommand( + "sh", + "-c", + "cp /opt/flink/artifacts/bluegreen-agent.jar" + + " /bluegreen-agent/bluegreen-agent.jar") + .withVolumeMounts( + new VolumeMountBuilder() + .withName("bluegreen-agent") + .withMountPath("/bluegreen-agent") + .build()) + .build()); + + VolumeMount agentMount = + new VolumeMountBuilder() + .withName("bluegreen-agent") + .withMountPath("/opt/flink/lib/bluegreen-agent.jar") + .withSubPath("bluegreen-agent.jar") + .build(); + + boolean foundMain = false; + for (Container c : podSpec.getContainers()) { + if ("flink-main-container".equals(c.getName())) { + c.getVolumeMounts().add(agentMount); + foundMain = true; + break; + } + } + if (!foundMain) { + podSpec.getContainers() + .add( + new ContainerBuilder() + .withName("flink-main-container") + .withVolumeMounts(agentMount) + .build()); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index a48f41d9c9..1692d474ce 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -497,9 +497,12 @@ public Optional getRetryInfo() { @Override public Set getSecondaryResources(Class aClass) { - // TODO: improve this, even if we only support FlinkDeployment as a secondary resource + KubernetesClient client = getClient(); + if (client == null) { + return new HashSet<>(); + } + if (aClass.getSimpleName().equals(FlinkDeployment.class.getSimpleName())) { - KubernetesClient client = getClient(); var hasMetadata = new HashSet<>( client.resources(FlinkDeployment.class) @@ -507,8 +510,12 @@ public Set getSecondaryResources(Class aClass) { .list() .getItems()); return (Set) hasMetadata; + } else if (aClass.getSimpleName().equals("ConfigMap")) { + var hasMetadata = + new HashSet<>(client.configMaps().inAnyNamespace().list().getItems()); + return (Set) hasMetadata; } else { - return null; + return new HashSet<>(); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/BlueGreenTestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/BlueGreenTestUtils.java new file mode 100644 index 0000000000..c6fa626177 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/BlueGreenTestUtils.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage; +import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; +import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; +import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec; +import org.apache.flink.kubernetes.operator.api.spec.JobSpec; +import org.apache.flink.kubernetes.operator.api.spec.JobState; +import org.apache.flink.kubernetes.operator.api.spec.Resource; +import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec; +import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.TRANSITION_STAGE; +import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD; +import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY; +import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL; +import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.SAMPLE_JAR; + +/** Shared utilities for Blue/Green testing. */ +public class BlueGreenTestUtils { + + public static final String SERVICE_ACCOUNT = "flink-operator"; + public static final String FLINK_VERSION = "latest"; + public static final String IMAGE = String.format("flink:%s", FLINK_VERSION); + public static final String IMAGE_POLICY = "IfNotPresent"; + private static final int DEFAULT_DELETION_DELAY_VALUE = 500; + + public static FlinkBlueGreenDeployment buildAdvancedDeployment( + String name, + String namespace, + FlinkVersion version, + String initialSavepointPath, + UpgradeMode upgradeMode) { + var deployment = new FlinkBlueGreenDeployment(); + deployment.setMetadata( + new ObjectMetaBuilder() + .withName(name) + .withNamespace(namespace) + .withCreationTimestamp(Instant.now().toString()) + .withUid(UUID.randomUUID().toString()) + .withResourceVersion("1") + .build()); + var bgDeploymentSpec = getTestFlinkDeploymentSpec(version); + + bgDeploymentSpec + .getTemplate() + .getSpec() + .setJob( + JobSpec.builder() + .jarURI(SAMPLE_JAR) + .parallelism(1) + .upgradeMode(upgradeMode) + .state(JobState.RUNNING) + .initialSavepointPath(initialSavepointPath) + .build()); + + bgDeploymentSpec.setTransitionMode(TransitionMode.ADVANCED); + deployment.setSpec(bgDeploymentSpec); + return deployment; + } + + public static ConfigMap getConfigMapFromSecondaryResources( + Context context, String deploymentName) { + var configMaps = context.getSecondaryResources(ConfigMap.class); + return configMaps.stream() + .filter(cm -> cm.getMetadata().getName().equals(deploymentName + "-configmap")) + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "ConfigMap not found for deployment: " + deploymentName)); + } + + public static TransitionStage getCurrentConfigMapStage( + Context context, String deploymentName) { + ConfigMap configMap = getConfigMapFromSecondaryResources(context, deploymentName); + String stage = configMap.getData().get(TRANSITION_STAGE.getLabel()); + return TransitionStage.valueOf(stage); + } + + public static void assertConfigMapProtocol(ConfigMap configMap) { + Map data = configMap.getData(); + if (!data.containsKey("IS_FIRST_DEPLOYMENT") + || !data.containsKey("ACTIVE_DEPLOYMENT_TYPE") + || !data.containsKey("TRANSITION_STAGE")) { + throw new AssertionError("ConfigMap missing required protocol fields"); + } + } + + public static boolean isValidTransitionStage(String stage) { + try { + TransitionStage.valueOf(stage); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + + public static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVersion version) { + Map conf = new HashMap<>(); + conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"); + conf.put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "test-savepoint-dir"); + conf.put(CheckpointingOptions.INCREMENTAL_CHECKPOINTS.key(), "true"); + conf.put(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), "10"); + conf.put( + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(), + "file:///test/test-checkpoint-dir"); + + var flinkDeploymentSpec = + FlinkDeploymentSpec.builder() + .image(IMAGE) + .imagePullPolicy(IMAGE_POLICY) + .serviceAccount(SERVICE_ACCOUNT) + .flinkVersion(version) + .flinkConfiguration(new ConfigObjectNode()) + .jobManager(new JobManagerSpec(new Resource(1.0, "2048m", "2G"), 1, null)) + .taskManager( + new TaskManagerSpec(new Resource(1.0, "2048m", "2G"), null, null)) + .build(); + + flinkDeploymentSpec.setFlinkConfiguration(conf); + + Map configuration = new HashMap<>(); + configuration.put(ABORT_GRACE_PERIOD.key(), "1"); + configuration.put(RECONCILIATION_RESCHEDULING_INTERVAL.key(), "500"); + configuration.put( + DEPLOYMENT_DELETION_DELAY.key(), String.valueOf(DEFAULT_DELETION_DELAY_VALUE)); + + var flinkDeploymentTemplateSpec = + FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build(); + + return new FlinkBlueGreenDeploymentSpec( + configuration, null, TransitionMode.BASIC, flinkDeploymentTemplateSpec); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java index 528b61727d..31c1a1c4c9 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java @@ -25,6 +25,9 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage; import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; @@ -44,6 +47,7 @@ import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.networking.v1.Ingress; import io.fabric8.kubernetes.client.KubernetesClient; @@ -64,12 +68,16 @@ import java.util.UUID; import java.util.stream.Stream; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.ACTIVE_DEPLOYMENT_TYPE; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.IS_FIRST_DEPLOYMENT; +import static org.apache.flink.kubernetes.operator.api.bluegreen.GateContextOptions.TRANSITION_STAGE; import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD; import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY; import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL; import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.SAMPLE_JAR; import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.TEST_DEPLOYMENT_NAME; import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.TEST_NAMESPACE; +import static org.apache.flink.kubernetes.operator.controller.BlueGreenTestUtils.getTestFlinkDeploymentSpec; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -501,8 +509,13 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce rs.reconciledStatus.getBlueGreenState()); assertEquals( customValue, - getFlinkConfigurationValue( - rs.deployment.getSpec().getTemplate().getSpec(), CUSTOM_CONFIG_FIELD)); + rs.deployment + .getSpec() + .getTemplate() + .getSpec() + .getFlinkConfiguration() + .get(CUSTOM_CONFIG_FIELD) + .asText()); // Simulating the Blue deployment doesn't start correctly (status will remain the same) Long reschedDelayMs = 0L; @@ -555,11 +568,6 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce testTransitionToGreen(rs, customValue, null); } - private static String getFlinkConfigurationValue( - FlinkDeploymentSpec flinkDeploymentSpec, String propertyName) { - return flinkDeploymentSpec.getFlinkConfiguration().get(propertyName).asText(); - } - @ParameterizedTest @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") public void verifySpecChangeDuringTransition(FlinkVersion flinkVersion) throws Exception { @@ -1004,9 +1012,12 @@ public void verifySpecificBehavior( assertEquals(1, deployments.size()); assertEquals( "100 SECONDS", - getFlinkConfigurationValue( - deployments.get(0).getSpec(), - "kubernetes.operator.reconcile.interval")); + deployments + .get(0) + .getSpec() + .getFlinkConfiguration() + .get("kubernetes.operator.reconcile.interval") + .asText()); } } @@ -1063,9 +1074,12 @@ public void verifySpecificBehavior( // Child spec change should be applied to FlinkDeployment assertEquals( "100 SECONDS", - getFlinkConfigurationValue( - deployments.get(0).getSpec(), - "kubernetes.operator.reconcile.interval")); + deployments + .get(0) + .getSpec() + .getFlinkConfiguration() + .get("kubernetes.operator.reconcile.interval") + .asText()); // Top-level changes should be preserved in reconciled spec assertNotNull(result.rs.reconciledStatus.getLastReconciledSpec()); @@ -1344,8 +1358,13 @@ private void testTransitionToGreen( assertEquals(0, instantStrToMillis(rs.reconciledStatus.getDeploymentReadyTimestamp())); assertEquals( customValue, - getFlinkConfigurationValue( - rs.deployment.getSpec().getTemplate().getSpec(), CUSTOM_CONFIG_FIELD)); + rs.deployment + .getSpec() + .getTemplate() + .getSpec() + .getFlinkConfiguration() + .get(CUSTOM_CONFIG_FIELD) + .asText()); // Initiate and mark the Green deployment ready simulateSuccessfulJobStart(getFlinkDeployments().get(1)); @@ -1500,6 +1519,7 @@ private static FlinkBlueGreenDeployment buildSessionCluster( .initialSavepointPath(initialSavepointPath) .build()); + bgDeploymentSpec.setTransitionMode(TransitionMode.BASIC); deployment.setSpec(bgDeploymentSpec); return deployment; } @@ -1537,7 +1557,8 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers var flinkDeploymentTemplateSpec = FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build(); - return new FlinkBlueGreenDeploymentSpec(configuration, null, flinkDeploymentTemplateSpec); + return new FlinkBlueGreenDeploymentSpec( + configuration, null, TransitionMode.BASIC, flinkDeploymentTemplateSpec); } // ==================== Ingress Helper Methods ==================== @@ -1678,4 +1699,221 @@ public void verifyIngressCreatedOnlyWhenConfigured(FlinkVersion flinkVersion) th // 4. Verify ingress created and points to Green after transition completes assertIngressPointsToService(GREEN_CLUSTER_ID + REST_SVC_NAME_SUFFIX); } + + // ==================== ADVANCED Mode Infrastructure Tests ==================== + + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void verifyAdvancedModeConfigMapCreation(FlinkVersion flinkVersion) throws Exception { + var deployment = + buildAdvancedSessionCluster( + TEST_DEPLOYMENT_NAME, + TEST_NAMESPACE, + flinkVersion, + null, + UpgradeMode.STATELESS); + var rs = executeAdvancedDeployment(deployment); + + // Verify ConfigMap is created with correct schema + ConfigMap configMap = + BlueGreenTestUtils.getConfigMapFromSecondaryResources( + context, TEST_DEPLOYMENT_NAME); + assertNotNull(configMap); + + Map data = configMap.getData(); + assertEquals("true", data.get(IS_FIRST_DEPLOYMENT.getLabel())); + assertEquals("BLUE", data.get(ACTIVE_DEPLOYMENT_TYPE.getLabel())); + assertEquals( + String.valueOf(DEFAULT_DELETION_DELAY_VALUE), + data.get(GateContextOptions.DEPLOYMENT_DELETION_DELAY.getLabel())); + + // Verify ConfigMap is available as secondary resource + var configMaps = context.getSecondaryResources(ConfigMap.class); + assertEquals(1, configMaps.size()); + assertEquals( + configMap.getMetadata().getName(), + configMaps.iterator().next().getMetadata().getName()); + } + + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void verifyAdvancedModeStageProgression(FlinkVersion flinkVersion) throws Exception { + var deployment = + buildAdvancedSessionCluster( + TEST_DEPLOYMENT_NAME, + TEST_NAMESPACE, + flinkVersion, + null, + UpgradeMode.STATELESS); + var rs = executeAdvancedDeployment(deployment); + + // After executeAdvancedDeployment, should be in RUNNING state + assertEquals( + TransitionStage.RUNNING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + assertEquals( + FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState()); + } + + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void verifyAdvancedModeBlocking(FlinkVersion flinkVersion) throws Exception { + var deployment = + buildAdvancedSessionCluster( + TEST_DEPLOYMENT_NAME, + TEST_NAMESPACE, + flinkVersion, + null, + UpgradeMode.STATELESS); + + // Complete first deployment + var rs = executeAdvancedDeployment(deployment); + assertEquals( + FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState()); + + // Trigger Blue->Green transition + simulateChangeInSpec(rs.deployment, "green-config", ALT_DELETION_DELAY_VALUE, null); + rs = reconcile(rs.deployment); + + // Initial stage should be INITIALIZING + assertEquals( + TransitionStage.INITIALIZING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + + simulateSuccessfulJobStart(getFlinkDeployments().get(1)); + rs = reconcile(rs.deployment); + + // Should be in TRANSITIONING stage and blocked + assertEquals( + TransitionStage.TRANSITIONING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + assertEquals( + FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN, + rs.reconciledStatus.getBlueGreenState()); + + // Multiple reconciles should remain blocked (no-op) + rs = reconcile(rs.deployment); + assertTrue(rs.updateControl.isNoUpdate()); + + rs = reconcile(rs.deployment); + assertTrue(rs.updateControl.isNoUpdate()); + + // Verify GREEN deployment is created but transition is blocked + assertEquals(2, getFlinkDeployments().size()); + + // Simulate GREEN deployment becoming ready + simulateSuccessfulJobStart(getFlinkDeployments().get(1)); // Green deployment + rs = reconcile(rs.deployment); + + // Should still be blocked waiting for CLEAR_TO_TEARDOWN + assertEquals( + TransitionStage.TRANSITIONING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + assertTrue(rs.updateControl.isNoUpdate()); + + // Simulate external system (gate) updating ConfigMap to CLEAR_TO_TEARDOWN + simulateExternalConfigMapUpdate( + TEST_DEPLOYMENT_NAME, + TRANSITION_STAGE.getLabel(), + TransitionStage.CLEAR_TO_TEARDOWN.toString()); + + // Now reconciliation should proceed with the transition + rs = reconcile(rs.deployment); + assertEquals( + TransitionStage.CLEAR_TO_TEARDOWN, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + + // Verify transition can continue - should schedule for deletion delay + assertTrue(rs.updateControl.isPatchStatus()); + assertTrue(rs.updateControl.getScheduleDelay().isPresent()); + assertEquals(ALT_DELETION_DELAY_VALUE, rs.updateControl.getScheduleDelay().get()); + + // Complete the transition after deletion delay + Thread.sleep(ALT_DELETION_DELAY_VALUE); + rs = reconcile(rs.deployment); + + // Verify successful transition completion + assertEquals(1, getFlinkDeployments().size()); // Blue deployment deleted + rs = reconcile(rs.deployment); // Final reconciliation + assertEquals( + FlinkBlueGreenDeploymentState.ACTIVE_GREEN, + rs.reconciledStatus.getBlueGreenState()); + assertEquals( + TransitionStage.RUNNING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + } + + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void verifyAdvancedModeJobStatusMapping(FlinkVersion flinkVersion) throws Exception { + var deployment = + buildAdvancedSessionCluster( + TEST_DEPLOYMENT_NAME, + TEST_NAMESPACE, + flinkVersion, + null, + UpgradeMode.STATELESS); + var rs = executeAdvancedDeployment(deployment); + + // Test JobStatus.RUNNING -> TransitionStage.RUNNING + assertEquals( + TransitionStage.RUNNING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + + // Test JobStatus.FAILING -> TransitionStage.FAILING + simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null); + simulateJobFailure(getFlinkDeployments().get(0)); + rs = reconcile(rs.deployment); + assertEquals( + TransitionStage.FAILING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + + // Test JobStatus.RECONCILING -> TransitionStage.INITIALIZING + simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null); + simulateSuccessfulJobStart(getFlinkDeployments().get(0)); + rs = reconcile(rs.deployment); + assertEquals( + TransitionStage.INITIALIZING, + BlueGreenTestUtils.getCurrentConfigMapStage(context, TEST_DEPLOYMENT_NAME)); + } + + // ==================== ADVANCED Mode Helper Methods ==================== + + private FlinkBlueGreenDeployment buildAdvancedSessionCluster( + String name, + String namespace, + FlinkVersion version, + String initialSavepointPath, + UpgradeMode upgradeMode) { + var deployment = + buildSessionCluster(name, namespace, version, initialSavepointPath, upgradeMode); + deployment.getSpec().setTransitionMode(TransitionMode.ADVANCED); + return deployment; + } + + private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult + executeAdvancedDeployment(FlinkBlueGreenDeployment deployment) throws Exception { + // Create the resource in the mock server before reconciling + kubernetesClient.resource(deployment).createOrReplace(); + var rs = reconcile(deployment); // Initialize + rs = reconcile(rs.deployment); // Transition to Blue + simulateSuccessfulJobStart(getFlinkDeployments().get(0)); + rs = reconcile(rs.deployment); // Complete the deployment and reach ACTIVE_BLUE + return rs; + } + + private void simulateExternalConfigMapUpdate(String deploymentName, String key, String value) { + String configMapName = + deploymentName + + "-configmap"; // Using actual naming convention from BlueGreenContext + ConfigMap configMap = + kubernetesClient + .configMaps() + .inNamespace(TEST_NAMESPACE) + .withName(configMapName) + .get(); + + configMap.getData().put(key, value); + kubernetesClient.configMaps().inNamespace(TEST_NAMESPACE).resource(configMap).update(); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java index da7bedea26..00e33dee63 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java @@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import lombok.AllArgsConstructor; @@ -69,14 +68,6 @@ public UpdateControl reconcile( return flinkBlueGreenDeploymentController.reconcile(cloned, context); } - @Override - public ErrorStatusUpdateControl updateErrorStatus( - FlinkBlueGreenDeployment flinkBlueGreenDeployment, - Context context, - Exception e) { - return null; - } - /** A simple DTO to handle common reconciliation results for tests. */ @AllArgsConstructor public static class BlueGreenReconciliationResult { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java index 76ff973aa8..9d7b0ca44e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkBlueGreenDeploymentMetricsTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; @@ -467,6 +468,7 @@ private FlinkBlueGreenDeployment buildBlueGreenDeployment(String name, String na new FlinkBlueGreenDeploymentSpec( new HashMap<>(), null, + TransitionMode.BASIC, FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build()); deployment.setSpec(bgDeploymentSpec); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java index 79add57f99..3e63636a26 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/BlueGreenLifecycleMetricsTest.java @@ -19,6 +19,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; @@ -180,6 +181,7 @@ private FlinkBlueGreenDeployment buildBlueGreenDeployment(String name, String na new FlinkBlueGreenDeploymentSpec( new HashMap<>(), null, + TransitionMode.BASIC, FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build()); deployment.setSpec(bgDeploymentSpec); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenTransitionUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenTransitionUtilsTest.java new file mode 100644 index 0000000000..e06af4d804 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenTransitionUtilsTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils.bluegreen; + +import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; +import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; +import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec; +import org.apache.flink.kubernetes.operator.api.spec.JobSpec; +import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus; +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link BlueGreenTransitionUtils} config validation. */ +public class BlueGreenTransitionUtilsTest { + + @Test + public void testValidate_basicMode_skipsValidation() { + BlueGreenContext context = buildContext(TransitionMode.BASIC, new HashMap<>()); + Optional result = BlueGreenTransitionUtils.validateAdvancedModeConfig(context); + assertFalse(result.isPresent()); + } + + @Test + public void testValidate_advancedMode_missingStrategy_returnsError() { + BlueGreenContext context = buildContext(TransitionMode.ADVANCED, new HashMap<>()); + Optional result = BlueGreenTransitionUtils.validateAdvancedModeConfig(context); + assertTrue(result.isPresent()); + assertTrue(result.get().contains("bluegreen.gate.strategy")); + } + + @Test + public void testValidate_advancedMode_unknownStrategy_returnsError() { + BlueGreenContext context = + buildContext( + TransitionMode.ADVANCED, + Map.of("bluegreen.gate.strategy", "UNKNOWN_STRATEGY")); + Optional result = BlueGreenTransitionUtils.validateAdvancedModeConfig(context); + assertTrue(result.isPresent()); + assertTrue(result.get().contains("UNKNOWN_STRATEGY")); + } + + @Test + public void testValidate_watermarkStrategy_missingExtractorClass_returnsError() { + BlueGreenContext context = + buildContext( + TransitionMode.ADVANCED, Map.of("bluegreen.gate.strategy", "WATERMARK")); + Optional result = BlueGreenTransitionUtils.validateAdvancedModeConfig(context); + assertTrue(result.isPresent()); + assertTrue(result.get().contains("bluegreen.gate.watermark.extractor-class")); + } + + @Test + public void testValidate_watermarkStrategy_validConfig_returnsEmpty() { + BlueGreenContext context = + buildContext( + TransitionMode.ADVANCED, + Map.of( + "bluegreen.gate.strategy", "WATERMARK", + "bluegreen.gate.watermark.extractor-class", + "com.example.MyExtractor")); + Optional result = BlueGreenTransitionUtils.validateAdvancedModeConfig(context); + assertFalse(result.isPresent()); + } + + // ==================== Helpers ==================== + + private static BlueGreenContext buildContext( + TransitionMode transitionMode, Map flinkConfig) { + var deployment = new FlinkBlueGreenDeployment(); + deployment.setMetadata( + new ObjectMetaBuilder() + .withName("test-app") + .withNamespace("test-ns") + .withUid(UUID.randomUUID().toString()) + .build()); + + var flinkDeploymentSpec = + FlinkDeploymentSpec.builder() + .flinkConfiguration(new ConfigObjectNode()) + .job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build()) + .build(); + flinkDeploymentSpec.setFlinkConfiguration(new HashMap<>(flinkConfig)); + + var bgDeploymentSpec = + new FlinkBlueGreenDeploymentSpec( + new HashMap<>(), + null, + transitionMode, + FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build()); + + deployment.setSpec(bgDeploymentSpec); + deployment.setStatus(new FlinkBlueGreenDeploymentStatus()); + return new BlueGreenContext(deployment, deployment.getStatus(), null, null, null); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java index d7e735103c..6530af8b13 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java @@ -20,6 +20,7 @@ import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; +import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode; import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; @@ -244,6 +245,7 @@ private static FlinkBlueGreenDeployment buildBlueGreenDeployment( new FlinkBlueGreenDeploymentSpec( new HashMap<>(), null, + TransitionMode.BASIC, FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build()); deployment.setSpec(bgDeploymentSpec); diff --git a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml index 0f8ef636fb..e16f618b5f 100644 --- a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml @@ -24,35 +24,35 @@ spec: properties: spec: properties: + configuration: + additionalProperties: + type: "string" + type: "object" ingress: properties: annotations: additionalProperties: - type: string - type: object + type: "string" + type: "object" className: - type: string + type: "string" labels: additionalProperties: - type: string - type: object + type: "string" + type: "object" template: - type: string + type: "string" tls: items: properties: hosts: items: - type: string - type: array + type: "string" + type: "array" secretName: - type: string - type: object - type: array - type: object - configuration: - additionalProperties: - type: "string" + type: "string" + type: "object" + type: "array" type: "object" template: properties: @@ -144,6 +144,8 @@ spec: - "v2_0" - "v2_1" - "v2_2" + - "v2_3" + - "v2_4" type: "string" image: type: "string" @@ -10777,6 +10779,11 @@ spec: type: "object" type: "object" type: "object" + transitionMode: + enum: + - "ADVANCED" + - "BASIC" + type: "string" type: "object" type: "object" status: diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml index 668a84b6c7..758118c108 100644 --- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml @@ -1,253 +1,250 @@ # Generated by Fabric8 CRDGenerator, manual edits might get overwritten! -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition +apiVersion: "apiextensions.k8s.io/v1" +kind: "CustomResourceDefinition" metadata: - name: flinksessionjobs.flink.apache.org + name: "flinksessionjobs.flink.apache.org" spec: - group: flink.apache.org + group: "flink.apache.org" names: - kind: FlinkSessionJob - plural: flinksessionjobs + kind: "FlinkSessionJob" + plural: "flinksessionjobs" shortNames: - - sessionjob - singular: flinksessionjob - scope: Namespaced + - "sessionjob" + singular: "flinksessionjob" + scope: "Namespaced" versions: - additionalPrinterColumns: - - description: Last observed state of the job. - jsonPath: .status.jobStatus.state - name: Job Status + - jsonPath: ".status.jobStatus.state" + name: "Job Status" priority: 0 - type: string - - description: "Lifecycle state of the Flink resource (including being rolled\ - \ back, failed etc.)." - jsonPath: .status.lifecycleState - name: Lifecycle State + type: "string" + - jsonPath: ".status.lifecycleState" + name: "Lifecycle State" priority: 0 - type: string - name: v1beta1 + type: "string" + name: "v1beta1" schema: openAPIV3Schema: properties: spec: properties: deploymentName: - type: string + type: "string" flinkConfiguration: x-kubernetes-preserve-unknown-fields: true job: properties: allowNonRestoredState: - type: boolean + type: "boolean" args: items: - type: string - type: array + type: "string" + type: "array" autoscalerResetNonce: - type: integer + type: "integer" checkpointTriggerNonce: - type: integer + type: "integer" entryClass: - type: string + type: "string" initialSavepointPath: - type: string + type: "string" jarURI: - type: string + type: "string" parallelism: - type: integer + type: "integer" savepointRedeployNonce: - type: integer + type: "integer" savepointTriggerNonce: - type: integer + type: "integer" state: enum: - - running - - suspended - type: string + - "running" + - "suspended" + type: "string" upgradeMode: enum: - - last-state - - savepoint - - stateless - type: string - type: object + - "last-state" + - "savepoint" + - "stateless" + type: "string" + type: "object" restartNonce: - type: integer - type: object + type: "integer" + type: "object" status: properties: error: - type: string + type: "string" jobStatus: properties: checkpointInfo: properties: formatType: enum: - - FULL - - INCREMENTAL - - UNKNOWN - type: string + - "FULL" + - "INCREMENTAL" + - "UNKNOWN" + type: "string" lastCheckpoint: properties: formatType: enum: - - FULL - - INCREMENTAL - - UNKNOWN - type: string + - "FULL" + - "INCREMENTAL" + - "UNKNOWN" + type: "string" timeStamp: - type: integer + type: "integer" triggerNonce: - type: integer + type: "integer" triggerType: enum: - - MANUAL - - PERIODIC - - UNKNOWN - - UPGRADE - type: string - type: object + - "MANUAL" + - "PERIODIC" + - "UNKNOWN" + - "UPGRADE" + type: "string" + type: "object" lastPeriodicCheckpointTimestamp: - type: integer + type: "integer" triggerId: - type: string + type: "string" triggerTimestamp: - type: integer + type: "integer" triggerType: enum: - - MANUAL - - PERIODIC - - UNKNOWN - - UPGRADE - type: string - type: object + - "MANUAL" + - "PERIODIC" + - "UNKNOWN" + - "UPGRADE" + type: "string" + type: "object" jobId: - type: string + type: "string" jobName: - type: string + type: "string" savepointInfo: properties: formatType: enum: - - CANONICAL - - NATIVE - - UNKNOWN - type: string + - "CANONICAL" + - "NATIVE" + - "UNKNOWN" + type: "string" lastPeriodicSavepointTimestamp: - type: integer + type: "integer" lastSavepoint: properties: formatType: enum: - - CANONICAL - - NATIVE - - UNKNOWN - type: string + - "CANONICAL" + - "NATIVE" + - "UNKNOWN" + type: "string" location: - type: string + type: "string" timeStamp: - type: integer + type: "integer" triggerNonce: - type: integer + type: "integer" triggerType: enum: - - MANUAL - - PERIODIC - - UNKNOWN - - UPGRADE - type: string - type: object + - "MANUAL" + - "PERIODIC" + - "UNKNOWN" + - "UPGRADE" + type: "string" + type: "object" savepointHistory: items: properties: formatType: enum: - - CANONICAL - - NATIVE - - UNKNOWN - type: string + - "CANONICAL" + - "NATIVE" + - "UNKNOWN" + type: "string" location: - type: string + type: "string" timeStamp: - type: integer + type: "integer" triggerNonce: - type: integer + type: "integer" triggerType: enum: - - MANUAL - - PERIODIC - - UNKNOWN - - UPGRADE - type: string - type: object - type: array + - "MANUAL" + - "PERIODIC" + - "UNKNOWN" + - "UPGRADE" + type: "string" + type: "object" + type: "array" triggerId: - type: string + type: "string" triggerTimestamp: - type: integer + type: "integer" triggerType: enum: - - MANUAL - - PERIODIC - - UNKNOWN - - UPGRADE - type: string - type: object + - "MANUAL" + - "PERIODIC" + - "UNKNOWN" + - "UPGRADE" + type: "string" + type: "object" startTime: - type: string + type: "string" state: enum: - - CANCELED - - CANCELLING - - CREATED - - FAILED - - FAILING - - FINISHED - - INITIALIZING - - RECONCILING - - RESTARTING - - RUNNING - - SUSPENDED - type: string + - "CANCELED" + - "CANCELLING" + - "CREATED" + - "FAILED" + - "FAILING" + - "FINISHED" + - "INITIALIZING" + - "RECONCILING" + - "RESTARTING" + - "RUNNING" + - "SUSPENDED" + type: "string" updateTime: - type: string + type: "string" upgradeSavepointPath: - type: string - type: object + type: "string" + type: "object" lifecycleState: enum: - - CREATED - - DELETED - - DELETING - - DEPLOYED - - FAILED - - ROLLED_BACK - - ROLLING_BACK - - STABLE - - SUSPENDED - - UPGRADING - type: string + - "CREATED" + - "DELETED" + - "DELETING" + - "DEPLOYED" + - "FAILED" + - "ROLLED_BACK" + - "ROLLING_BACK" + - "STABLE" + - "SUSPENDED" + - "UPGRADING" + type: "string" observedGeneration: - type: integer + type: "integer" reconciliationStatus: properties: lastReconciledSpec: - type: string + type: "string" lastStableSpec: - type: string + type: "string" reconciliationTimestamp: - type: integer + type: "integer" state: enum: - - DEPLOYED - - ROLLED_BACK - - ROLLING_BACK - - UPGRADING - type: string - type: object - type: object - type: object + - "DEPLOYED" + - "ROLLED_BACK" + - "ROLLING_BACK" + - "UPGRADING" + type: "string" + type: "object" + type: "object" + type: "object" served: true storage: true subresources: diff --git a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml index 3f2f3475de..06a05f7e2a 100644 --- a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml @@ -1,94 +1,91 @@ # Generated by Fabric8 CRDGenerator, manual edits might get overwritten! -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition +apiVersion: "apiextensions.k8s.io/v1" +kind: "CustomResourceDefinition" metadata: - name: flinkstatesnapshots.flink.apache.org + name: "flinkstatesnapshots.flink.apache.org" spec: - group: flink.apache.org + group: "flink.apache.org" names: - kind: FlinkStateSnapshot - plural: flinkstatesnapshots + kind: "FlinkStateSnapshot" + plural: "flinkstatesnapshots" shortNames: - - flinksnp - singular: flinkstatesnapshot - scope: Namespaced + - "flinksnp" + singular: "flinkstatesnapshot" + scope: "Namespaced" versions: - additionalPrinterColumns: - - description: Final path of the snapshot. - jsonPath: .status.path - name: Path + - jsonPath: ".status.path" + name: "Path" priority: 0 - type: string - - description: Timestamp when the snapshot was last created/failed. - jsonPath: .status.resultTimestamp - name: Result Timestamp + type: "string" + - jsonPath: ".status.resultTimestamp" + name: "Result Timestamp" priority: 0 - type: string - - description: Current state of the snapshot. - jsonPath: .status.state - name: Snapshot State + type: "string" + - jsonPath: ".status.state" + name: "Snapshot State" priority: 0 - type: string - name: v1beta1 + type: "string" + name: "v1beta1" schema: openAPIV3Schema: properties: spec: properties: backoffLimit: - type: integer + type: "integer" checkpoint: - type: object + type: "object" jobReference: properties: kind: enum: - - FlinkDeployment - - FlinkSessionJob - type: string + - "FlinkDeployment" + - "FlinkSessionJob" + type: "string" name: - type: string - type: object + type: "string" + type: "object" savepoint: properties: alreadyExists: - type: boolean + type: "boolean" disposeOnDelete: - type: boolean + type: "boolean" formatType: enum: - - CANONICAL - - NATIVE - - UNKNOWN - type: string + - "CANONICAL" + - "NATIVE" + - "UNKNOWN" + type: "string" path: - type: string - type: object - type: object + type: "string" + type: "object" + type: "object" status: properties: error: - type: string + type: "string" failures: - type: integer + type: "integer" path: - type: string + type: "string" resultTimestamp: - type: string + type: "string" state: enum: - - ABANDONED - - COMPLETED - - FAILED - - IN_PROGRESS - - TRIGGER_PENDING - type: string + - "ABANDONED" + - "COMPLETED" + - "FAILED" + - "IN_PROGRESS" + - "TRIGGER_PENDING" + type: "string" triggerId: - type: string + type: "string" triggerTimestamp: - type: string - type: object - type: object + type: "string" + type: "object" + type: "object" served: true storage: true subresources: diff --git a/helm/flink-kubernetes-operator/templates/controller/deployment.yaml b/helm/flink-kubernetes-operator/templates/controller/deployment.yaml index 85e6a465f1..7f5e19ac38 100644 --- a/helm/flink-kubernetes-operator/templates/controller/deployment.yaml +++ b/helm/flink-kubernetes-operator/templates/controller/deployment.yaml @@ -112,6 +112,8 @@ spec: fieldPath: metadata.name - name: OPERATOR_NAME value: {{ include "flink-operator.name" . }} + - name: OPERATOR_IMAGE + value: {{ include "flink-operator.imagePath" . }} - name: FLINK_CONF_DIR value: /opt/flink/conf - name: FLINK_PLUGINS_DIR diff --git a/pom.xml b/pom.xml index f3a0a8bc30..5adb8f8d00 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,8 @@ under the License. + flink-kubernetes-operator-bluegreen-client + flink-kubernetes-operator-bluegreen-agent flink-kubernetes-standalone flink-kubernetes-operator flink-kubernetes-operator-api @@ -86,6 +88,8 @@ under the License. 1.20.1 33.4.0-jre + 5.8.2 + 1.7.36 2.23.1