Skip to content

[Issue 1519][Consumer] Do not deduplicate messages on non-persistent topics#1520

Open
VitorNathanG wants to merge 1 commit into
apache:masterfrom
VitorNathanG:fix/non-persistent-dedup
Open

[Issue 1519][Consumer] Do not deduplicate messages on non-persistent topics#1520
VitorNathanG wants to merge 1 commit into
apache:masterfrom
VitorNathanG:fix/non-persistent-dedup

Conversation

@VitorNathanG

Copy link
Copy Markdown

Fixes #1519

Motivation

A consumer on a non-persistent topic receives only the first message and then silently drops every message after it — no error is returned, Receive() just blocks.

Non-persistent topics are not stored in BookKeeper, so the broker assigns every message the same placeholder MessageID (ledgerId=0, entryId=0). The grouping acknowledgment tracker deduplicates incoming messages by MessageID in the receive path:

// pulsar/consumer_partition.go
if pc.ackGroupingTracker.isDuplicate(msgID) {
    skippedMessages++
    continue
}

The default timedAckGroupingTracker.isDuplicate returns true once a (ledgerId, entryId) key is in its pendingAcks map. Because every non-persistent MessageID is (0,0), as soon as the first message is acknowledged every later message is treated as a duplicate of it and dropped before reaching the application. A synchronous consumer (ack each message before receiving the next) receives exactly one message and then blocks; a streaming consumer loses a large, timing-dependent fraction of its messages.

The Java and C++ clients avoid this by using a dedicated non-persistent ack tracker that never deduplicates (Java: NonPersistentAcknowledgmentGroupingTracker, isDuplicate()return false; C++: the base AckGroupingTracker for non-persistent topics). The Go client never made that distinction — tracker selection depends only on AckGroupingOptions.MaxSize, not on the topic domain. The MessageID-based dedup was introduced in #957, which ported the grouping-ack tracker from the Java client without its non-persistent counterpart. The Java client had the identical bug and fixed it the same way (apache/pulsar#1967).

Modifications

  • Add isNonPersistentTopic(topic string) bool in ack_grouping_tracker.go (uses the existing internal.ParseTopicName; returns false on parse error so the default persistent behavior is preserved).
  • In newPartitionConsumer, when the topic is non-persistent, force the immediate ack tracker (AckGroupingOptions{MaxSize: 1}), whose isDuplicate() always returns false. There is nothing to deduplicate against on a non-persistent topic anyway, since those topics never redeliver.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • TestIsNonPersistentTopic — unit test for the topic-domain helper (no broker required).
  • TestNonPersistentTopicReceiveAllMessages — integration test that publishes 10 messages to a non-persistent topic in a synchronous send/receive/ack loop and asserts all 10 are received. Verified that it fails without the fix (times out on the 2nd message: "did not receive message 1") and passes with it.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no (behavior change is internal and scoped to non-persistent topics; the public AckGroupingOptions default is unchanged)
  • The wire protocol: no

Documentation

  • Does this pull request introduce a new feature? no

…topics

Non-persistent topics are not stored in BookKeeper, so the broker assigns
every message the same placeholder MessageID (ledgerId=0, entryId=0). The
grouping acknowledgment tracker deduplicates incoming messages by MessageID
via isDuplicate(); once the first message on such a topic is acknowledged,
its (0,0) key is recorded in the tracker and every subsequent message is
judged a duplicate and dropped before it reaches Receive(). A consumer that
acks each message before receiving the next (for example a synchronous
request-reply loop) therefore receives only the first message and then
blocks forever. A streaming consumer loses a large, timing-dependent
fraction of its messages.

Force the immediate ack tracker, whose isDuplicate() always returns false,
for non-persistent topics. This matches the Java and C++ clients, which use
a dedicated non-persistent ack tracker that never deduplicates. There is
nothing to deduplicate against on a non-persistent topic in any case, since
those topics never redeliver.

The MessageID dedup was introduced in apache#957, which ported the grouping-ack
tracker from the Java client but not its non-persistent counterpart. The
Java client had the identical bug and fixed it the same way; see
apache/pulsar#1967.

Fixes apache#1519
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Consumer on a non-persistent topic receives only the first message (MessageID (0,0) dedup)

1 participant