-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add start/stop count metrics to listener container #4383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,7 @@ | |
| * @author Sanghyeok An | ||
| * @author Lokesh Alamuri | ||
| * @author Christian Fredriksson | ||
| * @author Vineeth Yelagandula | ||
| */ | ||
| public abstract class AbstractMessageListenerContainer<K, V> | ||
| implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware, | ||
|
|
@@ -530,6 +531,7 @@ public final void start() { | |
| () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided"); | ||
| Assert.state(!this.fenced, "Container Fenced. It is not allowed to start."); | ||
| doStart(); | ||
| recordContainerStarted(); | ||
| } | ||
| } | ||
| finally { | ||
|
|
@@ -609,6 +611,20 @@ public void checkGroupId() { | |
|
|
||
| protected abstract void doStart(); | ||
|
|
||
| /** | ||
| * Called when the container is started. Subclasses may override to record metrics. | ||
| * @since 4.1.0 | ||
| */ | ||
| protected void recordContainerStarted() { | ||
| } | ||
|
|
||
| /** | ||
| * Called when the container is stopped. Subclasses may override to record metrics. | ||
| * @since 4.1.0 | ||
| */ | ||
| protected void recordContainerStopped() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need |
||
| } | ||
|
|
||
| @Override | ||
| public final void stop() { | ||
| stop(true); | ||
|
|
@@ -680,6 +696,7 @@ public void stop(Runnable callback) { | |
| public void stopAbnormally(Runnable callback) { | ||
| this.lifecycleLock.lock(); | ||
| try { | ||
| recordContainerStopped(); | ||
| doStop(callback, false); | ||
| publishContainerStoppedEvent(); | ||
| } | ||
|
|
@@ -689,6 +706,7 @@ public void stopAbnormally(Runnable callback) { | |
| } | ||
|
|
||
| protected void doStop(Runnable callback) { | ||
| recordContainerStopped(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This call should also go into the |
||
| doStop(callback, true); | ||
| publishContainerStoppedEvent(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| * Copyright 2016-present the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.kafka.support.micrometer; | ||
|
|
||
| import io.micrometer.core.instrument.Counter; | ||
| import io.micrometer.core.instrument.MeterRegistry; | ||
|
|
||
| /** | ||
| * Micrometer holder for container lifecycle metrics (start/stop counts). | ||
| * | ||
| * @author Vineeth Yelagandula | ||
| * @since 4.1.0 | ||
| */ | ||
| public final class ContainerLifecycleMicrometerHolder { | ||
|
|
||
| private final Counter startCounter; | ||
|
|
||
| private final Counter stopCounter; | ||
|
|
||
| /** | ||
| * Create an instance with the provided {@link MeterRegistry} and container name. | ||
| * @param registry the meter registry. | ||
| * @param containerName the container bean name. | ||
| */ | ||
| public ContainerLifecycleMicrometerHolder(MeterRegistry registry, String containerName) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need javadoc for the constructor. |
||
| this.startCounter = Counter.builder("spring.kafka.container.start.count") | ||
| .description("Number of times this listener container has been started") | ||
| .tag("name", containerName) | ||
| .register(registry); | ||
| this.stopCounter = Counter.builder("spring.kafka.container.stop.count") | ||
| .description("Number of times this listener container has been stopped") | ||
| .tag("name", containerName) | ||
| .register(registry); | ||
| } | ||
|
|
||
| /** | ||
| * Increment the start counter. | ||
| */ | ||
| public void recordStart() { | ||
| this.startCounter.increment(); | ||
| } | ||
|
|
||
| /** | ||
| * Increment the stop counter. | ||
| */ | ||
| public void recordStop() { | ||
| this.stopCounter.increment(); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| * Copyright 2024-present the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.kafka.support.micrometer; | ||
|
|
||
| import io.micrometer.core.instrument.Counter; | ||
| import io.micrometer.core.instrument.MeterRegistry; | ||
| import io.micrometer.core.instrument.simple.SimpleMeterRegistry; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| /** | ||
| * Tests for {@link ContainerLifecycleMicrometerHolder}. | ||
| * | ||
| * @author Vineeth Yelagandula | ||
| * @since 4.1.0 | ||
| */ | ||
| public class ContainerLifecycleMicrometerHolderTests { | ||
|
|
||
| @Test | ||
| void startCounterIncrementsOnRecordStart() { | ||
| MeterRegistry registry = new SimpleMeterRegistry(); | ||
| ContainerLifecycleMicrometerHolder holder = | ||
| new ContainerLifecycleMicrometerHolder(registry, "testContainer"); | ||
|
|
||
| holder.recordStart(); | ||
| holder.recordStart(); | ||
|
|
||
| Counter startCounter = registry.find("spring.kafka.container.start.count") | ||
| .tag("name", "testContainer") | ||
| .counter(); | ||
| assertThat(startCounter).isNotNull(); | ||
| assertThat(startCounter.count()).isEqualTo(2.0); | ||
| } | ||
|
|
||
| @Test | ||
| void stopCounterIncrementsOnRecordStop() { | ||
| MeterRegistry registry = new SimpleMeterRegistry(); | ||
| ContainerLifecycleMicrometerHolder holder = | ||
| new ContainerLifecycleMicrometerHolder(registry, "testContainer"); | ||
|
|
||
| holder.recordStop(); | ||
| holder.recordStop(); | ||
| holder.recordStop(); | ||
|
|
||
| Counter stopCounter = registry.find("spring.kafka.container.stop.count") | ||
| .tag("name", "testContainer") | ||
| .counter(); | ||
| assertThat(stopCounter).isNotNull(); | ||
| assertThat(stopCounter.count()).isEqualTo(3.0); | ||
| } | ||
|
|
||
| @Test | ||
| void countersAreTaggedWithContainerName() { | ||
| MeterRegistry registry = new SimpleMeterRegistry(); | ||
| ContainerLifecycleMicrometerHolder holder = | ||
| new ContainerLifecycleMicrometerHolder(registry, "myContainer"); | ||
|
|
||
| holder.recordStart(); | ||
| holder.recordStop(); | ||
|
|
||
| assertThat(registry.find("spring.kafka.container.start.count") | ||
| .tag("name", "myContainer").counter()).isNotNull(); | ||
| assertThat(registry.find("spring.kafka.container.stop.count") | ||
| .tag("name", "myContainer").counter()).isNotNull(); | ||
| } | ||
|
|
||
| @Test | ||
| void startAndStopCountersAreIndependent() { | ||
| MeterRegistry registry = new SimpleMeterRegistry(); | ||
| ContainerLifecycleMicrometerHolder holder = | ||
| new ContainerLifecycleMicrometerHolder(registry, "testContainer"); | ||
|
|
||
| holder.recordStart(); | ||
| holder.recordStop(); | ||
| holder.recordStop(); | ||
|
|
||
| Counter startCounter = registry.find("spring.kafka.container.start.count") | ||
| .tag("name", "testContainer").counter(); | ||
| Counter stopCounter = registry.find("spring.kafka.container.stop.count") | ||
| .tag("name", "testContainer").counter(); | ||
|
|
||
| assertThat(startCounter).isNotNull(); | ||
| assertThat(stopCounter).isNotNull(); | ||
| assertThat(startCounter.count()).isEqualTo(1.0); | ||
| assertThat(stopCounter.count()).isEqualTo(2.0); | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need
sincetag.