Skip to content

Commit 1f4e984

Browse files
committed
refactor(runtime): extract VirtualClusterManager from KafkaProxy (#87)
* Add VirtualClusterManager with construction, accessors, and tests Introduces VirtualClusterManager as the single source of truth for which virtual clusters exist and their current lifecycle state. VCM owns the cluster config tree and lifecycle managers but does not manage networking, endpoint registration, or metrics — those remain with KafkaProxy. The onVirtualClusterStopped callback notifies the owner when a VC reaches terminal Stopped state, allowing proxy-level policy decisions. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Add per-VC lifecycle transitions with callback Adds initializationSucceeded(clusterName) and initializationFailed(clusterName, cause) to VirtualClusterManager. On failure, the VC transitions immediately from Failed to Stopped (no recovery path today) and fires the onVirtualClusterStopped callback with the failure cause. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Add bulk shutdown transitions and return value Adds transitionAllToDraining() and transitionAllToStopped() to VirtualClusterManager. transitionAllToStopped() returns boolean indicating whether all clusters are now stopped, so KafkaProxy doesn't need to poll individual lifecycle managers. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Wire VirtualClusterManager into KafkaProxy KafkaProxy now delegates virtual cluster lifecycle management to VirtualClusterManager instead of managing lifecycle managers directly. Removes private transitionAllToDraining/transitionAllToStopped methods and the lifecycleManagers map in favour of VCM. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Make VirtualClusterManager non-nullable in KafkaProxy Move VCM construction from startup() to the constructor so it can be final and non-nullable, eliminating null guards throughout. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Improve onVirtualClusterStopped callback logging Log at WARN with a message that reflects the serve:none policy intent. Add TODO noting the callback should eventually drive proxy shutdown rather than relying on the caller. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> --------- Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 5beef1a commit 1f4e984

3 files changed

Lines changed: 463 additions & 34 deletions

File tree

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.time.Duration;
99
import java.util.ArrayList;
10-
import java.util.LinkedHashMap;
1110
import java.util.List;
1211
import java.util.Map;
1312
import java.util.Optional;
@@ -59,7 +58,7 @@
5958
import io.kroxylicious.proxy.internal.MeterRegistries;
6059
import io.kroxylicious.proxy.internal.PortConflictDetector;
6160
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleManager;
62-
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState;
61+
import io.kroxylicious.proxy.internal.VirtualClusterManager;
6362
import io.kroxylicious.proxy.internal.admin.ManagementInitializer;
6463
import io.kroxylicious.proxy.internal.config.Features;
6564
import io.kroxylicious.proxy.internal.net.DefaultNetworkBindingOperationProcessor;
@@ -152,7 +151,7 @@ private static int resolveThreadCount(Configuration configuration, Function<Netw
152151
private final NetworkBindingOperationProcessor bindingOperationProcessor = new DefaultNetworkBindingOperationProcessor();
153152
private final EndpointRegistry endpointRegistry = new EndpointRegistry(bindingOperationProcessor);
154153
private final PluginFactoryRegistry pfr;
155-
private final Map<String, VirtualClusterLifecycleManager> lifecycleManagers = new LinkedHashMap<>();
154+
private final VirtualClusterManager vcm;
156155
private @Nullable MeterRegistries meterRegistries;
157156
private @Nullable FilterChainFactory filterChainFactory;
158157
private @Nullable EventGroupConfig managementEventGroup;
@@ -164,6 +163,10 @@ public KafkaProxy(PluginFactoryRegistry pfr, Configuration config, Features feat
164163
this.virtualClusterModels = config.virtualClusterModel();
165164
this.managementConfiguration = config.management();
166165
this.micrometerConfig = config.getMicrometer();
166+
this.vcm = new VirtualClusterManager(virtualClusterModels, (clusterName, cause) -> STARTUP_SHUTDOWN_LOGGER.atWarn()
167+
.addKeyValue("virtualCluster", clusterName)
168+
.addKeyValue("cause", cause.orElse(null))
169+
.log("Virtual cluster reached terminal stopped state, proxy shutdown required"));
167170
}
168171

169172
@VisibleForTesting
@@ -221,10 +224,6 @@ public KafkaProxy startup() {
221224
STARTUP_SHUTDOWN_LOGGER.atInfo()
222225
.log("Kroxylicious is starting");
223226

224-
for (var vcm : virtualClusterModels) {
225-
lifecycleManagers.put(vcm.getClusterName(), new VirtualClusterLifecycleManager(vcm.getClusterName()));
226-
}
227-
228227
meterRegistries = new MeterRegistries(pfr, micrometerConfig);
229228
initVersionInfoMetric();
230229

@@ -261,7 +260,7 @@ public KafkaProxy startup() {
261260
.toArray(CompletableFuture[]::new))
262261
.join();
263262

264-
lifecycleManagers.values().forEach(VirtualClusterLifecycleManager::initializationSucceeded);
263+
virtualClusterModels.forEach(model -> vcm.initializationSucceeded(model.getClusterName()));
265264

266265
STARTUP_SHUTDOWN_LOGGER.atInfo()
267266
.log("Kroxylicious is started");
@@ -271,7 +270,9 @@ public KafkaProxy startup() {
271270
STARTUP_SHUTDOWN_LOGGER.atError()
272271
.setCause(e)
273272
.log("Exception during startup, shutting down");
274-
lifecycleManagers.values().forEach(m -> m.initializationFailed(e));
273+
// TODO: the onVirtualClusterStopped callback should drive the serve:none policy (triggering proxy shutdown)
274+
// rather than relying on the caller to call shutdown() separately. Currently the callback only logs.
275+
virtualClusterModels.forEach(model -> vcm.initializationFailed(model.getClusterName(), e));
275276
shutdown();
276277
throw new LifecycleException("Startup completed exceptionally", e);
277278
}
@@ -369,7 +370,7 @@ public void shutdown() {
369370
try {
370371
STARTUP_SHUTDOWN_LOGGER.atInfo()
371372
.log("Shutting down");
372-
transitionAllToDraining();
373+
vcm.transitionAllToDraining();
373374
endpointRegistry.shutdown().handle((u, t) -> {
374375
bindingOperationProcessor.close();
375376
var closeFutures = new ArrayList<Future<?>>();
@@ -391,7 +392,7 @@ public void shutdown() {
391392
}
392393
return null;
393394
}).toCompletableFuture().join();
394-
transitionAllToStopped();
395+
vcm.transitionAllToStopped();
395396
if (meterRegistries != null) {
396397
meterRegistries.close();
397398
}
@@ -416,29 +417,7 @@ public void shutdown() {
416417
@VisibleForTesting
417418
@Nullable
418419
VirtualClusterLifecycleManager lifecycleManagerFor(String clusterName) {
419-
return lifecycleManagers.get(clusterName);
420-
}
421-
422-
private void transitionAllToDraining() {
423-
lifecycleManagers.values().forEach(m -> {
424-
if (m.getState() instanceof VirtualClusterLifecycleState.Serving) {
425-
m.startDraining();
426-
}
427-
else if (m.getState() instanceof VirtualClusterLifecycleState.Failed) {
428-
m.stop();
429-
}
430-
else if (m.getState() instanceof VirtualClusterLifecycleState.Initializing) {
431-
m.stop();
432-
}
433-
});
434-
}
435-
436-
private void transitionAllToStopped() {
437-
lifecycleManagers.values().forEach(m -> {
438-
if (m.getState() instanceof VirtualClusterLifecycleState.Draining) {
439-
m.drainComplete();
440-
}
441-
});
420+
return vcm.lifecycleManagerFor(clusterName);
442421
}
443422

444423
@Override
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.internal;
8+
9+
import java.util.LinkedHashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Objects;
13+
import java.util.Optional;
14+
import java.util.function.BiConsumer;
15+
16+
import io.kroxylicious.proxy.model.VirtualClusterModel;
17+
18+
import edu.umd.cs.findbugs.annotations.Nullable;
19+
20+
/**
21+
* Owns the virtual cluster configuration tree and lifecycle state.
22+
* <p>
23+
* This is the single source of truth for which virtual clusters exist and their
24+
* current lifecycle state. It does not manage networking, endpoint registration,
25+
* metrics, or any Netty infrastructure — those remain with {@link io.kroxylicious.proxy.KafkaProxy}.
26+
* </p>
27+
* <p>
28+
* The {@code onVirtualClusterStopped} callback notifies the owner (typically KafkaProxy)
29+
* when a virtual cluster reaches the terminal {@link VirtualClusterLifecycleState.Stopped}
30+
* state, allowing the owner to apply proxy-level policy (e.g. {@code serve: none}).
31+
* During reload, the Draining → Initializing → Serving cycle is managed internally
32+
* without involving the callback — reload never reaches Stopped.
33+
* </p>
34+
*/
35+
public class VirtualClusterManager {
36+
37+
private final List<VirtualClusterModel> virtualClusterModels;
38+
private final Map<String, VirtualClusterLifecycleManager> lifecycleManagers;
39+
private final BiConsumer<String, Optional<Throwable>> onVirtualClusterStopped;
40+
41+
/**
42+
* Creates a new VirtualClusterManager for the given set of virtual clusters.
43+
*
44+
* @param virtualClusterModels the complete set of virtual cluster configurations
45+
* @param onVirtualClusterStopped callback invoked with {@code (clusterName, priorFailureCause)}
46+
* whenever a virtual cluster reaches the terminal Stopped state. The cause is empty
47+
* for clean stops (e.g. drain completed during shutdown) and present for failure-driven stops.
48+
* @throws NullPointerException if either argument is null
49+
* @throws IllegalArgumentException if the list contains duplicate cluster names
50+
*/
51+
public VirtualClusterManager(List<VirtualClusterModel> virtualClusterModels,
52+
BiConsumer<String, Optional<Throwable>> onVirtualClusterStopped) {
53+
Objects.requireNonNull(virtualClusterModels, "virtualClusterModels must not be null");
54+
this.onVirtualClusterStopped = Objects.requireNonNull(onVirtualClusterStopped, "onVirtualClusterStopped must not be null");
55+
this.virtualClusterModels = List.copyOf(virtualClusterModels);
56+
this.lifecycleManagers = new LinkedHashMap<>();
57+
for (var vcm : this.virtualClusterModels) {
58+
var name = vcm.getClusterName();
59+
if (lifecycleManagers.containsKey(name)) {
60+
throw new IllegalArgumentException("Duplicate cluster name: " + name);
61+
}
62+
lifecycleManagers.put(name, new VirtualClusterLifecycleManager(name));
63+
}
64+
}
65+
66+
/**
67+
* Returns the virtual cluster models this manager was constructed with.
68+
* @return unmodifiable list of virtual cluster models
69+
*/
70+
public List<VirtualClusterModel> getVirtualClusterModels() {
71+
return virtualClusterModels;
72+
}
73+
74+
/**
75+
* Signals that the named virtual cluster initialized successfully.
76+
* Transitions the cluster from Initializing to Serving.
77+
*
78+
* @param clusterName the virtual cluster name
79+
* @throws IllegalArgumentException if no cluster with that name exists
80+
*/
81+
public void initializationSucceeded(String clusterName) {
82+
requireKnownCluster(clusterName).initializationSucceeded();
83+
}
84+
85+
/**
86+
* Signals that the named virtual cluster failed to initialize.
87+
* Transitions the cluster from Initializing to Failed, then immediately to Stopped
88+
* (no recovery path exists today), and fires the {@code onVirtualClusterStopped} callback.
89+
*
90+
* @param clusterName the virtual cluster name
91+
* @param cause the failure cause
92+
* @throws IllegalArgumentException if no cluster with that name exists
93+
*/
94+
public void initializationFailed(String clusterName, Throwable cause) {
95+
var manager = requireKnownCluster(clusterName);
96+
manager.initializationFailed(cause);
97+
manager.stop();
98+
onVirtualClusterStopped.accept(clusterName, Optional.of(cause));
99+
}
100+
101+
/**
102+
* Transitions all virtual clusters toward draining/stopped as appropriate for shutdown.
103+
* <ul>
104+
* <li>Serving → Draining</li>
105+
* <li>Initializing → Stopped (fires callback with empty cause)</li>
106+
* <li>Failed → Stopped (already handled by initializationFailed)</li>
107+
* </ul>
108+
*/
109+
public void transitionAllToDraining() {
110+
lifecycleManagers.forEach((name, manager) -> {
111+
var state = manager.getState();
112+
if (state instanceof VirtualClusterLifecycleState.Serving) {
113+
manager.startDraining();
114+
}
115+
else if (state instanceof VirtualClusterLifecycleState.Initializing) {
116+
manager.stop();
117+
onVirtualClusterStopped.accept(name, Optional.empty());
118+
}
119+
});
120+
}
121+
122+
/**
123+
* Transitions all draining virtual clusters to stopped, firing the callback for each.
124+
*
125+
* @return true if all virtual clusters are now in the Stopped state
126+
*/
127+
public boolean transitionAllToStopped() {
128+
lifecycleManagers.forEach((name, manager) -> {
129+
var state = manager.getState();
130+
if (state instanceof VirtualClusterLifecycleState.Draining) {
131+
manager.drainComplete();
132+
onVirtualClusterStopped.accept(name, Optional.empty());
133+
}
134+
});
135+
return lifecycleManagers.values().stream()
136+
.allMatch(m -> m.getState() instanceof VirtualClusterLifecycleState.Stopped);
137+
}
138+
139+
/**
140+
* Returns the lifecycle manager for the given virtual cluster name.
141+
* @param clusterName the virtual cluster name
142+
* @return the lifecycle manager, or null if no cluster with that name exists
143+
*/
144+
@Nullable
145+
public VirtualClusterLifecycleManager lifecycleManagerFor(String clusterName) {
146+
return lifecycleManagers.get(clusterName);
147+
}
148+
149+
private VirtualClusterLifecycleManager requireKnownCluster(String clusterName) {
150+
var manager = lifecycleManagers.get(clusterName);
151+
if (manager == null) {
152+
throw new IllegalArgumentException("Unknown cluster: " + clusterName);
153+
}
154+
return manager;
155+
}
156+
}

0 commit comments

Comments
 (0)