[Issue 1519][Consumer] Do not deduplicate messages on non-persistent topics#1520
Open
VitorNathanG wants to merge 1 commit into
Open
[Issue 1519][Consumer] Do not deduplicate messages on non-persistent topics#1520VitorNathanG wants to merge 1 commit into
VitorNathanG wants to merge 1 commit into
Conversation
…topics Non-persistent topics are not stored in BookKeeper, so the broker assigns every message the same placeholder MessageID (ledgerId=0, entryId=0). The grouping acknowledgment tracker deduplicates incoming messages by MessageID via isDuplicate(); once the first message on such a topic is acknowledged, its (0,0) key is recorded in the tracker and every subsequent message is judged a duplicate and dropped before it reaches Receive(). A consumer that acks each message before receiving the next (for example a synchronous request-reply loop) therefore receives only the first message and then blocks forever. A streaming consumer loses a large, timing-dependent fraction of its messages. Force the immediate ack tracker, whose isDuplicate() always returns false, for non-persistent topics. This matches the Java and C++ clients, which use a dedicated non-persistent ack tracker that never deduplicates. There is nothing to deduplicate against on a non-persistent topic in any case, since those topics never redeliver. The MessageID dedup was introduced in apache#957, which ported the grouping-ack tracker from the Java client but not its non-persistent counterpart. The Java client had the identical bug and fixed it the same way; see apache/pulsar#1967. Fixes apache#1519
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #1519
Motivation
A consumer on a non-persistent topic receives only the first message and then silently drops every message after it — no error is returned,
Receive()just blocks.Non-persistent topics are not stored in BookKeeper, so the broker assigns every message the same placeholder MessageID
(ledgerId=0, entryId=0). The grouping acknowledgment tracker deduplicates incoming messages by MessageID in the receive path:The default
timedAckGroupingTracker.isDuplicatereturnstrueonce a(ledgerId, entryId)key is in itspendingAcksmap. Because every non-persistent MessageID is(0,0), as soon as the first message is acknowledged every later message is treated as a duplicate of it and dropped before reaching the application. A synchronous consumer (ack each message before receiving the next) receives exactly one message and then blocks; a streaming consumer loses a large, timing-dependent fraction of its messages.The Java and C++ clients avoid this by using a dedicated non-persistent ack tracker that never deduplicates (Java:
NonPersistentAcknowledgmentGroupingTracker,isDuplicate()→return false; C++: the baseAckGroupingTrackerfor non-persistent topics). The Go client never made that distinction — tracker selection depends only onAckGroupingOptions.MaxSize, not on the topic domain. The MessageID-based dedup was introduced in #957, which ported the grouping-ack tracker from the Java client without its non-persistent counterpart. The Java client had the identical bug and fixed it the same way (apache/pulsar#1967).Modifications
isNonPersistentTopic(topic string) boolinack_grouping_tracker.go(uses the existinginternal.ParseTopicName; returnsfalseon parse error so the default persistent behavior is preserved).newPartitionConsumer, when the topic is non-persistent, force the immediate ack tracker (AckGroupingOptions{MaxSize: 1}), whoseisDuplicate()always returnsfalse. There is nothing to deduplicate against on a non-persistent topic anyway, since those topics never redeliver.Verifying this change
This change added tests and can be verified as follows:
TestIsNonPersistentTopic— unit test for the topic-domain helper (no broker required).TestNonPersistentTopicReceiveAllMessages— integration test that publishes 10 messages to a non-persistent topic in a synchronous send/receive/ack loop and asserts all 10 are received. Verified that it fails without the fix (times out on the 2nd message: "did not receive message 1") and passes with it.Does this pull request potentially affect one of the following parts:
AckGroupingOptionsdefault is unchanged)Documentation