Skip to content

Commit 6812db7

Browse files
author
Vineeth
committed
Add container lifecycle metrics for start/stop counts
- Add protected recordContainerStarted() and recordContainerStopped() lifecycle hooks in AbstractMessageListenerContainer - Create ContainerLifecycleMicrometerHolder following obtainMicrometerHolder() pattern - Override lifecycle methods in KafkaMessageListenerContainer - Add isMicrometerEnabled() check, proper imports, IllegalStateException logging - Add tests for ContainerLifecycleMicrometerHolder - Add reference docs in micrometer.adoc - Add whats-new entry - Add @author Vineeth Yelagandula to modified classes Closes #4277 Signed-off-by: Vineeth Yelagandula <111960524+Yelagandula@users.noreply.github.com>
1 parent 813b9af commit 6812db7

6 files changed

Lines changed: 254 additions & 0 deletions

File tree

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,26 @@ This class is a super for the mentioned above `MicrometerConsumerListener`, `Mic
9393
However, it can be used for any Kafka client use-cases.
9494
The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector.
9595

96+
97+
[[monitoring-container-lifecycle]]
98+
== Monitoring Listener Container Lifecycle
99+
100+
Starting with version 4.1.0, the listener container will automatically create and update Micrometer ``Counter``s for container start and stop events, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
101+
The counters can be disabled by setting the ``ContainerProperty``'s `micrometerEnabled` to `false`.
102+
103+
Two counters are maintained - one for container start events and one for container stop events.
104+
105+
The counters are named as follows:
106+
107+
* `spring.kafka.container.start.count` - incremented each time the container is started
108+
* `spring.kafka.container.stop.count` - incremented each time the container is stopped (including abnormal stops)
109+
110+
Both counters have the following tag:
111+
112+
* `name` : (container bean name)
113+
114+
This feature is useful for monitoring container restarts, detecting unexpected stops, and tracking the lifecycle of listener containers in production environments.
115+
96116
[[observation]]
97117
== Micrometer Observation
98118

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
This section covers the changes made from version 4.0 to version 4.1.
88
For changes in earlier versions, see xref:appendix/change-history.adoc[Change History].
99

10+
[[x41-container-lifecycle-metrics]]
11+
=== Container Lifecycle Metrics
12+
13+
The listener container now automatically tracks start and stop counts using Micrometer counters (`spring.kafka.container.start.count` and `spring.kafka.container.stop.count`), tagged with the container bean name.
14+
See xref:kafka/micrometer.adoc#monitoring-container-lifecycle[Monitoring Listener Container Lifecycle] for more information.
15+
1016
[[x41-kafka-listener-ack-mode]]
1117
=== `@KafkaListener` Changes
1218

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
* @author Sanghyeok An
7474
* @author Lokesh Alamuri
7575
* @author Christian Fredriksson
76+
* @author Vineeth Yelagandula
7677
*/
7778
public abstract class AbstractMessageListenerContainer<K, V>
7879
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
@@ -530,6 +531,7 @@ public final void start() {
530531
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
531532
Assert.state(!this.fenced, "Container Fenced. It is not allowed to start.");
532533
doStart();
534+
recordContainerStarted();
533535
}
534536
}
535537
finally {
@@ -609,6 +611,20 @@ public void checkGroupId() {
609611

610612
protected abstract void doStart();
611613

614+
/**
615+
* Called when the container is started. Subclasses may override to record metrics.
616+
* @since 4.1.0
617+
*/
618+
protected void recordContainerStarted() {
619+
}
620+
621+
/**
622+
* Called when the container is stopped. Subclasses may override to record metrics.
623+
* @since 4.1.0
624+
*/
625+
protected void recordContainerStopped() {
626+
}
627+
612628
@Override
613629
public final void stop() {
614630
stop(true);
@@ -680,6 +696,7 @@ public void stop(Runnable callback) {
680696
public void stopAbnormally(Runnable callback) {
681697
this.lifecycleLock.lock();
682698
try {
699+
recordContainerStopped();
683700
doStop(callback, false);
684701
publishContainerStoppedEvent();
685702
}
@@ -689,6 +706,7 @@ public void stopAbnormally(Runnable callback) {
689706
}
690707

691708
protected void doStop(Runnable callback) {
709+
recordContainerStopped();
692710
doStop(callback, true);
693711
publishContainerStoppedEvent();
694712
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.regex.Pattern;
5252
import java.util.stream.Collectors;
5353

54+
import io.micrometer.core.instrument.MeterRegistry;
5455
import io.micrometer.observation.Observation;
5556
import io.micrometer.observation.ObservationRegistry;
5657
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -122,6 +123,7 @@
122123
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
123124
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
124125
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
126+
import org.springframework.kafka.support.micrometer.ContainerLifecycleMicrometerHolder;
125127
import org.springframework.kafka.support.micrometer.MicrometerHolder;
126128
import org.springframework.kafka.support.serializer.DeserializationException;
127129
import org.springframework.kafka.support.serializer.SerializationUtils;
@@ -177,6 +179,7 @@
177179
* @author Jinhui Kim
178180
* @author Minchul Son
179181
* @author Youngjoo Kim
182+
* @author Vineeth Yelagandula
180183
*/
181184
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
182185
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -195,6 +198,9 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
195198

196199
private @Nullable String clientIdSuffix;
197200

201+
@Nullable
202+
private ContainerLifecycleMicrometerHolder lifecycleMicrometerHolder;
203+
198204
private Runnable emergencyStop = () -> stopAbnormally(() -> {
199205
});
200206

@@ -365,6 +371,43 @@ private void consumerWakeIfNecessary() {
365371
return Collections.emptyMap();
366372
}
367373

374+
@Override
375+
protected void recordContainerStarted() {
376+
if (this.lifecycleMicrometerHolder == null) {
377+
this.lifecycleMicrometerHolder = obtainLifecycleMicrometerHolder();
378+
}
379+
if (this.lifecycleMicrometerHolder != null) {
380+
this.lifecycleMicrometerHolder.recordStart();
381+
}
382+
}
383+
384+
@Override
385+
protected void recordContainerStopped() {
386+
if (this.lifecycleMicrometerHolder != null) {
387+
this.lifecycleMicrometerHolder.recordStop();
388+
}
389+
}
390+
391+
@Nullable
392+
private ContainerLifecycleMicrometerHolder obtainLifecycleMicrometerHolder() {
393+
try {
394+
if (KafkaUtils.MICROMETER_PRESENT && getContainerProperties().isMicrometerEnabled()) {
395+
ApplicationContext ctx = getApplicationContext();
396+
if (ctx != null) {
397+
MeterRegistry registry =
398+
ctx.getBeanProvider(MeterRegistry.class).getIfUnique();
399+
if (registry != null) {
400+
return new ContainerLifecycleMicrometerHolder(registry, getBeanName());
401+
}
402+
}
403+
}
404+
}
405+
catch (IllegalStateException ex) {
406+
this.logger.debug("Could not obtain MeterRegistry for lifecycle metrics", ex);
407+
}
408+
return null;
409+
}
410+
368411
@Override
369412
protected void doStart() {
370413
if (isRunning()) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2016-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.micrometer;
18+
19+
import io.micrometer.core.instrument.Counter;
20+
import io.micrometer.core.instrument.MeterRegistry;
21+
22+
/**
23+
* Micrometer holder for container lifecycle metrics (start/stop counts).
24+
*
25+
* @author Vineeth Yelagandula
26+
* @since 4.1.0
27+
*/
28+
public final class ContainerLifecycleMicrometerHolder {
29+
30+
private final Counter startCounter;
31+
32+
private final Counter stopCounter;
33+
34+
/**
35+
* Create an instance with the provided {@link MeterRegistry} and container name.
36+
* @param registry the meter registry.
37+
* @param containerName the container bean name.
38+
*/
39+
public ContainerLifecycleMicrometerHolder(MeterRegistry registry, String containerName) {
40+
this.startCounter = Counter.builder("spring.kafka.container.start.count")
41+
.description("Number of times this listener container has been started")
42+
.tag("name", containerName)
43+
.register(registry);
44+
this.stopCounter = Counter.builder("spring.kafka.container.stop.count")
45+
.description("Number of times this listener container has been stopped")
46+
.tag("name", containerName)
47+
.register(registry);
48+
}
49+
50+
/**
51+
* Increment the start counter.
52+
*/
53+
public void recordStart() {
54+
this.startCounter.increment();
55+
}
56+
57+
/**
58+
* Increment the stop counter.
59+
*/
60+
public void recordStop() {
61+
this.stopCounter.increment();
62+
}
63+
64+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2024-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.micrometer;
18+
19+
import io.micrometer.core.instrument.Counter;
20+
import io.micrometer.core.instrument.MeterRegistry;
21+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/**
27+
* Tests for {@link ContainerLifecycleMicrometerHolder}.
28+
*
29+
* @author Vineeth Yelagandula
30+
* @since 4.1.0
31+
*/
32+
public class ContainerLifecycleMicrometerHolderTests {
33+
34+
@Test
35+
void startCounterIncrementsOnRecordStart() {
36+
MeterRegistry registry = new SimpleMeterRegistry();
37+
ContainerLifecycleMicrometerHolder holder =
38+
new ContainerLifecycleMicrometerHolder(registry, "testContainer");
39+
40+
holder.recordStart();
41+
holder.recordStart();
42+
43+
Counter startCounter = registry.find("spring.kafka.container.start.count")
44+
.tag("name", "testContainer")
45+
.counter();
46+
assertThat(startCounter).isNotNull();
47+
assertThat(startCounter.count()).isEqualTo(2.0);
48+
}
49+
50+
@Test
51+
void stopCounterIncrementsOnRecordStop() {
52+
MeterRegistry registry = new SimpleMeterRegistry();
53+
ContainerLifecycleMicrometerHolder holder =
54+
new ContainerLifecycleMicrometerHolder(registry, "testContainer");
55+
56+
holder.recordStop();
57+
holder.recordStop();
58+
holder.recordStop();
59+
60+
Counter stopCounter = registry.find("spring.kafka.container.stop.count")
61+
.tag("name", "testContainer")
62+
.counter();
63+
assertThat(stopCounter).isNotNull();
64+
assertThat(stopCounter.count()).isEqualTo(3.0);
65+
}
66+
67+
@Test
68+
void countersAreTaggedWithContainerName() {
69+
MeterRegistry registry = new SimpleMeterRegistry();
70+
ContainerLifecycleMicrometerHolder holder =
71+
new ContainerLifecycleMicrometerHolder(registry, "myContainer");
72+
73+
holder.recordStart();
74+
holder.recordStop();
75+
76+
assertThat(registry.find("spring.kafka.container.start.count")
77+
.tag("name", "myContainer").counter()).isNotNull();
78+
assertThat(registry.find("spring.kafka.container.stop.count")
79+
.tag("name", "myContainer").counter()).isNotNull();
80+
}
81+
82+
@Test
83+
void startAndStopCountersAreIndependent() {
84+
MeterRegistry registry = new SimpleMeterRegistry();
85+
ContainerLifecycleMicrometerHolder holder =
86+
new ContainerLifecycleMicrometerHolder(registry, "testContainer");
87+
88+
holder.recordStart();
89+
holder.recordStop();
90+
holder.recordStop();
91+
92+
Counter startCounter = registry.find("spring.kafka.container.start.count")
93+
.tag("name", "testContainer").counter();
94+
Counter stopCounter = registry.find("spring.kafka.container.stop.count")
95+
.tag("name", "testContainer").counter();
96+
97+
assertThat(startCounter).isNotNull();
98+
assertThat(stopCounter).isNotNull();
99+
assertThat(startCounter.count()).isEqualTo(1.0);
100+
assertThat(stopCounter.count()).isEqualTo(2.0);
101+
}
102+
103+
}

0 commit comments

Comments
 (0)