Skip to content

Commit c30d632

Browse files
author
Vineeth
committed
Add container lifecycle metrics for start/stop counts
Introduce protected recordContainerStarted() and recordContainerStopped() lifecycle hooks in AbstractMessageListenerContainer. Add ContainerLifecycleMicrometerHolder to handle counter registration. Override lifecycle methods in KafkaMessageListenerContainer following the obtainMicrometerHolder() pattern to avoid tight Micrometer coupling in the abstract base class. Closes #4277 Signed-off-by: Vineeth <vineeth>
1 parent 813b9af commit c30d632

3 files changed

Lines changed: 114 additions & 0 deletions

File tree

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ public final void start() {
530530
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
531531
Assert.state(!this.fenced, "Container Fenced. It is not allowed to start.");
532532
doStart();
533+
recordContainerStarted();
533534
}
534535
}
535536
finally {
@@ -609,6 +610,18 @@ public void checkGroupId() {
609610

610611
protected abstract void doStart();
611612

613+
/**
614+
* Called when the container is started. Subclasses may override to record metrics.
615+
*/
616+
protected void recordContainerStarted() {
617+
}
618+
619+
/**
620+
* Called when the container is stopped. Subclasses may override to record metrics.
621+
*/
622+
protected void recordContainerStopped() {
623+
}
624+
612625
@Override
613626
public final void stop() {
614627
stop(true);
@@ -689,6 +702,7 @@ public void stopAbnormally(Runnable callback) {
689702
}
690703

691704
protected void doStop(Runnable callback) {
705+
recordContainerStopped();
692706
doStop(callback, true);
693707
publishContainerStoppedEvent();
694708
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
123123
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
124124
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
125+
import org.springframework.kafka.support.micrometer.ContainerLifecycleMicrometerHolder;
125126
import org.springframework.kafka.support.micrometer.MicrometerHolder;
126127
import org.springframework.kafka.support.serializer.DeserializationException;
127128
import org.springframework.kafka.support.serializer.SerializationUtils;
@@ -195,6 +196,9 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
195196

196197
private @Nullable String clientIdSuffix;
197198

199+
@Nullable
200+
private ContainerLifecycleMicrometerHolder lifecycleMicrometerHolder;
201+
198202
private Runnable emergencyStop = () -> stopAbnormally(() -> {
199203
});
200204

@@ -365,6 +369,43 @@ private void consumerWakeIfNecessary() {
365369
return Collections.emptyMap();
366370
}
367371

372+
@Override
373+
protected void recordContainerStarted() {
374+
if (this.lifecycleMicrometerHolder == null) {
375+
this.lifecycleMicrometerHolder = obtainLifecycleMicrometerHolder();
376+
}
377+
if (this.lifecycleMicrometerHolder != null) {
378+
this.lifecycleMicrometerHolder.recordStart();
379+
}
380+
}
381+
382+
@Override
383+
protected void recordContainerStopped() {
384+
if (this.lifecycleMicrometerHolder != null) {
385+
this.lifecycleMicrometerHolder.recordStop();
386+
}
387+
}
388+
389+
@Nullable
390+
private ContainerLifecycleMicrometerHolder obtainLifecycleMicrometerHolder() {
391+
try {
392+
if (KafkaUtils.MICROMETER_PRESENT) {
393+
org.springframework.context.ApplicationContext ctx = getApplicationContext();
394+
if (ctx != null) {
395+
io.micrometer.core.instrument.MeterRegistry registry =
396+
ctx.getBeanProvider(io.micrometer.core.instrument.MeterRegistry.class).getIfUnique();
397+
if (registry != null) {
398+
return new ContainerLifecycleMicrometerHolder(registry, getBeanName());
399+
}
400+
}
401+
}
402+
}
403+
catch (Exception ex) {
404+
// no micrometer
405+
}
406+
return null;
407+
}
408+
368409
@Override
369410
protected void doStart() {
370411
if (isRunning()) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2016-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.micrometer;
18+
19+
import io.micrometer.core.instrument.Counter;
20+
import io.micrometer.core.instrument.MeterRegistry;
21+
22+
/**
23+
* Micrometer holder for container lifecycle metrics (start/stop counts).
24+
*
25+
* @author Vineeth Yelagandula
26+
* @since 3.4
27+
*/
28+
public class ContainerLifecycleMicrometerHolder {
29+
30+
private final Counter startCounter;
31+
32+
private final Counter stopCounter;
33+
34+
public ContainerLifecycleMicrometerHolder(MeterRegistry registry, String containerName) {
35+
this.startCounter = Counter.builder("spring.kafka.container.start.count")
36+
.description("Number of times this listener container has been started")
37+
.tag("name", containerName)
38+
.register(registry);
39+
this.stopCounter = Counter.builder("spring.kafka.container.stop.count")
40+
.description("Number of times this listener container has been stopped")
41+
.tag("name", containerName)
42+
.register(registry);
43+
}
44+
45+
/**
46+
* Increment the start counter.
47+
*/
48+
public void recordStart() {
49+
this.startCounter.increment();
50+
}
51+
52+
/**
53+
* Increment the stop counter.
54+
*/
55+
public void recordStop() {
56+
this.stopCounter.increment();
57+
}
58+
59+
}

0 commit comments

Comments
 (0)