Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type ProducerMessage struct {
Payload []byte

// Value and payload is mutually exclusive, `Value interface{}` for schema message.
// If you encode message externally (for ex: with a different avro encoder), leave
// Value nil and set the `Payload` to encoded `[]byte` value
Value interface{}

// Key sets the key of the message for routing policy
Expand Down
3 changes: 2 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
payload := msg.Payload
var schemaPayload []byte
var err error
if p.options.Schema != nil {
// if msg.Value is nil, encoding can happen externally to set msg.Payload directly.
if p.options.Schema != nil && msg.Value != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be checked within in the if statement and then log an error and return?

if p.options.Schema != nil {
  if msg.Value == nil {
    p.log.WithError(err).Errorf("Schema encode message failed nil message")
    return
  }

}

Or should the ecoders be able to handle a nil value and return an error?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the assumption is that if value is null, a payload is already encoded.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a safe assumption to make? What happens on the consumer side if that assumption does not hold?

I think we should at least add a comment explaining this because from looking at the code it's not obvious to me.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - great call. Docs added.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more questions. I'm trying to figure out how to best solve this from an api perspective because if this goes in we are changing the api and will have to support it.

It seems like it's possible to pass a custom implementation of a schema is there a reason this can't solve the issue?

Why is app encoding the message before hand?

If the app is bypassing the internal encoder when producing does it need to bypass internal decoder on the consumer side as well?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can always bypass it on the consumer side if you don't call it and just read the Payload. You could implement a custom schema, maybe, but it would still be "hacky" for the use case I am proposing which is as a watermill adapter. Their interface requires payload to be a []byte, so we would have to set that to nil, then add the actual value to an untyped property.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the intention is to bypass the schema encoding/decoding why set the schema on the producer and or topic?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still want to take advantage of pushing schema to pulsar topic for schema validation

schemaPayload, err = p.options.Schema.Encode(msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
Expand Down