Skip to content

Commit f87ce9c

Browse files
authored
feat(runtime): add virtual cluster lifecycle state tracking (kroxylicious#3640)
* Add virtual cluster lifecycle state model Introduce VirtualClusterLifecycleState as a sealed interface with records for each state (Initializing, Serving, Draining, Failed, Stopped). Each state exposes typed transition methods enforcing the state machine from proposal 016. Failed carries its cause, and Stopped retains the prior failure cause for diagnostics. VirtualClusterLifecycleManager wraps the state in an AtomicReference for thread-safe transitions, with structured logging on each transition. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Wire lifecycle manager into KafkaProxy startup and shutdown Create a VirtualClusterLifecycleManager per virtual cluster during startup, transitioning each to Serving on success. On shutdown, beginShutdown() transitions serving clusters to Draining before the Netty graceful shutdown, then completeShutdown() transitions them to Stopped afterwards. Existing fail-fast behaviour is preserved. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Address review feedback on lifecycle manager - Remove beginShutdown/completeShutdown composite methods from VirtualClusterLifecycleManager; KafkaProxy now calls startDraining, drainComplete, and stop directly - Call initializationFailed(cause) from KafkaProxy startup catch block so Failed state carries the real failure cause - Fix race in transition(): use transitionFn.apply(previous) instead of state.get() to avoid observing another thread's update - Log at DEBUG for most transitions, INFO only for initial and terminal states Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Remove unused toInitializing() transitions from Draining and Failed These transitions are valid in the proposal's state machine but have no callers today — reload/recovery is deferred. Removing to avoid premature API commitment; they can be re-added when reload is implemented. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Remove unnecessary throws declaration from test method Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Address review feedback: fix double-apply, log levels, and Initializing→Stopped - Fix transition() to use get()+updateAndGet() instead of re-applying the transition function to derive the new state for logging - Log all transitions at INFO, Failed transitions at WARN - Support Initializing→Stopped transition so VCs are not orphaned if shutdown occurs during startup - Assert failure cause on Stopped state in startup failure test Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Replace AtomicReference with synchronized in VirtualClusterLifecycleManager The AtomicReference approach had a race between state.get() and state.updateAndGet() where another thread could advance the state, causing incorrect log messages. Using synchronized makes the read-transition-log sequence atomic. This is sufficient for the single-shot proxy lifecycle where contention is not a concern. Also adjusts log levels: INFO for significant states (Serving, Failed, Stopped), DEBUG for intermediate transitions (Draining). Closes kroxylicious#3696 Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * refactor(runtime): extract VirtualClusterManager from KafkaProxy (#87) * Add VirtualClusterManager with construction, accessors, and tests Introduces VirtualClusterManager as the single source of truth for which virtual clusters exist and their current lifecycle state. VCM owns the cluster config tree and lifecycle managers but does not manage networking, endpoint registration, or metrics — those remain with KafkaProxy. The onVirtualClusterStopped callback notifies the owner when a VC reaches terminal Stopped state, allowing proxy-level policy decisions. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Add per-VC lifecycle transitions with callback Adds initializationSucceeded(clusterName) and initializationFailed(clusterName, cause) to VirtualClusterManager. On failure, the VC transitions immediately from Failed to Stopped (no recovery path today) and fires the onVirtualClusterStopped callback with the failure cause. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Add bulk shutdown transitions and return value Adds transitionAllToDraining() and transitionAllToStopped() to VirtualClusterManager. transitionAllToStopped() returns boolean indicating whether all clusters are now stopped, so KafkaProxy doesn't need to poll individual lifecycle managers. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Wire VirtualClusterManager into KafkaProxy KafkaProxy now delegates virtual cluster lifecycle management to VirtualClusterManager instead of managing lifecycle managers directly. Removes private transitionAllToDraining/transitionAllToStopped methods and the lifecycleManagers map in favour of VCM. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Make VirtualClusterManager non-nullable in KafkaProxy Move VCM construction from startup() to the constructor so it can be final and non-nullable, eliminating null guards throughout. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Improve onVirtualClusterStopped callback logging Log at WARN with a message that reflects the serve:none policy intent. Add TODO noting the callback should eventually drive proxy shutdown rather than relying on the caller. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> --------- Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Address review feedback: naming conventions, javadoc, and logging - Rename getState()/getClusterName() to state()/clusterName() on VirtualClusterLifecycleManager per project accessor conventions - Rename getVirtualClusterModels() to virtualClusterModels() on VirtualClusterManager - Rename terse field 'vcm' to 'virtualClusterManager' in KafkaProxy - Fix transitionAllToDraining() javadoc: clarify that Failed/Stopped clusters are skipped, not transitioned by this method - Fix logging key from 'cause' to 'error' with Throwable::getMessage per logging conventions - Add comment documenting the all-or-nothing startup assumption at the initializationFailed call site Assisted-by: Claude claude-opus-4-6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Address review feedback: lazy logging, argumentSet, and javadoc fixes Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Rename transitionAllToStopped to completeDraining The previous name implied all clusters would be transitioned, but the method only completes the draining→stopped transition. The new name better describes the actual behaviour. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> --------- Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 58c1cc2 commit f87ce9c

7 files changed

Lines changed: 1033 additions & 0 deletions

File tree

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import io.kroxylicious.proxy.internal.KafkaProxyInitializer;
5858
import io.kroxylicious.proxy.internal.MeterRegistries;
5959
import io.kroxylicious.proxy.internal.PortConflictDetector;
60+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleManager;
61+
import io.kroxylicious.proxy.internal.VirtualClusterManager;
6062
import io.kroxylicious.proxy.internal.admin.ManagementInitializer;
6163
import io.kroxylicious.proxy.internal.config.Features;
6264
import io.kroxylicious.proxy.internal.net.DefaultNetworkBindingOperationProcessor;
@@ -149,6 +151,7 @@ private static int resolveThreadCount(Configuration configuration, Function<Netw
149151
private final NetworkBindingOperationProcessor bindingOperationProcessor = new DefaultNetworkBindingOperationProcessor();
150152
private final EndpointRegistry endpointRegistry = new EndpointRegistry(bindingOperationProcessor);
151153
private final PluginFactoryRegistry pfr;
154+
private final VirtualClusterManager virtualClusterManager;
152155
private @Nullable MeterRegistries meterRegistries;
153156
private @Nullable FilterChainFactory filterChainFactory;
154157
private @Nullable EventGroupConfig managementEventGroup;
@@ -160,6 +163,10 @@ public KafkaProxy(PluginFactoryRegistry pfr, Configuration config, Features feat
160163
this.virtualClusterModels = config.virtualClusterModel();
161164
this.managementConfiguration = config.management();
162165
this.micrometerConfig = config.getMicrometer();
166+
this.virtualClusterManager = new VirtualClusterManager(virtualClusterModels, (clusterName, cause) -> STARTUP_SHUTDOWN_LOGGER.atWarn()
167+
.addKeyValue("virtualCluster", clusterName)
168+
.addKeyValue("error", cause.map(Throwable::getMessage).orElse(null))
169+
.log("Virtual cluster reached terminal stopped state, proxy shutdown required"));
163170
}
164171

165172
@VisibleForTesting
@@ -216,6 +223,7 @@ public KafkaProxy startup() {
216223

217224
STARTUP_SHUTDOWN_LOGGER.atInfo()
218225
.log("Kroxylicious is starting");
226+
219227
meterRegistries = new MeterRegistries(pfr, micrometerConfig);
220228
initVersionInfoMetric();
221229

@@ -252,6 +260,8 @@ public KafkaProxy startup() {
252260
.toArray(CompletableFuture[]::new))
253261
.join();
254262

263+
virtualClusterModels.forEach(model -> virtualClusterManager.initializationSucceeded(model.getClusterName()));
264+
255265
STARTUP_SHUTDOWN_LOGGER.atInfo()
256266
.log("Kroxylicious is started");
257267
return this;
@@ -260,6 +270,11 @@ public KafkaProxy startup() {
260270
STARTUP_SHUTDOWN_LOGGER.atError()
261271
.setCause(e)
262272
.log("Exception during startup, shutting down");
273+
// TODO: the onVirtualClusterStopped callback should drive the serve:none policy (triggering proxy shutdown)
274+
// rather than relying on the caller to call shutdown() separately. Currently the callback only logs.
275+
// All VCs are failed with the same exception because startup is all-or-nothing:
276+
// initializationSucceeded is only called after all VCs register successfully (line 263).
277+
virtualClusterModels.forEach(model -> virtualClusterManager.initializationFailed(model.getClusterName(), e));
263278
shutdown();
264279
throw new LifecycleException("Startup completed exceptionally", e);
265280
}
@@ -357,6 +372,7 @@ public void shutdown() {
357372
try {
358373
STARTUP_SHUTDOWN_LOGGER.atInfo()
359374
.log("Shutting down");
375+
virtualClusterManager.transitionAllToDraining();
360376
endpointRegistry.shutdown().handle((u, t) -> {
361377
bindingOperationProcessor.close();
362378
var closeFutures = new ArrayList<Future<?>>();
@@ -378,6 +394,7 @@ public void shutdown() {
378394
}
379395
return null;
380396
}).toCompletableFuture().join();
397+
virtualClusterManager.completeDraining();
381398
if (meterRegistries != null) {
382399
meterRegistries.close();
383400
}
@@ -394,6 +411,17 @@ public void shutdown() {
394411
}
395412
}
396413

414+
/**
415+
* Returns the lifecycle manager for the given virtual cluster name.
416+
* @param clusterName the virtual cluster name
417+
* @return the lifecycle manager, or null if no cluster with that name exists
418+
*/
419+
@VisibleForTesting
420+
@Nullable
421+
VirtualClusterLifecycleManager lifecycleManagerFor(String clusterName) {
422+
return virtualClusterManager.lifecycleManagerFor(clusterName);
423+
}
424+
397425
@Override
398426
public void close() throws Exception {
399427
if (running.get()) {
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.internal;
8+
9+
import java.util.Objects;
10+
import java.util.function.UnaryOperator;
11+
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Draining;
16+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Failed;
17+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Initializing;
18+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Serving;
19+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Stopped;
20+
21+
/**
22+
* Manages the lifecycle state of a single virtual cluster.
23+
* <p>
24+
* Thread-safe: state transitions are performed under synchronization so that the
25+
* read-transition-log sequence is atomic. This is sufficient for the single-shot
26+
* proxy lifecycle where contention is not a concern.
27+
* </p>
28+
*/
29+
public class VirtualClusterLifecycleManager {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(VirtualClusterLifecycleManager.class);
32+
33+
private final String clusterName;
34+
private VirtualClusterLifecycleState state = new Initializing();
35+
36+
public VirtualClusterLifecycleManager(String clusterName) {
37+
this.clusterName = Objects.requireNonNull(clusterName);
38+
}
39+
40+
/**
41+
* Transitions from {@link Initializing} to {@link Serving}.
42+
*/
43+
public void initializationSucceeded() {
44+
transition(current -> {
45+
if (current instanceof Initializing s) {
46+
return s.toServing();
47+
}
48+
throw unexpectedState(current, "initializationSucceeded");
49+
});
50+
}
51+
52+
/**
53+
* Transitions from {@link Initializing} to {@link Failed}, recording the cause.
54+
*/
55+
public void initializationFailed(Throwable cause) {
56+
Objects.requireNonNull(cause);
57+
transition(current -> {
58+
if (current instanceof Initializing s) {
59+
return s.toFailed(cause);
60+
}
61+
throw unexpectedState(current, "initializationFailed");
62+
});
63+
}
64+
65+
/**
66+
* Transitions from {@link Serving} to {@link Draining}.
67+
*/
68+
public void startDraining() {
69+
transition(current -> {
70+
if (current instanceof Serving s) {
71+
return s.toDraining();
72+
}
73+
throw unexpectedState(current, "startDraining");
74+
});
75+
}
76+
77+
/**
78+
* Transitions from {@link Draining} to {@link Stopped}.
79+
*/
80+
public void drainComplete() {
81+
transition(current -> {
82+
if (current instanceof Draining s) {
83+
return s.toStopped();
84+
}
85+
throw unexpectedState(current, "drainComplete");
86+
});
87+
}
88+
89+
/**
90+
* Transitions to {@link Stopped} from {@link Failed} or {@link Initializing}.
91+
*/
92+
public void stop() {
93+
transition(current -> {
94+
if (current instanceof Failed s) {
95+
return s.toStopped();
96+
}
97+
if (current instanceof Initializing s) {
98+
return s.toStopped();
99+
}
100+
throw unexpectedState(current, "stop");
101+
});
102+
}
103+
104+
public synchronized VirtualClusterLifecycleState state() {
105+
return state;
106+
}
107+
108+
public String clusterName() {
109+
return clusterName;
110+
}
111+
112+
private synchronized void transition(UnaryOperator<VirtualClusterLifecycleState> transitionFn) {
113+
VirtualClusterLifecycleState previous = state;
114+
state = transitionFn.apply(state);
115+
var logBuilder = (state instanceof Serving || state instanceof Failed || state instanceof Stopped) ? LOGGER.atInfo() : LOGGER.atDebug();
116+
logBuilder
117+
.addKeyValue("virtualCluster", clusterName)
118+
.addKeyValue("from", () -> previous.getClass().getSimpleName())
119+
.addKeyValue("to", () -> state.getClass().getSimpleName())
120+
.log("Virtual cluster lifecycle transition");
121+
}
122+
123+
private IllegalStateException unexpectedState(VirtualClusterLifecycleState current, String operation) {
124+
return new IllegalStateException(
125+
"Cannot " + operation + " for virtual cluster '" + clusterName + "' in state " + current.getClass().getSimpleName());
126+
}
127+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.internal;
8+
9+
import java.util.Objects;
10+
11+
import edu.umd.cs.findbugs.annotations.Nullable;
12+
13+
/**
14+
* Lifecycle states for a virtual cluster.
15+
* <p>
16+
* Each virtual cluster has exactly one state at any time. States are immutable
17+
* records; transitions produce a new state instance.
18+
* </p>
19+
*
20+
* @see <a href="https://github.com/kroxylicious/design/blob/main/proposals/016-virtual-cluster-lifecycle.md">Proposal 016</a>
21+
*/
22+
public sealed interface VirtualClusterLifecycleState {
23+
24+
/** The cluster is being set up. Not yet serving traffic. */
25+
record Initializing() implements VirtualClusterLifecycleState {
26+
27+
/**
28+
* Configuration applied successfully.
29+
* @return the new serving state
30+
*/
31+
public Serving toServing() {
32+
return new Serving();
33+
}
34+
35+
/**
36+
* Configuration could not be applied.
37+
* @param cause the failure reason
38+
* @return the new failed state
39+
*/
40+
public Failed toFailed(Throwable cause) {
41+
return new Failed(cause);
42+
}
43+
44+
/**
45+
* Cluster removed before initialization completed (e.g. proxy shutdown during startup).
46+
* @return the new stopped state
47+
*/
48+
public Stopped toStopped() {
49+
return new Stopped(null);
50+
}
51+
}
52+
53+
/** The proxy has completed setup and is serving traffic for this cluster. */
54+
record Serving() implements VirtualClusterLifecycleState {
55+
56+
/**
57+
* The cluster is being shut down or reconfigured.
58+
* @return the new draining state
59+
*/
60+
public Draining toDraining() {
61+
return new Draining();
62+
}
63+
}
64+
65+
/** New connections are rejected. Existing in-flight requests are completing. */
66+
record Draining() implements VirtualClusterLifecycleState {
67+
68+
/**
69+
* Drain complete, cluster permanently removed.
70+
* @return the new stopped state
71+
*/
72+
public Stopped toStopped() {
73+
return new Stopped(null);
74+
}
75+
76+
}
77+
78+
/** Configuration was not viable. All resources have been released. */
79+
record Failed(Throwable cause) implements VirtualClusterLifecycleState {
80+
81+
public Failed {
82+
Objects.requireNonNull(cause);
83+
}
84+
85+
/**
86+
* The cluster is being permanently removed.
87+
* @return the new stopped state, retaining the failure cause for diagnostics
88+
*/
89+
public Stopped toStopped() {
90+
return new Stopped(cause);
91+
}
92+
}
93+
94+
/** Terminal state. The cluster has been permanently removed. */
95+
record Stopped(@Nullable Throwable priorFailureCause) implements VirtualClusterLifecycleState {}
96+
}

0 commit comments

Comments
 (0)