Skip to content

Commit a2d7a0a

Browse files
committed
Wire lifecycle manager into KafkaProxy startup and shutdown
Create a VirtualClusterLifecycleManager per virtual cluster during startup, transitioning each to Serving on success. On shutdown, beginShutdown() transitions serving clusters to Draining before the Netty graceful shutdown, then completeShutdown() transitions them to Stopped afterwards. Existing fail-fast behaviour is preserved. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent afc6ab7 commit a2d7a0a

4 files changed

Lines changed: 284 additions & 0 deletions

File tree

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.time.Duration;
99
import java.util.ArrayList;
10+
import java.util.LinkedHashMap;
1011
import java.util.List;
1112
import java.util.Map;
1213
import java.util.Optional;
@@ -57,6 +58,7 @@
5758
import io.kroxylicious.proxy.internal.KafkaProxyInitializer;
5859
import io.kroxylicious.proxy.internal.MeterRegistries;
5960
import io.kroxylicious.proxy.internal.PortConflictDetector;
61+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleManager;
6062
import io.kroxylicious.proxy.internal.admin.ManagementInitializer;
6163
import io.kroxylicious.proxy.internal.config.Features;
6264
import io.kroxylicious.proxy.internal.net.DefaultNetworkBindingOperationProcessor;
@@ -149,6 +151,7 @@ private static int resolveThreadCount(Configuration configuration, Function<Netw
149151
private final NetworkBindingOperationProcessor bindingOperationProcessor = new DefaultNetworkBindingOperationProcessor();
150152
private final EndpointRegistry endpointRegistry = new EndpointRegistry(bindingOperationProcessor);
151153
private final PluginFactoryRegistry pfr;
154+
private final Map<String, VirtualClusterLifecycleManager> lifecycleManagers = new LinkedHashMap<>();
152155
private @Nullable MeterRegistries meterRegistries;
153156
private @Nullable FilterChainFactory filterChainFactory;
154157
private @Nullable EventGroupConfig managementEventGroup;
@@ -216,6 +219,11 @@ public KafkaProxy startup() {
216219

217220
STARTUP_SHUTDOWN_LOGGER.atInfo()
218221
.log("Kroxylicious is starting");
222+
223+
for (var vcm : virtualClusterModels) {
224+
lifecycleManagers.put(vcm.getClusterName(), new VirtualClusterLifecycleManager(vcm.getClusterName()));
225+
}
226+
219227
meterRegistries = new MeterRegistries(pfr, micrometerConfig);
220228
initVersionInfoMetric();
221229

@@ -252,6 +260,8 @@ public KafkaProxy startup() {
252260
.toArray(CompletableFuture[]::new))
253261
.join();
254262

263+
lifecycleManagers.values().forEach(VirtualClusterLifecycleManager::initializationSucceeded);
264+
255265
STARTUP_SHUTDOWN_LOGGER.atInfo()
256266
.log("Kroxylicious is started");
257267
return this;
@@ -357,6 +367,7 @@ public void shutdown() {
357367
try {
358368
STARTUP_SHUTDOWN_LOGGER.atInfo()
359369
.log("Shutting down");
370+
transitionAllToDraining();
360371
endpointRegistry.shutdown().handle((u, t) -> {
361372
bindingOperationProcessor.close();
362373
var closeFutures = new ArrayList<Future<?>>();
@@ -378,6 +389,7 @@ public void shutdown() {
378389
}
379390
return null;
380391
}).toCompletableFuture().join();
392+
transitionAllToStopped();
381393
if (meterRegistries != null) {
382394
meterRegistries.close();
383395
}
@@ -394,6 +406,25 @@ public void shutdown() {
394406
}
395407
}
396408

409+
/**
410+
* Returns the lifecycle manager for the given virtual cluster name.
411+
* @param clusterName the virtual cluster name
412+
* @return the lifecycle manager, or null if no cluster with that name exists
413+
*/
414+
@VisibleForTesting
415+
@Nullable
416+
VirtualClusterLifecycleManager lifecycleManagerFor(String clusterName) {
417+
return lifecycleManagers.get(clusterName);
418+
}
419+
420+
private void transitionAllToDraining() {
421+
lifecycleManagers.values().forEach(VirtualClusterLifecycleManager::beginShutdown);
422+
}
423+
424+
private void transitionAllToStopped() {
425+
lifecycleManagers.values().forEach(VirtualClusterLifecycleManager::completeShutdown);
426+
}
427+
397428
@Override
398429
public void close() throws Exception {
399430
if (running.get()) {

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/VirtualClusterLifecycleManager.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,39 @@ public void stop() {
9797
});
9898
}
9999

100+
/**
101+
* Begins shutdown: transitions serving clusters to draining and
102+
* failed/initializing clusters directly to stopped.
103+
*/
104+
public void beginShutdown() {
105+
transition(current -> {
106+
if (current instanceof Serving s) {
107+
return s.toDraining();
108+
}
109+
if (current instanceof Failed s) {
110+
return s.toStopped();
111+
}
112+
if (current instanceof Initializing s) {
113+
return s.toFailed(new IllegalStateException("Shutdown before initialization completed")).toStopped();
114+
}
115+
// Draining or Stopped — no-op
116+
return current;
117+
});
118+
}
119+
120+
/**
121+
* Completes shutdown: transitions draining clusters to stopped.
122+
*/
123+
public void completeShutdown() {
124+
transition(current -> {
125+
if (current instanceof Draining s) {
126+
return s.toStopped();
127+
}
128+
// already Stopped — no-op
129+
return current;
130+
});
131+
}
132+
100133
public VirtualClusterLifecycleState getState() {
101134
return state.get();
102135
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy;
8+
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
12+
import io.kroxylicious.proxy.config.ConfigParser;
13+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Serving;
14+
import io.kroxylicious.proxy.internal.VirtualClusterLifecycleState.Stopped;
15+
import io.kroxylicious.proxy.internal.config.Features;
16+
import io.kroxylicious.proxy.plugin.PluginConfigurationException;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
class KafkaProxyLifecycleTest {
22+
23+
private ConfigParser configParser;
24+
25+
@BeforeEach
26+
void setUp() {
27+
configParser = new ConfigParser();
28+
}
29+
30+
@Test
31+
void shouldTrackVirtualClusterAsServingAfterStartup() throws Exception {
32+
// given
33+
var config = """
34+
virtualClusters:
35+
- name: demo1
36+
targetCluster:
37+
bootstrapServers: kafka.example:1234
38+
gateways:
39+
- name: default
40+
portIdentifiesNode:
41+
bootstrapAddress: localhost:9192
42+
""";
43+
44+
try (var proxy = new KafkaProxy(configParser, configParser.parseConfiguration(config), Features.defaultFeatures())) {
45+
// when
46+
proxy.startup();
47+
48+
// then
49+
assertThat(proxy.lifecycleManagerFor("demo1"))
50+
.isNotNull()
51+
.satisfies(m -> assertThat(m.getState()).isInstanceOf(Serving.class));
52+
}
53+
}
54+
55+
@Test
56+
void shouldTrackMultipleVirtualClustersAsServing() throws Exception {
57+
// given
58+
var config = """
59+
virtualClusters:
60+
- name: cluster-a
61+
targetCluster:
62+
bootstrapServers: kafka.example:1234
63+
gateways:
64+
- name: default
65+
portIdentifiesNode:
66+
bootstrapAddress: localhost:9192
67+
- name: cluster-b
68+
targetCluster:
69+
bootstrapServers: kafka.example:5678
70+
gateways:
71+
- name: default
72+
portIdentifiesNode:
73+
bootstrapAddress: localhost:9292
74+
""";
75+
76+
try (var proxy = new KafkaProxy(configParser, configParser.parseConfiguration(config), Features.defaultFeatures())) {
77+
// when
78+
proxy.startup();
79+
80+
// then
81+
assertThat(proxy.lifecycleManagerFor("cluster-a"))
82+
.isNotNull()
83+
.satisfies(m -> assertThat(m.getState()).isInstanceOf(Serving.class));
84+
assertThat(proxy.lifecycleManagerFor("cluster-b"))
85+
.isNotNull()
86+
.satisfies(m -> assertThat(m.getState()).isInstanceOf(Serving.class));
87+
}
88+
}
89+
90+
@Test
91+
void shouldTransitionToStoppedAfterShutdown() throws Exception {
92+
// given
93+
var config = """
94+
virtualClusters:
95+
- name: demo1
96+
targetCluster:
97+
bootstrapServers: kafka.example:1234
98+
gateways:
99+
- name: default
100+
portIdentifiesNode:
101+
bootstrapAddress: localhost:9192
102+
""";
103+
104+
var proxy = new KafkaProxy(configParser, configParser.parseConfiguration(config), Features.defaultFeatures());
105+
proxy.startup();
106+
var manager = proxy.lifecycleManagerFor("demo1");
107+
108+
// when
109+
proxy.shutdown();
110+
111+
// then
112+
assertThat(manager).isNotNull();
113+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
114+
}
115+
116+
@Test
117+
void shouldTransitionToStoppedOnStartupFailure() throws Exception {
118+
// given
119+
var config = """
120+
virtualClusters:
121+
- name: demo1
122+
targetCluster:
123+
bootstrapServers: kafka.example:1234
124+
gateways:
125+
- name: default
126+
portIdentifiesNode:
127+
bootstrapAddress: localhost:9192
128+
filterDefinitions:
129+
- name: filter1
130+
type: RequiresConfigFactory
131+
defaultFilters:
132+
- filter1
133+
""";
134+
135+
try (var proxy = new KafkaProxy(configParser, configParser.parseConfiguration(config), Features.defaultFeatures())) {
136+
// when
137+
assertThatThrownBy(proxy::startup)
138+
.isInstanceOf(LifecycleException.class)
139+
.cause()
140+
.isInstanceOf(PluginConfigurationException.class);
141+
142+
// then
143+
var manager = proxy.lifecycleManagerFor("demo1");
144+
assertThat(manager).isNotNull();
145+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
146+
}
147+
}
148+
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/VirtualClusterLifecycleManagerTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,78 @@ void shouldHaveNoPriorFailureCauseWhenStoppedFromDraining() {
130130
.isInstanceOfSatisfying(Stopped.class, stopped -> assertThat(stopped.priorFailureCause()).isNull());
131131
}
132132

133+
@Test
134+
void shouldTransitionServingToDrainingOnBeginShutdown() {
135+
// given
136+
manager.initializationSucceeded();
137+
138+
// when
139+
manager.beginShutdown();
140+
141+
// then
142+
assertThat(manager.getState()).isInstanceOf(Draining.class);
143+
}
144+
145+
@Test
146+
void shouldTransitionDrainingToStoppedOnCompleteShutdown() {
147+
// given
148+
manager.initializationSucceeded();
149+
manager.beginShutdown();
150+
151+
// when
152+
manager.completeShutdown();
153+
154+
// then
155+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
156+
}
157+
158+
@Test
159+
void shouldTransitionFailedToStoppedOnBeginShutdown() {
160+
// given
161+
manager.initializationFailed(new RuntimeException("boom"));
162+
163+
// when
164+
manager.beginShutdown();
165+
166+
// then
167+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
168+
}
169+
170+
@Test
171+
void shouldTransitionInitializingToStoppedOnBeginShutdown() {
172+
// when
173+
manager.beginShutdown();
174+
175+
// then
176+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
177+
}
178+
179+
@Test
180+
void shouldBeNoOpWhenBeginShutdownFromStopped() {
181+
// given
182+
manager.initializationFailed(new RuntimeException("boom"));
183+
manager.stop();
184+
185+
// when
186+
manager.beginShutdown();
187+
188+
// then
189+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
190+
}
191+
192+
@Test
193+
void shouldBeNoOpWhenCompleteShutdownFromStopped() {
194+
// given
195+
manager.initializationFailed(new RuntimeException("boom"));
196+
manager.stop();
197+
198+
// when
199+
manager.completeShutdown();
200+
201+
// then
202+
assertThat(manager.getState()).isInstanceOf(Stopped.class);
203+
}
204+
133205
static Stream<Arguments> invalidTransitions() {
134206
return Stream.of(
135207
Arguments.of("initializationSucceeded from SERVING", (Runnable) () -> {

0 commit comments

Comments
 (0)