Skip to content

Commit afc6ab7

Browse files
committed
Add virtual cluster lifecycle state model
Introduce VirtualClusterLifecycleState as a sealed interface with records for each state (Initializing, Serving, Draining, Failed, Stopped). Each state exposes typed transition methods enforcing the state machine from proposal 016. Failed carries its cause, and Stopped retains the prior failure cause for diagnostics. VirtualClusterLifecycleManager wraps the state in an AtomicReference for thread-safe transitions, with structured logging on each transition. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 8ad4590 commit afc6ab7

3 files changed

Lines changed: 393 additions & 0 deletions

File tree

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.internal;
8+
9+
import java.util.Objects;
10+
import java.util.concurrent.atomic.AtomicReference;
11+
import java.util.function.UnaryOperator;
12+
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Draining;
17+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Failed;
18+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Initializing;
19+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Serving;
20+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Stopped;
21+
22+
/**
23+
* Manages the lifecycle state of a single virtual cluster.
24+
* <p>
25+
* Thread-safe: state transitions are performed atomically via {@link AtomicReference}.
26+
* </p>
27+
*/
28+
public class VirtualClusterLifecycleManager {
29+
30+
private static final Logger LOGGER = LoggerFactory.getLogger(VirtualClusterLifecycleManager.class);
31+
32+
private final String clusterName;
33+
private final AtomicReference<VirtualClusterLifecycleState> state = new AtomicReference<>(new Initializing());
34+
35+
public VirtualClusterLifecycleManager(String clusterName) {
36+
this.clusterName = Objects.requireNonNull(clusterName);
37+
}
38+
39+
/**
40+
* Transitions from {@link Initializing} to {@link Serving}.
41+
*/
42+
public void initializationSucceeded() {
43+
transition(current -> {
44+
if (current instanceof Initializing s) {
45+
return s.toServing();
46+
}
47+
throw unexpectedState(current, "initializationSucceeded");
48+
});
49+
}
50+
51+
/**
52+
* Transitions from {@link Initializing} to {@link Failed}, recording the cause.
53+
*/
54+
public void initializationFailed(Throwable cause) {
55+
Objects.requireNonNull(cause);
56+
transition(current -> {
57+
if (current instanceof Initializing s) {
58+
return s.toFailed(cause);
59+
}
60+
throw unexpectedState(current, "initializationFailed");
61+
});
62+
}
63+
64+
/**
65+
* Transitions from {@link Serving} to {@link Draining}.
66+
*/
67+
public void startDraining() {
68+
transition(current -> {
69+
if (current instanceof Serving s) {
70+
return s.toDraining();
71+
}
72+
throw unexpectedState(current, "startDraining");
73+
});
74+
}
75+
76+
/**
77+
* Transitions from {@link Draining} to {@link Stopped}.
78+
*/
79+
public void drainComplete() {
80+
transition(current -> {
81+
if (current instanceof Draining s) {
82+
return s.toStopped();
83+
}
84+
throw unexpectedState(current, "drainComplete");
85+
});
86+
}
87+
88+
/**
89+
* Transitions from {@link Failed} to {@link Stopped}.
90+
*/
91+
public void stop() {
92+
transition(current -> {
93+
if (current instanceof Failed s) {
94+
return s.toStopped();
95+
}
96+
throw unexpectedState(current, "stop");
97+
});
98+
}
99+
100+
public VirtualClusterLifecycleState getState() {
101+
return state.get();
102+
}
103+
104+
public String getClusterName() {
105+
return clusterName;
106+
}
107+
108+
private void transition(UnaryOperator<VirtualClusterLifecycleState> transitionFn) {
109+
VirtualClusterLifecycleState previous = state.getAndUpdate(transitionFn);
110+
VirtualClusterLifecycleState current = state.get();
111+
LOGGER.atInfo()
112+
.addKeyValue("virtualCluster", clusterName)
113+
.addKeyValue("from", previous.getClass().getSimpleName())
114+
.addKeyValue("to", current.getClass().getSimpleName())
115+
.log("Virtual cluster lifecycle transition");
116+
}
117+
118+
private IllegalStateException unexpectedState(VirtualClusterLifecycleState current, String operation) {
119+
return new IllegalStateException(
120+
"Cannot " + operation + " for virtual cluster '" + clusterName + "' in state " + current.getClass().getSimpleName());
121+
}
122+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.internal;
8+
9+
import java.util.Objects;
10+
11+
import edu.umd.cs.findbugs.annotations.Nullable;
12+
13+
/**
14+
* Lifecycle states for a virtual cluster.
15+
* <p>
16+
* Each virtual cluster has exactly one state at any time. States are immutable
17+
* records; transitions produce a new state instance.
18+
* </p>
19+
*
20+
* @see <a href="https://github.com/kroxylicious/design/blob/main/proposals/016-virtual-cluster-lifecycle.md">Proposal 016</a>
21+
*/
22+
public sealed interface VirtualClusterLifecycleState {
23+
24+
/** The cluster is being set up. Not yet serving traffic. */
25+
record Initializing() implements VirtualClusterLifecycleState {
26+
27+
/**
28+
* Configuration applied successfully.
29+
* @return the new serving state
30+
*/
31+
public Serving toServing() {
32+
return new Serving();
33+
}
34+
35+
/**
36+
* Configuration could not be applied.
37+
* @param cause the failure reason
38+
* @return the new failed state
39+
*/
40+
public Failed toFailed(Throwable cause) {
41+
return new Failed(cause);
42+
}
43+
}
44+
45+
/** The proxy has completed setup and is serving traffic for this cluster. */
46+
record Serving() implements VirtualClusterLifecycleState {
47+
48+
/**
49+
* The cluster is being shut down or reconfigured.
50+
* @return the new draining state
51+
*/
52+
public Draining toDraining() {
53+
return new Draining();
54+
}
55+
}
56+
57+
/** New connections are rejected. Existing in-flight requests are completing. */
58+
record Draining() implements VirtualClusterLifecycleState {
59+
60+
/**
61+
* Drain complete, cluster permanently removed.
62+
* @return the new stopped state
63+
*/
64+
public Stopped toStopped() {
65+
return new Stopped(null);
66+
}
67+
68+
/**
69+
* Drain complete, cluster reinitialising with new configuration.
70+
* @return the new initializing state
71+
*/
72+
public Initializing toInitializing() {
73+
return new Initializing();
74+
}
75+
}
76+
77+
/** Configuration was not viable. All resources have been released. */
78+
record Failed(Throwable cause) implements VirtualClusterLifecycleState {
79+
80+
public Failed {
81+
Objects.requireNonNull(cause);
82+
}
83+
84+
/**
85+
* The cluster is being permanently removed.
86+
* @return the new stopped state, retaining the failure cause for diagnostics
87+
*/
88+
public Stopped toStopped() {
89+
return new Stopped(cause);
90+
}
91+
92+
/**
93+
* A retry is requested with corrected configuration.
94+
* @return the new initializing state
95+
*/
96+
public Initializing toInitializing() {
97+
return new Initializing();
98+
}
99+
}
100+
101+
/** Terminal state. The cluster has been permanently removed. */
102+
record Stopped(@Nullable Throwable priorFailureCause) implements VirtualClusterLifecycleState {}
103+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.internal;
8+
9+
import java.util.stream.Stream;
10+
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.junit.jupiter.params.ParameterizedTest;
14+
import org.junit.jupiter.params.provider.Arguments;
15+
import org.junit.jupiter.params.provider.MethodSource;
16+
17+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Draining;
18+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Failed;
19+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Initializing;
20+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Serving;
21+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Stopped;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
25+
26+
class VirtualClusterLifecycleManagerTest {
27+
28+
private static final String CLUSTER_NAME = "test-cluster";
29+
private VirtualClusterLifecycleManager manager;
30+
31+
@BeforeEach
32+
void setUp() {
33+
manager = new VirtualClusterLifecycleManager(CLUSTER_NAME);
34+
}
35+
36+
@Test
37+
void shouldStartInInitializingState() {
38+
assertThat(manager.getState()).isInstanceOf(Initializing.class);
39+
}
40+
41+
@Test
42+
void shouldExposeClusterName() {
43+
assertThat(manager.getClusterName()).isEqualTo(CLUSTER_NAME);
44+
}
45+
46+
@Test
47+
void shouldTransitionToServingOnSuccess() {
48+
// when
49+
manager.initializationSucceeded();
50+
51+
// then
52+
assertThat(manager.getState()).isInstanceOf(Serving.class);
53+
}
54+
55+
@Test
56+
void shouldTransitionToFailedOnError() {
57+
// given
58+
var cause = new RuntimeException("filter init failed");
59+
60+
// when
61+
manager.initializationFailed(cause);
62+
63+
// then
64+
assertThat(manager.getState())
65+
.isInstanceOfSatisfying(Failed.class, failed -> assertThat(failed.cause()).isSameAs(cause));
66+
}
67+
68+
@Test
69+
void shouldTransitionFromServingToDraining() {
70+
// given
71+
manager.initializationSucceeded();
72+
73+
// when
74+
manager.startDraining();
75+
76+
// then
77+
assertThat(manager.getState()).isInstanceOf(Draining.class);
78+
}
79+
80+
@Test
81+
void shouldTransitionFromDrainingToStopped() {
82+
// given
83+
manager.initializationSucceeded();
84+
manager.startDraining();
85+
86+
// when
87+
manager.drainComplete();
88+
89+
// then
90+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
91+
}
92+
93+
@Test
94+
void shouldTransitionFromFailedToStopped() {
95+
// given
96+
manager.initializationFailed(new RuntimeException("boom"));
97+
98+
// when
99+
manager.stop();
100+
101+
// then
102+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
103+
}
104+
105+
@Test
106+
void shouldRetainFailureCauseAfterStop() {
107+
// given
108+
var cause = new RuntimeException("boom");
109+
manager.initializationFailed(cause);
110+
111+
// when
112+
manager.stop();
113+
114+
// then
115+
assertThat(manager.getState())
116+
.isInstanceOfSatisfying(Stopped.class, stopped -> assertThat(stopped.priorFailureCause()).isSameAs(cause));
117+
}
118+
119+
@Test
120+
void shouldHaveNoPriorFailureCauseWhenStoppedFromDraining() {
121+
// given
122+
manager.initializationSucceeded();
123+
manager.startDraining();
124+
125+
// when
126+
manager.drainComplete();
127+
128+
// then
129+
assertThat(manager.getState())
130+
.isInstanceOfSatisfying(Stopped.class, stopped -> assertThat(stopped.priorFailureCause()).isNull());
131+
}
132+
133+
static Stream<Arguments> invalidTransitions() {
134+
return Stream.of(
135+
Arguments.of("initializationSucceeded from SERVING", (Runnable) () -> {
136+
var m = new VirtualClusterLifecycleManager("c");
137+
m.initializationSucceeded();
138+
m.initializationSucceeded();
139+
}),
140+
Arguments.of("startDraining from INITIALIZING", (Runnable) () -> {
141+
var m = new VirtualClusterLifecycleManager("c");
142+
m.startDraining();
143+
}),
144+
Arguments.of("drainComplete from SERVING", (Runnable) () -> {
145+
var m = new VirtualClusterLifecycleManager("c");
146+
m.initializationSucceeded();
147+
m.drainComplete();
148+
}),
149+
Arguments.of("stop from SERVING", (Runnable) () -> {
150+
var m = new VirtualClusterLifecycleManager("c");
151+
m.initializationSucceeded();
152+
m.stop();
153+
}),
154+
Arguments.of("initializationSucceeded from STOPPED", (Runnable) () -> {
155+
var m = new VirtualClusterLifecycleManager("c");
156+
m.initializationFailed(new RuntimeException("x"));
157+
m.stop();
158+
m.initializationSucceeded();
159+
}));
160+
}
161+
162+
@ParameterizedTest(name = "{0}")
163+
@MethodSource("invalidTransitions")
164+
void shouldRejectInvalidTransition(String description, Runnable action) {
165+
assertThatThrownBy(action::run)
166+
.isInstanceOf(IllegalStateException.class);
167+
}
168+
}

0 commit comments

Comments
 (0)