Skip to content

Commit 24b0d99

Browse files
[FLINK-39509][flink-kubernetes-webhook] Fix mutating webhook producing spurious object modifications on no-op patches
1 parent 27f66d9 commit 24b0d99

1 file changed

Lines changed: 24 additions & 4 deletions

File tree

flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ public HasMetadata mutate(HasMetadata resource, Operation operation)
6666
return resource;
6767
}
6868

69-
private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
69+
private HasMetadata mutateSessionJob(HasMetadata resource) {
7070
try {
7171
var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class);
72+
var originalSessionJob = mapper.valueToTree(sessionJob);
7273
var namespace = sessionJob.getMetadata().getNamespace();
7374
var deploymentName = sessionJob.getSpec().getDeploymentName();
7475
var key = Cache.namespaceKeyFunc(namespace, deploymentName);
@@ -78,31 +79,50 @@ private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
7879
for (FlinkResourceMutator mutator : mutators) {
7980
sessionJob = mutator.mutateSessionJob(sessionJob, Optional.ofNullable(deployment));
8081
}
81-
82+
// Return the original resource if no mutation was applied to avoid
83+
// the serialization round-trip producing a different object, which causes
84+
// kubectl to inconsistently omit the "(no change)" suffix on patch responses
85+
if (originalSessionJob.equals(mapper.valueToTree(sessionJob))) {
86+
return resource;
87+
}
8288
return sessionJob;
8389
} catch (Exception e) {
8490
throw new RuntimeException(e);
8591
}
8692
}
8793

88-
private FlinkDeployment mutateDeployment(HasMetadata resource) {
94+
private HasMetadata mutateDeployment(HasMetadata resource) {
8995
try {
9096
var flinkDeployment = mapper.convertValue(resource, FlinkDeployment.class);
97+
var originalFlinkDeployment = mapper.valueToTree(flinkDeployment);
9198
for (FlinkResourceMutator mutator : mutators) {
9299
flinkDeployment = mutator.mutateDeployment(flinkDeployment);
93100
}
101+
// Return the original resource if no mutation was applied to avoid
102+
// the serialization round-trip producing a different object, which causes
103+
// kubectl to inconsistently omit the "(no change)" suffix on patch responses
104+
if (originalFlinkDeployment.equals(mapper.valueToTree(flinkDeployment))) {
105+
return resource;
106+
}
94107
return flinkDeployment;
95108
} catch (Exception e) {
96109
throw new RuntimeException(e);
97110
}
98111
}
99112

100-
private FlinkStateSnapshot mutateStateSnapshot(HasMetadata resource) {
113+
private HasMetadata mutateStateSnapshot(HasMetadata resource) {
101114
try {
102115
var snapshot = mapper.convertValue(resource, FlinkStateSnapshot.class);
116+
var originalSnapshot = mapper.valueToTree(snapshot);
103117
for (var mutator : mutators) {
104118
snapshot = mutator.mutateStateSnapshot(snapshot);
105119
}
120+
// Return the original resource if no mutation was applied to avoid
121+
// the serialization round-trip producing a different object, which causes
122+
// kubectl to inconsistently omit the "(no change)" suffix on patch responses
123+
if (originalSnapshot.equals(mapper.valueToTree(snapshot))) {
124+
return resource;
125+
}
106126
return snapshot;
107127
} catch (Exception e) {
108128
throw new RuntimeException(e);

0 commit comments

Comments
 (0)