Skip to content

Commit e7eb7b3

Browse files
authored
[revert] Revert "[improve][broker] Support showing client ip address in client stats while using reverse proxy (#23974)" (#24177)
1 parent 033fec4 commit e7eb7b3

8 files changed

Lines changed: 11 additions & 55 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.bookkeeper.mledger.Position;
4949
import org.apache.bookkeeper.mledger.PositionFactory;
5050
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
51-
import org.apache.commons.lang3.StringUtils;
5251
import org.apache.commons.lang3.mutable.MutableInt;
5352
import org.apache.commons.lang3.tuple.MutablePair;
5453
import org.apache.commons.lang3.tuple.Pair;
@@ -65,7 +64,6 @@
6564
import org.apache.pulsar.common.api.proto.KeyLongValue;
6665
import org.apache.pulsar.common.api.proto.KeySharedMeta;
6766
import org.apache.pulsar.common.api.proto.MessageIdData;
68-
import org.apache.pulsar.common.naming.Metadata;
6967
import org.apache.pulsar.common.naming.TopicName;
7068
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
7169
import org.apache.pulsar.common.policies.data.TopicOperation;
@@ -228,8 +226,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
228226
this.metadata = metadata != null ? metadata : Collections.emptyMap();
229227

230228
stats = new ConsumerStatsImpl();
231-
String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null;
232-
stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort());
229+
stats.setAddress(cnx.clientSourceAddressAndPort());
233230
stats.consumerName = consumerName;
234231
stats.appId = appId;
235232
stats.setConnectedSince(DateFormatter.format(connectedSince));

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
4343
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4444
import org.apache.bookkeeper.mledger.Position;
45-
import org.apache.commons.lang3.StringUtils;
4645
import org.apache.pulsar.broker.ServiceConfiguration;
4746
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
4847
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -56,7 +55,6 @@
5655
import org.apache.pulsar.common.api.proto.MessageMetadata;
5756
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
5857
import org.apache.pulsar.common.api.proto.ServerError;
59-
import org.apache.pulsar.common.naming.Metadata;
6058
import org.apache.pulsar.common.naming.TopicName;
6159
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
6260
import org.apache.pulsar.common.policies.data.TopicOperation;
@@ -133,8 +131,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
133131
this.metadata = metadata != null ? metadata : Collections.emptyMap();
134132

135133
this.stats = isNonPersistentTopic ? new NonPersistentPublisherStatsImpl() : new PublisherStatsImpl();
136-
String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null;
137-
stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort());
134+
stats.setAddress(cnx.clientSourceAddressAndPort());
138135
stats.setConnectedSince(DateFormatter.now());
139136
stats.setClientVersion(cnx.getClientVersion());
140137
stats.setProducerName(producerName);

pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,16 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21-
import static org.testng.Assert.assertNotNull;
2221
import java.net.InetAddress;
2322
import java.net.URI;
2423
import java.util.HashMap;
2524
import java.util.Map;
2625
import java.util.concurrent.TimeUnit;
26+
2727
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
2828
import org.testng.annotations.Test;
29+
2930
import lombok.Cleanup;
30-
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
31-
import org.apache.pulsar.broker.service.Consumer;
32-
import org.apache.pulsar.broker.service.Producer;
33-
import org.apache.pulsar.common.naming.Metadata;
3431

3532
@Test(groups = "broker-api")
3633
public class TlsSniTest extends TlsProducerConsumerBase {
@@ -54,7 +51,6 @@ public void testIpAddressInBrokerServiceUrl() throws Exception {
5451

5552
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
5653
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH).allowTlsInsecureConnection(false)
57-
.proxyServiceUrl(brokerServiceIpAddressUrl, ProxyProtocol.SNI)
5854
.enableTlsHostnameVerification(false)
5955
.operationTimeout(1000, TimeUnit.MILLISECONDS);
6056
Map<String, String> authParams = new HashMap<>();
@@ -66,12 +62,6 @@ public void testIpAddressInBrokerServiceUrl() throws Exception {
6662
PulsarClient pulsarClient = clientBuilder.build();
6763
// should be able to create producer successfully
6864
pulsarClient.newProducer().topic(topicName).create();
69-
pulsarClient.newConsumer().topic(topicName).subscriptionName("test").subscribe();
70-
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
71-
Producer producer = topic.getProducers().values().iterator().next();
72-
assertNotNull(producer.getMetadata().get(Metadata.CLIENT_IP));
73-
Consumer consumer = topic.getSubscription("test").getDispatcher().getConsumers().iterator().next();
74-
assertNotNull(consumer.getMetadata().get(Metadata.CLIENT_IP));
7565
}
7666
}
7767

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@
119119
@SuppressWarnings("unchecked")
120120
public class ClientCnx extends PulsarHandler {
121121

122-
private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
123122
protected final Authentication authentication;
124123
protected State state;
125124

@@ -1439,13 +1438,7 @@ private void checkRequestTimeout() {
14391438
}
14401439
}
14411440

1442-
public boolean isProxy() {
1443-
return proxyToTargetBrokerAddress != null;
1444-
}
1445-
1446-
public SocketAddress getLocalAddress() {
1447-
return this.localAddress;
1448-
}
1441+
private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
14491442

14501443
/**
14511444
* Check client connection is now free. This method will not change the state to idle.

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@
124124
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
125125
import org.apache.pulsar.common.compression.CompressionCodec;
126126
import org.apache.pulsar.common.compression.CompressionCodecProvider;
127-
import org.apache.pulsar.common.naming.Metadata;
128127
import org.apache.pulsar.common.naming.TopicName;
129128
import org.apache.pulsar.common.protocol.Commands;
130129
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -191,7 +190,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
191190

192191
private final MessageCrypto msgCrypto;
193192

194-
private Map<String, String> metadata;
193+
private final Map<String, String> metadata;
195194

196195
private final boolean readCompacted;
197196
private final boolean resetIncludeHead;
@@ -365,7 +364,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
365364
if (conf.getProperties().isEmpty()) {
366365
metadata = Collections.emptyMap();
367366
} else {
368-
metadata = new HashMap<>(conf.getProperties());
367+
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
369368
}
370369

371370
this.connectionHandler = new ConnectionHandler(this,
@@ -919,7 +918,6 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
919918
// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
920919
final CompletableFuture<Void> future = new CompletableFuture<>();
921920
synchronized (this) {
922-
updateProxyMetadataIfNeeded(cnx);
923921
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
924922
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
925923
conf.getReplicateSubscriptionState(),
@@ -3159,14 +3157,6 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
31593157
return cmd;
31603158
}
31613159

3162-
private void updateProxyMetadataIfNeeded(ClientCnx cnx) {
3163-
boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null;
3164-
if (isProxy && cnx.getLocalAddress() != null) {
3165-
metadata = metadata.isEmpty() ? new HashMap<>() : metadata;
3166-
metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString());
3167-
}
3168-
}
3169-
31703160
private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
31713161
Map<String, Long> properties, TxnID txnID) {
31723162
long requestId = client.newRequestId();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
import org.apache.pulsar.common.api.proto.ProtocolVersion;
9393
import org.apache.pulsar.common.compression.CompressionCodec;
9494
import org.apache.pulsar.common.compression.CompressionCodecProvider;
95-
import org.apache.pulsar.common.naming.Metadata;
9695
import org.apache.pulsar.common.naming.TopicName;
9796
import org.apache.pulsar.common.protocol.ByteBufPair;
9897
import org.apache.pulsar.common.protocol.Commands;
@@ -157,7 +156,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
157156

158157
private ScheduledFuture<?> keyGeneratorTask = null;
159158

160-
private Map<String, String> metadata;
159+
private final Map<String, String> metadata;
161160

162161
private Optional<byte[]> schemaVersion = Optional.empty();
163162

@@ -281,7 +280,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
281280
if (conf.getProperties().isEmpty()) {
282281
metadata = Collections.emptyMap();
283282
} else {
284-
metadata = new HashMap<>(conf.getProperties());
283+
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
285284
}
286285

287286
InstrumentProvider ip = client.instrumentProvider();
@@ -1855,7 +1854,6 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
18551854
}
18561855

18571856
final CompletableFuture<Void> future = new CompletableFuture<>();
1858-
updateProxyMetadataIfNeeded(cnx);
18591857
cnx.sendRequestWithId(
18601858
Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
18611859
schemaInfo, epoch, userProvidedProducerName,
@@ -2028,14 +2026,6 @@ public void connectionFailed(PulsarClientException exception) {
20282026
}
20292027
}
20302028

2031-
private void updateProxyMetadataIfNeeded(ClientCnx cnx) {
2032-
boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null;
2033-
if (isProxy && cnx.getLocalAddress() != null) {
2034-
metadata = metadata.isEmpty() ? new HashMap<>() : metadata;
2035-
metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString());
2036-
}
2037-
}
2038-
20392029
private void closeProducerTasks() {
20402030
Timeout timeout = sendTimeout;
20412031
if (timeout != null) {

pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
*/
2626
public class Metadata {
2727

28-
public static final String CLIENT_IP = "X-Pulsar-Client-IP";
2928
private Metadata() {}
3029

3130
public static void validateMetadata(Map<String, String> metadata,

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,11 @@ public void testSimpleProduceAndConsume() throws PulsarClientException, PulsarAd
122122
SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subName);
123123
Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
124124
Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(),
125-
((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString());
125+
((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
126126

127127
topicStats = admin.topics().getStats(topicName);
128128
Assert.assertEquals(topicStats.getPublishers().size(), 1);
129129
Assert.assertEquals(topicStats.getPublishers().get(0).getAddress(),
130-
((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString());
130+
((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
131131
}
132132
}

0 commit comments

Comments
 (0)