Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ This class is a super for the mentioned above `MicrometerConsumerListener`, `Mic
However, it can be used for any Kafka client use-cases.
The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector.


[[monitoring-container-lifecycle]]
== Monitoring Listener Container Lifecycle

Starting with version 4.1.0, the listener container will automatically create and update Micrometer ``Counter``s for container start and stop events, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The counters can be disabled by setting the ``ContainerProperty``'s `micrometerEnabled` to `false`.

Two counters are maintained - one for container start events and one for container stop events.

The counters are named as follows:

* `spring.kafka.container.start.count` - incremented each time the container is started
* `spring.kafka.container.stop.count` - incremented each time the container is stopped (including abnormal stops)

Both counters have the following tag:

* `name` : (container bean name)

This feature is useful for monitoring container restarts, detecting unexpected stops, and tracking the lifecycle of listener containers in production environments.

[[observation]]
== Micrometer Observation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
This section covers the changes made from version 4.0 to version 4.1.
For changes in earlier versions, see xref:appendix/change-history.adoc[Change History].

[[x41-container-lifecycle-metrics]]
=== Container Lifecycle Metrics

The listener container now automatically tracks start and stop counts using Micrometer counters (`spring.kafka.container.start.count` and `spring.kafka.container.stop.count`), tagged with the container bean name.
See xref:kafka/micrometer.adoc#monitoring-container-lifecycle[Monitoring Listener Container Lifecycle] for more information.

[[x41-kafka-listener-ack-mode]]
=== `@KafkaListener` Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* @author Sanghyeok An
* @author Lokesh Alamuri
* @author Christian Fredriksson
* @author Vineeth Yelagandula
*/
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
Expand Down Expand Up @@ -530,6 +531,7 @@ public final void start() {
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
Assert.state(!this.fenced, "Container Fenced. It is not allowed to start.");
doStart();
recordContainerStarted();
}
}
finally {
Expand Down Expand Up @@ -609,6 +611,20 @@ public void checkGroupId() {

protected abstract void doStart();

/**
* Called when the container is started. Subclasses may override to record metrics.
* @since 4.1.0
*/
protected void recordContainerStarted() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need since tag.

}

/**
* Called when the container is stopped. Subclasses may override to record metrics.
* @since 4.1.0
*/
protected void recordContainerStopped() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need since tag.

}

@Override
public final void stop() {
stop(true);
Expand Down Expand Up @@ -680,6 +696,7 @@ public void stop(Runnable callback) {
public void stopAbnormally(Runnable callback) {
this.lifecycleLock.lock();
try {
recordContainerStopped();
doStop(callback, false);
publishContainerStoppedEvent();
}
Expand All @@ -689,6 +706,7 @@ public void stopAbnormally(Runnable callback) {
}

protected void doStop(Runnable callback) {
recordContainerStopped();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call should also go into the stopAbnormally method above.

doStop(callback, true);
publishContainerStoppedEvent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand Down Expand Up @@ -122,6 +123,7 @@
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
import org.springframework.kafka.support.micrometer.ContainerLifecycleMicrometerHolder;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
Expand Down Expand Up @@ -177,6 +179,7 @@
* @author Jinhui Kim
* @author Minchul Son
* @author Youngjoo Kim
* @author Vineeth Yelagandula
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand All @@ -195,6 +198,9 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count

private @Nullable String clientIdSuffix;

@Nullable
private ContainerLifecycleMicrometerHolder lifecycleMicrometerHolder;

private Runnable emergencyStop = () -> stopAbnormally(() -> {
});

Expand Down Expand Up @@ -365,6 +371,43 @@ private void consumerWakeIfNecessary() {
return Collections.emptyMap();
}

@Override
protected void recordContainerStarted() {
if (this.lifecycleMicrometerHolder == null) {
this.lifecycleMicrometerHolder = obtainLifecycleMicrometerHolder();
}
if (this.lifecycleMicrometerHolder != null) {
this.lifecycleMicrometerHolder.recordStart();
}
}

@Override
protected void recordContainerStopped() {
if (this.lifecycleMicrometerHolder != null) {
this.lifecycleMicrometerHolder.recordStop();
}
}

@Nullable
private ContainerLifecycleMicrometerHolder obtainLifecycleMicrometerHolder() {
try {
if (KafkaUtils.MICROMETER_PRESENT && getContainerProperties().isMicrometerEnabled()) {
ApplicationContext ctx = getApplicationContext();
if (ctx != null) {
MeterRegistry registry =
ctx.getBeanProvider(MeterRegistry.class).getIfUnique();
if (registry != null) {
return new ContainerLifecycleMicrometerHolder(registry, getBeanName());
}
}
}
}
catch (IllegalStateException ex) {
this.logger.debug("Could not obtain MeterRegistry for lifecycle metrics", ex);
}
return null;
}

@Override
protected void doStart() {
if (isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.micrometer;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;

/**
* Micrometer holder for container lifecycle metrics (start/stop counts).
*
* @author Vineeth Yelagandula
* @since 4.1.0
*/
public final class ContainerLifecycleMicrometerHolder {

private final Counter startCounter;

private final Counter stopCounter;

/**
* Create an instance with the provided {@link MeterRegistry} and container name.
* @param registry the meter registry.
* @param containerName the container bean name.
*/
public ContainerLifecycleMicrometerHolder(MeterRegistry registry, String containerName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need javadoc for the constructor.

this.startCounter = Counter.builder("spring.kafka.container.start.count")
.description("Number of times this listener container has been started")
.tag("name", containerName)
.register(registry);
this.stopCounter = Counter.builder("spring.kafka.container.stop.count")
.description("Number of times this listener container has been stopped")
.tag("name", containerName)
.register(registry);
}

/**
* Increment the start counter.
*/
public void recordStart() {
this.startCounter.increment();
}

/**
* Increment the stop counter.
*/
public void recordStop() {
this.stopCounter.increment();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2024-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.micrometer;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link ContainerLifecycleMicrometerHolder}.
*
* @author Vineeth Yelagandula
* @since 4.1.0
*/
public class ContainerLifecycleMicrometerHolderTests {

@Test
void startCounterIncrementsOnRecordStart() {
MeterRegistry registry = new SimpleMeterRegistry();
ContainerLifecycleMicrometerHolder holder =
new ContainerLifecycleMicrometerHolder(registry, "testContainer");

holder.recordStart();
holder.recordStart();

Counter startCounter = registry.find("spring.kafka.container.start.count")
.tag("name", "testContainer")
.counter();
assertThat(startCounter).isNotNull();
assertThat(startCounter.count()).isEqualTo(2.0);
}

@Test
void stopCounterIncrementsOnRecordStop() {
MeterRegistry registry = new SimpleMeterRegistry();
ContainerLifecycleMicrometerHolder holder =
new ContainerLifecycleMicrometerHolder(registry, "testContainer");

holder.recordStop();
holder.recordStop();
holder.recordStop();

Counter stopCounter = registry.find("spring.kafka.container.stop.count")
.tag("name", "testContainer")
.counter();
assertThat(stopCounter).isNotNull();
assertThat(stopCounter.count()).isEqualTo(3.0);
}

@Test
void countersAreTaggedWithContainerName() {
MeterRegistry registry = new SimpleMeterRegistry();
ContainerLifecycleMicrometerHolder holder =
new ContainerLifecycleMicrometerHolder(registry, "myContainer");

holder.recordStart();
holder.recordStop();

assertThat(registry.find("spring.kafka.container.start.count")
.tag("name", "myContainer").counter()).isNotNull();
assertThat(registry.find("spring.kafka.container.stop.count")
.tag("name", "myContainer").counter()).isNotNull();
}

@Test
void startAndStopCountersAreIndependent() {
MeterRegistry registry = new SimpleMeterRegistry();
ContainerLifecycleMicrometerHolder holder =
new ContainerLifecycleMicrometerHolder(registry, "testContainer");

holder.recordStart();
holder.recordStop();
holder.recordStop();

Counter startCounter = registry.find("spring.kafka.container.start.count")
.tag("name", "testContainer").counter();
Counter stopCounter = registry.find("spring.kafka.container.stop.count")
.tag("name", "testContainer").counter();

assertThat(startCounter).isNotNull();
assertThat(stopCounter).isNotNull();
assertThat(startCounter.count()).isEqualTo(1.0);
assertThat(stopCounter.count()).isEqualTo(2.0);
}

}
Loading