Skip to content

Commit 3c87fd7

Browse files
committed
Updating client side ProcessFunction and unit tests.
1 parent 25d70da commit 3c87fd7

5 files changed

Lines changed: 254 additions & 762 deletions

File tree

flink-kubernetes-operator-bluegreen-client/pom.xml

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,25 @@ under the License.
2424
<parent>
2525
<groupId>org.apache.flink</groupId>
2626
<artifactId>flink-kubernetes-operator-parent</artifactId>
27-
<version>1.14-SNAPSHOT</version>
27+
<version>1.13-apple-SNAPSHOT</version>
2828
<relativePath>..</relativePath>
2929
</parent>
3030

3131
<artifactId>flink-kubernetes-operator-bluegreen-client</artifactId>
3232
<name>Flink Kubernetes BlueGreen Deployment Client</name>
3333

34+
<properties>
35+
<surefire.module.config>
36+
<!-- required by Kryo/chill ArraysAsListSerializer in test harness -->
37+
--add-opens=java.base/java.util=ALL-UNNAMED
38+
</surefire.module.config>
39+
</properties>
40+
3441
<dependencies>
3542
<dependency>
3643
<groupId>org.apache.flink</groupId>
3744
<artifactId>flink-kubernetes-operator-api</artifactId>
38-
<version>1.14-SNAPSHOT</version>
45+
<version>1.13-apple-SNAPSHOT</version>
3946
<scope>compile</scope>
4047
</dependency>
4148

@@ -70,23 +77,23 @@ under the License.
7077

7178
<dependency>
7279
<groupId>org.apache.flink</groupId>
73-
<artifactId>flink-test-utils</artifactId>
80+
<artifactId>flink-runtime</artifactId>
7481
<version>${flink.version}</version>
7582
<scope>test</scope>
7683
</dependency>
7784

7885
<dependency>
7986
<groupId>org.apache.flink</groupId>
80-
<artifactId>flink-streaming-java</artifactId>
87+
<artifactId>flink-test-utils</artifactId>
8188
<version>${flink.version}</version>
82-
<type>test-jar</type>
8389
<scope>test</scope>
8490
</dependency>
8591

8692
<dependency>
87-
<groupId>io.fabric8</groupId>
88-
<artifactId>kubernetes-httpclient-okhttp</artifactId>
89-
<version>${fabric8.version}</version>
93+
<groupId>org.apache.flink</groupId>
94+
<artifactId>flink-streaming-java</artifactId>
95+
<version>${flink.version}</version>
96+
<type>test-jar</type>
9097
<scope>test</scope>
9198
</dependency>
9299
</dependencies>

flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/GateProcessFunction.java

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.kubernetes.operator.bluegreen.client;
1919

20+
import io.fabric8.kubernetes.api.model.ConfigMap;
21+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
2022
import org.apache.flink.configuration.Configuration;
2123
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
2224
import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext;
@@ -25,9 +27,6 @@
2527
import org.apache.flink.streaming.api.functions.ProcessFunction;
2628
import org.apache.flink.util.Collector;
2729
import org.apache.flink.util.Preconditions;
28-
29-
import io.fabric8.kubernetes.api.model.ConfigMap;
30-
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
3332

@@ -49,8 +48,8 @@ abstract class GateProcessFunction<I> extends ProcessFunction<I, I> implements S
4948
// TODO: make this configurable? This cannot be a constant
5049
protected final int subtaskIndexGuide = 1;
5150

52-
protected GateKubernetesService gateKubernetesService;
53-
protected Boolean clearToTeardown = false;
51+
private GateKubernetesService gateKubernetesService;
52+
private Boolean clearToTeardown = false;
5453
protected GateContext baseContext;
5554
private String namespace;
5655
private String configMapName;
@@ -60,8 +59,7 @@ abstract class GateProcessFunction<I> extends ProcessFunction<I, I> implements S
6059
public GateProcessFunction(
6160
BlueGreenDeploymentType blueGreenDeploymentType,
6261
String namespace,
63-
String configMapName,
64-
GateKubernetesService gateKubernetesService) {
62+
String configMapName) {
6563
Preconditions.checkArgument(
6664
blueGreenDeploymentType == BlueGreenDeploymentType.BLUE
6765
|| blueGreenDeploymentType == BlueGreenDeploymentType.GREEN,
@@ -70,19 +68,12 @@ public GateProcessFunction(
7068
this.blueGreenDeploymentType = blueGreenDeploymentType;
7169
this.namespace = namespace;
7270
this.configMapName = configMapName;
73-
this.gateKubernetesService = gateKubernetesService;
7471
}
7572

7673
@Override
7774
public void open(Configuration parameters) throws Exception {
7875
super.open(parameters);
7976

80-
// Only create service if not injected
81-
if (gateKubernetesService == null) {
82-
this.gateKubernetesService = new GateKubernetesService(namespace, configMapName);
83-
}
84-
85-
// Always set up informers, whether service was injected or created
8677
setKubernetesEnvironment();
8778
processConfigMap(gateKubernetesService.parseConfigMap());
8879
}
@@ -113,6 +104,8 @@ public void processElement(I value, ProcessFunction<I, I>.Context ctx, Collector
113104
}
114105

115106
private void setKubernetesEnvironment() {
107+
this.gateKubernetesService = new GateKubernetesService(namespace, configMapName);
108+
116109
logInfo("Preparing Informers...");
117110
var resourceEventHandler =
118111
new ResourceEventHandler<ConfigMap>() {
@@ -167,26 +160,18 @@ private void processConfigMap(ConfigMap configMap) {
167160
onContextUpdate(baseContext, filteredData);
168161
}
169162

170-
protected void notifyClearToTeardown() {
163+
protected final void notifyClearToTeardown() {
171164
// Notify only once
172165
if (!clearToTeardown && getRuntimeContext().getIndexOfThisSubtask() == subtaskIndexGuide) {
173166
logInfo("Setting " + CLEAR_TO_TEARDOWN);
174-
performConfigMapUpdate(
167+
gateKubernetesService.updateConfigMapEntries(
175168
Map.of(TRANSITION_STAGE.getLabel(), CLEAR_TO_TEARDOWN.toString()));
176169
logInfo(CLEAR_TO_TEARDOWN + " set!");
177170
this.clearToTeardown = true;
178171
}
179172
}
180173

181-
/**
182-
* Template method for updating ConfigMap entries. Override in tests to avoid actual K8s
183-
* updates.
184-
*/
185-
protected void performConfigMapUpdate(Map<String, String> updates) {
186-
gateKubernetesService.updateConfigMapEntries(updates);
187-
}
188-
189-
protected void updateConfigMapCustomEntries(Map<String, String> customEntries)
174+
protected final void updateConfigMapCustomEntries(Map<String, String> customEntries)
190175
throws IllegalAccessException {
191176
// Validating only "custom" entries/keys can be updated
192177
var keysToUpdate = customEntries.keySet();
@@ -203,7 +188,7 @@ protected void updateConfigMapCustomEntries(Map<String, String> customEntries)
203188
throw new IllegalAccessException(error);
204189
}
205190
logInfo("Updating custom entries: " + customEntries);
206-
performConfigMapUpdate(customEntries);
191+
gateKubernetesService.updateConfigMapEntries(customEntries);
207192
}
208193

209194
// Temporary "utility" function for development

flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
package org.apache.flink.kubernetes.operator.bluegreen.client;
1919

20-
import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext;
21-
2220
import lombok.Getter;
2321
import lombok.Setter;
22+
import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext;
2423

2524
import java.io.Serializable;
2625
import java.util.Map;
26+
import java.util.Objects;
2727

2828
/** Watermark based functionality of the GateContext. */
2929
@Getter
@@ -52,7 +52,7 @@ public boolean equals(Object o) {
5252
WatermarkGateContext wm = (WatermarkGateContext) o;
5353
return this.baseContext.equals(wm.getBaseContext())
5454
&& this.watermarkGateStage.equals(wm.getWatermarkGateStage())
55-
&& this.watermarkToggleValue.equals(wm.getWatermarkToggleValue());
55+
&& Objects.equals(this.watermarkToggleValue, wm.getWatermarkToggleValue());
5656
}
5757
return false;
5858
}

flink-kubernetes-operator-bluegreen-client/src/main/java/org/apache/flink/kubernetes/operator/bluegreen/client/WatermarkGateProcessFunction.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
2121
import org.apache.flink.kubernetes.operator.api.bluegreen.GateContext;
22-
import org.apache.flink.kubernetes.operator.api.bluegreen.GateKubernetesService;
2322
import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionStage;
2423
import org.apache.flink.streaming.api.functions.ProcessFunction;
2524
import org.apache.flink.util.Collector;
@@ -35,25 +34,16 @@ public class WatermarkGateProcessFunction<I> extends GateProcessFunction<I>
3534

3635
private final Function<I, Long> watermarkExtractor;
3736

38-
protected WatermarkGateContext currentWatermarkGateContext;
37+
private WatermarkGateContext currentWatermarkGateContext;
3938

40-
protected Boolean waitingForWatermark = false;
39+
private boolean waitingForWatermark = false;
4140

4241
WatermarkGateProcessFunction(
4342
BlueGreenDeploymentType blueGreenDeploymentType,
4443
String namespace,
4544
String configMapName,
4645
Function<I, Long> watermarkExtractor) {
47-
this(blueGreenDeploymentType, namespace, configMapName, watermarkExtractor, null);
48-
}
49-
50-
WatermarkGateProcessFunction(
51-
BlueGreenDeploymentType blueGreenDeploymentType,
52-
String namespace,
53-
String configMapName,
54-
Function<I, Long> watermarkExtractor,
55-
GateKubernetesService gateKubernetesService) {
56-
super(blueGreenDeploymentType, namespace, configMapName, gateKubernetesService);
46+
super(blueGreenDeploymentType, namespace, configMapName);
5747

5848
Preconditions.checkNotNull(watermarkExtractor);
5949

@@ -147,7 +137,7 @@ private long getWatermarkBoundary(ProcessFunction<I, I>.Context ctx) {
147137
: ctx.timerService().currentProcessingTime();
148138
}
149139

150-
private void updateWatermarkInConfigMap(ProcessFunction<I, I>.Context ctx)
140+
protected void updateWatermarkInConfigMap(ProcessFunction<I, I>.Context ctx)
151141
throws IllegalAccessException {
152142
var curWmCtx = currentWatermarkGateContext;
153143

@@ -173,7 +163,7 @@ private void updateWatermarkInConfigMap(ProcessFunction<I, I>.Context ctx)
173163
}
174164
}
175165

176-
private void notifyWaitingForWatermark() throws IllegalAccessException {
166+
protected void notifyWaitingForWatermark() throws IllegalAccessException {
177167
if (!waitingForWatermark
178168
&& getRuntimeContext().getIndexOfThisSubtask() == subtaskIndexGuide) {
179169
logInfo("Setting " + WatermarkGateStage.WAITING_FOR_WATERMARK);

0 commit comments

Comments
 (0)