diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index 806af1b3ca..7e0251a2dc 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -93,6 +93,26 @@ This class is a super for the mentioned above `MicrometerConsumerListener`, `Mic However, it can be used for any Kafka client use-cases. The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector. + +[[monitoring-container-lifecycle]] +== Monitoring Listener Container Lifecycle + +Starting with version 4.1.0, the listener container will automatically create and update Micrometer ``Counter``s for container start and stop events, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context. +The counters can be disabled by setting the ``ContainerProperty``'s `micrometerEnabled` to `false`. + +Two counters are maintained - one for container start events and one for container stop events. + +The counters are named as follows: + +* `spring.kafka.container.start.count` - incremented each time the container is started +* `spring.kafka.container.stop.count` - incremented each time the container is stopped (including abnormal stops) + +Both counters have the following tag: + +* `name` : (container bean name) + +This feature is useful for monitoring container restarts, detecting unexpected stops, and tracking the lifecycle of listener containers in production environments. + [[observation]] == Micrometer Observation diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 5a8054262b..6614b1d298 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -7,6 +7,12 @@ This section covers the changes made from version 4.0 to version 4.1. For changes in earlier versions, see xref:appendix/change-history.adoc[Change History]. +[[x41-container-lifecycle-metrics]] +=== Container Lifecycle Metrics + +The listener container now automatically tracks start and stop counts using Micrometer counters (`spring.kafka.container.start.count` and `spring.kafka.container.stop.count`), tagged with the container bean name. +See xref:kafka/micrometer.adoc#monitoring-container-lifecycle[Monitoring Listener Container Lifecycle] for more information. + [[x41-kafka-listener-ack-mode]] === `@KafkaListener` Changes diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 45b68c3140..909ba818f7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -73,6 +73,7 @@ * @author Sanghyeok An * @author Lokesh Alamuri * @author Christian Fredriksson + * @author Vineeth Yelagandula */ public abstract class AbstractMessageListenerContainer implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, @@ -530,6 +531,7 @@ public final void start() { () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided"); Assert.state(!this.fenced, "Container Fenced. It is not allowed to start."); doStart(); + recordContainerStarted(); } } finally { @@ -609,6 +611,20 @@ public void checkGroupId() { protected abstract void doStart(); + /** + * Called when the container is started. Subclasses may override to record metrics. + * @since 4.1.0 + */ + protected void recordContainerStarted() { + } + + /** + * Called when the container is stopped. Subclasses may override to record metrics. + * @since 4.1.0 + */ + protected void recordContainerStopped() { + } + @Override public final void stop() { stop(true); @@ -680,6 +696,7 @@ public void stop(Runnable callback) { public void stopAbnormally(Runnable callback) { this.lifecycleLock.lock(); try { + recordContainerStopped(); doStop(callback, false); publishContainerStoppedEvent(); } @@ -689,6 +706,7 @@ public void stopAbnormally(Runnable callback) { } protected void doStop(Runnable callback) { + recordContainerStopped(); doStop(callback, true); publishContainerStoppedEvent(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index c80dc6127e..13a0e0111d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -51,6 +51,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -122,6 +123,7 @@ import org.springframework.kafka.support.micrometer.KafkaListenerObservation; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext; +import org.springframework.kafka.support.micrometer.ContainerLifecycleMicrometerHolder; import org.springframework.kafka.support.micrometer.MicrometerHolder; import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.support.serializer.SerializationUtils; @@ -177,6 +179,7 @@ * @author Jinhui Kim * @author Minchul Son * @author Youngjoo Kim + * @author Vineeth Yelagandula */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -195,6 +198,9 @@ public class KafkaMessageListenerContainer // NOSONAR line count private @Nullable String clientIdSuffix; + @Nullable + private ContainerLifecycleMicrometerHolder lifecycleMicrometerHolder; + private Runnable emergencyStop = () -> stopAbnormally(() -> { }); @@ -365,6 +371,43 @@ private void consumerWakeIfNecessary() { return Collections.emptyMap(); } + @Override + protected void recordContainerStarted() { + if (this.lifecycleMicrometerHolder == null) { + this.lifecycleMicrometerHolder = obtainLifecycleMicrometerHolder(); + } + if (this.lifecycleMicrometerHolder != null) { + this.lifecycleMicrometerHolder.recordStart(); + } + } + + @Override + protected void recordContainerStopped() { + if (this.lifecycleMicrometerHolder != null) { + this.lifecycleMicrometerHolder.recordStop(); + } + } + + @Nullable + private ContainerLifecycleMicrometerHolder obtainLifecycleMicrometerHolder() { + try { + if (KafkaUtils.MICROMETER_PRESENT && getContainerProperties().isMicrometerEnabled()) { + ApplicationContext ctx = getApplicationContext(); + if (ctx != null) { + MeterRegistry registry = + ctx.getBeanProvider(MeterRegistry.class).getIfUnique(); + if (registry != null) { + return new ContainerLifecycleMicrometerHolder(registry, getBeanName()); + } + } + } + } + catch (IllegalStateException ex) { + this.logger.debug("Could not obtain MeterRegistry for lifecycle metrics", ex); + } + return null; + } + @Override protected void doStart() { if (isRunning()) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/ContainerLifecycleMicrometerHolder.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/ContainerLifecycleMicrometerHolder.java new file mode 100644 index 0000000000..3451bec67c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/ContainerLifecycleMicrometerHolder.java @@ -0,0 +1,64 @@ +/* + * Copyright 2016-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; + +/** + * Micrometer holder for container lifecycle metrics (start/stop counts). + * + * @author Vineeth Yelagandula + * @since 4.1.0 + */ +public final class ContainerLifecycleMicrometerHolder { + + private final Counter startCounter; + + private final Counter stopCounter; + + /** + * Create an instance with the provided {@link MeterRegistry} and container name. + * @param registry the meter registry. + * @param containerName the container bean name. + */ + public ContainerLifecycleMicrometerHolder(MeterRegistry registry, String containerName) { + this.startCounter = Counter.builder("spring.kafka.container.start.count") + .description("Number of times this listener container has been started") + .tag("name", containerName) + .register(registry); + this.stopCounter = Counter.builder("spring.kafka.container.stop.count") + .description("Number of times this listener container has been stopped") + .tag("name", containerName) + .register(registry); + } + + /** + * Increment the start counter. + */ + public void recordStart() { + this.startCounter.increment(); + } + + /** + * Increment the stop counter. + */ + public void recordStop() { + this.stopCounter.increment(); + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ContainerLifecycleMicrometerHolderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ContainerLifecycleMicrometerHolderTests.java new file mode 100644 index 0000000000..ef04cf07fc --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ContainerLifecycleMicrometerHolderTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.micrometer; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link ContainerLifecycleMicrometerHolder}. + * + * @author Vineeth Yelagandula + * @since 4.1.0 + */ +public class ContainerLifecycleMicrometerHolderTests { + + @Test + void startCounterIncrementsOnRecordStart() { + MeterRegistry registry = new SimpleMeterRegistry(); + ContainerLifecycleMicrometerHolder holder = + new ContainerLifecycleMicrometerHolder(registry, "testContainer"); + + holder.recordStart(); + holder.recordStart(); + + Counter startCounter = registry.find("spring.kafka.container.start.count") + .tag("name", "testContainer") + .counter(); + assertThat(startCounter).isNotNull(); + assertThat(startCounter.count()).isEqualTo(2.0); + } + + @Test + void stopCounterIncrementsOnRecordStop() { + MeterRegistry registry = new SimpleMeterRegistry(); + ContainerLifecycleMicrometerHolder holder = + new ContainerLifecycleMicrometerHolder(registry, "testContainer"); + + holder.recordStop(); + holder.recordStop(); + holder.recordStop(); + + Counter stopCounter = registry.find("spring.kafka.container.stop.count") + .tag("name", "testContainer") + .counter(); + assertThat(stopCounter).isNotNull(); + assertThat(stopCounter.count()).isEqualTo(3.0); + } + + @Test + void countersAreTaggedWithContainerName() { + MeterRegistry registry = new SimpleMeterRegistry(); + ContainerLifecycleMicrometerHolder holder = + new ContainerLifecycleMicrometerHolder(registry, "myContainer"); + + holder.recordStart(); + holder.recordStop(); + + assertThat(registry.find("spring.kafka.container.start.count") + .tag("name", "myContainer").counter()).isNotNull(); + assertThat(registry.find("spring.kafka.container.stop.count") + .tag("name", "myContainer").counter()).isNotNull(); + } + + @Test + void startAndStopCountersAreIndependent() { + MeterRegistry registry = new SimpleMeterRegistry(); + ContainerLifecycleMicrometerHolder holder = + new ContainerLifecycleMicrometerHolder(registry, "testContainer"); + + holder.recordStart(); + holder.recordStop(); + holder.recordStop(); + + Counter startCounter = registry.find("spring.kafka.container.start.count") + .tag("name", "testContainer").counter(); + Counter stopCounter = registry.find("spring.kafka.container.stop.count") + .tag("name", "testContainer").counter(); + + assertThat(startCounter).isNotNull(); + assertThat(stopCounter).isNotNull(); + assertThat(startCounter.count()).isEqualTo(1.0); + assertThat(stopCounter.count()).isEqualTo(2.0); + } + +}