diff --git a/pulsar/message.go b/pulsar/message.go index 3779cafc9c..0cd077eee3 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -27,6 +27,8 @@ type ProducerMessage struct { Payload []byte // Value and payload is mutually exclusive, `Value interface{}` for schema message. + // If you encode message externally (for ex: with a different avro encoder), leave + // Value nil and set the `Payload` to encoded `[]byte` value Value interface{} // Key sets the key of the message for routing policy diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index d67c0c0af2..9762efae0a 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -409,7 +409,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { payload := msg.Payload var schemaPayload []byte var err error - if p.options.Schema != nil { + // if msg.Value is nil, encoding can happen externally to set msg.Payload directly. + if p.options.Schema != nil && msg.Value != nil { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)