Skip to content

Commit fe8aaaf

Browse files
authored
[Reader] fix: deliver null-value tombstones instead of discarding them (#1482)
* fix(reader): deliver null-value tombstones instead of discarding them A message published with MessageMetadata.null_value = true (the Pulsar compaction tombstone convention) was conflated with a deserialization failure in partitionConsumer.MessageReceived and silently discarded. Because lastDequeuedMsg was never advanced past the tombstone, hasMoreMessages kept returning true and Reader.Next blocked forever when a tombstone was the last message on a topic. Consumer: when the reader yields an empty payload and msgMeta or the single-message metadata has null_value set, build a normal message with payLoad == nil and take the usual dispatch path so lastDequeuedMsg advances. Real corruption still routes through discardCorruptedMessage. Producer: set MessageMetadata.null_value / SingleMessageMetadata.null_value when both Value and Payload are nil, matching the Java client so Go-produced tombstones carry the flag consumers need. Message gains an IsNullValue() bool accessor so applications can tell tombstones apart from empty payloads. * fix: explicitly reseting err Quoting Copilot's feedback: When accepting a tombstone with err == internal.ErrEOM, the code keeps err non-nil and continues. It works today because err is not used later, but it’s easy to misread and could become a latent bug if future logic inspects/returns err after this block. Consider explicitly setting err = nil in the tombstone/ErrEOM acceptance path to make the intent unambiguous. * fix: supress lint error
1 parent a333bb9 commit fe8aaaf

7 files changed

Lines changed: 116 additions & 1 deletion

File tree

pulsar/consumer_partition.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1308,10 +1308,28 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
13081308
)
13091309
for i := 0; i < numMsgs; i++ {
13101310
smm, payload, err := reader.ReadMessage()
1311-
if err != nil || payload == nil {
1311+
isNullValue := msgMeta.GetNullValue() || (smm != nil && smm.GetNullValue())
1312+
if err != nil {
1313+
// A null-value (tombstone) message has no payload bytes on the wire, so
1314+
// the non-batched reader returns ErrEOM. Accept it instead of discarding
1315+
// it as corrupted, matching the Java client's behavior for compaction
1316+
// tombstones.
1317+
if isNullValue && err == internal.ErrEOM {
1318+
payload = nil
1319+
// Explicit reset to make tombstone-acceptance
1320+
// intent unambiguous.
1321+
err = nil //nolint:ineffassign
1322+
} else {
1323+
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
1324+
return err
1325+
}
1326+
} else if payload == nil && !isNullValue {
13121327
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
13131328
return err
13141329
}
1330+
if isNullValue {
1331+
payload = nil
1332+
}
13151333
if ackSet != nil && !ackSet.Test(uint(i)) {
13161334
pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i)
13171335
skippedMessages++
@@ -1396,6 +1414,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
13961414
index: messageIndex,
13971415
brokerPublishTime: brokerPublishTime,
13981416
conn: pc._getConn(),
1417+
isNullValue: isNullValue,
13991418
}
14001419
} else {
14011420
msg = &message{
@@ -1417,6 +1436,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
14171436
index: messageIndex,
14181437
brokerPublishTime: brokerPublishTime,
14191438
conn: pc._getConn(),
1439+
isNullValue: isNullValue,
14201440
}
14211441
}
14221442

pulsar/impl_message.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ type message struct {
316316
index *uint64
317317
brokerPublishTime *time.Time
318318
conn internal.Connection
319+
isNullValue bool
319320
}
320321

321322
func (msg *message) Topic() string {
@@ -330,6 +331,10 @@ func (msg *message) Payload() []byte {
330331
return msg.payLoad
331332
}
332333

334+
func (msg *message) IsNullValue() bool {
335+
return msg.isNullValue
336+
}
337+
333338
func (msg *message) ID() MessageID {
334339
return msg.msgID
335340
}

pulsar/internal/pulsartracing/message_carrier_util_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func (msg *mockConsumerMessage) Payload() []byte {
8080
return nil
8181
}
8282

83+
func (msg *mockConsumerMessage) IsNullValue() bool {
84+
return false
85+
}
86+
8387
func (msg *mockConsumerMessage) ID() pulsar.MessageID {
8488
return nil
8589
}

pulsar/message.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ type Message interface {
9090
// Payload returns the payload of the message
9191
Payload() []byte
9292

93+
// IsNullValue reports whether the message was published as a null-value
94+
// (tombstone) message, i.e. with MessageMetadata.null_value set. For such
95+
// messages Payload returns nil. Applications use this flag together with
96+
// Pulsar topic compaction to mark a key as deleted.
97+
IsNullValue() bool
98+
9399
// ID returns the unique message ID associated with this message.
94100
// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
95101
ID() MessageID

pulsar/negative_acks_tracker_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ func (msg *mockMessage1) Payload() []byte {
197197
return nil
198198
}
199199

200+
func (msg *mockMessage1) IsNullValue() bool {
201+
return false
202+
}
203+
200204
func (msg *mockMessage1) ID() MessageID {
201205
return &messageID{
202206
ledgerID: 1,
@@ -273,6 +277,10 @@ func (msg *mockMessage2) Payload() []byte {
273277
return nil
274278
}
275279

280+
func (msg *mockMessage2) IsNullValue() bool {
281+
return false
282+
}
283+
276284
func (msg *mockMessage2) ID() MessageID {
277285
return &messageID{
278286
ledgerID: 2,

pulsar/producer_partition.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,10 @@ func (p *partitionProducer) genMetadata(msg *ProducerMessage,
724724
UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
725725
}
726726

727+
if msg.Value == nil && msg.Payload == nil {
728+
mm.NullValue = proto.Bool(true)
729+
}
730+
727731
if !msg.EventTime.IsZero() {
728732
mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
729733
}
@@ -771,6 +775,10 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(
771775
PayloadSize: proto.Int32(int32(uncompressedSize)),
772776
}
773777

778+
if msg.Value == nil && msg.Payload == nil {
779+
smm.NullValue = proto.Bool(true)
780+
}
781+
774782
if !msg.EventTime.IsZero() {
775783
smm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
776784
}

pulsar/reader_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,70 @@ func TestReaderHasNext(t *testing.T) {
496496
assert.Equal(t, 10, i)
497497
}
498498

499+
// TestReaderNullValueTombstone verifies that a message published with a nil
500+
// Value / nil Payload (the Pulsar compaction tombstone convention) is delivered
501+
// to Reader.Next.
502+
func TestReaderNullValueTombstone(t *testing.T) {
503+
client, err := NewClient(ClientOptions{
504+
URL: lookupURL,
505+
})
506+
assert.Nil(t, err)
507+
defer client.Close()
508+
509+
topic := newTopicName()
510+
ctx := context.Background()
511+
512+
producer, err := client.CreateProducer(ProducerOptions{
513+
Topic: topic,
514+
DisableBatching: true,
515+
})
516+
assert.Nil(t, err)
517+
518+
_, err = producer.Send(ctx, &ProducerMessage{
519+
Key: "k",
520+
Payload: []byte("v1"),
521+
})
522+
assert.NoError(t, err)
523+
524+
tombstoneID, err := producer.Send(ctx, &ProducerMessage{
525+
Key: "k",
526+
Payload: nil,
527+
})
528+
assert.NoError(t, err)
529+
producer.Close()
530+
531+
reader, err := client.CreateReader(ReaderOptions{
532+
Topic: topic,
533+
StartMessageID: EarliestMessageID(),
534+
StartMessageIDInclusive: true,
535+
})
536+
assert.Nil(t, err)
537+
defer reader.Close()
538+
539+
lastID, err := reader.GetLastMessageID()
540+
assert.NoError(t, err)
541+
assert.Equal(t, tombstoneID.LedgerID(), lastID.LedgerID())
542+
assert.Equal(t, tombstoneID.EntryID(), lastID.EntryID())
543+
544+
firstCtx, firstCancel := context.WithTimeout(ctx, 5*time.Second)
545+
defer firstCancel()
546+
msg, err := reader.Next(firstCtx)
547+
assert.NoError(t, err)
548+
assert.Equal(t, []byte("v1"), msg.Payload())
549+
assert.False(t, msg.IsNullValue())
550+
551+
secondCtx, secondCancel := context.WithTimeout(ctx, 5*time.Second)
552+
defer secondCancel()
553+
tombstone, err := reader.Next(secondCtx)
554+
assert.NoError(t, err)
555+
assert.True(t, tombstone.IsNullValue())
556+
assert.Nil(t, tombstone.Payload())
557+
assert.Equal(t, lastID.LedgerID(), tombstone.ID().LedgerID())
558+
assert.Equal(t, lastID.EntryID(), tombstone.ID().EntryID())
559+
560+
assert.False(t, reader.HasNext())
561+
}
562+
499563
type myMessageID struct {
500564
data []byte
501565
}

0 commit comments

Comments
 (0)