Skip to content

Commit 6a1bbe6

Browse files
authored
[feat][broker] PIP-264: Add OpenTelemetry broker connection metrics (#22931)
1 parent 5ed07c1 commit 6a1bbe6

4 files changed

Lines changed: 170 additions & 10 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public PulsarStats(PulsarService pulsar) {
7878
this.bundleStats = new ConcurrentHashMap<>();
7979
this.tempMetricsCollection = new ArrayList<>();
8080
this.metricsCollection = new ArrayList<>();
81-
this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
82-
pulsar.getAdvertisedAddress());
81+
this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar);
8382
this.tempNonPersistentTopics = new ArrayList<>();
8483

8584
this.exposePublisherStats = pulsar.getConfiguration().isExposePublisherStats();

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,80 @@
1818
*/
1919
package org.apache.pulsar.broker.stats;
2020

21+
import io.opentelemetry.api.metrics.ObservableLongCounter;
2122
import io.prometheus.client.Counter;
2223
import java.util.ArrayList;
2324
import java.util.HashMap;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.LongAdder;
29+
import org.apache.pulsar.broker.PulsarService;
2830
import org.apache.pulsar.common.stats.Metrics;
31+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
32+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
2933

3034
/**
3135
*/
32-
public class BrokerOperabilityMetrics {
36+
public class BrokerOperabilityMetrics implements AutoCloseable {
3337
private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register();
3438
private final List<Metrics> metricsList;
3539
private final String localCluster;
3640
private final DimensionStats topicLoadStats;
3741
private final String brokerName;
3842
private final LongAdder connectionTotalCreatedCount;
39-
private final LongAdder connectionCreateSuccessCount;
40-
private final LongAdder connectionCreateFailCount;
4143
private final LongAdder connectionTotalClosedCount;
4244
private final LongAdder connectionActive;
4345

44-
public BrokerOperabilityMetrics(String localCluster, String brokerName) {
46+
private final LongAdder connectionCreateSuccessCount;
47+
private final LongAdder connectionCreateFailCount;
48+
49+
public static final String CONNECTION_COUNTER_METRIC_NAME = "pulsar.broker.connection.count";
50+
private final ObservableLongCounter connectionCounter;
51+
52+
public static final String CONNECTION_CREATE_COUNTER_METRIC_NAME =
53+
"pulsar.broker.connection.create.operation.count";
54+
private final ObservableLongCounter connectionCreateCounter;
55+
56+
public BrokerOperabilityMetrics(PulsarService pulsar) {
4557
this.metricsList = new ArrayList<>();
46-
this.localCluster = localCluster;
58+
this.localCluster = pulsar.getConfiguration().getClusterName();
4759
this.topicLoadStats = new DimensionStats("pulsar_topic_load_times", 60);
48-
this.brokerName = brokerName;
60+
this.brokerName = pulsar.getAdvertisedAddress();
4961
this.connectionTotalCreatedCount = new LongAdder();
50-
this.connectionCreateSuccessCount = new LongAdder();
51-
this.connectionCreateFailCount = new LongAdder();
5262
this.connectionTotalClosedCount = new LongAdder();
5363
this.connectionActive = new LongAdder();
64+
65+
this.connectionCreateSuccessCount = new LongAdder();
66+
this.connectionCreateFailCount = new LongAdder();
67+
68+
connectionCounter = pulsar.getOpenTelemetry().getMeter()
69+
.counterBuilder(CONNECTION_COUNTER_METRIC_NAME)
70+
.setDescription("The number of connections.")
71+
.setUnit("{connection}")
72+
.buildWithCallback(measurement -> {
73+
var closedConnections = connectionTotalClosedCount.sum();
74+
var openedConnections = connectionTotalCreatedCount.sum();
75+
var activeConnections = openedConnections - closedConnections;
76+
measurement.record(activeConnections, ConnectionStatus.ACTIVE.attributes);
77+
measurement.record(openedConnections, ConnectionStatus.OPEN.attributes);
78+
measurement.record(closedConnections, ConnectionStatus.CLOSE.attributes);
79+
});
80+
81+
connectionCreateCounter = pulsar.getOpenTelemetry().getMeter()
82+
.counterBuilder(CONNECTION_CREATE_COUNTER_METRIC_NAME)
83+
.setDescription("The number of connection create operations.")
84+
.setUnit("{operation}")
85+
.buildWithCallback(measurement -> {
86+
measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes);
87+
measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes);
88+
});
89+
}
90+
91+
@Override
92+
public void close() throws Exception {
93+
connectionCounter.close();
94+
connectionCreateCounter.close();
5495
}
5596

5697
public List<Metrics> getMetrics() {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.stats;
20+
21+
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
import java.util.concurrent.TimeUnit;
24+
import lombok.Cleanup;
25+
import org.apache.pulsar.broker.BrokerTestUtil;
26+
import org.apache.pulsar.broker.service.BrokerTestBase;
27+
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
28+
import org.apache.pulsar.client.api.PulsarClient;
29+
import org.apache.pulsar.client.api.PulsarClientException;
30+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
31+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
32+
import org.testng.annotations.AfterMethod;
33+
import org.testng.annotations.BeforeMethod;
34+
import org.testng.annotations.Test;
35+
36+
public class OpenTelemetryBrokerOperabilityStatsTest extends BrokerTestBase {
37+
38+
@BeforeMethod(alwaysRun = true)
39+
@Override
40+
protected void setup() throws Exception {
41+
super.baseSetup();
42+
}
43+
44+
@Override
45+
protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) {
46+
super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
47+
pulsarTestContextBuilder.enableOpenTelemetry(true);
48+
}
49+
50+
@AfterMethod(alwaysRun = true)
51+
@Override
52+
protected void cleanup() throws Exception {
53+
super.internalCleanup();
54+
}
55+
56+
@Test
57+
public void testBrokerConnection() throws Exception {
58+
var topicName = BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testBrokerConnection");
59+
60+
@Cleanup
61+
var producer = pulsarClient.newProducer().topic(topicName).create();
62+
63+
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
64+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
65+
OpenTelemetryAttributes.ConnectionStatus.OPEN.attributes, 1);
66+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
67+
OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 0);
68+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
69+
OpenTelemetryAttributes.ConnectionStatus.ACTIVE.attributes, 1);
70+
71+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
72+
ConnectionCreateStatus.SUCCESS.attributes, 1);
73+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
74+
ConnectionCreateStatus.FAILURE.attributes, 0);
75+
76+
pulsarClient.close();
77+
78+
metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
79+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
80+
OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 1);
81+
82+
pulsar.getConfiguration().setAuthenticationEnabled(true);
83+
84+
replacePulsarClient(PulsarClient.builder()
85+
.serviceUrl(lookupUrl.toString())
86+
.operationTimeout(1, TimeUnit.MILLISECONDS));
87+
assertThatThrownBy(() -> pulsarClient.newProducer().topic(topicName).create())
88+
.isInstanceOf(PulsarClientException.AuthenticationException.class);
89+
pulsarClient.close();
90+
91+
metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
92+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
93+
OpenTelemetryAttributes.ConnectionStatus.OPEN.attributes, 2);
94+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
95+
OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 2);
96+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
97+
OpenTelemetryAttributes.ConnectionStatus.ACTIVE.attributes, 0);
98+
99+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
100+
ConnectionCreateStatus.SUCCESS.attributes, 1);
101+
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
102+
ConnectionCreateStatus.FAILURE.attributes, 1);
103+
}
104+
}

pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,20 @@ enum BacklogQuotaType {
142142
TIME;
143143
public final Attributes attributes = Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase());
144144
}
145+
146+
AttributeKey<String> PULSAR_CONNECTION_STATUS = AttributeKey.stringKey("pulsar.connection.status");
147+
enum ConnectionStatus {
148+
ACTIVE,
149+
OPEN,
150+
CLOSE;
151+
public final Attributes attributes = Attributes.of(PULSAR_CONNECTION_STATUS, name().toLowerCase());
152+
}
153+
154+
AttributeKey<String> PULSAR_CONNECTION_CREATE_STATUS =
155+
AttributeKey.stringKey("pulsar.connection.create.operation.status");
156+
enum ConnectionCreateStatus {
157+
SUCCESS,
158+
FAILURE;
159+
public final Attributes attributes = Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
160+
}
145161
}

0 commit comments

Comments
 (0)