Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pulsar/ack_grouping_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/bits-and-blooms/bitset"
)
Expand All @@ -40,6 +41,15 @@ type ackGroupingTracker interface {
close()
}

func isNonPersistentTopic(topic string) bool {
tn, err := internal.ParseTopicName(topic)
if err != nil {
// On a parse error, keep the default (persistent) behavior.
return false
}
return tn.Domain == "non-persistent"
}

func newAckGroupingTracker(options *AckGroupingOptions,
ackIndividual func(id MessageID),
ackCumulative func(id MessageID),
Expand Down
19 changes: 19 additions & 0 deletions pulsar/ack_grouping_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,22 @@ func TestTrackerPendingAcks(t *testing.T) {
assert.True(t, found)
assert.Equal(t, 0, len(ackSet)) // all messages in the batch are acknowledged
}

func TestIsNonPersistentTopic(t *testing.T) {
tests := []struct {
name string
topic string
want bool
}{
{"non-persistent scheme", "non-persistent://public/default/my-topic", true},
{"persistent scheme", "persistent://public/default/my-topic", false},
{"short name defaults to persistent", "my-topic", false},
{"namespace path defaults to persistent", "public/default/my-topic", false},
{"unparseable treated as persistent", "://invalid", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, isNonPersistentTopic(tt.topic))
})
}
}
16 changes: 15 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,21 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.ackGroupingTracker = newAckGroupingTracker(options.ackGroupingOptions,
ackGroupingOptions := options.ackGroupingOptions
if isNonPersistentTopic(pc.topic) {
// Non-persistent topics are not stored in BookKeeper, so the broker
// assigns every message the same placeholder MessageID (ledgerId=0,
// entryId=0). The grouping ack tracker's isDuplicate() dedupes by
// MessageID, so once the first message is acked, every later message
// looks like a duplicate of it and is silently dropped before reaching
// the application. Force the immediate (no-op isDuplicate) tracker for
// non-persistent topics, matching the Java and C++ clients, which use a
// dedicated non-persistent ack tracker that never dedupes. There is
// nothing to dedupe against anyway: non-persistent topics do not
// redeliver.
ackGroupingOptions = &AckGroupingOptions{MaxSize: 1}
}
pc.ackGroupingTracker = newAckGroupingTracker(ackGroupingOptions,
func(id MessageID) { pc.sendIndividualAck(id) },
func(id MessageID) { pc.sendCumulativeAck(id) },
func(ids []*pb.MessageIdData) {
Expand Down
50 changes: 50 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6508,3 +6508,53 @@ func TestConsumerWithDLQRetryTopicNoGetPartitionedTopicMetadata(t *testing.T) {
"GetPartitionedTopicMetadata should not be called with old Retry topic when custom Retry topic is provided")
}
}

// TestNonPersistentTopicReceiveAllMessages is a regression test for the bug
// where a consumer on a non-persistent topic received only the first message
// and then silently dropped the rest. See the non-persistent guard in
// newPartitionConsumer.
func TestNonPersistentTopicReceiveAllMessages(t *testing.T) {
client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
topic = "non-persistent://public/default/" + topic

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-non-persistent",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// Synchronous send-receive-ack loop: this is the pattern that
// deterministically triggered the "only the first message" bug, because
// each message arrives within the ack-grouping flush window right after the
// previous one was acked.
const total = 10
for i := 0; i < total; i++ {
_, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
})
assert.Nil(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
msg, err := consumer.Receive(ctx)
cancel()
assert.Nil(t, err, "did not receive message %d (dropped as a bogus duplicate?)", i)
if err != nil {
return
}
assert.Equal(t, fmt.Sprintf("msg-%d", i), string(msg.Payload()))
consumer.Ack(msg)
}
}