From 5f3d4cbb0aa209d35aa79d7f2928608874b094f4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 8 Mar 2023 17:09:11 +0800 Subject: [PATCH 1/3] PIP-254: Support configuring client version ### Motivation https://github.com/apache/pulsar/issues/19705 ### Modifications - Add the `ClientBuilderImpl#description` method to add the description to the original client version string that is set in `CommandConnect` and `CommandAuthResponse`. - Add `testClientVersion` to cover these two cases. --- .../client/api/MutualAuthenticationTest.java | 48 ++++++++++++++++--- .../api/SimpleProducerConsumerTest.java | 34 ++++++++++++- .../pulsar/client/impl/ClientBuilderImpl.java | 23 +++++++++ .../apache/pulsar/client/impl/ClientCnx.java | 9 ++-- .../impl/conf/ClientConfigurationData.java | 6 +++ 5 files changed, 110 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java index 472af3e88cd36..2cebf34810a7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java @@ -26,20 +26,26 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; /** * Test Mutual Authentication. @@ -182,7 +188,7 @@ public AuthenticationState newAuthState(AuthData authData, } } - @BeforeMethod(alwaysRun = true) + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { mutualAuth = new MutualAuthentication(); @@ -205,7 +211,7 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { clientBuilder.authentication(mutualAuth); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { internalCleanup(); @@ -214,12 +220,13 @@ protected void cleanup() throws Exception { @Test public void testAuthentication() throws Exception { log.info("-- Starting {} test --", methodName); + String topic = "persistent://my-property/my-ns/test-authentication"; - Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1") + Consumer consumer = pulsarClient.newConsumer().topic(topic) .subscriptionName("my-subscriber-name") .subscribe(); Producer producer = pulsarClient.newProducer(Schema.BYTES) - .topic("persistent://my-property/my-ns/my-topic1") + .topic(topic) .create(); for (int i = 0; i < 10; i++) { @@ -239,4 +246,33 @@ public void testAuthentication() throws Exception { log.info("-- Exiting {} test --", methodName); } + + @Test + public void testClientVersion() throws Exception { + String defaultClientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion(); + String topic = "persistent://my-property/my-ns/test-client-version"; + + Producer producer1 = pulsarClient.newProducer() + .topic(topic) + .create(); + TopicStats stats = admin.topics().getStats(topic); + assertEquals(stats.getPublishers().size(), 1); + assertEquals(stats.getPublishers().get(0).getClientVersion(), defaultClientVersion); + + PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder()) + .description("my-java-client") + .serviceUrl(lookupUrl.toString()) + .authentication(mutualAuth) + .build(); + Producer producer2 = client.newProducer().topic(topic).create(); + stats = admin.topics().getStats(topic); + assertEquals(stats.getPublishers().size(), 2); + + assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()), + Sets.newHashSet(defaultClientVersion, defaultClientVersion + "my-java-client")); + + producer1.close(); + producer2.close(); + client.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 293e298fc6671..1b33a91da5ca9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -83,10 +83,12 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -105,6 +107,8 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; @@ -4581,4 +4585,32 @@ public void testSendMsgGreaterThanBatchingMaxBytes() throws Exception { // sendAsync should complete in time assertNotNull(producer.sendAsync(msg).get(timeoutSec, TimeUnit.SECONDS)); } -} \ No newline at end of file + + @Test + public void testClientVersion() throws Exception { + String defaultClientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion(); + String topic = "persistent://my-property/my-ns/test-client-version"; + + Producer producer1 = pulsarClient.newProducer() + .topic(topic) + .create(); + TopicStats stats = admin.topics().getStats(topic); + assertEquals(stats.getPublishers().size(), 1); + assertEquals(stats.getPublishers().get(0).getClientVersion(), defaultClientVersion); + + PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder()) + .description("my-java-client") + .serviceUrl(lookupUrl.toString()) + .build(); + Producer producer2 = client.newProducer().topic(topic).create(); + stats = admin.topics().getStats(topic); + assertEquals(stats.getPublishers().size(), 2); + + assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()), + Sets.newHashSet(defaultClientVersion, defaultClientVersion + "my-java-client")); + + producer1.close(); + producer2.close(); + client.close(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 523acdace3950..741df1cbb8b13 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -410,4 +410,27 @@ public ClientBuilder socks5ProxyPassword(String socks5ProxyPassword) { conf.setSocks5ProxyPassword(socks5ProxyPassword); return this; } + + /** + * Set the description. + * + *

By default, when the client connects to the broker, a version string like "Pulsar-Java-v" will be + * carried and saved by the broker. The client version string could be queried from the topic stats. + * + *

This method provides a way to add more description to a specific PulsarClient instance. If it's configured, + * the description will be appended to the original client version string, with '-' as the separator. + * + *

For example, if the client version is 3.0.0, and the description is "forked", the final client version string + * will be "Pulsar-Java-v3.0.0-forked". + * + * @param description the description of the current PulsarClient instance + * @throws IllegalArgumentException if the length of description exceeds 64 + */ + public ClientBuilder description(String description) { + if (description.length() > 64) { + throw new IllegalArgumentException("description should be at most 64 characters"); + } + conf.setDescription(description); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 7780856c6948e..145d5701575a9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -194,6 +194,8 @@ public class ClientCnx extends PulsarHandler { @Getter private long lastDisconnectedTimestamp; + private final String clientVersion; + protected enum State { None, SentConnectFrame, Ready, Failed, Connecting } @@ -252,6 +254,8 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in this.state = State.None; this.protocolVersion = protocolVersion; this.idleState = new ClientCnxIdleState(this); + this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion() + + (conf.getDescription() == null ? "" : conf.getDescription()); } @Override @@ -293,8 +297,7 @@ protected ByteBuf newConnectCommand() throws Exception { authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, - String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()), proxyToTargetBrokerAddress, null, null, - null); + clientVersion, proxyToTargetBrokerAddress, null, null, null); } @Override @@ -411,7 +414,7 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { ByteBuf request = Commands.newAuthResponse(authentication.getAuthMethodName(), authData, this.protocolVersion, - String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + clientVersion); if (log.isDebugEnabled()) { log.debug("{} Mutual auth {}", ctx.channel(), authentication.getAuthMethodName()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 1e7bc6f8221cf..a732ef349af1f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -379,6 +379,12 @@ public class ClientConfigurationData implements Serializable, Cloneable { @Secret private String socks5ProxyPassword; + @ApiModelProperty( + name = "description", + value = "The extra description of the client version." + ) + private String description; + /** * Gets the authentication settings for the client. * From 9ea0d234e853f6a45e0220e70ae2c1dac4638663 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 4 Apr 2023 21:27:56 +0800 Subject: [PATCH 2/3] Add the dash as the delimiter --- .../org/apache/pulsar/client/api/MutualAuthenticationTest.java | 2 +- .../apache/pulsar/client/api/SimpleProducerConsumerTest.java | 2 +- .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java index 2cebf34810a7b..2fc8aebf64a4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java @@ -269,7 +269,7 @@ public void testClientVersion() throws Exception { assertEquals(stats.getPublishers().size(), 2); assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()), - Sets.newHashSet(defaultClientVersion, defaultClientVersion + "my-java-client")); + Sets.newHashSet(defaultClientVersion, defaultClientVersion + "-my-java-client")); producer1.close(); producer2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 1b33a91da5ca9..1bc437195d96d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4607,7 +4607,7 @@ public void testClientVersion() throws Exception { assertEquals(stats.getPublishers().size(), 2); assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()), - Sets.newHashSet(defaultClientVersion, defaultClientVersion + "my-java-client")); + Sets.newHashSet(defaultClientVersion, defaultClientVersion + "-my-java-client")); producer1.close(); producer2.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 145d5701575a9..115c71307c4f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -255,7 +255,7 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in this.protocolVersion = protocolVersion; this.idleState = new ClientCnxIdleState(this); this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion() - + (conf.getDescription() == null ? "" : conf.getDescription()); + + (conf.getDescription() == null ? "" : ("-" + conf.getDescription())); } @Override From 927064b2d083595b2568564f25d1d0350d89580e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 10 Apr 2023 18:46:25 +0800 Subject: [PATCH 3/3] Document the length limit and add null check --- .../java/org/apache/pulsar/client/impl/ClientBuilderImpl.java | 2 +- .../apache/pulsar/client/impl/conf/ClientConfigurationData.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 741df1cbb8b13..7677045f0899b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -427,7 +427,7 @@ public ClientBuilder socks5ProxyPassword(String socks5ProxyPassword) { * @throws IllegalArgumentException if the length of description exceeds 64 */ public ClientBuilder description(String description) { - if (description.length() > 64) { + if (description != null && description.length() > 64) { throw new IllegalArgumentException("description should be at most 64 characters"); } conf.setDescription(description); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index a732ef349af1f..7d94675ccba7d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -381,7 +381,7 @@ public class ClientConfigurationData implements Serializable, Cloneable { @ApiModelProperty( name = "description", - value = "The extra description of the client version." + value = "The extra description of the client version. The length cannot exceed 64." ) private String description;