Skip to content

Commit aefb381

Browse files
committed
Address review feedback on lifecycle manager
- Remove beginShutdown/completeShutdown composite methods from VirtualClusterLifecycleManager; KafkaProxy now calls startDraining, drainComplete, and stop directly - Call initializationFailed(cause) from KafkaProxy startup catch block so Failed state carries the real failure cause - Fix race in transition(): use transitionFn.apply(previous) instead of state.get() to avoid observing another thread's update - Log at DEBUG for most transitions, INFO only for initial and terminal states Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent a2d7a0a commit aefb381

3 files changed

Lines changed: 18 additions & 109 deletions

File tree

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import io.kroxylicious.proxy.internal.MeterRegistries;
6060
import io.kroxylicious.proxy.internal.PortConflictDetector;
6161
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleManager;
62+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState;
6263
import io.kroxylicious.proxy.internal.admin.ManagementInitializer;
6364
import io.kroxylicious.proxy.internal.config.Features;
6465
import io.kroxylicious.proxy.internal.net.DefaultNetworkBindingOperationProcessor;
@@ -270,6 +271,7 @@ public KafkaProxy startup() {
270271
STARTUP_SHUTDOWN_LOGGER.atError()
271272
.setCause(e)
272273
.log("Exception during startup, shutting down");
274+
lifecycleManagers.values().forEach(m -> m.initializationFailed(e));
273275
shutdown();
274276
throw new LifecycleException("Startup completed exceptionally", e);
275277
}
@@ -418,11 +420,22 @@ VirtualClusterLifecycleManager lifecycleManagerFor(String clusterName) {
418420
}
419421

420422
private void transitionAllToDraining() {
421-
lifecycleManagers.values().forEach(VirtualClusterLifecycleManager::beginShutdown);
423+
lifecycleManagers.values().forEach(m -> {
424+
if (m.getState() instanceof VirtualClusterLifecycleState.Serving) {
425+
m.startDraining();
426+
}
427+
else if (m.getState() instanceof VirtualClusterLifecycleState.Failed) {
428+
m.stop();
429+
}
430+
});
422431
}
423432

424433
private void transitionAllToStopped() {
425-
lifecycleManagers.values().forEach(VirtualClusterLifecycleManager::completeShutdown);
434+
lifecycleManagers.values().forEach(m -> {
435+
if (m.getState() instanceof VirtualClusterLifecycleState.Draining) {
436+
m.drainComplete();
437+
}
438+
});
426439
}
427440

428441
@Override

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/VirtualClusterLifecycleManager.java

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -97,39 +97,6 @@ public void stop() {
9797
});
9898
}
9999

100-
/**
101-
* Begins shutdown: transitions serving clusters to draining and
102-
* failed/initializing clusters directly to stopped.
103-
*/
104-
public void beginShutdown() {
105-
transition(current -> {
106-
if (current instanceof Serving s) {
107-
return s.toDraining();
108-
}
109-
if (current instanceof Failed s) {
110-
return s.toStopped();
111-
}
112-
if (current instanceof Initializing s) {
113-
return s.toFailed(new IllegalStateException("Shutdown before initialization completed")).toStopped();
114-
}
115-
// Draining or Stopped — no-op
116-
return current;
117-
});
118-
}
119-
120-
/**
121-
* Completes shutdown: transitions draining clusters to stopped.
122-
*/
123-
public void completeShutdown() {
124-
transition(current -> {
125-
if (current instanceof Draining s) {
126-
return s.toStopped();
127-
}
128-
// already Stopped — no-op
129-
return current;
130-
});
131-
}
132-
133100
public VirtualClusterLifecycleState getState() {
134101
return state.get();
135102
}
@@ -140,8 +107,9 @@ public String getClusterName() {
140107

141108
private void transition(UnaryOperator<VirtualClusterLifecycleState> transitionFn) {
142109
VirtualClusterLifecycleState previous = state.getAndUpdate(transitionFn);
143-
VirtualClusterLifecycleState current = state.get();
144-
LOGGER.atInfo()
110+
VirtualClusterLifecycleState current = transitionFn.apply(previous);
111+
var logBuilder = (current instanceof Initializing || current instanceof Stopped) ? LOGGER.atInfo() : LOGGER.atDebug();
112+
logBuilder
145113
.addKeyValue("virtualCluster", clusterName)
146114
.addKeyValue("from", previous.getClass().getSimpleName())
147115
.addKeyValue("to", current.getClass().getSimpleName())

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/VirtualClusterLifecycleManagerTest.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -130,78 +130,6 @@ void shouldHaveNoPriorFailureCauseWhenStoppedFromDraining() {
130130
.isInstanceOfSatisfying(Stopped.class, stopped -> assertThat(stopped.priorFailureCause()).isNull());
131131
}
132132

133-
@Test
134-
void shouldTransitionServingToDrainingOnBeginShutdown() {
135-
// given
136-
manager.initializationSucceeded();
137-
138-
// when
139-
manager.beginShutdown();
140-
141-
// then
142-
assertThat(manager.getState()).isInstanceOf(Draining.class);
143-
}
144-
145-
@Test
146-
void shouldTransitionDrainingToStoppedOnCompleteShutdown() {
147-
// given
148-
manager.initializationSucceeded();
149-
manager.beginShutdown();
150-
151-
// when
152-
manager.completeShutdown();
153-
154-
// then
155-
assertThat(manager.getState()).isInstanceOf(Stopped.class);
156-
}
157-
158-
@Test
159-
void shouldTransitionFailedToStoppedOnBeginShutdown() {
160-
// given
161-
manager.initializationFailed(new RuntimeException("boom"));
162-
163-
// when
164-
manager.beginShutdown();
165-
166-
// then
167-
assertThat(manager.getState()).isInstanceOf(Stopped.class);
168-
}
169-
170-
@Test
171-
void shouldTransitionInitializingToStoppedOnBeginShutdown() {
172-
// when
173-
manager.beginShutdown();
174-
175-
// then
176-
assertThat(manager.getState()).isInstanceOf(Stopped.class);
177-
}
178-
179-
@Test
180-
void shouldBeNoOpWhenBeginShutdownFromStopped() {
181-
// given
182-
manager.initializationFailed(new RuntimeException("boom"));
183-
manager.stop();
184-
185-
// when
186-
manager.beginShutdown();
187-
188-
// then
189-
assertThat(manager.getState()).isInstanceOf(Stopped.class);
190-
}
191-
192-
@Test
193-
void shouldBeNoOpWhenCompleteShutdownFromStopped() {
194-
// given
195-
manager.initializationFailed(new RuntimeException("boom"));
196-
manager.stop();
197-
198-
// when
199-
manager.completeShutdown();
200-
201-
// then
202-
assertThat(manager.getState()).isInstanceOf(Stopped.class);
203-
}
204-
205133
static Stream<Arguments> invalidTransitions() {
206134
return Stream.of(
207135
Arguments.of("initializationSucceeded from SERVING", (Runnable) () -> {

0 commit comments

Comments
 (0)