Skip to content

Commit e558cfe

Browse files
authored
[feat][broker] PIP-264: Add OpenTelemetry consumer metrics (#22693)
1 parent d77c5de commit e558cfe

7 files changed

Lines changed: 408 additions & 1 deletion

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
110110
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
111111
import org.apache.pulsar.broker.stats.MetricsGenerator;
112+
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
112113
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
113114
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
114115
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
@@ -254,6 +255,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
254255
private MetricsGenerator metricsGenerator;
255256
private final PulsarBrokerOpenTelemetry openTelemetry;
256257
private OpenTelemetryTopicStats openTelemetryTopicStats;
258+
private OpenTelemetryConsumerStats openTelemetryConsumerStats;
257259

258260
private TransactionMetadataStoreService transactionMetadataStoreService;
259261
private TransactionBufferProvider transactionBufferProvider;
@@ -630,8 +632,13 @@ public CompletableFuture<Void> closeAsync() {
630632
brokerClientSharedTimer.stop();
631633
monotonicSnapshotClock.close();
632634

635+
if (openTelemetryConsumerStats != null) {
636+
openTelemetryConsumerStats.close();
637+
openTelemetryConsumerStats = null;
638+
}
633639
if (openTelemetryTopicStats != null) {
634640
openTelemetryTopicStats.close();
641+
openTelemetryTopicStats = null;
635642
}
636643

637644
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
@@ -775,6 +782,7 @@ public void start() throws PulsarServerException {
775782
}
776783

777784
openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
785+
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
778786

779787
localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
780788
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.util.concurrent.AtomicDouble;
2626
import io.netty.util.concurrent.Future;
2727
import io.netty.util.concurrent.Promise;
28+
import java.time.Instant;
2829
import java.util.ArrayList;
2930
import java.util.BitSet;
3031
import java.util.Collections;
@@ -90,7 +91,9 @@ public class Consumer {
9091
private final Rate msgOut;
9192
private final Rate msgRedeliver;
9293
private final LongAdder msgOutCounter;
94+
private final LongAdder msgRedeliverCounter;
9395
private final LongAdder bytesOutCounter;
96+
private final LongAdder messageAckCounter;
9497
private final Rate messageAckRate;
9598

9699
private volatile long lastConsumedTimestamp;
@@ -152,6 +155,9 @@ public class Consumer {
152155
@Getter
153156
private final SchemaType schemaType;
154157

158+
@Getter
159+
private final Instant connectedSince = Instant.now();
160+
155161
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
156162
int priorityLevel, String consumerName,
157163
boolean isDurable, TransportCnx cnx, String appId,
@@ -182,8 +188,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
182188
this.msgOut = new Rate();
183189
this.chunkedMessageRate = new Rate();
184190
this.msgRedeliver = new Rate();
191+
this.msgRedeliverCounter = new LongAdder();
185192
this.bytesOutCounter = new LongAdder();
186193
this.msgOutCounter = new LongAdder();
194+
this.messageAckCounter = new LongAdder();
187195
this.messageAckRate = new Rate();
188196
this.appId = appId;
189197

@@ -200,7 +208,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
200208
stats = new ConsumerStatsImpl();
201209
stats.setAddress(cnx.clientSourceAddressAndPort());
202210
stats.consumerName = consumerName;
203-
stats.setConnectedSince(DateFormatter.now());
211+
stats.setConnectedSince(DateFormatter.format(connectedSince));
204212
stats.setClientVersion(cnx.getClientVersion());
205213
stats.metadata = this.metadata;
206214

@@ -238,8 +246,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
238246
this.consumerName = consumerName;
239247
this.msgOut = null;
240248
this.msgRedeliver = null;
249+
this.msgRedeliverCounter = null;
241250
this.msgOutCounter = null;
242251
this.bytesOutCounter = null;
252+
this.messageAckCounter = null;
243253
this.messageAckRate = null;
244254
this.pendingAcks = null;
245255
this.stats = null;
@@ -502,6 +512,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
502512
return future
503513
.thenApply(v -> {
504514
this.messageAckRate.recordEvent(v);
515+
this.messageAckCounter.add(v);
505516
return null;
506517
});
507518
}
@@ -922,6 +933,14 @@ public long getBytesOutCounter() {
922933
return bytesOutCounter.longValue();
923934
}
924935

936+
public long getMessageAckCounter() {
937+
return messageAckCounter.sum();
938+
}
939+
940+
public long getMessageRedeliverCounter() {
941+
return msgRedeliverCounter.sum();
942+
}
943+
925944
public int getUnackedMessages() {
926945
return unackedMessages;
927946
}
@@ -1059,6 +1078,8 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) {
10591078
}
10601079

10611080
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue());
1081+
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
1082+
10621083
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
10631084
} else {
10641085
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
@@ -1091,6 +1112,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
10911112

10921113
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
10931114
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
1115+
msgRedeliverCounter.add(totalRedeliveryMessages);
10941116

10951117
int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
10961118

@@ -1153,6 +1175,14 @@ public String getClientAddress() {
11531175
return clientAddress;
11541176
}
11551177

1178+
public String getClientAddressAndPort() {
1179+
return cnx.clientSourceAddressAndPort();
1180+
}
1181+
1182+
public String getClientVersion() {
1183+
return cnx.getClientVersion();
1184+
}
1185+
11561186
public MessageId getStartMessageId() {
11571187
return startMessageId;
11581188
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.stats;
20+
21+
import io.opentelemetry.api.common.Attributes;
22+
import io.opentelemetry.api.metrics.BatchCallback;
23+
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
24+
import java.util.Collection;
25+
import java.util.Optional;
26+
import org.apache.pulsar.broker.PulsarService;
27+
import org.apache.pulsar.broker.service.Consumer;
28+
import org.apache.pulsar.broker.service.Subscription;
29+
import org.apache.pulsar.broker.service.Topic;
30+
import org.apache.pulsar.common.naming.TopicName;
31+
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
32+
33+
public class OpenTelemetryConsumerStats implements AutoCloseable {
34+
35+
// Replaces pulsar_consumer_msg_rate_out
36+
public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.count";
37+
private final ObservableLongMeasurement messageOutCounter;
38+
39+
// Replaces pulsar_consumer_msg_throughput_out
40+
public static final String BYTES_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.size";
41+
private final ObservableLongMeasurement bytesOutCounter;
42+
43+
// Replaces pulsar_consumer_msg_ack_rate
44+
public static final String MESSAGE_ACK_COUNTER = "pulsar.broker.consumer.message.ack.count";
45+
private final ObservableLongMeasurement messageAckCounter;
46+
47+
// Replaces pulsar_consumer_msg_rate_redeliver
48+
public static final String MESSAGE_REDELIVER_COUNTER = "pulsar.broker.consumer.message.redeliver.count";
49+
private final ObservableLongMeasurement messageRedeliverCounter;
50+
51+
// Replaces pulsar_consumer_unacked_messages
52+
public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count";
53+
private final ObservableLongMeasurement messageUnacknowledgedCounter;
54+
55+
// Replaces pulsar_consumer_available_permits
56+
public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count";
57+
private final ObservableLongMeasurement messagePermitsCounter;
58+
59+
private final BatchCallback batchCallback;
60+
61+
public OpenTelemetryConsumerStats(PulsarService pulsar) {
62+
var meter = pulsar.getOpenTelemetry().getMeter();
63+
64+
messageOutCounter = meter
65+
.counterBuilder(MESSAGE_OUT_COUNTER)
66+
.setUnit("{message}")
67+
.setDescription("The total number of messages dispatched to this consumer.")
68+
.buildObserver();
69+
70+
bytesOutCounter = meter
71+
.counterBuilder(BYTES_OUT_COUNTER)
72+
.setUnit("By")
73+
.setDescription("The total number of messages bytes dispatched to this consumer.")
74+
.buildObserver();
75+
76+
messageAckCounter = meter
77+
.counterBuilder(MESSAGE_ACK_COUNTER)
78+
.setUnit("{ack}")
79+
.setDescription("The total number of message acknowledgments received from this consumer.")
80+
.buildObserver();
81+
82+
messageRedeliverCounter = meter
83+
.counterBuilder(MESSAGE_REDELIVER_COUNTER)
84+
.setUnit("{message}")
85+
.setDescription("The total number of messages that have been redelivered to this consumer.")
86+
.buildObserver();
87+
88+
messageUnacknowledgedCounter = meter
89+
.upDownCounterBuilder(MESSAGE_UNACKNOWLEDGED_COUNTER)
90+
.setUnit("{message}")
91+
.setDescription("The total number of messages unacknowledged by this consumer.")
92+
.buildObserver();
93+
94+
messagePermitsCounter = meter
95+
.upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
96+
.setUnit("{permit}")
97+
.setDescription("The number of permits currently available for this consumer.")
98+
.buildObserver();
99+
100+
batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
101+
.getTopics()
102+
.values()
103+
.stream()
104+
.map(topicFuture -> topicFuture.getNow(Optional.empty()))
105+
.filter(Optional::isPresent)
106+
.map(Optional::get)
107+
.map(Topic::getSubscriptions)
108+
.flatMap(s -> s.values().stream())
109+
.map(Subscription::getConsumers)
110+
.flatMap(Collection::stream)
111+
.forEach(this::recordMetricsForConsumer),
112+
messageOutCounter,
113+
bytesOutCounter,
114+
messageAckCounter,
115+
messageRedeliverCounter,
116+
messageUnacknowledgedCounter,
117+
messagePermitsCounter);
118+
}
119+
120+
@Override
121+
public void close() {
122+
batchCallback.close();
123+
}
124+
125+
private void recordMetricsForConsumer(Consumer consumer) {
126+
var subscription = consumer.getSubscription();
127+
var topicName = TopicName.get(subscription.getTopic().getName());
128+
129+
var builder = Attributes.builder()
130+
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.consumerName())
131+
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumer.consumerId())
132+
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
133+
consumer.getConnectedSince().getEpochSecond())
134+
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName())
135+
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, consumer.subType().toString())
136+
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
137+
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
138+
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
139+
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
140+
if (topicName.isPartitioned()) {
141+
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
142+
}
143+
var clientAddress = consumer.getClientAddressAndPort();
144+
if (clientAddress != null) {
145+
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, clientAddress);
146+
}
147+
var clientVersion = consumer.getClientVersion();
148+
if (clientVersion != null) {
149+
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, clientVersion);
150+
}
151+
var metadataList = consumer.getMetadata()
152+
.entrySet()
153+
.stream()
154+
.map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
155+
.toList();
156+
builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, metadataList);
157+
var attributes = builder.build();
158+
159+
messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
160+
bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
161+
messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
162+
messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), attributes);
163+
messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
164+
Attributes.builder()
165+
.putAll(attributes)
166+
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, consumer.isBlocked())
167+
.build());
168+
messagePermitsCounter.record(consumer.getAvailablePermits(), attributes);
169+
}
170+
}

0 commit comments

Comments
 (0)