From 095823702457343f6e4da5f82e1d0312491b969d Mon Sep 17 00:00:00 2001 From: Stepan Mazurov Date: Tue, 27 Jul 2021 08:12:49 -0600 Subject: [PATCH 1/2] Avoid parsing nil msg.Value --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index abec4fc1f7..9c268dc426 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -353,7 +353,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { payload := msg.Payload var schemaPayload []byte var err error - if p.options.Schema != nil { + if p.options.Schema != nil && msg.Value != nil { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) From 041aa725676d402f776596bc0578c09f4b800d51 Mon Sep 17 00:00:00 2001 From: Stepan Mazurov Date: Tue, 27 Jul 2021 23:54:34 -0600 Subject: [PATCH 2/2] chore: add comments to better describe ext. encoding behavior --- pulsar/message.go | 4 +++- pulsar/producer_partition.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar/message.go b/pulsar/message.go index 23dfefb418..8bb8ac02ba 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -26,7 +26,9 @@ type ProducerMessage struct { // Payload for the message Payload []byte - //Value and payload is mutually exclusive, `Value interface{}` for schema message. + // Value and payload is mutually exclusive, `Value interface{}` for schema message. + // If you encode message externally (for ex: with a different avro encoder), leave + // Value nil and set the `Payload` to encoded `[]byte` value Value interface{} // Key sets the key of the message for routing policy diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9c268dc426..1785708cf9 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -353,6 +353,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { payload := msg.Payload var schemaPayload []byte var err error + // if msg.Value is nil, encoding can happen externally to set msg.Payload directly. if p.options.Schema != nil && msg.Value != nil { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil {