diff --git a/pulsar/client.go b/pulsar/client.go index 368b4f3767..ec3438b365 100644 --- a/pulsar/client.go +++ b/pulsar/client.go @@ -171,6 +171,15 @@ type ClientOptions struct { // The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://" // The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface LookupProperties map[string]string + + // Set the description. + // By default, when the client connects to the broker, a version string like "Pulsar Go " will be + // carried and saved by the broker. The client version string could be queried from the topic stats. + // This method provides a way to add more description to a specific PulsarClient instance. If it's configured, + // the description will be appended to the original client version string, with '-' as the separator. + // For example, if the client version is 3.0.0, and the description is "forked", the final client version string + // "Pulsar Go 3.0.0-forked". + Description string } // Client represents a pulsar client diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 08c5616870..0295d1ecf7 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -163,7 +163,7 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, - maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), + maxConnectionsPerHost, logger, metrics, options.Description, connectionMaxIdleTime), log: logger, metrics: metrics, memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 0fdf6be73f..0896ec4f97 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -29,6 +29,10 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/stretchr/testify/assert" @@ -208,6 +212,43 @@ func TestTokenAuth(t *testing.T) { client.Close() } +func TestTokenAuthWithClientVersion(t *testing.T) { + token, err := os.ReadFile(tokenFilePath) + assert.NoError(t, err) + + client, err := NewClient(ClientOptions{ + URL: serviceURL, + Authentication: NewAuthenticationToken(string(token)), + Description: "test-client", + }) + assert.NoError(t, err) + defer client.Close() + + topic := newAuthTopicName() + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + + readFile, err := os.ReadFile("../integration-tests/tokens/admin-token") + assert.NoError(t, err) + cfg := &config.Config{ + Token: string(readFile), + } + admin, err := admin.New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + topicState, err := admin.Topics().GetStats(*topicName) + assert.Nil(t, err) + publisher := topicState.Publishers[0] + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) +} func TestTokenAuthWithSupplier(t *testing.T) { client, err := NewClient(ClientOptions{ diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index eae8dd3c2e..3445f42543 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -26,11 +26,14 @@ import ( "os" "regexp" "strconv" + "strings" "sync" "sync/atomic" "testing" "time" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin" + "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -4985,3 +4988,53 @@ func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) { return true }, 30*time.Second, 1*time.Second) } + +func TestClientVersion(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + + cfg := &config.Config{} + admin, err := admin.New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + topicState, err := admin.Topics().GetStats(*topicName) + assert.Nil(t, err) + publisher := topicState.Publishers[0] + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + + topic = newTopicName() + client, err = NewClient(ClientOptions{ + URL: lookupURL, + Description: "test-client", + }) + assert.Nil(t, err) + producer, err = client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + topicName, err = utils.GetTopicName(topic) + assert.Nil(t, err) + topicState, err = admin.Topics().GetStats(*topicName) + assert.Nil(t, err) + publisher = topicState.Publishers[0] + assert.True(t, strings.HasPrefix(publisher.ClientVersion, "Pulsar Go version")) + assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client")) + +} diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 04a3cc83f9..84c4323d9c 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -176,7 +176,8 @@ type connection struct { keepAliveInterval time.Duration - lastActive time.Time + lastActive time.Time + description string } // connectionOptions defines configurations for creating connection. @@ -189,6 +190,7 @@ type connectionOptions struct { logger log.Logger metrics *Metrics keepAliveInterval time.Duration + description string } func newConnection(opts connectionOptions) *connection { @@ -218,6 +220,7 @@ func newConnection(opts connectionOptions) *connection { listeners: make(map[uint64]ConnectionListener), consumerHandlers: make(map[uint64]ConsumerHandler), metrics: opts.metrics, + description: opts.description, } cnx.state.Store(int32(connectionInit)) cnx.reader = newConnectionReader(cnx) @@ -305,7 +308,7 @@ func (c *connection) doHandshake() bool { c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval)) cmdConnect := &pb.CommandConnect{ ProtocolVersion: proto.Int32(PulsarProtocolVersion), - ClientVersion: proto.String(ClientVersionString), + ClientVersion: proto.String(c.getClientVersion()), AuthMethodName: proto.String(c.auth.Name()), AuthData: authData, FeatureFlags: &pb.FeatureFlags{ @@ -346,6 +349,16 @@ func (c *connection) doHandshake() bool { return true } +func (c *connection) getClientVersion() string { + var clientVersion string + if c.description == "" { + clientVersion = ClientVersionString + } else { + clientVersion = fmt.Sprintf("%s-%s", ClientVersionString, c.description) + } + return clientVersion +} + func (c *connection) IsProxied() bool { return c.logicalAddr.Host != c.physicalAddr.Host } @@ -832,7 +845,7 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) cmdAuthResponse := &pb.CommandAuthResponse{ ProtocolVersion: proto.Int32(PulsarProtocolVersion), - ClientVersion: proto.String(ClientVersionString), + ClientVersion: proto.String(c.getClientVersion()), Response: &pb.AuthData{ AuthMethodName: proto.String(c.auth.Name()), AuthData: authData, diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 5f858b1182..ee583825a9 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -52,8 +52,9 @@ type connectionPool struct { keepAliveInterval time.Duration closeCh chan struct{} - metrics *Metrics - log log.Logger + metrics *Metrics + log log.Logger + description string } // NewConnectionPool init connection pool. @@ -65,6 +66,7 @@ func NewConnectionPool( maxConnectionsPerHost int, logger log.Logger, metrics *Metrics, + description string, connectionMaxIdleTime time.Duration) ConnectionPool { p := &connectionPool{ connections: make(map[string]*connection), @@ -76,6 +78,7 @@ func NewConnectionPool( log: logger, metrics: metrics, closeCh: make(chan struct{}), + description: description, } go p.checkAndCleanIdleConnections(connectionMaxIdleTime) return p @@ -113,6 +116,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U keepAliveInterval: p.keepAliveInterval, logger: p.log, metrics: p.metrics, + description: p.description, }) p.connections[key] = conn p.Unlock()